TRAVIS_PYTHON_VERSION=2.7 TRAVIS_OS_NAME=linux <<<<<< ENV .coveragerc .gitignore .travis.yml CHANGES.rst CODE_OF_CONDUCT.md CONTRIBUTING.rst ISSUE_TEMPLATE.rst LICENSE MANIFEST.in Makefile README.rst add-license.sh benchmark/simple_consumer_bench.py benchmark/simple_consumer_bench_analyse.py doc/.gitignore doc/Makefile doc/api/balancedconsumer.rst doc/api/broker.rst doc/api/client.rst doc/api/cluster.rst doc/api/common.rst doc/api/connection.rst doc/api/exceptions.rst doc/api/handlers.rst doc/api/managedbalancedconsumer.rst doc/api/membershipprotocol.rst doc/api/partition.rst doc/api/partitioners.rst doc/api/producer.rst doc/api/protocol.rst doc/api/sasl_authenticators.rst doc/api/simpleconsumer.rst doc/api/topic.rst doc/conf.py doc/index.rst doc/introduction.rst doc/usage.rst doc/utils/compression.rst doc/utils/error_handlers.rst doc/utils/socket.rst doc/utils/struct_helpers.rst publish.sh pykafka/__init__.py pykafka/balancedconsumer.py pykafka/broker.py pykafka/cli/__init__.py pykafka/cli/__main__.py pykafka/cli/kafka_tools.py pykafka/client.py pykafka/cluster.py pykafka/common.py pykafka/connection.py pykafka/exceptions.py pykafka/handlers.py pykafka/managedbalancedconsumer.py pykafka/membershipprotocol.py pykafka/partition.py pykafka/partitioners.py pykafka/producer.py pykafka/protocol/__init__.py pykafka/protocol/admin.py pykafka/protocol/base.py pykafka/protocol/fetch.py pykafka/protocol/group_membership.py pykafka/protocol/message.py pykafka/protocol/metadata.py pykafka/protocol/offset.py pykafka/protocol/offset_commit.py pykafka/protocol/produce.py pykafka/protocol/sasl.py pykafka/rdkafka/__init__.py pykafka/rdkafka/_rd_kafkamodule.c pykafka/rdkafka/helpers.py pykafka/rdkafka/producer.py pykafka/rdkafka/simple_consumer.py pykafka/sasl_authenticators.py pykafka/simpleconsumer.py pykafka/test/__init__.py pykafka/test/kafka_instance.py pykafka/test/utils.py pykafka/topic.py pykafka/utils/__init__.py pykafka/utils/compat.py pykafka/utils/compression.py pykafka/utils/error_handlers.py pykafka/utils/socket.py pykafka/utils/struct_helpers.py setup.cfg setup.py test-requirements.txt tests/__init__.py tests/conftest.py tests/debug-logging.conf tests/pykafka_tests/__init__.py tests/pykafka_tests/rdkafka/__init__.py tests/pykafka_tests/rdkafka/test_rd_kafka_consumer.py tests/pykafka_tests/rdkafka/test_sasl.py tests/pykafka_tests/rdkafka/test_simple_consumer.py tests/pykafka_tests/rdkafka/test_ssl.py tests/pykafka_tests/test_balancedconsumer.py tests/pykafka_tests/test_cluster.py tests/pykafka_tests/test_connection.py tests/pykafka_tests/test_partition.py tests/pykafka_tests/test_partitioners.py tests/pykafka_tests/test_producer.py tests/pykafka_tests/test_protocol.py tests/pykafka_tests/test_sasl.py tests/pykafka_tests/test_simpleconsumer.py tests/pykafka_tests/test_ssl.py tests/pykafka_tests/utils/__init__.py tests/pykafka_tests/utils/test_compression.py tests/pykafka_tests/utils/test_struct_helpers.py tox.ini <<<<<< network # path=/home/travis/build/Parsely/pykafka/coverage.xml /home/travis/build/Parsely/pykafka/pykafka <<<<<< EOF # path=/home/travis/build/Parsely/pykafka/pykafka#rdkafka#_rd_kafkamodule.c.gcov -: 0:Source:pykafka/rdkafka/_rd_kafkamodule.c -: 0:Graph:./build/temp.linux-x86_64-2.7/pykafka/rdkafka/_rd_kafkamodule.gcno -: 0:Data:./build/temp.linux-x86_64-2.7/pykafka/rdkafka/_rd_kafkamodule.gcda -: 0:Runs:1 -: 0:Programs:1 -: 1:#define PY_SSIZE_T_CLEAN -: 2: -: 3:#include -: 4:#include -: 5:#include -: 6: -: 7:#include -: 8:#include -: 9:#include -: 10: -: 11:#include -: 12: -: 13: -: 14:static PyObject *pykafka_exceptions; /* ~ import pykafka.exceptions */ -: 15:static PyObject *Message; /* ~ from pykafka.protocol import Message */ -: 16: -: 17:static PyObject *logger; /* ~ logging.getLogger */ -: 18: -: 19: -: 20:/** -: 21: * Logging -: 22: */ -: 23: -: 24: -: 25:/* If used as librdkafka log_cb, passes log messages into python logging */ -: 26:static void function logging_callback called 3 returned 100% blocks executed 67% 3: 27:logging_callback(const rd_kafka_t *rk, -: 28: int level, -: 29: const char *fac, -: 30: const char *buf) -: 31:{ -: 32: /* Map syslog levels to python logging levels */ 3: 33: char *lvl = NULL; 3: 34: if (level == LOG_DEBUG) lvl = "debug"; branch 0 taken 3 (fallthrough) branch 1 taken 0 3: 35: else if (level == LOG_INFO || level == LOG_NOTICE) lvl = "info"; branch 0 taken 0 (fallthrough) branch 1 taken 3 #####: 36: else if (level == LOG_WARNING) lvl = "warning"; branch 0 never executed branch 1 never executed #####: 37: else if (level == LOG_ERR) lvl = "error"; branch 0 never executed branch 1 never executed #####: 38: else lvl = "critical"; -: 39: -: 40: /* Grab the GIL, as rdkafka callbacks may come from non-python threads */ 3: 41: PyGILState_STATE gstate = PyGILState_Ensure(); call 0 returned 3 -: 42: -: 43: /* NB librdkafka docs say that rk may be NULL, so check that */ -: 44: /* NB2 because we hold the GIL we don't need the handle's rwlock */ 3: 45: const char *rk_name = rk ? rd_kafka_name(rk) : "rk_handle null"; branch 0 taken 3 (fallthrough) branch 1 taken 0 call 2 returned 3 3: 46: const char *format = "%s [%s] %s"; /* format rk_name + fac + buf */ -: 47: 3: 48: PyObject *res = PyObject_CallMethod( call 0 returned 3 -: 49: logger, lvl, "ssss", format, rk_name, fac, buf); -: 50: /* Any errors here we'll just have to swallow: we're probably on some -: 51: background thread, and we can't log either (logging just failed!) */ 3: 52: if (! res) PyErr_Clear(); branch 0 taken 0 (fallthrough) branch 1 taken 3 call 2 never executed 3: 53: else Py_DECREF(res); branch 0 taken 0 (fallthrough) branch 1 taken 3 call 2 never executed -: 54: 3: 55: PyGILState_Release(gstate); call 0 returned 3 3: 56:} -: 57: -: 58: -: 59:/** -: 60: * Exception helpers -: 61: */ -: 62: -: 63: -: 64:/* Raise an exception from pykafka.exceptions (always returns NULL, to allow -: 65: * shorthand `return set_pykafka_error("Exception", "error message")`) */ -: 66:static PyObject * function set_pykafka_error called 15 returned 100% blocks executed 83% 15: 67:set_pykafka_error(const char *err_name, const char *err_msg) -: 68:{ 15: 69: PyObject *error = PyObject_GetAttrString(pykafka_exceptions, err_name); call 0 returned 15 15: 70: if (! error) return NULL; branch 0 taken 15 (fallthrough) branch 1 taken 0 15: 71: PyErr_SetString(error, err_msg); call 0 returned 15 15: 72: Py_DECREF(error); branch 0 taken 0 (fallthrough) branch 1 taken 15 call 2 never executed -: 73: return NULL; -: 74:} -: 75: -: 76: -: 77:/* Given an error code, return most suitable class from pykafka.exceptions */ -: 78:static PyObject * function find_pykafka_error called 1 returned 100% blocks executed 80% 1: 79:find_pykafka_error(rd_kafka_resp_err_t err) -: 80:{ 1: 81: PyObject *error_codes = NULL; 1: 82: PyObject *errcode = NULL; 1: 83: PyObject *Exc = NULL; -: 84: -: 85: /* See if there's a standard Kafka error for this */ 1: 86: error_codes = PyObject_GetAttrString(pykafka_exceptions, "ERROR_CODES"); call 0 returned 1 1: 87: if (! error_codes) goto cleanup; branch 0 taken 1 (fallthrough) branch 1 taken 0 1: 88: errcode = PyLong_FromLong(err); call 0 returned 1 1: 89: if (! errcode) goto cleanup; branch 0 taken 1 (fallthrough) branch 1 taken 0 1: 90: Exc = PyObject_GetItem(error_codes, errcode); call 0 returned 1 -: 91: 1: 92: if (! Exc) { /* raise a generic exception instead */ branch 0 taken 0 (fallthrough) branch 1 taken 1 #####: 93: PyErr_Clear(); call 0 never executed #####: 94: Exc = PyObject_GetAttrString(pykafka_exceptions, "RdKafkaException"); call 0 never executed -: 95: } -: 96:cleanup: 1: 97: Py_XDECREF(error_codes); branch 0 taken 1 (fallthrough) branch 1 taken 0 branch 2 taken 0 (fallthrough) branch 3 taken 1 call 4 never executed 1: 98: Py_XDECREF(errcode); branch 0 taken 1 (fallthrough) branch 1 taken 0 branch 2 taken 1 (fallthrough) branch 3 taken 0 call 4 returned 1 1: 99: return Exc; -: 100:} -: 101: -: 102: -: 103:/* Given an error code, set a suitable exception; or, if return_error is not -: 104: * NULL, pass the exception instance back through return_error instead */ -: 105:static void function set_pykafka_error_from_code called 1 returned 100% blocks executed 81% 1: 106:set_pykafka_error_from_code(rd_kafka_resp_err_t err, PyObject **return_error) -: 107:{ 1: 108: PyObject *error = NULL; 1: 109: PyObject *err_args = NULL; -: 110: 1: 111: error = find_pykafka_error(err); call 0 returned 1 1: 112: if (! error) goto cleanup; branch 0 taken 1 (fallthrough) branch 1 taken 0 1: 113: err_args = Py_BuildValue("ls", (long)err, rd_kafka_err2str(err)); call 0 returned 1 call 1 returned 1 1: 114: if (! err_args) goto cleanup; branch 0 taken 1 (fallthrough) branch 1 taken 0 -: 115: 1: 116: if (! return_error) PyErr_SetObject(error, err_args); branch 0 taken 0 (fallthrough) branch 1 taken 1 call 2 never executed 1: 117: else (*return_error) = PyObject_CallObject(error, err_args); call 0 returned 1 -: 118:cleanup: 1: 119: Py_XDECREF(error); branch 0 taken 1 (fallthrough) branch 1 taken 0 branch 2 taken 0 (fallthrough) branch 3 taken 1 call 4 never executed 1: 120: Py_XDECREF(err_args); branch 0 taken 1 (fallthrough) branch 1 taken 0 branch 2 taken 0 (fallthrough) branch 3 taken 1 call 4 never executed 1: 121:} -: 122: -: 123: -: 124:/** -: 125: * Shared bits of Producer and Consumer types -: 126: */ -: 127: -: 128: -: 129:/* Note that with this RdkHandle, we hold a separate rd_kafka_t handle for each -: 130: * rd_kafka_topic_t, whereas librdkafka would allow sharing the same rd_kafka_t -: 131: * handle between many topic handles, which would be far more efficient. The -: 132: * problem with that is that it would require the same rd_kafka_conf_t settings -: 133: * across all class instances sharing a handle, which is somewhat incompatible -: 134: * with the current pykafka API. -: 135: * -: 136: * We need a pthread rwlock here, because in many methods we release the GIL -: 137: * (this for various reasons - one key reason is that it prevents us -: 138: * deadlocking if we do blocking calls into librdkafka and then get callbacks -: 139: * out of librdkafka, and the callbacks would try to grab the GIL). Once we -: 140: * release the GIL however, there may be other threads calling RdkHandle_stop -: 141: * (which in pykafka can happen on any thread). The rule here then is that -: 142: * RdkHandle_stop needs to take out an exclusive lock (wrlock), whereas most -: 143: * other calls are safe when taking out a shared lock (rdlock). */ -: 144:typedef struct { -: 145: PyObject_HEAD -: 146: pthread_rwlock_t rwlock; -: 147: rd_kafka_t *rdk_handle; -: 148: rd_kafka_conf_t *rdk_conf; -: 149: rd_kafka_topic_t *rdk_topic_handle; -: 150: rd_kafka_topic_conf_t *rdk_topic_conf; -: 151: -: 152: /* Consumer-specific fields */ -: 153: rd_kafka_queue_t *rdk_queue_handle; -: 154: PyObject *partition_ids; -: 155:} RdkHandle; -: 156: -: 157: -: 158:/* Only for inspection; if you'd manipulate these from outside the module, we -: 159: * have no error checking in place to protect from ensuing mayhem */ -: 160:static PyMemberDef RdkHandle_members[] = { -: 161: {"_partition_ids", T_OBJECT_EX, offsetof(RdkHandle, partition_ids), -: 162: READONLY, "Partitions fetched from by this consumer"}, -: 163: {NULL} -: 164:}; -: 165: -: 166: -: 167:static PyObject * function RdkHandle_new called 72 returned 100% blocks executed 63% 72: 168:RdkHandle_new(PyTypeObject *type, PyObject *args, PyObject *kwds) -: 169:{ 72: 170: PyObject *self = PyType_GenericNew(type, args, kwds); call 0 returned 72 72: 171: if (self) { branch 0 taken 72 (fallthrough) branch 1 taken 0 72: 172: int res = pthread_rwlock_init(&((RdkHandle *)self)->rwlock, NULL); call 0 returned 72 72: 173: if (res) { branch 0 taken 0 (fallthrough) branch 1 taken 72 #####: 174: Py_DECREF(self); branch 0 never executed branch 1 never executed call 2 never executed #####: 175: return set_pykafka_error("RdKafkaException", "Failed rwlock init"); call 0 never executed -: 176: } -: 177: } -: 178: return self; -: 179:} -: 180: -: 181: -: 182:/* Release RdkHandle.rwlock (see RdkHandle docstring) */ -: 183:static int function RdkHandle_unlock called 7350 returned 100% blocks executed 60% 7350: 184:RdkHandle_unlock(RdkHandle *self) -: 185:{ 7350: 186: if (pthread_rwlock_unlock(&self->rwlock)) { call 0 returned 7350 branch 1 taken 0 (fallthrough) branch 2 taken 7350 #####: 187: set_pykafka_error("RdKafkaException", "Failed to release rwlock"); call 0 never executed #####: 188: return -1; -: 189: } -: 190: return 0; -: 191:} -: 192: -: 193: -: 194:/* Get shared lock and optionally check handle is running. Returns non-zero -: 195: * if error has been set. -: 196: * -: 197: * Should be used by any method that accesses an RdkHandle, to prevent the -: 198: * handle being concurrently destroyed on another thread (by calling -: 199: * RdkHandle_stop). See also RdkHandle docstring. */ -: 200:static int function RdkHandle_safe_lock called 7046 returned 100% blocks executed 100% 7046: 201:RdkHandle_safe_lock(RdkHandle *self, int check_running) -: 202:{ -: 203: int res; 7046: 204: Py_BEGIN_ALLOW_THREADS call 0 returned 7046 7046: 205: res = pthread_rwlock_rdlock(&self->rwlock); call 0 returned 7046 7046: 206: Py_END_ALLOW_THREADS call 0 returned 7046 7046: 207: if (res) { branch 0 taken 1 (fallthrough) branch 1 taken 7045 1: 208: set_pykafka_error("RdKafkaException", "Failed to get shared lock"); call 0 returned 1 1: 209: return -1; -: 210: } 7045: 211: if (check_running && !self->rdk_handle) { branch 0 taken 6827 (fallthrough) branch 1 taken 218 branch 2 taken 10 (fallthrough) branch 3 taken 6817 10: 212: set_pykafka_error("RdKafkaStoppedException", ""); call 0 returned 10 10: 213: RdkHandle_unlock(self); call 0 returned 10 10: 214: return -1; -: 215: } -: 216: return 0; -: 217:} -: 218: -: 219: -: 220:/* Get exclusive lock on handle. Returns non-zero if error has been set. -: 221: * -: 222: * Should be used by any method that might render accessing RdkHandle members -: 223: * unsafe (in particular, RdkHandle_stop). See also RdkHandle docstring. */ -: 224:static int function RdkHandle_excl_lock called 315 returned 100% blocks executed 71% 315: 225:RdkHandle_excl_lock(RdkHandle *self) -: 226:{ -: 227: int res; 315: 228: Py_BEGIN_ALLOW_THREADS call 0 returned 315 315: 229: res = pthread_rwlock_wrlock(&self->rwlock); call 0 returned 315 315: 230: Py_END_ALLOW_THREADS call 0 returned 315 315: 231: if (res) { branch 0 taken 0 (fallthrough) branch 1 taken 315 #####: 232: set_pykafka_error("RdKafkaException", "Failed to get exclusive lock"); call 0 never executed #####: 233: return -1; -: 234: } -: 235: return 0; -: 236:} -: 237: -: 238: -: 239:PyDoc_STRVAR(RdkHandle_outq_len__doc__, -: 240: "outq_len(self) -> int\n" -: 241: "\n" -: 242: "Number of messages pending shipping to a broker."); -: 243:static PyObject * function RdkHandle_outq_len called 26 returned 100% blocks executed 100% 26: 244:RdkHandle_outq_len(RdkHandle *self) -: 245:{ 26: 246: if (RdkHandle_safe_lock(self, /* check_running= */ 1)) return NULL; call 0 returned 26 branch 1 taken 26 (fallthrough) branch 2 taken 0 -: 247: 26: 248: int outq_len = -1; 26: 249: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 26 26: 250: outq_len = rd_kafka_outq_len(self->rdk_handle); call 0 returned 26 26: 251: Py_END_ALLOW_THREADS call 0 returned 26 -: 252: 26: 253: if (RdkHandle_unlock(self)) return NULL; call 0 returned 26 branch 1 taken 26 (fallthrough) branch 2 taken 0 26: 254: return Py_BuildValue("i", outq_len); call 0 returned 26 -: 255:} -: 256: -: 257: -: 258:PyDoc_STRVAR(RdkHandle_poll__doc__, -: 259: "poll(self, timeout_ms) -> int\n" -: 260: "\n" -: 261: "Poll the handle for events (in particular delivery callbacks) and\n" -: 262: "return the total number of callbacks triggered."); -: 263:static PyObject * function RdkHandle_poll called 1460 returned 99% blocks executed 100% 1460: 264:RdkHandle_poll(RdkHandle *self, PyObject *args, PyObject *kwds) -: 265:{ 1460: 266: char *keywords[] = {"timeout_ms", NULL}; 1460: 267: int timeout_ms = 0; 1460: 268: if (! PyArg_ParseTupleAndKeywords(args, kwds, "i", keywords, &timeout_ms)) { call 0 returned 1460 branch 1 taken 1460 (fallthrough) branch 2 taken 0 -: 269: return NULL; -: 270: } -: 271: 1460: 272: if (RdkHandle_safe_lock(self, /* check_running= */ 1)) return NULL; call 0 returned 1460 branch 1 taken 1451 (fallthrough) branch 2 taken 9 -: 273: 1451: 274: int n_events = 0; 1451: 275: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 1451 1451: 276: n_events = rd_kafka_poll(self->rdk_handle, timeout_ms); call 0 returned 1444 1444: 277: Py_END_ALLOW_THREADS call 0 returned 1442 -: 278: 1442: 279: if (RdkHandle_unlock(self)) return NULL; call 0 returned 1442 branch 1 taken 1442 (fallthrough) branch 2 taken 0 1442: 280: return Py_BuildValue("i", n_events); call 0 returned 1442 -: 281:} -: 282: -: 283: -: 284:PyDoc_STRVAR(RdkHandle_stop__doc__, -: 285: "stop(self)\n" -: 286: "\n" -: 287: "Shut down the librdkafka handle and clean up. This may block until\n" -: 288: "other methods accessing the same handle concurrently have finished.\n"); -: 289:static PyObject * function RdkHandle_stop called 128 returned 100% blocks executed 83% 128: 290:RdkHandle_stop(RdkHandle *self) -: 291:{ -: 292: /* We'll only ever get a locking error if we programmed ourselves into a -: 293: * deadlock. We'd have to admit defeat, abort, and leak this RdkHandle */ 128: 294: if (RdkHandle_excl_lock(self)) return NULL; call 0 returned 128 branch 1 taken 128 (fallthrough) branch 2 taken 0 -: 295: 128: 296: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 128 128: 297: if (self->rdk_queue_handle) { branch 0 taken 38 (fallthrough) branch 1 taken 90 38: 298: rd_kafka_queue_destroy(self->rdk_queue_handle); call 0 returned 38 38: 299: self->rdk_queue_handle = NULL; -: 300: } 128: 301: if (self->rdk_topic_handle) { branch 0 taken 61 (fallthrough) branch 1 taken 67 61: 302: rd_kafka_topic_destroy(self->rdk_topic_handle); call 0 returned 61 61: 303: self->rdk_topic_handle = NULL; -: 304: } 128: 305: if (self->rdk_handle) { branch 0 taken 61 (fallthrough) branch 1 taken 67 61: 306: PyObject *opaque = (PyObject *)rd_kafka_opaque(self->rdk_handle); call 0 returned 61 61: 307: Py_XDECREF(opaque); branch 0 taken 23 (fallthrough) branch 1 taken 38 branch 2 taken 0 (fallthrough) branch 3 taken 23 call 4 never executed 61: 308: rd_kafka_destroy(self->rdk_handle); call 0 returned 61 61: 309: self->rdk_handle = NULL; -: 310: } 128: 311: if (self->rdk_conf) { branch 0 taken 0 (fallthrough) branch 1 taken 128 #####: 312: rd_kafka_conf_destroy(self->rdk_conf); call 0 never executed #####: 313: self->rdk_conf = NULL; -: 314: } 128: 315: if (self->rdk_topic_conf) { branch 0 taken 0 (fallthrough) branch 1 taken 128 #####: 316: rd_kafka_topic_conf_destroy(self->rdk_topic_conf); call 0 never executed #####: 317: self->rdk_topic_conf = NULL; -: 318: } 128: 319: Py_END_ALLOW_THREADS call 0 returned 128 -: 320: 128: 321: Py_CLEAR(self->partition_ids); branch 0 taken 38 (fallthrough) branch 1 taken 90 branch 2 taken 38 (fallthrough) branch 3 taken 0 call 4 returned 38 -: 322: 128: 323: if (RdkHandle_unlock(self)) return NULL; call 0 returned 128 branch 1 taken 128 (fallthrough) branch 2 taken 0 128: 324: Py_INCREF(Py_None); 128: 325: return Py_None; -: 326:} -: 327: -: 328: -: 329:static void function RdkHandle_dealloc called 63 returned 100% blocks executed 75% 63: 330:RdkHandle_dealloc(PyObject *self, PyObject *(*stop_func) (RdkHandle *)) -: 331:{ 63: 332: PyObject *stop_result = stop_func((RdkHandle *)self); call 0 returned 63 63: 333: if (!stop_result) { branch 0 taken 1 (fallthrough) branch 1 taken 62 -: 334: /* We'll swallow the exception, so let's try to log info first */ 1: 335: PyObject *res = PyObject_CallMethod( call 0 returned 1 -: 336: logger, "exception", "s", "In dealloc: stop() failed."); 1: 337: PyErr_Clear(); call 0 returned 1 1: 338: Py_XDECREF(res); branch 0 taken 0 (fallthrough) branch 1 taken 1 branch 2 never executed branch 3 never executed call 4 never executed -: 339: } else { 62: 340: Py_DECREF(stop_result); branch 0 taken 0 (fallthrough) branch 1 taken 62 call 2 never executed -: 341: } 63: 342: pthread_rwlock_destroy(&((RdkHandle *)self)->rwlock); call 0 returned 63 63: 343: self->ob_type->tp_free(self); call 0 returned 63 63: 344:} -: 345: -: 346: -: 347:PyDoc_STRVAR(RdkHandle_configure__doc__, -: 348: "configure(self, conf) OR configure(self, topic_conf)\n" -: 349: "\n" -: 350: "Set up and populate the rd_kafka_(topic_)conf_t. Somewhat inelegantly\n" -: 351: "(for the benefit of code reuse, whilst avoiding some harrowing partial\n" -: 352: "binding for C functions) this requires that you call it twice, once\n" -: 353: "with a `conf` list only, and again with `topic_conf` only.\n" -: 354: "\n" -: 355: "Repeated calls work incrementally; you can wipe configuration completely\n" -: 356: "by calling Consumer_stop()\n"); -: 357:static PyObject * function RdkHandle_configure called 140 returned 100% blocks executed 88% 140: 358:RdkHandle_configure(RdkHandle *self, PyObject *args, PyObject *kwds) -: 359:{ 140: 360: char *keywords[] = {"conf", "topic_conf", NULL}; 140: 361: PyObject *conf = NULL; 140: 362: PyObject *topic_conf = NULL; 140: 363: if (! PyArg_ParseTupleAndKeywords(args, call 0 returned 140 branch 1 taken 140 (fallthrough) branch 2 taken 0 -: 364: kwds, -: 365: "|OO", -: 366: keywords, -: 367: &conf, -: 368: &topic_conf)) { -: 369: return NULL; -: 370: } -: 371: 140: 372: if (RdkHandle_safe_lock(self, /* check_running= */ 0)) return NULL; call 0 returned 140 branch 1 taken 140 (fallthrough) branch 2 taken 0 140: 373: if ((conf && topic_conf) || (!conf && !topic_conf)) { branch 0 taken 70 (fallthrough) branch 1 taken 70 branch 2 taken 70 (fallthrough) branch 3 taken 0 branch 4 taken 70 (fallthrough) branch 5 taken 70 branch 6 taken 0 (fallthrough) branch 7 taken 70 #####: 374: return set_pykafka_error( call 0 never executed -: 375: "RdKafkaException", -: 376: "You need to specify *either* `conf` *or* `topic_conf`."); -: 377: } 140: 378: if (self->rdk_handle) { branch 0 taken 0 (fallthrough) branch 1 taken 140 #####: 379: return set_pykafka_error( call 0 never executed -: 380: "RdKafkaException", -: 381: "Cannot configure: seems instance was started already?"); -: 382: } -: 383: 140: 384: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 140 140: 385: if (! self->rdk_conf) { branch 0 taken 70 (fallthrough) branch 1 taken 70 70: 386: self->rdk_conf = rd_kafka_conf_new(); call 0 returned 70 70: 387: rd_kafka_conf_set_log_cb(self->rdk_conf, logging_callback); call 0 returned 70 -: 388: } 140: 389: if (! self->rdk_topic_conf) { branch 0 taken 70 (fallthrough) branch 1 taken 70 70: 390: self->rdk_topic_conf = rd_kafka_topic_conf_new(); call 0 returned 70 -: 391: } 140: 392: Py_END_ALLOW_THREADS call 0 returned 140 -: 393: 140: 394: PyObject *retval = Py_None; 140: 395: PyObject *conf_or_topic_conf = topic_conf ? topic_conf : conf; branch 0 taken 70 (fallthrough) branch 1 taken 70 140: 396: Py_ssize_t i, len = PyList_Size(conf_or_topic_conf); 1246: 397: for (i = 0; i != len; ++i) { call 0 returned 140 branch 1 taken 966 branch 2 taken 140 (fallthrough) 966: 398: PyObject *conf_pair = PyList_GetItem(conf_or_topic_conf, i); call 0 returned 966 966: 399: const char *name = NULL; 966: 400: const char *value = NULL; 966: 401: if (! PyArg_ParseTuple(conf_pair, "ss", &name, &value)) { call 0 returned 966 branch 1 taken 966 (fallthrough) branch 2 taken 0 -: 402: retval = NULL; #####: 403: break; -: 404: } -: 405: char errstr[512]; -: 406: rd_kafka_conf_res_t res; 966: 407: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 966 966: 408: if (topic_conf) { branch 0 taken 186 (fallthrough) branch 1 taken 780 186: 409: res = rd_kafka_topic_conf_set( call 0 returned 186 -: 410: self->rdk_topic_conf, name, value, errstr, sizeof(errstr)); -: 411: } else { 780: 412: res = rd_kafka_conf_set( call 0 returned 780 -: 413: self->rdk_conf, name, value, errstr, sizeof(errstr)); -: 414: } 966: 415: Py_END_ALLOW_THREADS call 0 returned 966 966: 416: if (res != RD_KAFKA_CONF_OK) { branch 0 taken 0 (fallthrough) branch 1 taken 966 #####: 417: retval = set_pykafka_error("RdKafkaException", errstr); call 0 never executed #####: 418: break; -: 419: } -: 420: } -: 421: 140: 422: if (RdkHandle_unlock(self)) return NULL; call 0 returned 140 branch 1 taken 140 (fallthrough) branch 2 taken 0 140: 423: Py_XINCREF(retval); branch 0 taken 140 (fallthrough) branch 1 taken 0 -: 424: return retval; -: 425:} -: 426: -: 427: -: 428:/* Cleanup helper for *_start(), returns NULL to allow shorthand in use. -: 429: * NB: assumes self->rwlock is held and releases it. */ -: 430:static PyObject * function RdkHandle_start_fail called 0 returned 0% blocks executed 0% #####: 431:RdkHandle_start_fail(RdkHandle *self, PyObject *(*stop_func) (RdkHandle *)) -: 432:{ -: 433: /* Something went wrong so we expect an exception has been set */ -: 434: PyObject *err_type, *err_value, *err_traceback; #####: 435: PyErr_Fetch(&err_type, &err_value, &err_traceback); call 0 never executed -: 436: #####: 437: RdkHandle_unlock(self); call 0 never executed #####: 438: PyObject *stop_result = stop_func(self); call 0 never executed -: 439: -: 440: /* stop_func is likely to raise exceptions, as start was incomplete */ #####: 441: if (! stop_result) PyErr_Clear(); branch 0 never executed branch 1 never executed call 2 never executed #####: 442: else Py_DECREF(stop_result); branch 0 never executed branch 1 never executed call 2 never executed -: 443: #####: 444: PyErr_Restore(err_type, err_value, err_traceback); call 0 never executed #####: 445: return NULL; -: 446:} -: 447: -: 448: -: 449:/* Shared logic of Consumer_start and Producer_start */ -: 450:static PyObject * function RdkHandle_start called 70 returned 100% blocks executed 66% 70: 451:RdkHandle_start(RdkHandle *self, -: 452: rd_kafka_type_t rdk_type, -: 453: const char *brokers, -: 454: const char *topic_name) -: 455:{ 70: 456: if (RdkHandle_excl_lock(self)) return NULL; call 0 returned 70 branch 1 taken 70 (fallthrough) branch 2 taken 0 70: 457: if (self->rdk_handle) { branch 0 taken 0 (fallthrough) branch 1 taken 70 #####: 458: set_pykafka_error("RdKafkaException", "Already started!"); call 0 never executed #####: 459: return RdkHandle_start_fail(self, RdkHandle_stop); call 0 never executed -: 460: } -: 461: -: 462: /* Configure and start rdk_handle */ -: 463: char errstr[512]; 70: 464: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 70 70: 465: self->rdk_handle = rd_kafka_new( call 0 returned 70 -: 466: rdk_type, self->rdk_conf, errstr, sizeof(errstr)); 70: 467: self->rdk_conf = NULL; /* deallocated by rd_kafka_new() */ 70: 468: Py_END_ALLOW_THREADS call 0 returned 70 70: 469: if (! self->rdk_handle) { branch 0 taken 0 (fallthrough) branch 1 taken 70 #####: 470: set_pykafka_error("RdKafkaException", errstr); call 0 never executed #####: 471: return RdkHandle_start_fail(self, RdkHandle_stop); call 0 never executed -: 472: } -: 473: -: 474: /* Set brokers */ -: 475: int brokers_added; 70: 476: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 70 70: 477: brokers_added = rd_kafka_brokers_add(self->rdk_handle, brokers); call 0 returned 70 70: 478: Py_END_ALLOW_THREADS call 0 returned 70 70: 479: if (brokers_added == 0) { branch 0 taken 0 (fallthrough) branch 1 taken 70 #####: 480: set_pykafka_error("RdKafkaException", "adding brokers failed"); call 0 never executed #####: 481: return RdkHandle_start_fail(self, RdkHandle_stop); call 0 never executed -: 482: } -: 483: -: 484: /* Configure and take out a topic handle */ 70: 485: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 70 70: 486: self->rdk_topic_handle = rd_kafka_topic_new(self->rdk_handle, call 0 returned 70 -: 487: topic_name, -: 488: self->rdk_topic_conf); 70: 489: self->rdk_topic_conf = NULL; /* deallocated by rd_kafka_topic_new() */ 70: 490: Py_END_ALLOW_THREADS call 0 returned 70 70: 491: if (! self->rdk_topic_handle) { branch 0 taken 0 (fallthrough) branch 1 taken 70 #####: 492: set_pykafka_error_from_code(rd_kafka_errno2err(errno), NULL); call 0 never executed call 1 never executed call 2 never executed #####: 493: return RdkHandle_start_fail(self, RdkHandle_stop); call 0 never executed -: 494: } -: 495: 70: 496: if (RdkHandle_unlock(self)) return NULL; call 0 returned 70 branch 1 taken 70 (fallthrough) branch 2 taken 0 70: 497: Py_INCREF(Py_None); 70: 498: return Py_None; -: 499:} -: 500: -: 501: -: 502:/** -: 503: * Producer type -: 504: */ -: 505: -: 506: -: 507:/* NB this doesn't check if RdkHandle_outq_len is zero, and generally assumes -: 508: * the wrapping python class will take care of ensuring any such preconditions -: 509: * for a clean termination */ -: 510:static void function Producer_dealloc called 23 returned 100% blocks executed 100% 23: 511:Producer_dealloc(PyObject *self) -: 512:{ 23: 513: RdkHandle_dealloc(self, RdkHandle_stop); call 0 returned 23 23: 514:} -: 515: -: 516: -: 517:/* Helper function for Producer_delivery_report_callback: find an exception -: 518: * corresponding to `err`, and send that into the report queue. Returns -1 -: 519: * on failure, or 0 on success */ -: 520:static int function Producer_delivery_report_put called 45 returned 100% blocks executed 82% 45: 521:Producer_delivery_report_put(PyObject *put_func, -: 522: PyObject *message, -: 523: rd_kafka_resp_err_t err) -: 524:{ 45: 525: PyObject *exc = NULL; 45: 526: if (err == RD_KAFKA_RESP_ERR_NO_ERROR) { branch 0 taken 44 (fallthrough) branch 1 taken 1 44: 527: exc = Py_None; 44: 528: Py_INCREF(Py_None); -: 529: } else { 1: 530: set_pykafka_error_from_code(err, &exc); call 0 returned 1 1: 531: if (! exc) return -1; branch 0 taken 1 (fallthrough) branch 1 taken 0 -: 532: } 45: 533: PyObject *res = PyObject_CallFunctionObjArgs(put_func, message, exc, NULL); call 0 returned 45 -: 534: 45: 535: Py_DECREF(exc); branch 0 taken 0 (fallthrough) branch 1 taken 45 call 2 never executed 45: 536: if (! res) return -1; branch 0 taken 45 (fallthrough) branch 1 taken 0 45: 537: else Py_DECREF(res); branch 0 taken 0 (fallthrough) branch 1 taken 45 call 2 never executed -: 538: return 0; -: 539:} -: 540: -: 541: -: 542:/* Callback to be used as librdkafka dr_msg_cb. Note that this must always -: 543: * be configured to run, even if we do not care about delivery reporting, -: 544: * because we keep the Message objects that we produce from alive until this -: 545: * callback releases them, in order to avoid having to copy the payload */ -: 546:static void function Producer_delivery_report_callback called 44 returned 100% blocks executed 75% 44: 547:Producer_delivery_report_callback(rd_kafka_t *rk, -: 548: const rd_kafka_message_t *rkmessage, -: 549: void *opaque) -: 550:{ 44: 551: PyGILState_STATE gstate = PyGILState_Ensure(); call 0 returned 44 -: 552: -: 553: /* Producer_produce sent *Message as msg_opaque == rkmessage->_private */ 44: 554: PyObject *message = (PyObject *)rkmessage->_private; 44: 555: PyObject *put_func = (PyObject *)opaque; 44: 556: if (rkmessage->offset != -1) { branch 0 taken 44 (fallthrough) branch 1 taken 0 44: 557: PyObject* offset = PyLong_FromUnsignedLongLong(rkmessage->offset); call 0 returned 44 44: 558: PyObject_SetAttrString(message, "offset", offset); call 0 returned 44 -: 559: } 44: 560: if (-1 == Producer_delivery_report_put(put_func, message, rkmessage->err)) { call 0 returned 44 branch 1 taken 0 (fallthrough) branch 2 taken 44 -: 561: /* Must swallow exception as this is a non-python callback */ #####: 562: PyObject *res = PyObject_CallMethod( call 0 never executed -: 563: logger, "exception", "s", "Failure in delivery callback"); #####: 564: Py_XDECREF(res); branch 0 never executed branch 1 never executed branch 2 never executed branch 3 never executed call 4 never executed #####: 565: PyErr_Clear(); call 0 never executed -: 566: } else { -: 567: // Set timestamp to the one created by librdkafka -: 568: rd_kafka_timestamp_type_t timestamp_type; 44: 569: int64_t timestamp = rd_kafka_message_timestamp(rkmessage, ×tamp_type); call 0 returned 44 44: 570: if (timestamp_type != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { branch 0 taken 44 (fallthrough) branch 1 taken 0 44: 571: PyObject* timestamp_o = PyLong_FromUnsignedLongLong(timestamp); call 0 returned 44 44: 572: PyObject_SetAttrString(message, "timestamp", timestamp_o); call 0 returned 44 -: 573: } -: 574: } -: 575: 44: 576: Py_DECREF(message); /* We INCREF'd this in Producer_produce() */ branch 0 taken 2 (fallthrough) branch 1 taken 42 call 2 returned 2 44: 577: PyGILState_Release(gstate); call 0 returned 44 44: 578:} -: 579: -: 580: -: 581:PyDoc_STRVAR(Producer_start__doc__, -: 582: "start(self, brokers, topic_name, delivery_put)\n" -: 583: "\n" -: 584: "Starts the underlying rdkafka producer, given a broker connection\n" -: 585: "string, a topic name, and a (bound) method that we can send delivery\n" -: 586: "reports (as in `put(msg, exception)`). Configuration should have been\n" -: 587: "provided through earlier calls to configure(). Note that following\n" -: 588: "start, you _must_ ensure that poll() is called regularly, or delivery\n" -: 589: "reports may pile up\n"); -: 590:static PyObject * function Producer_start called 24 returned 100% blocks executed 73% 24: 591:Producer_start(RdkHandle *self, PyObject *args, PyObject *kwds) -: 592:{ 24: 593: if (RdkHandle_excl_lock(self)) return NULL; call 0 returned 24 branch 1 taken 24 (fallthrough) branch 2 taken 0 -: 594: 24: 595: char *keywords[] = {"brokers", "topic_name", "delivery_put", NULL}; 24: 596: PyObject *brokers = NULL; 24: 597: PyObject *topic_name = NULL; 24: 598: PyObject *delivery_put = NULL; 24: 599: if (! PyArg_ParseTupleAndKeywords( call 0 returned 24 branch 1 taken 24 (fallthrough) branch 2 taken 0 -: 600: args, kwds, "SSO", keywords, &brokers, &topic_name, &delivery_put)) { -: 601: goto failed; -: 602: } -: 603: -: 604: /* Configure delivery-reporting */ 24: 605: if (! self->rdk_conf) { branch 0 taken 0 (fallthrough) branch 1 taken 24 #####: 606: set_pykafka_error("RdKafkaException", call 0 never executed -: 607: "Please run configure() before starting."); #####: 608: goto failed; -: 609: } 24: 610: rd_kafka_conf_set_dr_msg_cb(self->rdk_conf, call 0 returned 24 -: 611: Producer_delivery_report_callback); 24: 612: Py_INCREF(delivery_put); 24: 613: rd_kafka_conf_set_opaque(self->rdk_conf, delivery_put); call 0 returned 24 -: 614: 24: 615: if (RdkHandle_unlock(self)) return NULL; call 0 returned 24 branch 1 taken 24 (fallthrough) branch 2 taken 0 24: 616: return RdkHandle_start( call 0 returned 24 -: 617: self, -: 618: RD_KAFKA_PRODUCER, 24: 619: PyBytes_AS_STRING(brokers), 24: 620: PyBytes_AS_STRING(topic_name)); -: 621:failed: #####: 622: RdkHandle_unlock(self); call 0 never executed #####: 623: return NULL; -: 624:} -: 625: -: 626: -: 627:PyDoc_STRVAR(Producer_produce__doc__, -: 628: "produce(self, message)\n" -: 629: "\n" -: 630: "Produces a `pykafka.protocol.Message` through librdkafka. Will keep\n" -: 631: "a reference to the message internally until delivery is confirmed\n"); -: 632:static PyObject * function Producer_produce called 47 returned 100% blocks executed 84% 47: 633:Producer_produce(RdkHandle *self, PyObject *message) -: 634:{ 47: 635: if (RdkHandle_safe_lock(self, /* check_running= */ 1)) return NULL; call 0 returned 47 branch 1 taken 47 (fallthrough) branch 2 taken 0 -: 636: 47: 637: PyObject *value = NULL; 47: 638: PyObject *partition_key = NULL; 47: 639: PyObject *partition_id = NULL; -: 640: -: 641: /* Keep message alive until the delivery-callback runs. Needed both -: 642: * because we may want to put the message on a report queue when the -: 643: * callback runs, and because we'll tell rd_kafka_produce() not to copy -: 644: * the payload and it can safely use the raw Message bytes directly */ 47: 645: Py_INCREF(message); -: 646: -: 647: /* Get pointers to raw Message contents */ 47: 648: value = PyObject_GetAttrString(message, "value"); call 0 returned 47 47: 649: if (! value) goto failed; branch 0 taken 46 (fallthrough) branch 1 taken 1 46: 650: partition_key = PyObject_GetAttrString(message, "partition_key"); call 0 returned 46 46: 651: if (! partition_key) goto failed; branch 0 taken 46 (fallthrough) branch 1 taken 0 46: 652: partition_id = PyObject_GetAttrString(message, "partition_id"); call 0 returned 46 46: 653: if (! partition_id) goto failed; branch 0 taken 46 (fallthrough) branch 1 taken 0 -: 654: 46: 655: char *v = NULL; 46: 656: Py_ssize_t v_len = 0; 46: 657: if (value != Py_None) { branch 0 taken 44 (fallthrough) branch 1 taken 2 44: 658: v = PyBytes_AsString(value); call 0 returned 44 44: 659: if (! v) goto failed; branch 0 taken 44 (fallthrough) branch 1 taken 0 44: 660: v_len = PyBytes_GET_SIZE(value); -: 661: } 46: 662: char *pk = NULL; 46: 663: Py_ssize_t pk_len = 0; 46: 664: if (partition_key != Py_None) { branch 0 taken 2 (fallthrough) branch 1 taken 44 2: 665: pk = PyBytes_AsString(partition_key); call 0 returned 2 2: 666: if (! pk) goto failed; branch 0 taken 2 (fallthrough) branch 1 taken 0 2: 667: pk_len = PyBytes_GET_SIZE(partition_key); -: 668: } 46: 669: int32_t p_id = PyLong_AsLong(partition_id); call 0 returned 46 46: 670: if (p_id == -1 && PyErr_Occurred()) goto failed; branch 0 taken 0 (fallthrough) branch 1 taken 46 call 2 never executed branch 3 never executed branch 4 never executed -: 671: 46: 672: int res = 0; 46: 673: Py_BEGIN_ALLOW_THREADS call 0 returned 46 46: 674: res = rd_kafka_produce(self->rdk_topic_handle, call 0 returned 46 -: 675: p_id, -: 676: 0, /* ie don't copy and don't dealloc v */ -: 677: v, v_len, -: 678: pk, pk_len, -: 679: (void *)message); 46: 680: Py_END_ALLOW_THREADS call 0 returned 46 46: 681: if (res == -1) { branch 0 taken 2 (fallthrough) branch 1 taken 44 2: 682: rd_kafka_resp_err_t err = rd_kafka_errno2err(errno); call 0 returned 2 call 1 returned 2 2: 683: if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) { branch 0 taken 1 (fallthrough) branch 1 taken 1 1: 684: set_pykafka_error("ProducerQueueFullError", ""); call 0 returned 1 1: 685: goto failed; -: 686: } else { -: 687: /* Any other errors should go through the report queue, -: 688: * because that's where pykafka.Producer would put them */ 1: 689: PyObject *put_func = (PyObject *)rd_kafka_opaque(self->rdk_handle); call 0 returned 1 1: 690: if (-1 == Producer_delivery_report_put(put_func, message, err)) { call 0 returned 1 branch 1 taken 1 (fallthrough) branch 2 taken 0 -: 691: goto failed; -: 692: } -: 693: } 1: 694: Py_DECREF(message); /* There won't be a delivery-callback */ branch 0 taken 0 (fallthrough) branch 1 taken 1 call 2 never executed -: 695: } -: 696: 45: 697: Py_DECREF(value); branch 0 taken 0 (fallthrough) branch 1 taken 45 call 2 never executed 45: 698: Py_DECREF(partition_key); branch 0 taken 0 (fallthrough) branch 1 taken 45 call 2 never executed 45: 699: Py_DECREF(partition_id); branch 0 taken 0 (fallthrough) branch 1 taken 45 call 2 never executed 45: 700: if (RdkHandle_unlock(self)) return NULL; call 0 returned 45 branch 1 taken 45 (fallthrough) branch 2 taken 0 -: 701: 45: 702: Py_INCREF(Py_None); 45: 703: return Py_None; -: 704:failed: 2: 705: Py_XDECREF(value); branch 0 taken 1 (fallthrough) branch 1 taken 1 branch 2 taken 0 (fallthrough) branch 3 taken 1 call 4 never executed 2: 706: Py_XDECREF(partition_key); branch 0 taken 1 (fallthrough) branch 1 taken 1 branch 2 taken 0 (fallthrough) branch 3 taken 1 call 4 never executed 2: 707: Py_XDECREF(partition_id); branch 0 taken 1 (fallthrough) branch 1 taken 1 branch 2 taken 0 (fallthrough) branch 3 taken 1 call 4 never executed 2: 708: RdkHandle_unlock(self); call 0 returned 2 2: 709: return NULL; -: 710:} -: 711: -: 712: -: 713:static PyMethodDef Producer_methods[] = { -: 714: {"produce", (PyCFunction)Producer_produce, -: 715: METH_O, Producer_produce__doc__}, -: 716: {"stop", (PyCFunction)RdkHandle_stop, -: 717: METH_NOARGS, RdkHandle_stop__doc__}, -: 718: {"configure", (PyCFunction)RdkHandle_configure, -: 719: METH_VARARGS | METH_KEYWORDS, RdkHandle_configure__doc__}, -: 720: {"start", (PyCFunction)Producer_start, -: 721: METH_VARARGS | METH_KEYWORDS, Producer_start__doc__}, -: 722: {"outq_len", (PyCFunction)RdkHandle_outq_len, -: 723: METH_NOARGS, RdkHandle_outq_len__doc__}, -: 724: {"poll", (PyCFunction)RdkHandle_poll, -: 725: METH_VARARGS | METH_KEYWORDS, RdkHandle_poll__doc__}, -: 726: {NULL, NULL, 0, NULL} -: 727:}; -: 728: -: 729: -: 730:static PyTypeObject ProducerType = { -: 731: PyVarObject_HEAD_INIT(NULL, 0) -: 732: "pykafka.rd_kafka.Producer", -: 733: sizeof(RdkHandle), -: 734: 0, /* tp_itemsize */ -: 735: (destructor)Producer_dealloc, /* tp_dealloc */ -: 736: 0, /* tp_print */ -: 737: 0, /* tp_getattr */ -: 738: 0, /* tp_setattr */ -: 739: 0, /* tp_compare */ -: 740: 0, /* tp_repr */ -: 741: 0, /* tp_as_number */ -: 742: 0, /* tp_as_sequence */ -: 743: 0, /* tp_as_mapping */ -: 744: 0, /* tp_hash */ -: 745: 0, /* tp_call */ -: 746: 0, /* tp_str */ -: 747: 0, /* tp_getattro */ -: 748: 0, /* tp_setattro */ -: 749: 0, /* tp_as_buffer */ -: 750: Py_TPFLAGS_DEFAULT, /* tp_flags */ -: 751: 0, /* tp_doc */ -: 752: 0, /* tp_traverse */ -: 753: 0, /* tp_clear */ -: 754: 0, /* tp_richcompare */ -: 755: 0, /* tp_weaklistoffset */ -: 756: 0, /* tp_iter */ -: 757: 0, /* tp_iternext */ -: 758: Producer_methods, /* tp_methods */ -: 759: RdkHandle_members, /* tp_members */ -: 760: 0, /* tp_getset */ -: 761: 0, /* tp_base */ -: 762: 0, /* tp_dict */ -: 763: 0, /* tp_descr_get */ -: 764: 0, /* tp_descr_set */ -: 765: 0, /* tp_dictoffset */ -: 766: 0, /* tp_init */ -: 767: 0, /* tp_alloc */ -: 768: RdkHandle_new, /* tp_new */ -: 769:}; -: 770: -: 771: -: 772:/** -: 773: * Consumer type -: 774: */ -: 775: -: 776: -: 777:/* Destroy all internal state of the consumer */ -: 778:static PyObject * function Consumer_stop called 79 returned 100% blocks executed 54% 79: 779:Consumer_stop(RdkHandle *self) -: 780:{ 79: 781: if (RdkHandle_safe_lock(self, /* check_running= */ 0)) return NULL; call 0 returned 79 branch 1 taken 78 (fallthrough) branch 2 taken 1 -: 782: 78: 783: int errored = 0; 78: 784: if (self->rdk_topic_handle && self->partition_ids) { branch 0 taken 38 (fallthrough) branch 1 taken 40 branch 2 taken 38 (fallthrough) branch 3 taken 0 38: 785: Py_ssize_t i, len = PyList_Size(self->partition_ids); 137: 786: for (i = 0; i != len; ++i) { call 0 returned 38 branch 1 taken 99 branch 2 taken 38 (fallthrough) -: 787: /* Error handling here is a bit poor; we cannot bail out directly -: 788: if we want to clean up as much as we can. */ 99: 789: long part_id = PyLong_AsLong( call 0 returned 99 call 1 returned 99 -: 790: PyList_GetItem(self->partition_ids, i)); 99: 791: if (part_id == -1) { branch 0 taken 0 (fallthrough) branch 1 taken 99 #####: 792: errored += 1; #####: 793: PyObject *log_res = PyObject_CallMethod( call 0 never executed -: 794: logger, "exception", "s", "In Consumer_stop:"); #####: 795: Py_XDECREF(log_res); branch 0 never executed branch 1 never executed branch 2 never executed branch 3 never executed call 4 never executed #####: 796: continue; -: 797: } -: 798: int res; 99: 799: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 99 99: 800: res = rd_kafka_consume_stop(self->rdk_topic_handle, part_id); call 0 returned 99 99: 801: Py_END_ALLOW_THREADS call 0 returned 99 99: 802: if (res == -1) { branch 0 taken 0 (fallthrough) branch 1 taken 99 #####: 803: set_pykafka_error_from_code(rd_kafka_errno2err(errno), NULL); call 0 never executed call 1 never executed call 2 never executed #####: 804: errored += 1; #####: 805: PyObject *log_res = PyObject_CallMethod( call 0 never executed -: 806: logger, "exception", "sl", -: 807: "Error in rd_kafka_consume_stop, part_id=%s", -: 808: part_id); #####: 809: Py_XDECREF(log_res); branch 0 never executed branch 1 never executed branch 2 never executed branch 3 never executed call 4 never executed #####: 810: continue; -: 811: } -: 812: } -: 813: } -: 814: 78: 815: RdkHandle_unlock(self); call 0 returned 78 78: 816: PyObject *res = RdkHandle_stop(self); call 0 returned 78 78: 817: if (errored) { branch 0 taken 0 (fallthrough) branch 1 taken 78 #####: 818: Py_XDECREF(res); branch 0 never executed branch 1 never executed branch 2 never executed branch 3 never executed call 4 never executed -: 819: return NULL; -: 820: } -: 821: return res; -: 822:} -: 823: -: 824: -: 825:static void function Consumer_dealloc called 40 returned 100% blocks executed 100% 40: 826:Consumer_dealloc(PyObject *self) -: 827:{ 40: 828: RdkHandle_dealloc(self, Consumer_stop); call 0 returned 40 40: 829:} -: 830: -: 831: -: 832:/* Cleanup helper for Consumer_start. -: 833: * NB: assumes self->rwlock is held and releases it. */ -: 834:static PyObject * -: 835:Consumer_start_fail(RdkHandle *self) -: 836:{ #####: 837: return RdkHandle_start_fail(self, Consumer_stop); -: 838:} -: 839: -: 840: -: 841:PyDoc_STRVAR(Consumer_start__doc__, -: 842: "start(self, brokers, topic_name, partition_ids, start_offsets)\n" -: 843: "\n" -: 844: "Starts the underlying rdkafka consumer. Configuration should have been\n" -: 845: "provided through earlier calls to configure(). Note that following\n" -: 846: "start, you must ensure that poll() is called regularly, as required\n" -: 847: "by librdkafka\n"); -: 848:static PyObject * function Consumer_start called 47 returned 100% blocks executed 67% 47: 849:Consumer_start(RdkHandle *self, PyObject *args, PyObject *kwds) -: 850:{ 47: 851: char *keywords[] = { -: 852: "brokers", -: 853: "topic_name", -: 854: "partition_ids", -: 855: "start_offsets", /* same order as partition_ids */ -: 856: NULL}; 47: 857: PyObject *brokers = NULL; 47: 858: PyObject *topic_name = NULL; 47: 859: PyObject *partition_ids = NULL; 47: 860: PyObject *start_offsets = NULL; 47: 861: if (! PyArg_ParseTupleAndKeywords(args, call 0 returned 47 branch 1 taken 47 (fallthrough) branch 2 taken 0 -: 862: kwds, -: 863: "SSOO", -: 864: keywords, -: 865: &brokers, -: 866: &topic_name, -: 867: &partition_ids, -: 868: &start_offsets)) { -: 869: return NULL; -: 870: } -: 871: -: 872: /* Set the application opaque NULL, just to make stop() simpler */ 47: 873: if (RdkHandle_excl_lock(self)) return NULL; call 0 returned 47 branch 1 taken 47 (fallthrough) branch 2 taken 0 47: 874: if (! self->rdk_conf) { branch 0 taken 1 (fallthrough) branch 1 taken 46 1: 875: set_pykafka_error("RdKafkaException", call 0 returned 1 -: 876: "Please run configure() before starting."); 1: 877: return NULL; -: 878: } 46: 879: rd_kafka_conf_set_opaque(self->rdk_conf, NULL); call 0 returned 46 46: 880: if (RdkHandle_unlock(self)) return NULL; call 0 returned 46 branch 1 taken 46 (fallthrough) branch 2 taken 0 -: 881: -: 882: /* Basic setup */ 46: 883: PyObject *res = RdkHandle_start( call 0 returned 46 -: 884: self, -: 885: RD_KAFKA_CONSUMER, 46: 886: PyBytes_AS_STRING(brokers), 46: 887: PyBytes_AS_STRING(topic_name)); 46: 888: if (! res) return NULL; branch 0 taken 46 (fallthrough) branch 1 taken 0 46: 889: else Py_DECREF(res); branch 0 taken 0 (fallthrough) branch 1 taken 46 call 2 never executed -: 890: 46: 891: if (RdkHandle_excl_lock(self)) return NULL; call 0 returned 46 branch 1 taken 46 (fallthrough) branch 2 taken 0 46: 892: if (! self->rdk_handle) { branch 0 taken 0 (fallthrough) branch 1 taken 46 #####: 893: set_pykafka_error("RdKafkaStoppedException", call 0 never executed -: 894: "Stopped in the middle of starting."); #####: 895: return Consumer_start_fail(self); call 0 never executed -: 896: } -: 897: -: 898: /* We'll keep our own copy of partition_ids, because the one handed to us -: 899: might be mutable, and weird things could happen if the list used on init -: 900: is different than that on dealloc */ 46: 901: self->partition_ids = PySequence_List(partition_ids); call 0 returned 46 46: 902: if (! self->partition_ids) return Consumer_start_fail(self); branch 0 taken 0 (fallthrough) branch 1 taken 46 call 2 never executed -: 903: -: 904: /* Start a queue and add all partition_ids to it */ 46: 905: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 46 46: 906: self->rdk_queue_handle = rd_kafka_queue_new(self->rdk_handle); call 0 returned 46 46: 907: Py_END_ALLOW_THREADS call 0 returned 46 46: 908: if (! self->rdk_queue_handle) { branch 0 taken 0 (fallthrough) branch 1 taken 46 #####: 909: set_pykafka_error("RdKafkaException", "could not get queue"); call 0 never executed #####: 910: return Consumer_start_fail(self); call 0 never executed -: 911: } 46: 912: Py_ssize_t i, len = PyList_Size(self->partition_ids); 158: 913: for (i = 0; i != len; ++i) { call 0 returned 46 branch 1 taken 112 branch 2 taken 46 (fallthrough) -: 914: /* We don't do much type-checking on partition_ids/start_offsets as -: 915: this module is intended solely for use with the python class that -: 916: wraps it */ 112: 917: int32_t part_id = PyLong_AsLong( call 0 returned 112 call 1 returned 112 -: 918: PyList_GetItem(self->partition_ids, i)); 112: 919: if (part_id == -1 && PyErr_Occurred()) { branch 0 taken 0 (fallthrough) branch 1 taken 112 call 2 never executed branch 3 never executed branch 4 never executed #####: 920: return Consumer_start_fail(self); call 0 never executed -: 921: } 112: 922: PyObject *offset_obj = PySequence_GetItem(start_offsets, i); call 0 returned 112 112: 923: if (! offset_obj) { /* start_offsets shorter than partition_ids? */ branch 0 taken 0 (fallthrough) branch 1 taken 112 #####: 924: return Consumer_start_fail(self); call 0 never executed -: 925: } 112: 926: int64_t offset = PyLong_AsLongLong(offset_obj); call 0 returned 112 -: 927: int res; 112: 928: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 112 112: 929: res = rd_kafka_consume_start_queue(self->rdk_topic_handle, call 0 returned 112 -: 930: part_id, -: 931: offset, -: 932: self->rdk_queue_handle); 112: 933: Py_END_ALLOW_THREADS call 0 returned 112 112: 934: if (res == -1) { branch 0 taken 0 (fallthrough) branch 1 taken 112 #####: 935: set_pykafka_error_from_code(rd_kafka_errno2err(errno), NULL); call 0 never executed call 1 never executed call 2 never executed #####: 936: return Consumer_start_fail(self); call 0 never executed -: 937: } -: 938: } 46: 939: if (RdkHandle_unlock(self)) return NULL; call 0 returned 46 branch 1 taken 46 (fallthrough) branch 2 taken 0 46: 940: Py_INCREF(Py_None); 46: 941: return Py_None; -: 942:} -: 943: -: 944: -: 945:PyDoc_STRVAR(Consumer_consume__doc__, -: 946: "consume(self, timeout_ms) -> pykafka.protocol.Message or None \n" -: 947: "\n" -: 948: "Block until a message can be returned, or return None if timeout_ms has\n" -: 949: "been reached.\n"); -: 950:static PyObject * function Consumer_consume called 5294 returned 100% blocks executed 91% 5294: 951:Consumer_consume(RdkHandle *self, PyObject *args) -: 952:{ 5294: 953: int timeout_ms = 0; 5294: 954: if (! PyArg_ParseTuple(args, "i", &timeout_ms)) return NULL; call 0 returned 5294 branch 1 taken 5294 (fallthrough) branch 2 taken 0 -: 955: 5294: 956: PyObject *retval = NULL; 5294: 957: PyObject *empty_args = NULL; 5294: 958: PyObject *kwargs = NULL; -: 959: rd_kafka_message_t *rkmessage; 5294: 960: int protocol_version = 0; -: 961: 5294: 962: if (RdkHandle_safe_lock(self, /* check_running= */ 1)) return NULL; call 0 returned 5294 branch 1 taken 5293 (fallthrough) branch 2 taken 1 5293: 963: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 5293 5293: 964: rkmessage = rd_kafka_consume_queue(self->rdk_queue_handle, timeout_ms); call 0 returned 5293 5293: 965: Py_END_ALLOW_THREADS call 0 returned 5293 5293: 966: if (RdkHandle_unlock(self)) goto cleanup; call 0 returned 5293 branch 1 taken 5293 (fallthrough) branch 2 taken 0 -: 967: 5293: 968: if (!rkmessage) { branch 0 taken 429 (fallthrough) branch 1 taken 4864 -: 969: /* Either ETIMEDOUT or ENOENT occurred, but the latter would imply we -: 970: forgot to call rd_kafka_consume_start_queue, which is unlikely in -: 971: this setup. We'll assume it was ETIMEDOUT then: */ 429: 972: Py_INCREF(Py_None); 429: 973: return Py_None; -: 974: } -: 975: 4864: 976: if (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) { branch 0 taken 4853 (fallthrough) branch 1 taken 11 -: 977: /* Build a pykafka.protocol.Message */ -: 978: rd_kafka_timestamp_type_t timestamp_type; 4853: 979: int64_t timestamp = rd_kafka_message_timestamp(rkmessage, ×tamp_type); call 0 returned 4853 4853: 980: if (timestamp_type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { branch 0 taken 4853 (fallthrough) branch 1 taken 0 -: 981: timestamp = 0; -: 982: } else { -: 983: // infer protocol from presence of timestamp. Fragile, but protocol -: 984: // not transmitted by librdkafka 4853: 985: protocol_version = 1; -: 986: } -: 987:#if PY_MAJOR_VERSION >= 3 -: 988: const char *format = "{s:y#,s:y#,s:l,s:L,s:i,s:L}"; -: 989:#else 4853: 990: const char *format = "{s:s#,s:s#,s:l,s:L,s:i,s:L}"; -: 991:#endif 4853: 992: kwargs = Py_BuildValue( call 0 returned 4853 -: 993: format, -: 994: "value", rkmessage->payload, rkmessage->len, -: 995: "partition_key", rkmessage->key, rkmessage->key_len, 4853: 996: "partition_id", (long)rkmessage->partition, 4853: 997: "offset", (PY_LONG_LONG)rkmessage->offset, -: 998: "protocol_version", protocol_version, -: 999: "timestamp", (PY_LONG_LONG)timestamp); 4853: 1000: if (! kwargs) goto cleanup; branch 0 taken 4853 (fallthrough) branch 1 taken 0 4853: 1001: empty_args = PyTuple_New(0); call 0 returned 4853 4853: 1002: if (! empty_args) goto cleanup; branch 0 taken 4853 (fallthrough) branch 1 taken 0 4853: 1003: retval = PyObject_Call(Message, empty_args, kwargs); call 0 returned 4853 11: 1004: } else if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { branch 0 taken 11 (fallthrough) branch 1 taken 0 -: 1005: /* Whenever we get to the head of a partition, we get this. There -: 1006: * may be messages available in other partitions, so if we want to -: 1007: * match pykafka.SimpleConsumer behaviour, we ought to avoid breaking -: 1008: * any iteration loops, and simply skip over this one altogether: */ 11: 1009: retval = Consumer_consume(self, args); call 0 returned 11 -: 1010: } else { #####: 1011: set_pykafka_error_from_code(rkmessage->err, NULL); call 0 never executed -: 1012: } -: 1013:cleanup: 4864: 1014: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 4864 4864: 1015: rd_kafka_message_destroy(rkmessage); call 0 returned 4864 4864: 1016: Py_END_ALLOW_THREADS call 0 returned 4864 4864: 1017: Py_XDECREF(empty_args); branch 0 taken 4853 (fallthrough) branch 1 taken 11 branch 2 taken 0 (fallthrough) branch 3 taken 4853 call 4 never executed 4864: 1018: Py_XDECREF(kwargs); branch 0 taken 4853 (fallthrough) branch 1 taken 11 branch 2 taken 4853 (fallthrough) branch 3 taken 0 call 4 returned 4853 -: 1019: return retval; -: 1020:} -: 1021: -: 1022: -: 1023:static PyMethodDef Consumer_methods[] = { -: 1024: {"consume", (PyCFunction)Consumer_consume, -: 1025: METH_VARARGS, Consumer_consume__doc__}, -: 1026: {"stop", (PyCFunction)Consumer_stop, -: 1027: METH_NOARGS, RdkHandle_stop__doc__}, -: 1028: {"configure", (PyCFunction)RdkHandle_configure, -: 1029: METH_VARARGS | METH_KEYWORDS, RdkHandle_configure__doc__}, -: 1030: {"start", (PyCFunction)Consumer_start, -: 1031: METH_VARARGS | METH_KEYWORDS, Consumer_start__doc__}, -: 1032: {"poll", (PyCFunction)RdkHandle_poll, -: 1033: METH_VARARGS | METH_KEYWORDS, RdkHandle_poll__doc__}, -: 1034: {NULL, NULL, 0, NULL} -: 1035:}; -: 1036: -: 1037: -: 1038:static PyTypeObject ConsumerType = { -: 1039: PyVarObject_HEAD_INIT(NULL, 0) -: 1040: "pykafka.rd_kafka.Consumer", -: 1041: sizeof(RdkHandle), -: 1042: 0, /* tp_itemsize */ -: 1043: (destructor)Consumer_dealloc, /* tp_dealloc */ -: 1044: 0, /* tp_print */ -: 1045: 0, /* tp_getattr */ -: 1046: 0, /* tp_setattr */ -: 1047: 0, /* tp_compare */ -: 1048: 0, /* tp_repr */ -: 1049: 0, /* tp_as_number */ -: 1050: 0, /* tp_as_sequence */ -: 1051: 0, /* tp_as_mapping */ -: 1052: 0, /* tp_hash */ -: 1053: 0, /* tp_call */ -: 1054: 0, /* tp_str */ -: 1055: 0, /* tp_getattro */ -: 1056: 0, /* tp_setattro */ -: 1057: 0, /* tp_as_buffer */ -: 1058: Py_TPFLAGS_DEFAULT, /* tp_flags */ -: 1059: 0, /* tp_doc */ -: 1060: 0, /* tp_traverse */ -: 1061: 0, /* tp_clear */ -: 1062: 0, /* tp_richcompare */ -: 1063: 0, /* tp_weaklistoffset */ -: 1064: 0, /* tp_iter */ -: 1065: 0, /* tp_iternext */ -: 1066: Consumer_methods, /* tp_methods */ -: 1067: RdkHandle_members, /* tp_members */ -: 1068: 0, /* tp_getset */ -: 1069: 0, /* tp_base */ -: 1070: 0, /* tp_dict */ -: 1071: 0, /* tp_descr_get */ -: 1072: 0, /* tp_descr_set */ -: 1073: 0, /* tp_dictoffset */ -: 1074: 0, /* tp_init */ -: 1075: 0, /* tp_alloc */ -: 1076: RdkHandle_new, /* tp_new */ -: 1077:}; -: 1078: -: 1079: -: 1080:/** -: 1081: * Debugging helpers -: 1082: */ -: 1083: -: 1084: -: 1085:PyDoc_STRVAR(debug_thread_cnt__doc__, -: 1086: "_thread_cnt() -> int\n" -: 1087: "\n" -: 1088: "Debugging helper, reports number of librdkafka threads running.\n"); -: 1089:static PyObject * function debug_thread_cnt called 4 returned 100% blocks executed 100% 4: 1090:debug_thread_cnt(PyObject *self, PyObject *args) -: 1091:{ 4: 1092: return PyLong_FromLong(rd_kafka_thread_cnt()); call 0 returned 4 call 1 returned 4 -: 1093:} -: 1094: -: 1095: -: 1096:PyDoc_STRVAR(debug_wait_destroyed__doc__, -: 1097: "_wait_destroyed(timeout_ms)\n" -: 1098: "\n" -: 1099: "Debugging helper, blocks until all rdkafka handles have been destroyed.\n" -: 1100: "Raises exception if timeout_ms is reached before then\n"); -: 1101:static PyObject * function debug_wait_destroyed called 2 returned 100% blocks executed 73% 2: 1102:debug_wait_destroyed(PyObject *self, PyObject *arg) -: 1103:{ 2: 1104: int timeout_ms = PyLong_AsLong(arg); call 0 returned 2 2: 1105: if (timeout_ms == -1 && PyErr_Occurred()) return NULL; branch 0 taken 0 (fallthrough) branch 1 taken 2 call 2 never executed branch 3 never executed branch 4 never executed -: 1106: -: 1107: int res; 2: 1108: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */ call 0 returned 2 2: 1109: res = rd_kafka_wait_destroyed(timeout_ms); call 0 returned 2 2: 1110: Py_END_ALLOW_THREADS call 0 returned 2 -: 1111: 2: 1112: if (res == -1) { branch 0 taken 2 (fallthrough) branch 1 taken 0 2: 1113: return set_pykafka_error("RdKafkaException", call 0 returned 2 -: 1114: "rd_kafka_wait_destroyed timed out"); -: 1115: } #####: 1116: Py_INCREF(Py_None); #####: 1117: return Py_None; -: 1118:} -: 1119: -: 1120: -: 1121:/** -: 1122: * Module init -: 1123: */ -: 1124: -: 1125:static const char module_name[] = "pykafka.rdkafka._rd_kafka"; -: 1126: -: 1127: -: 1128:static PyMethodDef pyrdk_methods[] = { -: 1129: {"_thread_cnt", debug_thread_cnt, -: 1130: METH_NOARGS, debug_thread_cnt__doc__}, -: 1131: {"_wait_destroyed", debug_wait_destroyed, -: 1132: METH_O, debug_wait_destroyed__doc__}, -: 1133: {NULL, NULL, 0, NULL} -: 1134:}; -: 1135: -: 1136: -: 1137:#if PY_MAJOR_VERSION >= 3 -: 1138: static struct PyModuleDef moduledef = { -: 1139: PyModuleDef_HEAD_INIT, -: 1140: module_name, -: 1141: NULL, /* m_doc */ -: 1142: -1, /* m_size */ -: 1143: pyrdk_methods, -: 1144: NULL, /* m_reload */ -: 1145: NULL, /* m_traverse */ -: 1146: NULL, /* m_clear */ -: 1147: NULL, /* m_free */ -: 1148: }; -: 1149:#endif -: 1150: -: 1151: -: 1152:static PyObject * function _rd_kafkamodule_init called 1 returned 100% blocks executed 93% 1: 1153:_rd_kafkamodule_init(void) -: 1154:{ -: 1155:#if PY_MAJOR_VERSION >= 3 -: 1156: PyObject *mod = PyModule_Create(&moduledef); -: 1157:#else 1: 1158: PyObject *mod = Py_InitModule(module_name, pyrdk_methods); call 0 returned 1 -: 1159:#endif 1: 1160: if (mod == NULL) return NULL; branch 0 taken 1 (fallthrough) branch 1 taken 0 -: 1161: -: 1162: /* Callback logging requires the GIL */ 1: 1163: PyEval_InitThreads(); call 0 returned 1 -: 1164: 1: 1165: PyObject *logging = PyImport_ImportModule("logging"); call 0 returned 1 1: 1166: if (! logging) return NULL; branch 0 taken 1 (fallthrough) branch 1 taken 0 1: 1167: logger = PyObject_CallMethod(logging, "getLogger", "s", module_name); call 0 returned 1 1: 1168: Py_DECREF(logging); branch 0 taken 0 (fallthrough) branch 1 taken 1 call 2 never executed 1: 1169: if (! logger) return NULL; branch 0 taken 1 (fallthrough) branch 1 taken 0 -: 1170: 1: 1171: pykafka_exceptions = PyImport_ImportModule("pykafka.exceptions"); call 0 returned 1 1: 1172: if (! pykafka_exceptions) return NULL; branch 0 taken 1 (fallthrough) branch 1 taken 0 -: 1173: 1: 1174: PyObject *pykafka_protocol = PyImport_ImportModule("pykafka.protocol"); call 0 returned 1 1: 1175: if (! pykafka_protocol) return NULL; branch 0 taken 1 (fallthrough) branch 1 taken 0 1: 1176: Message = PyObject_GetAttrString(pykafka_protocol, "Message"); call 0 returned 1 1: 1177: Py_DECREF(pykafka_protocol); branch 0 taken 0 (fallthrough) branch 1 taken 1 call 2 never executed 1: 1178: if (! Message) return NULL; branch 0 taken 1 (fallthrough) branch 1 taken 0 -: 1179: 1: 1180: if (PyType_Ready(&ProducerType)) return NULL; call 0 returned 1 branch 1 taken 1 (fallthrough) branch 2 taken 0 1: 1181: Py_INCREF(&ProducerType); 1: 1182: if (PyModule_AddObject(mod, "Producer", (PyObject *)&ProducerType)) { call 0 returned 1 branch 1 taken 1 (fallthrough) branch 2 taken 0 -: 1183: return NULL; -: 1184: } -: 1185: 1: 1186: if (PyType_Ready(&ConsumerType)) return NULL; call 0 returned 1 branch 1 taken 1 (fallthrough) branch 2 taken 0 1: 1187: Py_INCREF(&ConsumerType); 1: 1188: if (PyModule_AddObject(mod, "Consumer", (PyObject *)&ConsumerType)) { call 0 returned 1 branch 1 taken 1 (fallthrough) branch 2 taken 0 -: 1189: return NULL; -: 1190: } -: 1191: 1: 1192: return mod; -: 1193:} -: 1194: -: 1195: -: 1196:#if PY_MAJOR_VERSION >= 3 -: 1197: PyMODINIT_FUNC -: 1198: PyInit__rd_kafka(void) -: 1199: { -: 1200: return _rd_kafkamodule_init(); -: 1201: } -: 1202:#else -: 1203: PyMODINIT_FUNC function init_rd_kafka called 1 returned 100% blocks executed 100% 1: 1204: init_rd_kafka(void) -: 1205: { 1: 1206: _rd_kafkamodule_init(); call 0 returned 1 1: 1207: } -: 1208:#endif <<<<<< EOF