TRAVIS_OS_NAME=linux
TRAVIS_PYTHON_VERSION=3.4
<<<<<< ENV
.coveragerc
.gitignore
.travis.yml
CHANGES.rst
CODE_OF_CONDUCT.md
CONTRIBUTING.rst
ISSUE_TEMPLATE.rst
LICENSE
MANIFEST.in
Makefile
README.rst
add-license.sh
benchmark/simple_consumer_bench.py
benchmark/simple_consumer_bench_analyse.py
doc/.gitignore
doc/Makefile
doc/api/balancedconsumer.rst
doc/api/broker.rst
doc/api/client.rst
doc/api/cluster.rst
doc/api/common.rst
doc/api/connection.rst
doc/api/exceptions.rst
doc/api/handlers.rst
doc/api/managedbalancedconsumer.rst
doc/api/membershipprotocol.rst
doc/api/partition.rst
doc/api/partitioners.rst
doc/api/producer.rst
doc/api/protocol.rst
doc/api/sasl_authenticators.rst
doc/api/simpleconsumer.rst
doc/api/topic.rst
doc/conf.py
doc/index.rst
doc/introduction.rst
doc/usage.rst
doc/utils/compression.rst
doc/utils/error_handlers.rst
doc/utils/socket.rst
doc/utils/struct_helpers.rst
publish.sh
pykafka/__init__.py
pykafka/balancedconsumer.py
pykafka/broker.py
pykafka/cli/__init__.py
pykafka/cli/__main__.py
pykafka/cli/kafka_tools.py
pykafka/client.py
pykafka/cluster.py
pykafka/common.py
pykafka/connection.py
pykafka/exceptions.py
pykafka/handlers.py
pykafka/managedbalancedconsumer.py
pykafka/membershipprotocol.py
pykafka/partition.py
pykafka/partitioners.py
pykafka/producer.py
pykafka/protocol/__init__.py
pykafka/protocol/admin.py
pykafka/protocol/base.py
pykafka/protocol/fetch.py
pykafka/protocol/group_membership.py
pykafka/protocol/message.py
pykafka/protocol/metadata.py
pykafka/protocol/offset.py
pykafka/protocol/offset_commit.py
pykafka/protocol/produce.py
pykafka/protocol/sasl.py
pykafka/rdkafka/__init__.py
pykafka/rdkafka/_rd_kafkamodule.c
pykafka/rdkafka/helpers.py
pykafka/rdkafka/producer.py
pykafka/rdkafka/simple_consumer.py
pykafka/sasl_authenticators.py
pykafka/simpleconsumer.py
pykafka/test/__init__.py
pykafka/test/kafka_instance.py
pykafka/test/utils.py
pykafka/topic.py
pykafka/utils/__init__.py
pykafka/utils/compat.py
pykafka/utils/compression.py
pykafka/utils/error_handlers.py
pykafka/utils/socket.py
pykafka/utils/struct_helpers.py
setup.cfg
setup.py
test-requirements.txt
tests/__init__.py
tests/conftest.py
tests/debug-logging.conf
tests/pykafka/__init__.py
tests/pykafka/rdkafka/__init__.py
tests/pykafka/rdkafka/test_rd_kafka_consumer.py
tests/pykafka/rdkafka/test_sasl.py
tests/pykafka/rdkafka/test_simple_consumer.py
tests/pykafka/rdkafka/test_ssl.py
tests/pykafka/test_balancedconsumer.py
tests/pykafka/test_cluster.py
tests/pykafka/test_connection.py
tests/pykafka/test_partition.py
tests/pykafka/test_partitioners.py
tests/pykafka/test_producer.py
tests/pykafka/test_protocol.py
tests/pykafka/test_sasl.py
tests/pykafka/test_simpleconsumer.py
tests/pykafka/test_ssl.py
tests/pykafka/utils/__init__.py
tests/pykafka/utils/test_compression.py
tests/pykafka/utils/test_struct_helpers.py
tox.ini
<<<<<< network
# path=/home/travis/build/Parsely/pykafka/coverage.xml
/home/travis/build/Parsely/pykafka/pykafka
<<<<<< EOF
# path=/home/travis/build/Parsely/pykafka/pykafka#rdkafka#_rd_kafkamodule.c.gcov
-: 0:Source:pykafka/rdkafka/_rd_kafkamodule.c
-: 0:Graph:./build/temp.linux-x86_64-3.4/pykafka/rdkafka/_rd_kafkamodule.gcno
-: 0:Data:./build/temp.linux-x86_64-3.4/pykafka/rdkafka/_rd_kafkamodule.gcda
-: 0:Runs:1
-: 0:Programs:1
-: 1:#define PY_SSIZE_T_CLEAN
-: 2:
-: 3:#include
-: 4:#include
-: 5:#include
-: 6:
-: 7:#include
-: 8:#include
-: 9:#include
-: 10:
-: 11:#include
-: 12:
-: 13:
-: 14:static PyObject *pykafka_exceptions; /* ~ import pykafka.exceptions */
-: 15:static PyObject *Message; /* ~ from pykafka.protocol import Message */
-: 16:
-: 17:static PyObject *logger; /* ~ logging.getLogger */
-: 18:
-: 19:
-: 20:/**
-: 21: * Logging
-: 22: */
-: 23:
-: 24:
-: 25:/* If used as librdkafka log_cb, passes log messages into python logging */
-: 26:static void
function logging_callback called 3 returned 100% blocks executed 67%
3: 27:logging_callback(const rd_kafka_t *rk,
-: 28: int level,
-: 29: const char *fac,
-: 30: const char *buf)
-: 31:{
-: 32: /* Map syslog levels to python logging levels */
3: 33: char *lvl = NULL;
3: 34: if (level == LOG_DEBUG) lvl = "debug";
branch 0 taken 3 (fallthrough)
branch 1 taken 0
3: 35: else if (level == LOG_INFO || level == LOG_NOTICE) lvl = "info";
branch 0 taken 0 (fallthrough)
branch 1 taken 3
#####: 36: else if (level == LOG_WARNING) lvl = "warning";
branch 0 never executed
branch 1 never executed
#####: 37: else if (level == LOG_ERR) lvl = "error";
branch 0 never executed
branch 1 never executed
#####: 38: else lvl = "critical";
-: 39:
-: 40: /* Grab the GIL, as rdkafka callbacks may come from non-python threads */
3: 41: PyGILState_STATE gstate = PyGILState_Ensure();
call 0 returned 3
-: 42:
-: 43: /* NB librdkafka docs say that rk may be NULL, so check that */
-: 44: /* NB2 because we hold the GIL we don't need the handle's rwlock */
3: 45: const char *rk_name = rk ? rd_kafka_name(rk) : "rk_handle null";
branch 0 taken 3 (fallthrough)
branch 1 taken 0
call 2 returned 3
3: 46: const char *format = "%s [%s] %s"; /* format rk_name + fac + buf */
-: 47:
3: 48: PyObject *res = PyObject_CallMethod(
call 0 returned 3
-: 49: logger, lvl, "ssss", format, rk_name, fac, buf);
-: 50: /* Any errors here we'll just have to swallow: we're probably on some
-: 51: background thread, and we can't log either (logging just failed!) */
3: 52: if (! res) PyErr_Clear();
branch 0 taken 0 (fallthrough)
branch 1 taken 3
call 2 never executed
3: 53: else Py_DECREF(res);
branch 0 taken 0 (fallthrough)
branch 1 taken 3
call 2 never executed
-: 54:
3: 55: PyGILState_Release(gstate);
call 0 returned 3
3: 56:}
-: 57:
-: 58:
-: 59:/**
-: 60: * Exception helpers
-: 61: */
-: 62:
-: 63:
-: 64:/* Raise an exception from pykafka.exceptions (always returns NULL, to allow
-: 65: * shorthand `return set_pykafka_error("Exception", "error message")`) */
-: 66:static PyObject *
function set_pykafka_error called 13 returned 100% blocks executed 83%
13: 67:set_pykafka_error(const char *err_name, const char *err_msg)
-: 68:{
13: 69: PyObject *error = PyObject_GetAttrString(pykafka_exceptions, err_name);
call 0 returned 13
13: 70: if (! error) return NULL;
branch 0 taken 13 (fallthrough)
branch 1 taken 0
13: 71: PyErr_SetString(error, err_msg);
call 0 returned 13
13: 72: Py_DECREF(error);
branch 0 taken 0 (fallthrough)
branch 1 taken 13
call 2 never executed
-: 73: return NULL;
-: 74:}
-: 75:
-: 76:
-: 77:/* Given an error code, return most suitable class from pykafka.exceptions */
-: 78:static PyObject *
function find_pykafka_error called 1 returned 100% blocks executed 73%
1: 79:find_pykafka_error(rd_kafka_resp_err_t err)
-: 80:{
1: 81: PyObject *error_codes = NULL;
1: 82: PyObject *errcode = NULL;
1: 83: PyObject *Exc = NULL;
-: 84:
-: 85: /* See if there's a standard Kafka error for this */
1: 86: error_codes = PyObject_GetAttrString(pykafka_exceptions, "ERROR_CODES");
call 0 returned 1
1: 87: if (! error_codes) goto cleanup;
branch 0 taken 1 (fallthrough)
branch 1 taken 0
1: 88: errcode = PyLong_FromLong(err);
call 0 returned 1
1: 89: if (! errcode) goto cleanup;
branch 0 taken 1 (fallthrough)
branch 1 taken 0
1: 90: Exc = PyObject_GetItem(error_codes, errcode);
call 0 returned 1
-: 91:
1: 92: if (! Exc) { /* raise a generic exception instead */
branch 0 taken 0 (fallthrough)
branch 1 taken 1
#####: 93: PyErr_Clear();
call 0 never executed
#####: 94: Exc = PyObject_GetAttrString(pykafka_exceptions, "RdKafkaException");
call 0 never executed
-: 95: }
-: 96:cleanup:
1: 97: Py_XDECREF(error_codes);
branch 0 taken 1 (fallthrough)
branch 1 taken 0
branch 2 taken 0 (fallthrough)
branch 3 taken 1
call 4 never executed
1: 98: Py_XDECREF(errcode);
branch 0 taken 1 (fallthrough)
branch 1 taken 0
branch 2 taken 0 (fallthrough)
branch 3 taken 1
call 4 never executed
1: 99: return Exc;
-: 100:}
-: 101:
-: 102:
-: 103:/* Given an error code, set a suitable exception; or, if return_error is not
-: 104: * NULL, pass the exception instance back through return_error instead */
-: 105:static void
function set_pykafka_error_from_code called 1 returned 100% blocks executed 81%
1: 106:set_pykafka_error_from_code(rd_kafka_resp_err_t err, PyObject **return_error)
-: 107:{
1: 108: PyObject *error = NULL;
1: 109: PyObject *err_args = NULL;
-: 110:
1: 111: error = find_pykafka_error(err);
call 0 returned 1
1: 112: if (! error) goto cleanup;
branch 0 taken 1 (fallthrough)
branch 1 taken 0
1: 113: err_args = Py_BuildValue("ls", (long)err, rd_kafka_err2str(err));
call 0 returned 1
call 1 returned 1
1: 114: if (! err_args) goto cleanup;
branch 0 taken 1 (fallthrough)
branch 1 taken 0
-: 115:
1: 116: if (! return_error) PyErr_SetObject(error, err_args);
branch 0 taken 0 (fallthrough)
branch 1 taken 1
call 2 never executed
1: 117: else (*return_error) = PyObject_CallObject(error, err_args);
call 0 returned 1
-: 118:cleanup:
1: 119: Py_XDECREF(error);
branch 0 taken 1 (fallthrough)
branch 1 taken 0
branch 2 taken 0 (fallthrough)
branch 3 taken 1
call 4 never executed
1: 120: Py_XDECREF(err_args);
branch 0 taken 1 (fallthrough)
branch 1 taken 0
branch 2 taken 0 (fallthrough)
branch 3 taken 1
call 4 never executed
1: 121:}
-: 122:
-: 123:
-: 124:/**
-: 125: * Shared bits of Producer and Consumer types
-: 126: */
-: 127:
-: 128:
-: 129:/* Note that with this RdkHandle, we hold a separate rd_kafka_t handle for each
-: 130: * rd_kafka_topic_t, whereas librdkafka would allow sharing the same rd_kafka_t
-: 131: * handle between many topic handles, which would be far more efficient. The
-: 132: * problem with that is that it would require the same rd_kafka_conf_t settings
-: 133: * across all class instances sharing a handle, which is somewhat incompatible
-: 134: * with the current pykafka API.
-: 135: *
-: 136: * We need a pthread rwlock here, because in many methods we release the GIL
-: 137: * (this for various reasons - one key reason is that it prevents us
-: 138: * deadlocking if we do blocking calls into librdkafka and then get callbacks
-: 139: * out of librdkafka, and the callbacks would try to grab the GIL). Once we
-: 140: * release the GIL however, there may be other threads calling RdkHandle_stop
-: 141: * (which in pykafka can happen on any thread). The rule here then is that
-: 142: * RdkHandle_stop needs to take out an exclusive lock (wrlock), whereas most
-: 143: * other calls are safe when taking out a shared lock (rdlock). */
-: 144:typedef struct {
-: 145: PyObject_HEAD
-: 146: pthread_rwlock_t rwlock;
-: 147: rd_kafka_t *rdk_handle;
-: 148: rd_kafka_conf_t *rdk_conf;
-: 149: rd_kafka_topic_t *rdk_topic_handle;
-: 150: rd_kafka_topic_conf_t *rdk_topic_conf;
-: 151:
-: 152: /* Consumer-specific fields */
-: 153: rd_kafka_queue_t *rdk_queue_handle;
-: 154: PyObject *partition_ids;
-: 155:} RdkHandle;
-: 156:
-: 157:
-: 158:/* Only for inspection; if you'd manipulate these from outside the module, we
-: 159: * have no error checking in place to protect from ensuing mayhem */
-: 160:static PyMemberDef RdkHandle_members[] = {
-: 161: {"_partition_ids", T_OBJECT_EX, offsetof(RdkHandle, partition_ids),
-: 162: READONLY, "Partitions fetched from by this consumer"},
-: 163: {NULL}
-: 164:};
-: 165:
-: 166:
-: 167:static PyObject *
function RdkHandle_new called 72 returned 100% blocks executed 63%
72: 168:RdkHandle_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
-: 169:{
72: 170: PyObject *self = PyType_GenericNew(type, args, kwds);
call 0 returned 72
72: 171: if (self) {
branch 0 taken 72 (fallthrough)
branch 1 taken 0
72: 172: int res = pthread_rwlock_init(&((RdkHandle *)self)->rwlock, NULL);
call 0 returned 72
72: 173: if (res) {
branch 0 taken 0 (fallthrough)
branch 1 taken 72
#####: 174: Py_DECREF(self);
branch 0 never executed
branch 1 never executed
call 2 never executed
#####: 175: return set_pykafka_error("RdKafkaException", "Failed rwlock init");
call 0 never executed
-: 176: }
-: 177: }
-: 178: return self;
-: 179:}
-: 180:
-: 181:
-: 182:/* Release RdkHandle.rwlock (see RdkHandle docstring) */
-: 183:static int
function RdkHandle_unlock called 6948 returned 100% blocks executed 60%
6948: 184:RdkHandle_unlock(RdkHandle *self)
-: 185:{
6948: 186: if (pthread_rwlock_unlock(&self->rwlock)) {
call 0 returned 6948
branch 1 taken 0 (fallthrough)
branch 2 taken 6948
#####: 187: set_pykafka_error("RdKafkaException", "Failed to release rwlock");
call 0 never executed
#####: 188: return -1;
-: 189: }
-: 190: return 0;
-: 191:}
-: 192:
-: 193:
-: 194:/* Get shared lock and optionally check handle is running. Returns non-zero
-: 195: * if error has been set.
-: 196: *
-: 197: * Should be used by any method that accesses an RdkHandle, to prevent the
-: 198: * handle being concurrently destroyed on another thread (by calling
-: 199: * RdkHandle_stop). See also RdkHandle docstring. */
-: 200:static int
function RdkHandle_safe_lock called 6641 returned 100% blocks executed 100%
6641: 201:RdkHandle_safe_lock(RdkHandle *self, int check_running)
-: 202:{
-: 203: int res;
6641: 204: Py_BEGIN_ALLOW_THREADS
call 0 returned 6641
6641: 205: res = pthread_rwlock_rdlock(&self->rwlock);
call 0 returned 6641
6641: 206: Py_END_ALLOW_THREADS
call 0 returned 6641
6641: 207: if (res) {
branch 0 taken 1 (fallthrough)
branch 1 taken 6640
1: 208: set_pykafka_error("RdKafkaException", "Failed to get shared lock");
call 0 returned 1
1: 209: return -1;
-: 210: }
6640: 211: if (check_running && !self->rdk_handle) {
branch 0 taken 6422 (fallthrough)
branch 1 taken 218
branch 2 taken 10 (fallthrough)
branch 3 taken 6412
10: 212: set_pykafka_error("RdKafkaStoppedException", "");
call 0 returned 10
10: 213: RdkHandle_unlock(self);
call 0 returned 10
10: 214: return -1;
-: 215: }
-: 216: return 0;
-: 217:}
-: 218:
-: 219:
-: 220:/* Get exclusive lock on handle. Returns non-zero if error has been set.
-: 221: *
-: 222: * Should be used by any method that might render accessing RdkHandle members
-: 223: * unsafe (in particular, RdkHandle_stop). See also RdkHandle docstring. */
-: 224:static int
function RdkHandle_excl_lock called 317 returned 100% blocks executed 71%
317: 225:RdkHandle_excl_lock(RdkHandle *self)
-: 226:{
-: 227: int res;
317: 228: Py_BEGIN_ALLOW_THREADS
call 0 returned 317
317: 229: res = pthread_rwlock_wrlock(&self->rwlock);
call 0 returned 317
317: 230: Py_END_ALLOW_THREADS
call 0 returned 317
317: 231: if (res) {
branch 0 taken 0 (fallthrough)
branch 1 taken 317
#####: 232: set_pykafka_error("RdKafkaException", "Failed to get exclusive lock");
call 0 never executed
#####: 233: return -1;
-: 234: }
-: 235: return 0;
-: 236:}
-: 237:
-: 238:
-: 239:PyDoc_STRVAR(RdkHandle_outq_len__doc__,
-: 240: "outq_len(self) -> int\n"
-: 241: "\n"
-: 242: "Number of messages pending shipping to a broker.");
-: 243:static PyObject *
function RdkHandle_outq_len called 25 returned 100% blocks executed 100%
25: 244:RdkHandle_outq_len(RdkHandle *self)
-: 245:{
25: 246: if (RdkHandle_safe_lock(self, /* check_running= */ 1)) return NULL;
call 0 returned 25
branch 1 taken 25 (fallthrough)
branch 2 taken 0
-: 247:
25: 248: int outq_len = -1;
25: 249: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 25
25: 250: outq_len = rd_kafka_outq_len(self->rdk_handle);
call 0 returned 25
25: 251: Py_END_ALLOW_THREADS
call 0 returned 25
-: 252:
25: 253: if (RdkHandle_unlock(self)) return NULL;
call 0 returned 25
branch 1 taken 25 (fallthrough)
branch 2 taken 0
25: 254: return Py_BuildValue("i", outq_len);
call 0 returned 25
-: 255:}
-: 256:
-: 257:
-: 258:PyDoc_STRVAR(RdkHandle_poll__doc__,
-: 259: "poll(self, timeout_ms) -> int\n"
-: 260: "\n"
-: 261: "Poll the handle for events (in particular delivery callbacks) and\n"
-: 262: "return the total number of callbacks triggered.");
-: 263:static PyObject *
function RdkHandle_poll called 1110 returned 99% blocks executed 100%
1110: 264:RdkHandle_poll(RdkHandle *self, PyObject *args, PyObject *kwds)
-: 265:{
1110: 266: char *keywords[] = {"timeout_ms", NULL};
1110: 267: int timeout_ms = 0;
1110: 268: if (! PyArg_ParseTupleAndKeywords(args, kwds, "i", keywords, &timeout_ms)) {
call 0 returned 1110
branch 1 taken 1110 (fallthrough)
branch 2 taken 0
-: 269: return NULL;
-: 270: }
-: 271:
1110: 272: if (RdkHandle_safe_lock(self, /* check_running= */ 1)) return NULL;
call 0 returned 1110
branch 1 taken 1101 (fallthrough)
branch 2 taken 9
-: 273:
1101: 274: int n_events = 0;
1101: 275: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 1101
1101: 276: n_events = rd_kafka_poll(self->rdk_handle, timeout_ms);
call 0 returned 1101
1101: 277: Py_END_ALLOW_THREADS
call 0 returned 1093
-: 278:
1093: 279: if (RdkHandle_unlock(self)) return NULL;
call 0 returned 1093
branch 1 taken 1093 (fallthrough)
branch 2 taken 0
1093: 280: return Py_BuildValue("i", n_events);
call 0 returned 1093
-: 281:}
-: 282:
-: 283:
-: 284:PyDoc_STRVAR(RdkHandle_stop__doc__,
-: 285: "stop(self)\n"
-: 286: "\n"
-: 287: "Shut down the librdkafka handle and clean up. This may block until\n"
-: 288: "other methods accessing the same handle concurrently have finished.\n");
-: 289:static PyObject *
function RdkHandle_stop called 130 returned 100% blocks executed 83%
130: 290:RdkHandle_stop(RdkHandle *self)
-: 291:{
-: 292: /* We'll only ever get a locking error if we programmed ourselves into a
-: 293: * deadlock. We'd have to admit defeat, abort, and leak this RdkHandle */
130: 294: if (RdkHandle_excl_lock(self)) return NULL;
call 0 returned 130
branch 1 taken 130 (fallthrough)
branch 2 taken 0
-: 295:
130: 296: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 130
130: 297: if (self->rdk_queue_handle) {
branch 0 taken 38 (fallthrough)
branch 1 taken 92
38: 298: rd_kafka_queue_destroy(self->rdk_queue_handle);
call 0 returned 38
38: 299: self->rdk_queue_handle = NULL;
-: 300: }
130: 301: if (self->rdk_topic_handle) {
branch 0 taken 62 (fallthrough)
branch 1 taken 68
62: 302: rd_kafka_topic_destroy(self->rdk_topic_handle);
call 0 returned 62
62: 303: self->rdk_topic_handle = NULL;
-: 304: }
130: 305: if (self->rdk_handle) {
branch 0 taken 62 (fallthrough)
branch 1 taken 68
62: 306: PyObject *opaque = (PyObject *)rd_kafka_opaque(self->rdk_handle);
call 0 returned 62
62: 307: Py_XDECREF(opaque);
branch 0 taken 24 (fallthrough)
branch 1 taken 38
branch 2 taken 0 (fallthrough)
branch 3 taken 24
call 4 never executed
62: 308: rd_kafka_destroy(self->rdk_handle);
call 0 returned 62
62: 309: self->rdk_handle = NULL;
-: 310: }
130: 311: if (self->rdk_conf) {
branch 0 taken 0 (fallthrough)
branch 1 taken 130
#####: 312: rd_kafka_conf_destroy(self->rdk_conf);
call 0 never executed
#####: 313: self->rdk_conf = NULL;
-: 314: }
130: 315: if (self->rdk_topic_conf) {
branch 0 taken 0 (fallthrough)
branch 1 taken 130
#####: 316: rd_kafka_topic_conf_destroy(self->rdk_topic_conf);
call 0 never executed
#####: 317: self->rdk_topic_conf = NULL;
-: 318: }
130: 319: Py_END_ALLOW_THREADS
call 0 returned 130
-: 320:
130: 321: Py_CLEAR(self->partition_ids);
branch 0 taken 38 (fallthrough)
branch 1 taken 92
branch 2 taken 38 (fallthrough)
branch 3 taken 0
call 4 returned 38
-: 322:
130: 323: if (RdkHandle_unlock(self)) return NULL;
call 0 returned 130
branch 1 taken 130 (fallthrough)
branch 2 taken 0
130: 324: Py_INCREF(Py_None);
130: 325: return Py_None;
-: 326:}
-: 327:
-: 328:
-: 329:static void
function RdkHandle_dealloc called 64 returned 100% blocks executed 75%
64: 330:RdkHandle_dealloc(PyObject *self, PyObject *(*stop_func) (RdkHandle *))
-: 331:{
64: 332: PyObject *stop_result = stop_func((RdkHandle *)self);
call 0 returned 64
64: 333: if (!stop_result) {
branch 0 taken 1 (fallthrough)
branch 1 taken 63
-: 334: /* We'll swallow the exception, so let's try to log info first */
1: 335: PyObject *res = PyObject_CallMethod(
call 0 returned 1
-: 336: logger, "exception", "s", "In dealloc: stop() failed.");
1: 337: PyErr_Clear();
call 0 returned 1
1: 338: Py_XDECREF(res);
branch 0 taken 0 (fallthrough)
branch 1 taken 1
branch 2 never executed
branch 3 never executed
call 4 never executed
-: 339: } else {
63: 340: Py_DECREF(stop_result);
branch 0 taken 0 (fallthrough)
branch 1 taken 63
call 2 never executed
-: 341: }
64: 342: pthread_rwlock_destroy(&((RdkHandle *)self)->rwlock);
call 0 returned 64
64: 343: self->ob_type->tp_free(self);
call 0 returned 64
64: 344:}
-: 345:
-: 346:
-: 347:PyDoc_STRVAR(RdkHandle_configure__doc__,
-: 348: "configure(self, conf) OR configure(self, topic_conf)\n"
-: 349: "\n"
-: 350: "Set up and populate the rd_kafka_(topic_)conf_t. Somewhat inelegantly\n"
-: 351: "(for the benefit of code reuse, whilst avoiding some harrowing partial\n"
-: 352: "binding for C functions) this requires that you call it twice, once\n"
-: 353: "with a `conf` list only, and again with `topic_conf` only.\n"
-: 354: "\n"
-: 355: "Repeated calls work incrementally; you can wipe configuration completely\n"
-: 356: "by calling Consumer_stop()\n");
-: 357:static PyObject *
function RdkHandle_configure called 140 returned 100% blocks executed 88%
140: 358:RdkHandle_configure(RdkHandle *self, PyObject *args, PyObject *kwds)
-: 359:{
140: 360: char *keywords[] = {"conf", "topic_conf", NULL};
140: 361: PyObject *conf = NULL;
140: 362: PyObject *topic_conf = NULL;
140: 363: if (! PyArg_ParseTupleAndKeywords(args,
call 0 returned 140
branch 1 taken 140 (fallthrough)
branch 2 taken 0
-: 364: kwds,
-: 365: "|OO",
-: 366: keywords,
-: 367: &conf,
-: 368: &topic_conf)) {
-: 369: return NULL;
-: 370: }
-: 371:
140: 372: if (RdkHandle_safe_lock(self, /* check_running= */ 0)) return NULL;
call 0 returned 140
branch 1 taken 140 (fallthrough)
branch 2 taken 0
140: 373: if ((conf && topic_conf) || (!conf && !topic_conf)) {
branch 0 taken 70 (fallthrough)
branch 1 taken 70
branch 2 taken 70 (fallthrough)
branch 3 taken 0
branch 4 taken 70 (fallthrough)
branch 5 taken 70
branch 6 taken 0 (fallthrough)
branch 7 taken 70
#####: 374: return set_pykafka_error(
call 0 never executed
-: 375: "RdKafkaException",
-: 376: "You need to specify *either* `conf` *or* `topic_conf`.");
-: 377: }
140: 378: if (self->rdk_handle) {
branch 0 taken 0 (fallthrough)
branch 1 taken 140
#####: 379: return set_pykafka_error(
call 0 never executed
-: 380: "RdKafkaException",
-: 381: "Cannot configure: seems instance was started already?");
-: 382: }
-: 383:
140: 384: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 140
140: 385: if (! self->rdk_conf) {
branch 0 taken 70 (fallthrough)
branch 1 taken 70
70: 386: self->rdk_conf = rd_kafka_conf_new();
call 0 returned 70
70: 387: rd_kafka_conf_set_log_cb(self->rdk_conf, logging_callback);
call 0 returned 70
-: 388: }
140: 389: if (! self->rdk_topic_conf) {
branch 0 taken 70 (fallthrough)
branch 1 taken 70
70: 390: self->rdk_topic_conf = rd_kafka_topic_conf_new();
call 0 returned 70
-: 391: }
140: 392: Py_END_ALLOW_THREADS
call 0 returned 140
-: 393:
140: 394: PyObject *retval = Py_None;
140: 395: PyObject *conf_or_topic_conf = topic_conf ? topic_conf : conf;
branch 0 taken 70 (fallthrough)
branch 1 taken 70
140: 396: Py_ssize_t i, len = PyList_Size(conf_or_topic_conf);
1246: 397: for (i = 0; i != len; ++i) {
call 0 returned 140
branch 1 taken 966
branch 2 taken 140 (fallthrough)
966: 398: PyObject *conf_pair = PyList_GetItem(conf_or_topic_conf, i);
call 0 returned 966
966: 399: const char *name = NULL;
966: 400: const char *value = NULL;
966: 401: if (! PyArg_ParseTuple(conf_pair, "ss", &name, &value)) {
call 0 returned 966
branch 1 taken 966 (fallthrough)
branch 2 taken 0
-: 402: retval = NULL;
#####: 403: break;
-: 404: }
-: 405: char errstr[512];
-: 406: rd_kafka_conf_res_t res;
966: 407: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 966
966: 408: if (topic_conf) {
branch 0 taken 186 (fallthrough)
branch 1 taken 780
186: 409: res = rd_kafka_topic_conf_set(
call 0 returned 186
-: 410: self->rdk_topic_conf, name, value, errstr, sizeof(errstr));
-: 411: } else {
780: 412: res = rd_kafka_conf_set(
call 0 returned 780
-: 413: self->rdk_conf, name, value, errstr, sizeof(errstr));
-: 414: }
966: 415: Py_END_ALLOW_THREADS
call 0 returned 966
966: 416: if (res != RD_KAFKA_CONF_OK) {
branch 0 taken 0 (fallthrough)
branch 1 taken 966
#####: 417: retval = set_pykafka_error("RdKafkaException", errstr);
call 0 never executed
#####: 418: break;
-: 419: }
-: 420: }
-: 421:
140: 422: if (RdkHandle_unlock(self)) return NULL;
call 0 returned 140
branch 1 taken 140 (fallthrough)
branch 2 taken 0
140: 423: Py_XINCREF(retval);
branch 0 taken 140 (fallthrough)
branch 1 taken 0
-: 424: return retval;
-: 425:}
-: 426:
-: 427:
-: 428:/* Cleanup helper for *_start(), returns NULL to allow shorthand in use.
-: 429: * NB: assumes self->rwlock is held and releases it. */
-: 430:static PyObject *
function RdkHandle_start_fail called 0 returned 0% blocks executed 0%
#####: 431:RdkHandle_start_fail(RdkHandle *self, PyObject *(*stop_func) (RdkHandle *))
-: 432:{
-: 433: /* Something went wrong so we expect an exception has been set */
-: 434: PyObject *err_type, *err_value, *err_traceback;
#####: 435: PyErr_Fetch(&err_type, &err_value, &err_traceback);
call 0 never executed
-: 436:
#####: 437: RdkHandle_unlock(self);
call 0 never executed
#####: 438: PyObject *stop_result = stop_func(self);
call 0 never executed
-: 439:
-: 440: /* stop_func is likely to raise exceptions, as start was incomplete */
#####: 441: if (! stop_result) PyErr_Clear();
branch 0 never executed
branch 1 never executed
call 2 never executed
#####: 442: else Py_DECREF(stop_result);
branch 0 never executed
branch 1 never executed
call 2 never executed
-: 443:
#####: 444: PyErr_Restore(err_type, err_value, err_traceback);
call 0 never executed
#####: 445: return NULL;
-: 446:}
-: 447:
-: 448:
-: 449:/* Shared logic of Consumer_start and Producer_start */
-: 450:static PyObject *
function RdkHandle_start called 70 returned 100% blocks executed 66%
70: 451:RdkHandle_start(RdkHandle *self,
-: 452: rd_kafka_type_t rdk_type,
-: 453: const char *brokers,
-: 454: const char *topic_name)
-: 455:{
70: 456: if (RdkHandle_excl_lock(self)) return NULL;
call 0 returned 70
branch 1 taken 70 (fallthrough)
branch 2 taken 0
70: 457: if (self->rdk_handle) {
branch 0 taken 0 (fallthrough)
branch 1 taken 70
#####: 458: set_pykafka_error("RdKafkaException", "Already started!");
call 0 never executed
#####: 459: return RdkHandle_start_fail(self, RdkHandle_stop);
call 0 never executed
-: 460: }
-: 461:
-: 462: /* Configure and start rdk_handle */
-: 463: char errstr[512];
70: 464: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 70
70: 465: self->rdk_handle = rd_kafka_new(
call 0 returned 70
-: 466: rdk_type, self->rdk_conf, errstr, sizeof(errstr));
70: 467: self->rdk_conf = NULL; /* deallocated by rd_kafka_new() */
70: 468: Py_END_ALLOW_THREADS
call 0 returned 70
70: 469: if (! self->rdk_handle) {
branch 0 taken 0 (fallthrough)
branch 1 taken 70
#####: 470: set_pykafka_error("RdKafkaException", errstr);
call 0 never executed
#####: 471: return RdkHandle_start_fail(self, RdkHandle_stop);
call 0 never executed
-: 472: }
-: 473:
-: 474: /* Set brokers */
-: 475: int brokers_added;
70: 476: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 70
70: 477: brokers_added = rd_kafka_brokers_add(self->rdk_handle, brokers);
call 0 returned 70
70: 478: Py_END_ALLOW_THREADS
call 0 returned 70
70: 479: if (brokers_added == 0) {
branch 0 taken 0 (fallthrough)
branch 1 taken 70
#####: 480: set_pykafka_error("RdKafkaException", "adding brokers failed");
call 0 never executed
#####: 481: return RdkHandle_start_fail(self, RdkHandle_stop);
call 0 never executed
-: 482: }
-: 483:
-: 484: /* Configure and take out a topic handle */
70: 485: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 70
70: 486: self->rdk_topic_handle = rd_kafka_topic_new(self->rdk_handle,
call 0 returned 70
-: 487: topic_name,
-: 488: self->rdk_topic_conf);
70: 489: self->rdk_topic_conf = NULL; /* deallocated by rd_kafka_topic_new() */
70: 490: Py_END_ALLOW_THREADS
call 0 returned 70
70: 491: if (! self->rdk_topic_handle) {
branch 0 taken 0 (fallthrough)
branch 1 taken 70
#####: 492: set_pykafka_error_from_code(rd_kafka_errno2err(errno), NULL);
call 0 never executed
call 1 never executed
call 2 never executed
#####: 493: return RdkHandle_start_fail(self, RdkHandle_stop);
call 0 never executed
-: 494: }
-: 495:
70: 496: if (RdkHandle_unlock(self)) return NULL;
call 0 returned 70
branch 1 taken 70 (fallthrough)
branch 2 taken 0
70: 497: Py_INCREF(Py_None);
70: 498: return Py_None;
-: 499:}
-: 500:
-: 501:
-: 502:/**
-: 503: * Producer type
-: 504: */
-: 505:
-: 506:
-: 507:/* NB this doesn't check if RdkHandle_outq_len is zero, and generally assumes
-: 508: * the wrapping python class will take care of ensuring any such preconditions
-: 509: * for a clean termination */
-: 510:static void
function Producer_dealloc called 24 returned 100% blocks executed 100%
24: 511:Producer_dealloc(PyObject *self)
-: 512:{
24: 513: RdkHandle_dealloc(self, RdkHandle_stop);
call 0 returned 24
24: 514:}
-: 515:
-: 516:
-: 517:/* Helper function for Producer_delivery_report_callback: find an exception
-: 518: * corresponding to `err`, and send that into the report queue. Returns -1
-: 519: * on failure, or 0 on success */
-: 520:static int
function Producer_delivery_report_put called 45 returned 100% blocks executed 82%
45: 521:Producer_delivery_report_put(PyObject *put_func,
-: 522: PyObject *message,
-: 523: rd_kafka_resp_err_t err)
-: 524:{
45: 525: PyObject *exc = NULL;
45: 526: if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
branch 0 taken 44 (fallthrough)
branch 1 taken 1
44: 527: exc = Py_None;
44: 528: Py_INCREF(Py_None);
-: 529: } else {
1: 530: set_pykafka_error_from_code(err, &exc);
call 0 returned 1
1: 531: if (! exc) return -1;
branch 0 taken 1 (fallthrough)
branch 1 taken 0
-: 532: }
45: 533: PyObject *res = PyObject_CallFunctionObjArgs(put_func, message, exc, NULL);
call 0 returned 45
-: 534:
45: 535: Py_DECREF(exc);
branch 0 taken 0 (fallthrough)
branch 1 taken 45
call 2 never executed
45: 536: if (! res) return -1;
branch 0 taken 45 (fallthrough)
branch 1 taken 0
45: 537: else Py_DECREF(res);
branch 0 taken 0 (fallthrough)
branch 1 taken 45
call 2 never executed
-: 538: return 0;
-: 539:}
-: 540:
-: 541:
-: 542:/* Callback to be used as librdkafka dr_msg_cb. Note that this must always
-: 543: * be configured to run, even if we do not care about delivery reporting,
-: 544: * because we keep the Message objects that we produce from alive until this
-: 545: * callback releases them, in order to avoid having to copy the payload */
-: 546:static void
function Producer_delivery_report_callback called 44 returned 100% blocks executed 75%
44: 547:Producer_delivery_report_callback(rd_kafka_t *rk,
-: 548: const rd_kafka_message_t *rkmessage,
-: 549: void *opaque)
-: 550:{
44: 551: PyGILState_STATE gstate = PyGILState_Ensure();
call 0 returned 44
-: 552:
-: 553: /* Producer_produce sent *Message as msg_opaque == rkmessage->_private */
44: 554: PyObject *message = (PyObject *)rkmessage->_private;
44: 555: PyObject *put_func = (PyObject *)opaque;
44: 556: if (rkmessage->offset != -1) {
branch 0 taken 44 (fallthrough)
branch 1 taken 0
44: 557: PyObject* offset = PyLong_FromUnsignedLongLong(rkmessage->offset);
call 0 returned 44
44: 558: PyObject_SetAttrString(message, "offset", offset);
call 0 returned 44
-: 559: }
44: 560: if (-1 == Producer_delivery_report_put(put_func, message, rkmessage->err)) {
call 0 returned 44
branch 1 taken 0 (fallthrough)
branch 2 taken 44
-: 561: /* Must swallow exception as this is a non-python callback */
#####: 562: PyObject *res = PyObject_CallMethod(
call 0 never executed
-: 563: logger, "exception", "s", "Failure in delivery callback");
#####: 564: Py_XDECREF(res);
branch 0 never executed
branch 1 never executed
branch 2 never executed
branch 3 never executed
call 4 never executed
#####: 565: PyErr_Clear();
call 0 never executed
-: 566: } else {
-: 567: // Set timestamp to the one created by librdkafka
-: 568: rd_kafka_timestamp_type_t timestamp_type;
44: 569: int64_t timestamp = rd_kafka_message_timestamp(rkmessage, ×tamp_type);
call 0 returned 44
44: 570: if (timestamp_type != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
branch 0 taken 44 (fallthrough)
branch 1 taken 0
44: 571: PyObject* timestamp_o = PyLong_FromUnsignedLongLong(timestamp);
call 0 returned 44
44: 572: PyObject_SetAttrString(message, "timestamp", timestamp_o);
call 0 returned 44
-: 573: }
-: 574: }
-: 575:
44: 576: Py_DECREF(message); /* We INCREF'd this in Producer_produce() */
branch 0 taken 2 (fallthrough)
branch 1 taken 42
call 2 returned 2
44: 577: PyGILState_Release(gstate);
call 0 returned 44
44: 578:}
-: 579:
-: 580:
-: 581:PyDoc_STRVAR(Producer_start__doc__,
-: 582: "start(self, brokers, topic_name, delivery_put)\n"
-: 583: "\n"
-: 584: "Starts the underlying rdkafka producer, given a broker connection\n"
-: 585: "string, a topic name, and a (bound) method that we can send delivery\n"
-: 586: "reports (as in `put(msg, exception)`). Configuration should have been\n"
-: 587: "provided through earlier calls to configure(). Note that following\n"
-: 588: "start, you _must_ ensure that poll() is called regularly, or delivery\n"
-: 589: "reports may pile up\n");
-: 590:static PyObject *
function Producer_start called 24 returned 100% blocks executed 73%
24: 591:Producer_start(RdkHandle *self, PyObject *args, PyObject *kwds)
-: 592:{
24: 593: if (RdkHandle_excl_lock(self)) return NULL;
call 0 returned 24
branch 1 taken 24 (fallthrough)
branch 2 taken 0
-: 594:
24: 595: char *keywords[] = {"brokers", "topic_name", "delivery_put", NULL};
24: 596: PyObject *brokers = NULL;
24: 597: PyObject *topic_name = NULL;
24: 598: PyObject *delivery_put = NULL;
24: 599: if (! PyArg_ParseTupleAndKeywords(
call 0 returned 24
branch 1 taken 24 (fallthrough)
branch 2 taken 0
-: 600: args, kwds, "SSO", keywords, &brokers, &topic_name, &delivery_put)) {
-: 601: goto failed;
-: 602: }
-: 603:
-: 604: /* Configure delivery-reporting */
24: 605: if (! self->rdk_conf) {
branch 0 taken 0 (fallthrough)
branch 1 taken 24
#####: 606: set_pykafka_error("RdKafkaException",
call 0 never executed
-: 607: "Please run configure() before starting.");
#####: 608: goto failed;
-: 609: }
24: 610: rd_kafka_conf_set_dr_msg_cb(self->rdk_conf,
call 0 returned 24
-: 611: Producer_delivery_report_callback);
24: 612: Py_INCREF(delivery_put);
24: 613: rd_kafka_conf_set_opaque(self->rdk_conf, delivery_put);
call 0 returned 24
-: 614:
24: 615: if (RdkHandle_unlock(self)) return NULL;
call 0 returned 24
branch 1 taken 24 (fallthrough)
branch 2 taken 0
24: 616: return RdkHandle_start(
call 0 returned 24
-: 617: self,
-: 618: RD_KAFKA_PRODUCER,
24: 619: PyBytes_AS_STRING(brokers),
24: 620: PyBytes_AS_STRING(topic_name));
-: 621:failed:
#####: 622: RdkHandle_unlock(self);
call 0 never executed
#####: 623: return NULL;
-: 624:}
-: 625:
-: 626:
-: 627:PyDoc_STRVAR(Producer_produce__doc__,
-: 628: "produce(self, message)\n"
-: 629: "\n"
-: 630: "Produces a `pykafka.protocol.Message` through librdkafka. Will keep\n"
-: 631: "a reference to the message internally until delivery is confirmed\n");
-: 632:static PyObject *
function Producer_produce called 47 returned 100% blocks executed 84%
47: 633:Producer_produce(RdkHandle *self, PyObject *message)
-: 634:{
47: 635: if (RdkHandle_safe_lock(self, /* check_running= */ 1)) return NULL;
call 0 returned 47
branch 1 taken 47 (fallthrough)
branch 2 taken 0
-: 636:
47: 637: PyObject *value = NULL;
47: 638: PyObject *partition_key = NULL;
47: 639: PyObject *partition_id = NULL;
-: 640:
-: 641: /* Keep message alive until the delivery-callback runs. Needed both
-: 642: * because we may want to put the message on a report queue when the
-: 643: * callback runs, and because we'll tell rd_kafka_produce() not to copy
-: 644: * the payload and it can safely use the raw Message bytes directly */
47: 645: Py_INCREF(message);
-: 646:
-: 647: /* Get pointers to raw Message contents */
47: 648: value = PyObject_GetAttrString(message, "value");
call 0 returned 47
47: 649: if (! value) goto failed;
branch 0 taken 46 (fallthrough)
branch 1 taken 1
46: 650: partition_key = PyObject_GetAttrString(message, "partition_key");
call 0 returned 46
46: 651: if (! partition_key) goto failed;
branch 0 taken 46 (fallthrough)
branch 1 taken 0
46: 652: partition_id = PyObject_GetAttrString(message, "partition_id");
call 0 returned 46
46: 653: if (! partition_id) goto failed;
branch 0 taken 46 (fallthrough)
branch 1 taken 0
-: 654:
46: 655: char *v = NULL;
46: 656: Py_ssize_t v_len = 0;
46: 657: if (value != Py_None) {
branch 0 taken 44 (fallthrough)
branch 1 taken 2
44: 658: v = PyBytes_AsString(value);
call 0 returned 44
44: 659: if (! v) goto failed;
branch 0 taken 44 (fallthrough)
branch 1 taken 0
44: 660: v_len = PyBytes_GET_SIZE(value);
-: 661: }
46: 662: char *pk = NULL;
46: 663: Py_ssize_t pk_len = 0;
46: 664: if (partition_key != Py_None) {
branch 0 taken 2 (fallthrough)
branch 1 taken 44
2: 665: pk = PyBytes_AsString(partition_key);
call 0 returned 2
2: 666: if (! pk) goto failed;
branch 0 taken 2 (fallthrough)
branch 1 taken 0
2: 667: pk_len = PyBytes_GET_SIZE(partition_key);
-: 668: }
46: 669: int32_t p_id = PyLong_AsLong(partition_id);
call 0 returned 46
46: 670: if (p_id == -1 && PyErr_Occurred()) goto failed;
branch 0 taken 0 (fallthrough)
branch 1 taken 46
call 2 never executed
branch 3 never executed
branch 4 never executed
-: 671:
46: 672: int res = 0;
46: 673: Py_BEGIN_ALLOW_THREADS
call 0 returned 46
46: 674: res = rd_kafka_produce(self->rdk_topic_handle,
call 0 returned 46
-: 675: p_id,
-: 676: 0, /* ie don't copy and don't dealloc v */
-: 677: v, v_len,
-: 678: pk, pk_len,
-: 679: (void *)message);
46: 680: Py_END_ALLOW_THREADS
call 0 returned 46
46: 681: if (res == -1) {
branch 0 taken 2 (fallthrough)
branch 1 taken 44
2: 682: rd_kafka_resp_err_t err = rd_kafka_errno2err(errno);
call 0 returned 2
call 1 returned 2
2: 683: if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
branch 0 taken 1 (fallthrough)
branch 1 taken 1
1: 684: set_pykafka_error("ProducerQueueFullError", "");
call 0 returned 1
1: 685: goto failed;
-: 686: } else {
-: 687: /* Any other errors should go through the report queue,
-: 688: * because that's where pykafka.Producer would put them */
1: 689: PyObject *put_func = (PyObject *)rd_kafka_opaque(self->rdk_handle);
call 0 returned 1
1: 690: if (-1 == Producer_delivery_report_put(put_func, message, err)) {
call 0 returned 1
branch 1 taken 1 (fallthrough)
branch 2 taken 0
-: 691: goto failed;
-: 692: }
-: 693: }
1: 694: Py_DECREF(message); /* There won't be a delivery-callback */
branch 0 taken 0 (fallthrough)
branch 1 taken 1
call 2 never executed
-: 695: }
-: 696:
45: 697: Py_DECREF(value);
branch 0 taken 0 (fallthrough)
branch 1 taken 45
call 2 never executed
45: 698: Py_DECREF(partition_key);
branch 0 taken 0 (fallthrough)
branch 1 taken 45
call 2 never executed
45: 699: Py_DECREF(partition_id);
branch 0 taken 0 (fallthrough)
branch 1 taken 45
call 2 never executed
45: 700: if (RdkHandle_unlock(self)) return NULL;
call 0 returned 45
branch 1 taken 45 (fallthrough)
branch 2 taken 0
-: 701:
45: 702: Py_INCREF(Py_None);
45: 703: return Py_None;
-: 704:failed:
2: 705: Py_XDECREF(value);
branch 0 taken 1 (fallthrough)
branch 1 taken 1
branch 2 taken 0 (fallthrough)
branch 3 taken 1
call 4 never executed
2: 706: Py_XDECREF(partition_key);
branch 0 taken 1 (fallthrough)
branch 1 taken 1
branch 2 taken 0 (fallthrough)
branch 3 taken 1
call 4 never executed
2: 707: Py_XDECREF(partition_id);
branch 0 taken 1 (fallthrough)
branch 1 taken 1
branch 2 taken 0 (fallthrough)
branch 3 taken 1
call 4 never executed
2: 708: RdkHandle_unlock(self);
call 0 returned 2
2: 709: return NULL;
-: 710:}
-: 711:
-: 712:
-: 713:static PyMethodDef Producer_methods[] = {
-: 714: {"produce", (PyCFunction)Producer_produce,
-: 715: METH_O, Producer_produce__doc__},
-: 716: {"stop", (PyCFunction)RdkHandle_stop,
-: 717: METH_NOARGS, RdkHandle_stop__doc__},
-: 718: {"configure", (PyCFunction)RdkHandle_configure,
-: 719: METH_VARARGS | METH_KEYWORDS, RdkHandle_configure__doc__},
-: 720: {"start", (PyCFunction)Producer_start,
-: 721: METH_VARARGS | METH_KEYWORDS, Producer_start__doc__},
-: 722: {"outq_len", (PyCFunction)RdkHandle_outq_len,
-: 723: METH_NOARGS, RdkHandle_outq_len__doc__},
-: 724: {"poll", (PyCFunction)RdkHandle_poll,
-: 725: METH_VARARGS | METH_KEYWORDS, RdkHandle_poll__doc__},
-: 726: {NULL, NULL, 0, NULL}
-: 727:};
-: 728:
-: 729:
-: 730:static PyTypeObject ProducerType = {
-: 731: PyVarObject_HEAD_INIT(NULL, 0)
-: 732: "pykafka.rd_kafka.Producer",
-: 733: sizeof(RdkHandle),
-: 734: 0, /* tp_itemsize */
-: 735: (destructor)Producer_dealloc, /* tp_dealloc */
-: 736: 0, /* tp_print */
-: 737: 0, /* tp_getattr */
-: 738: 0, /* tp_setattr */
-: 739: 0, /* tp_compare */
-: 740: 0, /* tp_repr */
-: 741: 0, /* tp_as_number */
-: 742: 0, /* tp_as_sequence */
-: 743: 0, /* tp_as_mapping */
-: 744: 0, /* tp_hash */
-: 745: 0, /* tp_call */
-: 746: 0, /* tp_str */
-: 747: 0, /* tp_getattro */
-: 748: 0, /* tp_setattro */
-: 749: 0, /* tp_as_buffer */
-: 750: Py_TPFLAGS_DEFAULT, /* tp_flags */
-: 751: 0, /* tp_doc */
-: 752: 0, /* tp_traverse */
-: 753: 0, /* tp_clear */
-: 754: 0, /* tp_richcompare */
-: 755: 0, /* tp_weaklistoffset */
-: 756: 0, /* tp_iter */
-: 757: 0, /* tp_iternext */
-: 758: Producer_methods, /* tp_methods */
-: 759: RdkHandle_members, /* tp_members */
-: 760: 0, /* tp_getset */
-: 761: 0, /* tp_base */
-: 762: 0, /* tp_dict */
-: 763: 0, /* tp_descr_get */
-: 764: 0, /* tp_descr_set */
-: 765: 0, /* tp_dictoffset */
-: 766: 0, /* tp_init */
-: 767: 0, /* tp_alloc */
-: 768: RdkHandle_new, /* tp_new */
-: 769:};
-: 770:
-: 771:
-: 772:/**
-: 773: * Consumer type
-: 774: */
-: 775:
-: 776:
-: 777:/* Destroy all internal state of the consumer */
-: 778:static PyObject *
function Consumer_stop called 79 returned 100% blocks executed 54%
79: 779:Consumer_stop(RdkHandle *self)
-: 780:{
79: 781: if (RdkHandle_safe_lock(self, /* check_running= */ 0)) return NULL;
call 0 returned 79
branch 1 taken 78 (fallthrough)
branch 2 taken 1
-: 782:
78: 783: int errored = 0;
78: 784: if (self->rdk_topic_handle && self->partition_ids) {
branch 0 taken 38 (fallthrough)
branch 1 taken 40
branch 2 taken 38 (fallthrough)
branch 3 taken 0
38: 785: Py_ssize_t i, len = PyList_Size(self->partition_ids);
137: 786: for (i = 0; i != len; ++i) {
call 0 returned 38
branch 1 taken 99
branch 2 taken 38 (fallthrough)
-: 787: /* Error handling here is a bit poor; we cannot bail out directly
-: 788: if we want to clean up as much as we can. */
99: 789: long part_id = PyLong_AsLong(
call 0 returned 99
call 1 returned 99
-: 790: PyList_GetItem(self->partition_ids, i));
99: 791: if (part_id == -1) {
branch 0 taken 0 (fallthrough)
branch 1 taken 99
#####: 792: errored += 1;
#####: 793: PyObject *log_res = PyObject_CallMethod(
call 0 never executed
-: 794: logger, "exception", "s", "In Consumer_stop:");
#####: 795: Py_XDECREF(log_res);
branch 0 never executed
branch 1 never executed
branch 2 never executed
branch 3 never executed
call 4 never executed
#####: 796: continue;
-: 797: }
-: 798: int res;
99: 799: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 99
99: 800: res = rd_kafka_consume_stop(self->rdk_topic_handle, part_id);
call 0 returned 99
99: 801: Py_END_ALLOW_THREADS
call 0 returned 99
99: 802: if (res == -1) {
branch 0 taken 0 (fallthrough)
branch 1 taken 99
#####: 803: set_pykafka_error_from_code(rd_kafka_errno2err(errno), NULL);
call 0 never executed
call 1 never executed
call 2 never executed
#####: 804: errored += 1;
#####: 805: PyObject *log_res = PyObject_CallMethod(
call 0 never executed
-: 806: logger, "exception", "sl",
-: 807: "Error in rd_kafka_consume_stop, part_id=%s",
-: 808: part_id);
#####: 809: Py_XDECREF(log_res);
branch 0 never executed
branch 1 never executed
branch 2 never executed
branch 3 never executed
call 4 never executed
#####: 810: continue;
-: 811: }
-: 812: }
-: 813: }
-: 814:
78: 815: RdkHandle_unlock(self);
call 0 returned 78
78: 816: PyObject *res = RdkHandle_stop(self);
call 0 returned 78
78: 817: if (errored) {
branch 0 taken 0 (fallthrough)
branch 1 taken 78
#####: 818: Py_XDECREF(res);
branch 0 never executed
branch 1 never executed
branch 2 never executed
branch 3 never executed
call 4 never executed
-: 819: return NULL;
-: 820: }
-: 821: return res;
-: 822:}
-: 823:
-: 824:
-: 825:static void
function Consumer_dealloc called 40 returned 100% blocks executed 100%
40: 826:Consumer_dealloc(PyObject *self)
-: 827:{
40: 828: RdkHandle_dealloc(self, Consumer_stop);
call 0 returned 40
40: 829:}
-: 830:
-: 831:
-: 832:/* Cleanup helper for Consumer_start.
-: 833: * NB: assumes self->rwlock is held and releases it. */
-: 834:static PyObject *
-: 835:Consumer_start_fail(RdkHandle *self)
-: 836:{
#####: 837: return RdkHandle_start_fail(self, Consumer_stop);
-: 838:}
-: 839:
-: 840:
-: 841:PyDoc_STRVAR(Consumer_start__doc__,
-: 842: "start(self, brokers, topic_name, partition_ids, start_offsets)\n"
-: 843: "\n"
-: 844: "Starts the underlying rdkafka consumer. Configuration should have been\n"
-: 845: "provided through earlier calls to configure(). Note that following\n"
-: 846: "start, you must ensure that poll() is called regularly, as required\n"
-: 847: "by librdkafka\n");
-: 848:static PyObject *
function Consumer_start called 47 returned 100% blocks executed 67%
47: 849:Consumer_start(RdkHandle *self, PyObject *args, PyObject *kwds)
-: 850:{
47: 851: char *keywords[] = {
-: 852: "brokers",
-: 853: "topic_name",
-: 854: "partition_ids",
-: 855: "start_offsets", /* same order as partition_ids */
-: 856: NULL};
47: 857: PyObject *brokers = NULL;
47: 858: PyObject *topic_name = NULL;
47: 859: PyObject *partition_ids = NULL;
47: 860: PyObject *start_offsets = NULL;
47: 861: if (! PyArg_ParseTupleAndKeywords(args,
call 0 returned 47
branch 1 taken 47 (fallthrough)
branch 2 taken 0
-: 862: kwds,
-: 863: "SSOO",
-: 864: keywords,
-: 865: &brokers,
-: 866: &topic_name,
-: 867: &partition_ids,
-: 868: &start_offsets)) {
-: 869: return NULL;
-: 870: }
-: 871:
-: 872: /* Set the application opaque NULL, just to make stop() simpler */
47: 873: if (RdkHandle_excl_lock(self)) return NULL;
call 0 returned 47
branch 1 taken 47 (fallthrough)
branch 2 taken 0
47: 874: if (! self->rdk_conf) {
branch 0 taken 1 (fallthrough)
branch 1 taken 46
1: 875: set_pykafka_error("RdKafkaException",
call 0 returned 1
-: 876: "Please run configure() before starting.");
1: 877: return NULL;
-: 878: }
46: 879: rd_kafka_conf_set_opaque(self->rdk_conf, NULL);
call 0 returned 46
46: 880: if (RdkHandle_unlock(self)) return NULL;
call 0 returned 46
branch 1 taken 46 (fallthrough)
branch 2 taken 0
-: 881:
-: 882: /* Basic setup */
46: 883: PyObject *res = RdkHandle_start(
call 0 returned 46
-: 884: self,
-: 885: RD_KAFKA_CONSUMER,
46: 886: PyBytes_AS_STRING(brokers),
46: 887: PyBytes_AS_STRING(topic_name));
46: 888: if (! res) return NULL;
branch 0 taken 46 (fallthrough)
branch 1 taken 0
46: 889: else Py_DECREF(res);
branch 0 taken 0 (fallthrough)
branch 1 taken 46
call 2 never executed
-: 890:
46: 891: if (RdkHandle_excl_lock(self)) return NULL;
call 0 returned 46
branch 1 taken 46 (fallthrough)
branch 2 taken 0
46: 892: if (! self->rdk_handle) {
branch 0 taken 0 (fallthrough)
branch 1 taken 46
#####: 893: set_pykafka_error("RdKafkaStoppedException",
call 0 never executed
-: 894: "Stopped in the middle of starting.");
#####: 895: return Consumer_start_fail(self);
call 0 never executed
-: 896: }
-: 897:
-: 898: /* We'll keep our own copy of partition_ids, because the one handed to us
-: 899: might be mutable, and weird things could happen if the list used on init
-: 900: is different than that on dealloc */
46: 901: self->partition_ids = PySequence_List(partition_ids);
call 0 returned 46
46: 902: if (! self->partition_ids) return Consumer_start_fail(self);
branch 0 taken 0 (fallthrough)
branch 1 taken 46
call 2 never executed
-: 903:
-: 904: /* Start a queue and add all partition_ids to it */
46: 905: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 46
46: 906: self->rdk_queue_handle = rd_kafka_queue_new(self->rdk_handle);
call 0 returned 46
46: 907: Py_END_ALLOW_THREADS
call 0 returned 46
46: 908: if (! self->rdk_queue_handle) {
branch 0 taken 0 (fallthrough)
branch 1 taken 46
#####: 909: set_pykafka_error("RdKafkaException", "could not get queue");
call 0 never executed
#####: 910: return Consumer_start_fail(self);
call 0 never executed
-: 911: }
46: 912: Py_ssize_t i, len = PyList_Size(self->partition_ids);
157: 913: for (i = 0; i != len; ++i) {
call 0 returned 46
branch 1 taken 111
branch 2 taken 46 (fallthrough)
-: 914: /* We don't do much type-checking on partition_ids/start_offsets as
-: 915: this module is intended solely for use with the python class that
-: 916: wraps it */
111: 917: int32_t part_id = PyLong_AsLong(
call 0 returned 111
call 1 returned 111
-: 918: PyList_GetItem(self->partition_ids, i));
111: 919: if (part_id == -1 && PyErr_Occurred()) {
branch 0 taken 0 (fallthrough)
branch 1 taken 111
call 2 never executed
branch 3 never executed
branch 4 never executed
#####: 920: return Consumer_start_fail(self);
call 0 never executed
-: 921: }
111: 922: PyObject *offset_obj = PySequence_GetItem(start_offsets, i);
call 0 returned 111
111: 923: if (! offset_obj) { /* start_offsets shorter than partition_ids? */
branch 0 taken 0 (fallthrough)
branch 1 taken 111
#####: 924: return Consumer_start_fail(self);
call 0 never executed
-: 925: }
111: 926: int64_t offset = PyLong_AsLongLong(offset_obj);
call 0 returned 111
-: 927: int res;
111: 928: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 111
111: 929: res = rd_kafka_consume_start_queue(self->rdk_topic_handle,
call 0 returned 111
-: 930: part_id,
-: 931: offset,
-: 932: self->rdk_queue_handle);
111: 933: Py_END_ALLOW_THREADS
call 0 returned 111
111: 934: if (res == -1) {
branch 0 taken 0 (fallthrough)
branch 1 taken 111
#####: 935: set_pykafka_error_from_code(rd_kafka_errno2err(errno), NULL);
call 0 never executed
call 1 never executed
call 2 never executed
#####: 936: return Consumer_start_fail(self);
call 0 never executed
-: 937: }
-: 938: }
46: 939: if (RdkHandle_unlock(self)) return NULL;
call 0 returned 46
branch 1 taken 46 (fallthrough)
branch 2 taken 0
46: 940: Py_INCREF(Py_None);
46: 941: return Py_None;
-: 942:}
-: 943:
-: 944:
-: 945:PyDoc_STRVAR(Consumer_consume__doc__,
-: 946: "consume(self, timeout_ms) -> pykafka.protocol.Message or None \n"
-: 947: "\n"
-: 948: "Block until a message can be returned, or return None if timeout_ms has\n"
-: 949: "been reached.\n");
-: 950:static PyObject *
function Consumer_consume called 5240 returned 100% blocks executed 91%
5240: 951:Consumer_consume(RdkHandle *self, PyObject *args)
-: 952:{
5240: 953: int timeout_ms = 0;
5240: 954: if (! PyArg_ParseTuple(args, "i", &timeout_ms)) return NULL;
call 0 returned 5240
branch 1 taken 5240 (fallthrough)
branch 2 taken 0
-: 955:
5240: 956: PyObject *retval = NULL;
5240: 957: PyObject *empty_args = NULL;
5240: 958: PyObject *kwargs = NULL;
-: 959: rd_kafka_message_t *rkmessage;
5240: 960: int protocol_version = 0;
-: 961:
5240: 962: if (RdkHandle_safe_lock(self, /* check_running= */ 1)) return NULL;
call 0 returned 5240
branch 1 taken 5239 (fallthrough)
branch 2 taken 1
5239: 963: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 5239
5239: 964: rkmessage = rd_kafka_consume_queue(self->rdk_queue_handle, timeout_ms);
call 0 returned 5239
5239: 965: Py_END_ALLOW_THREADS
call 0 returned 5239
5239: 966: if (RdkHandle_unlock(self)) goto cleanup;
call 0 returned 5239
branch 1 taken 5239 (fallthrough)
branch 2 taken 0
-: 967:
5239: 968: if (!rkmessage) {
branch 0 taken 371 (fallthrough)
branch 1 taken 4868
-: 969: /* Either ETIMEDOUT or ENOENT occurred, but the latter would imply we
-: 970: forgot to call rd_kafka_consume_start_queue, which is unlikely in
-: 971: this setup. We'll assume it was ETIMEDOUT then: */
371: 972: Py_INCREF(Py_None);
371: 973: return Py_None;
-: 974: }
-: 975:
4868: 976: if (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
branch 0 taken 4853 (fallthrough)
branch 1 taken 15
-: 977: /* Build a pykafka.protocol.Message */
-: 978: rd_kafka_timestamp_type_t timestamp_type;
4853: 979: int64_t timestamp = rd_kafka_message_timestamp(rkmessage, ×tamp_type);
call 0 returned 4853
4853: 980: if (timestamp_type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
branch 0 taken 4853 (fallthrough)
branch 1 taken 0
-: 981: timestamp = 0;
-: 982: } else {
-: 983: // infer protocol from presence of timestamp. Fragile, but protocol
-: 984: // not transmitted by librdkafka
4853: 985: protocol_version = 1;
-: 986: }
-: 987:#if PY_MAJOR_VERSION >= 3
4853: 988: const char *format = "{s:y#,s:y#,s:l,s:L,s:i,s:L}";
-: 989:#else
-: 990: const char *format = "{s:s#,s:s#,s:l,s:L,s:i,s:L}";
-: 991:#endif
4853: 992: kwargs = Py_BuildValue(
call 0 returned 4853
-: 993: format,
-: 994: "value", rkmessage->payload, rkmessage->len,
-: 995: "partition_key", rkmessage->key, rkmessage->key_len,
4853: 996: "partition_id", (long)rkmessage->partition,
4853: 997: "offset", (PY_LONG_LONG)rkmessage->offset,
-: 998: "protocol_version", protocol_version,
-: 999: "timestamp", (PY_LONG_LONG)timestamp);
4853: 1000: if (! kwargs) goto cleanup;
branch 0 taken 4853 (fallthrough)
branch 1 taken 0
4853: 1001: empty_args = PyTuple_New(0);
call 0 returned 4853
4853: 1002: if (! empty_args) goto cleanup;
branch 0 taken 4853 (fallthrough)
branch 1 taken 0
4853: 1003: retval = PyObject_Call(Message, empty_args, kwargs);
call 0 returned 4853
15: 1004: } else if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
branch 0 taken 15 (fallthrough)
branch 1 taken 0
-: 1005: /* Whenever we get to the head of a partition, we get this. There
-: 1006: * may be messages available in other partitions, so if we want to
-: 1007: * match pykafka.SimpleConsumer behaviour, we ought to avoid breaking
-: 1008: * any iteration loops, and simply skip over this one altogether: */
15: 1009: retval = Consumer_consume(self, args);
call 0 returned 15
-: 1010: } else {
#####: 1011: set_pykafka_error_from_code(rkmessage->err, NULL);
call 0 never executed
-: 1012: }
-: 1013:cleanup:
4868: 1014: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 4868
4868: 1015: rd_kafka_message_destroy(rkmessage);
call 0 returned 4868
4868: 1016: Py_END_ALLOW_THREADS
call 0 returned 4868
4868: 1017: Py_XDECREF(empty_args);
branch 0 taken 4853 (fallthrough)
branch 1 taken 15
branch 2 taken 0 (fallthrough)
branch 3 taken 4853
call 4 never executed
4868: 1018: Py_XDECREF(kwargs);
branch 0 taken 4853 (fallthrough)
branch 1 taken 15
branch 2 taken 4853 (fallthrough)
branch 3 taken 0
call 4 returned 4853
-: 1019: return retval;
-: 1020:}
-: 1021:
-: 1022:
-: 1023:static PyMethodDef Consumer_methods[] = {
-: 1024: {"consume", (PyCFunction)Consumer_consume,
-: 1025: METH_VARARGS, Consumer_consume__doc__},
-: 1026: {"stop", (PyCFunction)Consumer_stop,
-: 1027: METH_NOARGS, RdkHandle_stop__doc__},
-: 1028: {"configure", (PyCFunction)RdkHandle_configure,
-: 1029: METH_VARARGS | METH_KEYWORDS, RdkHandle_configure__doc__},
-: 1030: {"start", (PyCFunction)Consumer_start,
-: 1031: METH_VARARGS | METH_KEYWORDS, Consumer_start__doc__},
-: 1032: {"poll", (PyCFunction)RdkHandle_poll,
-: 1033: METH_VARARGS | METH_KEYWORDS, RdkHandle_poll__doc__},
-: 1034: {NULL, NULL, 0, NULL}
-: 1035:};
-: 1036:
-: 1037:
-: 1038:static PyTypeObject ConsumerType = {
-: 1039: PyVarObject_HEAD_INIT(NULL, 0)
-: 1040: "pykafka.rd_kafka.Consumer",
-: 1041: sizeof(RdkHandle),
-: 1042: 0, /* tp_itemsize */
-: 1043: (destructor)Consumer_dealloc, /* tp_dealloc */
-: 1044: 0, /* tp_print */
-: 1045: 0, /* tp_getattr */
-: 1046: 0, /* tp_setattr */
-: 1047: 0, /* tp_compare */
-: 1048: 0, /* tp_repr */
-: 1049: 0, /* tp_as_number */
-: 1050: 0, /* tp_as_sequence */
-: 1051: 0, /* tp_as_mapping */
-: 1052: 0, /* tp_hash */
-: 1053: 0, /* tp_call */
-: 1054: 0, /* tp_str */
-: 1055: 0, /* tp_getattro */
-: 1056: 0, /* tp_setattro */
-: 1057: 0, /* tp_as_buffer */
-: 1058: Py_TPFLAGS_DEFAULT, /* tp_flags */
-: 1059: 0, /* tp_doc */
-: 1060: 0, /* tp_traverse */
-: 1061: 0, /* tp_clear */
-: 1062: 0, /* tp_richcompare */
-: 1063: 0, /* tp_weaklistoffset */
-: 1064: 0, /* tp_iter */
-: 1065: 0, /* tp_iternext */
-: 1066: Consumer_methods, /* tp_methods */
-: 1067: RdkHandle_members, /* tp_members */
-: 1068: 0, /* tp_getset */
-: 1069: 0, /* tp_base */
-: 1070: 0, /* tp_dict */
-: 1071: 0, /* tp_descr_get */
-: 1072: 0, /* tp_descr_set */
-: 1073: 0, /* tp_dictoffset */
-: 1074: 0, /* tp_init */
-: 1075: 0, /* tp_alloc */
-: 1076: RdkHandle_new, /* tp_new */
-: 1077:};
-: 1078:
-: 1079:
-: 1080:/**
-: 1081: * Debugging helpers
-: 1082: */
-: 1083:
-: 1084:
-: 1085:PyDoc_STRVAR(debug_thread_cnt__doc__,
-: 1086: "_thread_cnt() -> int\n"
-: 1087: "\n"
-: 1088: "Debugging helper, reports number of librdkafka threads running.\n");
-: 1089:static PyObject *
function debug_thread_cnt called 4 returned 100% blocks executed 100%
4: 1090:debug_thread_cnt(PyObject *self, PyObject *args)
-: 1091:{
4: 1092: return PyLong_FromLong(rd_kafka_thread_cnt());
call 0 returned 4
call 1 returned 4
-: 1093:}
-: 1094:
-: 1095:
-: 1096:PyDoc_STRVAR(debug_wait_destroyed__doc__,
-: 1097: "_wait_destroyed(timeout_ms)\n"
-: 1098: "\n"
-: 1099: "Debugging helper, blocks until all rdkafka handles have been destroyed.\n"
-: 1100: "Raises exception if timeout_ms is reached before then\n");
-: 1101:static PyObject *
function debug_wait_destroyed called 2 returned 100% blocks executed 73%
2: 1102:debug_wait_destroyed(PyObject *self, PyObject *arg)
-: 1103:{
2: 1104: int timeout_ms = PyLong_AsLong(arg);
call 0 returned 2
2: 1105: if (timeout_ms == -1 && PyErr_Occurred()) return NULL;
branch 0 taken 0 (fallthrough)
branch 1 taken 2
call 2 never executed
branch 3 never executed
branch 4 never executed
-: 1106:
-: 1107: int res;
2: 1108: Py_BEGIN_ALLOW_THREADS /* avoid callbacks deadlocking */
call 0 returned 2
2: 1109: res = rd_kafka_wait_destroyed(timeout_ms);
call 0 returned 2
2: 1110: Py_END_ALLOW_THREADS
call 0 returned 2
-: 1111:
2: 1112: if (res == -1) {
branch 0 taken 0 (fallthrough)
branch 1 taken 2
#####: 1113: return set_pykafka_error("RdKafkaException",
call 0 never executed
-: 1114: "rd_kafka_wait_destroyed timed out");
-: 1115: }
2: 1116: Py_INCREF(Py_None);
2: 1117: return Py_None;
-: 1118:}
-: 1119:
-: 1120:
-: 1121:/**
-: 1122: * Module init
-: 1123: */
-: 1124:
-: 1125:static const char module_name[] = "pykafka.rdkafka._rd_kafka";
-: 1126:
-: 1127:
-: 1128:static PyMethodDef pyrdk_methods[] = {
-: 1129: {"_thread_cnt", debug_thread_cnt,
-: 1130: METH_NOARGS, debug_thread_cnt__doc__},
-: 1131: {"_wait_destroyed", debug_wait_destroyed,
-: 1132: METH_O, debug_wait_destroyed__doc__},
-: 1133: {NULL, NULL, 0, NULL}
-: 1134:};
-: 1135:
-: 1136:
-: 1137:#if PY_MAJOR_VERSION >= 3
-: 1138: static struct PyModuleDef moduledef = {
-: 1139: PyModuleDef_HEAD_INIT,
-: 1140: module_name,
-: 1141: NULL, /* m_doc */
-: 1142: -1, /* m_size */
-: 1143: pyrdk_methods,
-: 1144: NULL, /* m_reload */
-: 1145: NULL, /* m_traverse */
-: 1146: NULL, /* m_clear */
-: 1147: NULL, /* m_free */
-: 1148: };
-: 1149:#endif
-: 1150:
-: 1151:
-: 1152:static PyObject *
function _rd_kafkamodule_init called 1 returned 100% blocks executed 93%
1: 1153:_rd_kafkamodule_init(void)
-: 1154:{
-: 1155:#if PY_MAJOR_VERSION >= 3
1: 1156: PyObject *mod = PyModule_Create(&moduledef);
call 0 returned 1
-: 1157:#else
-: 1158: PyObject *mod = Py_InitModule(module_name, pyrdk_methods);
-: 1159:#endif
1: 1160: if (mod == NULL) return NULL;
branch 0 taken 1 (fallthrough)
branch 1 taken 0
-: 1161:
-: 1162: /* Callback logging requires the GIL */
1: 1163: PyEval_InitThreads();
call 0 returned 1
-: 1164:
1: 1165: PyObject *logging = PyImport_ImportModule("logging");
call 0 returned 1
1: 1166: if (! logging) return NULL;
branch 0 taken 1 (fallthrough)
branch 1 taken 0
1: 1167: logger = PyObject_CallMethod(logging, "getLogger", "s", module_name);
call 0 returned 1
1: 1168: Py_DECREF(logging);
branch 0 taken 0 (fallthrough)
branch 1 taken 1
call 2 never executed
1: 1169: if (! logger) return NULL;
branch 0 taken 1 (fallthrough)
branch 1 taken 0
-: 1170:
1: 1171: pykafka_exceptions = PyImport_ImportModule("pykafka.exceptions");
call 0 returned 1
1: 1172: if (! pykafka_exceptions) return NULL;
branch 0 taken 1 (fallthrough)
branch 1 taken 0
-: 1173:
1: 1174: PyObject *pykafka_protocol = PyImport_ImportModule("pykafka.protocol");
call 0 returned 1
1: 1175: if (! pykafka_protocol) return NULL;
branch 0 taken 1 (fallthrough)
branch 1 taken 0
1: 1176: Message = PyObject_GetAttrString(pykafka_protocol, "Message");
call 0 returned 1
1: 1177: Py_DECREF(pykafka_protocol);
branch 0 taken 0 (fallthrough)
branch 1 taken 1
call 2 never executed
1: 1178: if (! Message) return NULL;
branch 0 taken 1 (fallthrough)
branch 1 taken 0
-: 1179:
1: 1180: if (PyType_Ready(&ProducerType)) return NULL;
call 0 returned 1
branch 1 taken 1 (fallthrough)
branch 2 taken 0
1: 1181: Py_INCREF(&ProducerType);
1: 1182: if (PyModule_AddObject(mod, "Producer", (PyObject *)&ProducerType)) {
call 0 returned 1
branch 1 taken 1 (fallthrough)
branch 2 taken 0
-: 1183: return NULL;
-: 1184: }
-: 1185:
1: 1186: if (PyType_Ready(&ConsumerType)) return NULL;
call 0 returned 1
branch 1 taken 1 (fallthrough)
branch 2 taken 0
1: 1187: Py_INCREF(&ConsumerType);
1: 1188: if (PyModule_AddObject(mod, "Consumer", (PyObject *)&ConsumerType)) {
call 0 returned 1
branch 1 taken 1 (fallthrough)
branch 2 taken 0
-: 1189: return NULL;
-: 1190: }
-: 1191:
1: 1192: return mod;
-: 1193:}
-: 1194:
-: 1195:
-: 1196:#if PY_MAJOR_VERSION >= 3
-: 1197: PyMODINIT_FUNC
function PyInit__rd_kafka called 1 returned 100% blocks executed 100%
1: 1198: PyInit__rd_kafka(void)
-: 1199: {
1: 1200: return _rd_kafkamodule_init();
call 0 returned 1
-: 1201: }
-: 1202:#else
-: 1203: PyMODINIT_FUNC
-: 1204: init_rd_kafka(void)
-: 1205: {
-: 1206: _rd_kafkamodule_init();
-: 1207: }
-: 1208:#endif
<<<<<< EOF