Parsely / pykafka

Compare e7665bf ... +40 ... 8b42a20


@@ -62,6 +62,7 @@
Loading
62 62
                 source_host='',
63 63
                 source_port=0,
64 64
                 ssl_config=None,
65 +
                 sasl_authenticator=None,
65 66
                 broker_version="0.9.0",
66 67
                 api_versions=None):
67 68
        """Create a Broker instance.
@@ -92,6 +93,8 @@
Loading
92 93
        :type source_port: int
93 94
        :param ssl_config: Config object for SSL connection
94 95
        :type ssl_config: :class:`pykafka.connection.SslConfig`
96 +
        :param sasl_authenticator: Authenticator to use for authentication using sasl.
97 +
        :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator`
95 98
        :param broker_version: The protocol version of the cluster being connected to.
96 99
            If this parameter doesn't match the actual broker version, some pykafka
97 100
            features may not work properly.
@@ -117,6 +120,7 @@
Loading
117 120
        self._req_handlers = {}
118 121
        self._broker_version = broker_version
119 122
        self._api_versions = api_versions
123 +
        self._sasl_authenticator = sasl_authenticator
120 124
        try:
121 125
            self.connect()
122 126
        except SocketDisconnectedError:
@@ -144,6 +148,7 @@
Loading
144 148
                      source_host='',
145 149
                      source_port=0,
146 150
                      ssl_config=None,
151 +
                      sasl_authenticator=None,
147 152
                      broker_version="0.9.0",
148 153
                      api_versions=None):
149 154
        """Create a Broker using BrokerMetadata
@@ -169,6 +174,8 @@
Loading
169 174
        :type source_port: int
170 175
        :param ssl_config: Config object for SSL connection
171 176
        :type ssl_config: :class:`pykafka.connection.SslConfig`
177 +
        :param sasl_authenticator: Authenticator to use for authentication using sasl.
178 +
        :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator`
172 179
        :param broker_version: The protocol version of the cluster being connected to.
173 180
            If this parameter doesn't match the actual broker version, some pykafka
174 181
            features may not work properly.
@@ -184,6 +191,7 @@
Loading
184 191
                   source_host=source_host,
185 192
                   source_port=source_port,
186 193
                   ssl_config=ssl_config,
194 +
                   sasl_authenticator=sasl_authenticator,
187 195
                   broker_version=broker_version,
188 196
                   api_versions=api_versions)
189 197
@@ -203,6 +211,22 @@
Loading
203 211
            return self._offsets_channel_connection.connected
204 212
        return False
205 213
214 +
    @property
215 +
    def authenticated(self):
216 +
        """Returns True if this object's main connection to the Kafka broker
217 +
            is authenticated
218 +
        """
219 +
        return self._connection.authenticated
220 +
221 +
    @property
222 +
    def offsets_channel_authenticated(self):
223 +
        """Returns True if this object's offsets channel connection to the
224 +
            Kafka broker is authenticated
225 +
        """
226 +
        if self._offsets_channel_connection:
227 +
            return self._offsets_channel_connection.authenticated
228 +
        return False
229 +
206 230
    @property
207 231
    def id(self):
208 232
        """The broker's ID within the Kafka cluster"""
@@ -246,7 +270,8 @@
Loading
246 270
                                            buffer_size=self._buffer_size,
247 271
                                            source_host=self._source_host,
248 272
                                            source_port=self._source_port,
249 -
                                            ssl_config=self._ssl_config)
273 +
                                            ssl_config=self._ssl_config,
274 +
                                            sasl_authenticator=self._sasl_authenticator)
250 275
        self._connection.connect(self._socket_timeout_ms, attempts=attempts)
251 276
        self._req_handler = RequestHandler(self._handler, self._connection)
252 277
        self._req_handler.start()

@@ -91,6 +91,11 @@
Loading
91 91
        self.partition = partition
92 92
93 93
94 +
class AuthenticationException(KafkaException):
95 +
    """Indicates that something went wrong during Authentication."""
96 +
    pass
97 +
98 +
94 99
"""
95 100
Protocol Client Exceptions
96 101
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
@@ -254,6 +259,26 @@
Loading
254 259
    ERROR_CODE = 30
255 260
256 261
262 +
class ClusteAuthorizationFailed(ProtocolClientError):
263 +
    """Cluster authorization failed."""
264 +
    ERROR_CODE = 31
265 +
266 +
267 +
class UnsupportedSaslMechanism(ProtocolClientError, AuthenticationException):
268 +
    """The broker does not support the requested SASL mechanism."""
269 +
    ERROR_CODE = 33
270 +
271 +
272 +
class IllegalSaslState(ProtocolClientError, AuthenticationException):
273 +
    """Request is not valid given the current SASL state."""
274 +
    ERROR_CODE = 34
275 +
276 +
277 +
class SaslAuthenticationFailed(ProtocolClientError, AuthenticationException):
278 +
    """SASL authentication failed."""
279 +
    ERROR_CODE = 58
280 +
281 +
257 282
ERROR_CODES = dict(
258 283
    (exc.ERROR_CODE, exc)
259 284
    for exc in (UnknownError,
@@ -276,7 +301,11 @@
Loading
276 301
                InvalidSessionTimeout,
277 302
                RebalanceInProgress,
278 303
                TopicAuthorizationFailed,
279 -
                GroupAuthorizationFailed)
304 +
                GroupAuthorizationFailed,
305 +
                ClusteAuthorizationFailed,
306 +
                UnsupportedSaslMechanism,
307 +
                IllegalSaslState,
308 +
                SaslAuthenticationFailed)
280 309
)
281 310
282 311

@@ -64,6 +64,7 @@
Loading
64 64
        self._stop_poller_thread = cluster.handler.Event()
65 65
        self._broker_version = cluster._broker_version
66 66
        self._fetch_error_backoff_ms = valid_int(fetch_error_backoff_ms)
67 +
        self._sasl_conf = {} if cluster.sasl_authenticator is None else cluster.sasl_authenticator.get_rd_kafka_opts()
67 68
        # super() must come last for the case where auto_start=True
68 69
        super(RdKafkaSimpleConsumer, self).__init__(**callargs)
69 70
@@ -248,9 +249,10 @@
Loading
248 249
            # queued.max.messages.kbytes so for now we infer the implied
249 250
            # maximum (which, with default settings, is ~2GB per partition):
250 251
            "queued.min.messages": self._queued_max_messages,
251 -
            "queued.max.messages.kbytes": str(
252 -
                self._queued_max_messages
253 -
                * self._fetch_message_max_bytes // 1024),
252 +
            "queued.max.messages.kbytes": min(
253 +
                2097151,
254 +
                self._queued_max_messages * self._fetch_message_max_bytes // 1024
255 +
            ),
254 256
255 257
            "fetch.wait.max.ms": self._fetch_wait_max_ms,
256 258
            "fetch.message.max.bytes": self._fetch_message_max_bytes,
@@ -285,6 +287,10 @@
Loading
285 287
            ##"offset.store.sync.interval.ms"
286 288
            ##"offset.store.method"
287 289
            }
290 +
291 +
        # append configurations necessary for sasl authentication
292 +
        conf.update(self._sasl_conf)
293 +
288 294
        # librdkafka expects all config values as strings:
289 295
        conf = [(key, str(conf[key])) for key in conf]
290 296
        topic_conf = [(key, str(topic_conf[key])) for key in topic_conf]

@@ -0,0 +1,343 @@
Loading
1 +
import base64
2 +
import hashlib
3 +
import hmac
4 +
import logging
5 +
import struct
6 +
from uuid import uuid4
7 +
8 +
import six
9 +
10 +
from .exceptions import AuthenticationException, ERROR_CODES, UnsupportedSaslMechanism
11 +
from .protocol import (
12 +
    SaslHandshakeRequest,
13 +
    SaslHandshakeResponse,
14 +
    ApiVersionsRequest,
15 +
    ApiVersionsResponse,
16 +
    SaslAuthenticateRequest,
17 +
    SaslAuthenticateResponse,
18 +
)
19 +
20 +
log = logging.getLogger(__name__)
21 +
22 +
23 +
if six.PY2:
24 +
    def xor_bytes(left, right):
25 +
        return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right))
26 +
else:
27 +
    def xor_bytes(left, right):
28 +
        return bytes(lb ^ rb for lb, rb in zip(left, right))
29 +
30 +
31 +
class BytesWrapper:
32 +
    """
33 +
    Class that implements :meth:`get_bytes` and wraps some payload so it can be used for
34 +
    :meth:`connection.BrokerConnection.request` during legacy sasl authentication sequence.
35 +
    """
36 +
37 +
    def __init__(self, payload):
38 +
        """
39 +
        Create a new FakeRequest.
40 +
41 +
        :param payload: The payload to wrap
42 +
        :type payload: bytes
43 +
        """
44 +
        self.payload = payload
45 +
46 +
    def get_bytes(self):
47 +
        return struct.pack("!i", len(self.payload)) + self.payload
48 +
49 +
50 +
class BaseAuthenticator:
51 +
    """
52 +
    Base class for authentication mechanisms.
53 +
    Subclasses are supposed to implement:
54 +
     1. :meth:`BaseAuthenticator.get_rd_kafka_opts` which should return a dictionary
55 +
        whose items will be appended to the config given to librdkafka consumers and producers.
56 +
     2. :meth:`BaseAuthenticator.exchange_tokens` which is supposed to use
57 +
        :meth:`BaseAuthenticator.send_and_receive` to send and receive the byte strings necessary to authenticate
58 +
        with the broker.
59 +
    """
60 +
61 +
    MAX_AUTH_VERSION = 1
62 +
    MAX_HANDSHAKE_VERSION = 1
63 +
64 +
    def __init__(self, mechanism, security_protocol=None):
65 +
        """
66 +
        Base class for SASL authentication mechanisms.
67 +
68 +
        :param mechanism: The mechanism this authenticator is supposed to use.
69 +
        :type mechanism: str
70 +
        :param security_protocol: The security protocol determining the broker endpoint this
71 +
                                  authenticator is supposed to authenticate with.
72 +
                                  Only used for rdkafka based consumers and producers.
73 +
        """
74 +
75 +
        self.mechanism = mechanism
76 +
        self.handshake_version = None
77 +
        self.auth_version = None
78 +
        self.security_protocol = security_protocol
79 +
        self._broker_connection = None
80 +
81 +
    def get_rd_kafka_opts(self):
82 +
        """
83 +
        Creates the config entries necessary for librdkafka to successfully authenticate with the broker.
84 +
85 +
        :return: Dictionary to enrich config for librdkafka based consumers and producers.
86 +
        """
87 +
        raise NotImplementedError()
88 +
89 +
    def authenticate(self, broker_connection):
90 +
        """
91 +
        Runs the authentication sequence on the given broker connection.
92 +
93 +
        .. warning::
94 +
           This is not thread safe!
95 +
96 +
        :param broker_connection: The broker connection to authenticate with.
97 +
        :type broker_connection: :class:`pykafka.connection.BrokerConnection`
98 +
        """
99 +
        self._broker_connection = broker_connection
100 +
        if self.handshake_version is None:
101 +
            self._fetch_api_versions()
102 +
        log.debug(
103 +
            "Authenticating to {}:{} using mechanism {}.".format(
104 +
                self._broker_connection.host, self._broker_connection.port, self.mechanism
105 +
            )
106 +
        )
107 +
        self._initialize_authentication()
108 +
        self.exchange_tokens()
109 +
        log.debug("Authentication successful.")
110 +
111 +
    def _initialize_authentication(self):
112 +
        """
113 +
        Initializes the authentication sequence.
114 +
        """
115 +
        self._broker_connection.request(SaslHandshakeRequest.get_versions()[self.handshake_version](self.mechanism))
116 +
        response = SaslHandshakeResponse.get_versions()[self.handshake_version](self._broker_connection.response())
117 +
        if response.error_code != 0:
118 +
            if response.error_code == UnsupportedSaslMechanism.ERROR_CODE:
119 +
                msg = "Broker only supports sasl mechanisms {}, requested was {}"
120 +
                raise UnsupportedSaslMechanism(msg.format(",".join(response.mechanisms), self.mechanism))
121 +
            raise ERROR_CODES[response.error_code]("Authentication Handshake failed")
122 +
123 +
    def exchange_tokens(self):
124 +
        """
125 +
        Runs the authentication sequence. Implementation varies among SASL mechanism and has to be supplied by
126 +
        subclasses. See also :meth:`PlainAuthenticator.exchange_tokens` or :meth:`ScramAuthenticator.exchange_tokens`
127 +
        for exemplary implementations.
128 +
        """
129 +
        raise NotImplementedError()
130 +
131 +
    def send_and_receive(self, token):
132 +
        """
133 +
        Sends the given token to the broker and receives the brokers response.
134 +
        This will automatically use the appropriate mechanism to do so.
135 +
        I.e. use SaslAuthenticateRequest if the server supports it or just send the bytes directly if it doesn't.
136 +
137 +
        :param token: The token to be sent to the broker.
138 +
        :type token: bytes
139 +
        :return: bytes, the servers response
140 +
        """
141 +
        self._send_token(token)
142 +
        return self._receive_token()
143 +
144 +
    def _send_token(self, token):
145 +
        log.debug("Seding auth token")
146 +
        if self.handshake_version == 0:
147 +
            req = BytesWrapper(token)
148 +
        else:
149 +
            req = SaslAuthenticateRequest.get_versions()[self.auth_version](token)
150 +
        self._broker_connection.request(req)
151 +
152 +
    def _receive_token(self):
153 +
        log.debug("Receiving auth token")
154 +
        if self.handshake_version == 0:
155 +
            return self._broker_connection.response_raw()
156 +
157 +
        data = self._broker_connection.response()
158 +
        response = SaslAuthenticateResponse.get_versions()[self.auth_version](data)
159 +
        if response.error_code != 0:
160 +
            raise ERROR_CODES[response.error_code](response.error_message)
161 +
        return response.auth_bytes
162 +
163 +
    def _fetch_api_versions(self):
164 +
        """
165 +
        The api version request can be run without authentication in order to determine which authentication api
166 +
        versions to use. That's what this method does.
167 +
        """
168 +
        log.debug("Fetch SASL authentication api versions.")
169 +
        self._broker_connection.request(ApiVersionsRequest())
170 +
        response = ApiVersionsResponse(self._broker_connection.response())
171 +
172 +
        self.handshake_version = response.api_versions[SaslHandshakeRequest.API_KEY].max
173 +
        self.auth_version = response.api_versions.get(SaslAuthenticateRequest.API_KEY, None)
174 +
175 +
        self.handshake_version = min(self.MAX_HANDSHAKE_VERSION, self.handshake_version)
176 +
        if self.auth_version is not None:
177 +
            self.auth_version = min(self.auth_version.max, self.MAX_AUTH_VERSION)
178 +
        log.debug(
179 +
            "Determinded handshake api version {} and authenticate api version {}".format(
180 +
                self.handshake_version, self.auth_version
181 +
            )
182 +
        )
183 +
184 +
185 +
class ScramAuthenticator(BaseAuthenticator):
186 +
    """
187 +
    Authenticates with Kafka using the salted challenge response authentication mechanism.
188 +
    """
189 +
190 +
    MECHANISMS = {"SCRAM-SHA-256": ("sha256", hashlib.sha256), "SCRAM-SHA-512": ("sha512", hashlib.sha512)}
191 +
192 +
    def __init__(self, mechanism, user, password, security_protocol=None):
193 +
        """
194 +
        Create new ScramAuthenticator
195 +
196 +
        :param mechanism: The mechanism this authenticator is supposed to use.
197 +
        :type mechanism: str, one of 'SCRAM-SHA-256' or 'SCRAM-SHA-512'
198 +
        :param user: The user to authenticate as.
199 +
        :type user: str
200 +
        :param password: The user's password.
201 +
        :type password: str
202 +
        :param security_protocol: The security protocol determining the broker endpoint this
203 +
                                  authenticator is supposed to authenticate with.
204 +
                                  Only used for rdkafka based consumers and producers.
205 +
        """
206 +
        super(ScramAuthenticator, self).__init__(mechanism, security_protocol)
207 +
        self.nonce = None
208 +
        self.auth_message = None
209 +
        self.salted_password = None
210 +
        self.user = user
211 +
        self.password = password.encode()
212 +
        self.hashname, self.hashfunc = self.MECHANISMS[mechanism]
213 +
        self.mechanism = mechanism
214 +
        self.stored_key = None
215 +
        self.client_key = None
216 +
        self.client_signature = None
217 +
        self.client_proof = None
218 +
        self.server_key = None
219 +
        self.server_signature = None
220 +
221 +
    def client_first_message(self):
222 +
        """
223 +
        Create and return the client first message. This will also reset all internal variables.
224 +
        :return: str, the client first message
225 +
        """
226 +
        self.nonce = str(uuid4()).replace("-", "")
227 +
        client_first_bare = "n={},r={}".format(self.user, self.nonce)
228 +
        self.auth_message = client_first_bare
229 +
        return "n,," + client_first_bare
230 +
231 +
    def process_server_first_message(self, server_first_message):
232 +
        """
233 +
        Parse and process server first message, this will extract all necessary information from the server's first
234 +
        response such as iteration count or salt and use it to prepare the client final message.
235 +
236 +
        :param server_first_message: The first message sent by the server
237 +
        :type server_first_message: str
238 +
        """
239 +
        self.auth_message += "," + server_first_message
240 +
        params = dict(pair.split("=", 1) for pair in server_first_message.split(","))
241 +
        server_nonce = params["r"]
242 +
        if not server_nonce.startswith(self.nonce):
243 +
            raise AuthenticationException("Server nonce, did not start with client nonce!")
244 +
        self.nonce = server_nonce
245 +
        self.auth_message += ",c=biws,r=" + self.nonce
246 +
247 +
        salt = base64.b64decode(params["s"].encode())
248 +
        iterations = int(params["i"])
249 +
        self._create_salted_password(salt, iterations)
250 +
251 +
        self.client_key = self._hmac(self.salted_password, b"Client Key")
252 +
        self.stored_key = self.hashfunc(self.client_key).digest()
253 +
        self.client_signature = self._hmac(self.stored_key, self.auth_message.encode())
254 +
        self.client_proof = xor_bytes(self.client_key, self.client_signature)
255 +
        self.server_key = self._hmac(self.salted_password, b"Server Key")
256 +
        self.server_signature = self._hmac(self.server_key, self.auth_message.encode())
257 +
258 +
    def _hmac(self, key, msg):
259 +
        """
260 +
        Run the hmac algorithm on `key` and `msg` using the appropriate digest method for the configures scram
261 +
        mechanism.
262 +
        :param key: The key for the hmac algorithm
263 +
        :type key: bytes
264 +
        :param msg: The message for the hmac algorithm
265 +
        :type msg: bytes
266 +
        :return: bytes, the result of applying hmac on `key` and `msg`
267 +
        """
268 +
        return hmac.new(key, msg, digestmod=self.hashfunc).digest()
269 +
270 +
    def _create_salted_password(self, salt, iterations):
271 +
        self.salted_password = hashlib.pbkdf2_hmac(self.hashname, self.password, salt, iterations)
272 +
273 +
    def client_final_message(self):
274 +
        """
275 +
        Create and return the client final message.
276 +
        :return: str, the client final message
277 +
        """
278 +
        return "c=biws,r={},p={}".format(self.nonce, base64.b64encode(self.client_proof).decode())
279 +
280 +
    def process_server_final_message(self, server_final_message):
281 +
        """
282 +
        Parse and process server final message. This will run validation on the server's response to make sure that
283 +
        everything is all right.
284 +
285 +
        :param server_final_message: The first message sent by the server
286 +
        :type server_final_message: str
287 +
        """
288 +
        params = dict(pair.split("=", 1) for pair in server_final_message.split(","))
289 +
        if self.server_signature != base64.b64decode(params["v"].encode()):
290 +
            raise AuthenticationException("Server sent wrong signature!")
291 +
292 +
    def get_rd_kafka_opts(self):
293 +
        return {
294 +
            "sasl.mechanisms": self.mechanism,
295 +
            "sasl.username": self.user,
296 +
            "sasl.password": self.password.decode(),
297 +
            "security.protocol": self.security_protocol,
298 +
        }
299 +
300 +
    def exchange_tokens(self):
301 +
        client_first = self.client_first_message()
302 +
        server_first = self.send_and_receive(client_first.encode()).decode()
303 +
        self.process_server_first_message(server_first)
304 +
305 +
        client_final = self.client_final_message()
306 +
        server_final = self.send_and_receive(client_final.encode()).decode()
307 +
        self.process_server_final_message(server_final)
308 +
309 +
310 +
class PlainAuthenticator(BaseAuthenticator):
311 +
    """
312 +
    Authenticates with kafka using the Plain mechanism. I.e. sending user and password in plaintext.
313 +
    """
314 +
315 +
    def __init__(self, user, password, security_protocol=None):
316 +
        """
317 +
        Create new PlainAuthenticator.
318 +
319 +
        :param user: The user to authenticate as.
320 +
        :type user: str
321 +
        :param password: The user's password.
322 +
        :type password: str
323 +
        :param security_protocol: The security protocol determining the broker endpoint this
324 +
                                  authenticator is supposed to authenticate with.
325 +
                                  Only used for rdkafka based consumers and producers.
326 +
        """
327 +
        super(PlainAuthenticator, self).__init__("PLAIN", security_protocol)
328 +
        self.user = user
329 +
        self.password = password
330 +
331 +
    def get_rd_kafka_opts(self):
332 +
        return {
333 +
            "sasl.mechanisms": self.mechanism,
334 +
            "sasl.username": self.user,
335 +
            "sasl.password": self.password,
336 +
            "security.protocol": self.security_protocol,
337 +
        }
338 +
339 +
    def exchange_tokens(self):
340 +
        token = "\0".join([self.user, self.user, self.password]).encode()
341 +
        response = self.send_and_receive(token)
342 +
        if response != b"":
343 +
            raise AuthenticationException("Server sent unexpected response!")

@@ -117,7 +117,8 @@
Loading
117 117
                 buffer_size=1024 * 1024,
118 118
                 source_host='',
119 119
                 source_port=0,
120 -
                 ssl_config=None):
120 +
                 ssl_config=None,
121 +
                 sasl_authenticator=None):
121 122
        """Initialize a socket connection to Kafka.
122 123
123 124
        :param host: The host to which to connect
@@ -139,6 +140,8 @@
Loading
139 140
        :type source_port: int
140 141
        :param ssl_config: Config object for SSL connection
141 142
        :type ssl_config: :class:`pykafka.connection.SslConfig`
143 +
        :param sasl_authenticator: Authenticator to use for authentication using sasl.
144 +
        :type sasl_authenticator: :class:`pykafka.sasl_authenticators.BaseAuthenticator`
142 145
        """
143 146
        self._buff = bytearray(buffer_size)
144 147
        self.host = host
@@ -149,6 +152,8 @@
Loading
149 152
        self.source_port = source_port
150 153
        self._wrap_socket = (
151 154
            ssl_config.wrap_socket if ssl_config else lambda x: x)
155 +
        self._sasl_authenticator = sasl_authenticator
156 +
        self.authenticated = sasl_authenticator is None
152 157
153 158
    def __del__(self):
154 159
        """Close this connection when the object is deleted."""
@@ -161,6 +166,7 @@
Loading
161 166
162 167
    def connect(self, timeout, attempts=1):
163 168
        """Connect to the broker, retrying if specified."""
169 +
        self.authenticated = False
164 170
        log.debug("Connecting to %s:%s", self.host, self.port)
165 171
        for attempt in range(0, attempts):
166 172
            try:
@@ -172,6 +178,9 @@
Loading
172 178
                    )
173 179
                )
174 180
                log.debug("Successfully connected to %s:%s", self.host, self.port)
181 +
                if self._sasl_authenticator is not None:
182 +
                    self._sasl_authenticator.authenticate(self)
183 +
                    self.authenticated = True
175 184
                return
176 185
            except (self._handler.SockErr, self._handler.GaiError) as err:
177 186
                log.info("Attempt %s: failed to connect to %s:%s", attempt, self.host, self.port)
@@ -193,6 +202,7 @@
Loading
193 202
            pass
194 203
        finally:
195 204
            self._socket = None
205 +
            self.authenticated = False
196 206
197 207
    def reconnect(self):
198 208
        """Disconnect from the broker, then reconnect"""
@@ -203,6 +213,7 @@
Loading
203 213
        """Send a request over the socket connection"""
204 214
        bytes_ = request.get_bytes()
205 215
        if not self._socket:
216 +
            self.authenticated = False
206 217
            raise SocketDisconnectedError("<broker {}:{}>".format(self.host, self.port))
207 218
        try:
208 219
            self._socket.sendall(bytes_)
@@ -211,7 +222,7 @@
Loading
211 222
            self.disconnect()
212 223
            raise SocketDisconnectedError("<broker {}:{}>".format(self.host, self.port))
213 224
214 -
    def response(self):
225 +
    def response_raw(self):
215 226
        """Wait for a response from the broker"""
216 227
        size = bytes()
217 228
        expected_len = 4  # Size => int32
@@ -231,5 +242,9 @@
Loading
231 242
        except SocketDisconnectedError:
232 243
            self.disconnect()
233 244
            raise SocketDisconnectedError("<broker {}:{}>".format(self.host, self.port))
245 +
        return self._buff[:size]
246 +
247 +
    def response(self):
234 248
        # Drop CorrelationId => int32
235 -
        return buffer(self._buff[4:4 + size])
249 +
        return buffer(self.response_raw()[4:])
250 +

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Click to load this diff.
Loading diff...

Learn more Showing 18 files with coverage changes found.

Changes in pykafka/utils/compat.py
-37
-3
+40
Loading file...
Changes in pykafka/utils/socket.py
-1
+1
Loading file...
Changes in pykafka/topic.py
-5
+1
+4
Loading file...
Changes in pykafka/utils/compression.py
-6
+4
+2
Loading file...
Changes in pykafka/rdkafka/producer.py
-2
+1
+1
Loading file...
Changes in pykafka/protocol/fetch.py
-2
+1
+1
Loading file...
Changes in pykafka/rdkafka/simple_consumer.py
-2
+1
+1
Loading file...
Changes in pykafka/broker.py
-3
+3
Loading file...
Changes in pykafka/balancedconsumer.py
-5
+1
+4
Loading file...
Changes in pykafka/managedbalancedconsumer.py
-1
+1
Loading file...
Changes in pykafka/handlers.py
-1
+1
Loading file...
Changes in pykafka/protocol/message.py
New
Loading file...
New file pykafka/rdkafka/_rd_kafkamodule.c
New
Loading file...
New file pykafka/protocol/sasl.py
New
Loading file...
New file pykafka/sasl_authenticators.py
New
Loading file...
Changes in pykafka/simpleconsumer.py
-1
-2
+3
Loading file...
Changes in pykafka/cluster.py
-5
+2
+3
Loading file...
Changes in pykafka/connection.py
-2
+2
Loading file...

42 Commits

Hiding 40 contexual commits
+3 Files
+698
+454
+104
+140
Files Coverage
pykafka -2.87% 80.66%
Project Totals (39 files) 80.66%
Loading