Problem
Previously for Kafka 2.5, we set group.instance.id in StaticPartitioningKafkaClientSupplierSupplier using config. This approach won't work if there are multiple stream thread. Because all stream threads will use the same group.instance.id which cause the group.instance.id fence off exception.
Solution Move the configuration to the the Stream Config directly. It will work because in StreamThread.scala in kafka source, it will append the threadId
// Suffix each thread consumer with thread.id to enforce uniqueness of group.instance.id.
if (groupInstanceId != null) {
consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId + "-" + threadIdx);
}
Differential Revision: https://phabricator.twitter.biz/D603905