增加 Kafka 中的分区数时,消息未按 RoundRobin 顺序分发
Posted
技术标签:
【中文标题】增加 Kafka 中的分区数时,消息未按 RoundRobin 顺序分发【英文标题】:Message not getting distributed in RoundRobin order when increasing the number of partition in Kafka 【发布时间】:2017-07-24 18:57:39 【问题描述】:您好,我是 kafka 新手,我使用的是 kafka 版本 0.10.2 和 zookeeper 版本 3.4.9 。我有一个主题有两个分区和两个正在运行的消费者。所以为了提高处理速度,我决定将分区数增加到 10,这样我就可以增加消费者的数量。所以我运行了命令
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic 主题 --partition 10.
所以我观察到了两件奇怪的事情
我的消费者仍然只连接到两个分区。分区的其余部分没有任何消费者。(两个消费者应该监听所有 10 个分区的预期行为)
消息仅被推送到两个(旧分区)。新分区未收到任何消息。(预期行为消息应以 RoundRobin 方式分布在所有分区中。)
我正在使用此命令查看有关分区的详细信息
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 主题组
我的消费者代码:
class KafkaPollingConsumer implements Runnable
private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
private static final String TAG = "[KafkaPollingConsumer]"
private final KafkaConsumer<String, byte []> kafkaConsumer
private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>()
List topicNameList
Map kafkaTopicConfigMap = new HashMap<String,Object>()
Map kafkaTopicMessageListMap = new HashMap<String,List>()
public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex)
logger.debug(" [Constructor] [Enter] Thread Name serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex)
logger.debug("Populating Property for kafak consumer")
Properties kafkaConsumerProperties = new Properties()
kafkaConsumerProperties.put("group.id", groupName)
kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumer.deserializer.CustomObjectDeserializer")
switch(serverType)
case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() :
kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode)
kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit)
kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset)
break
case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() :
kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode)
kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit)
kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset)
kafkaConsumerProperties.put("max.poll.records",10)
kafkaConsumerProperties.put("max.poll.interval.ms",900000)
kafkaConsumerProperties.put("request.timeout.ms",900000)
break
default :
throw "Invalid server type"
break
logger.debug(" [Constructor] KafkaConsumer Property Populated ",properties.toString())
kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties)
topicNameList = topicNameRegex.split(Pattern.quote('|'))
logger.debug(" [Constructor] Kafkatopic List ",topicNameList.toString())
logger.debug(" [Constructor] Exit",TAG)
private class HandleRebalance implements ConsumerRebalanceListener
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
if(currentOffsetsMap != null && !currentOffsetsMap.isEmpty())
logger.debug(" In onPartitionsRevoked Rebalanced ",TAG)
kafkaConsumer.commitSync(currentOffsetsMap)
@Override
void run()
logger.debug(" Starting Thread ThreadName ",TAG,Thread.currentThread().getName())
populateKafkaConfigMap()
initializeKafkaTopicMessageListMap()
String topicName
String consumerClassName
String consumerMethodName
Boolean isBatchJob
Integer batchSize = 0
final Thread mainThread = Thread.currentThread()
Runtime.getRuntime().addShutdownHook(new Thread()
public void run()
logger.error(",gracefully shutdowning thread ",TAG,mainThread.getName())
kafkaConsumer.wakeup()
try
mainThread.join()
catch (InterruptedException exception)
logger.error(" Error : ",TAG,exception.getStackTrace().join("\n"))
)
kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
try
while(true)
logger.debug(" Starting Consumer with polling time in ms 100",TAG)
ConsumerRecords kafkaRecords = kafkaConsumer.poll(100)
for(ConsumerRecord record: kafkaRecords)
topicName = record.topic()
DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName)
consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
logger.debug("Details about Message")
logger.debug("Thread ",mainThread.getName())
logger.debug("Topic ",topicName)
logger.debug("Partition ",record.partition().toString())
logger.debug("Offset ",record.offset().toString())
logger.debug("clasName ",consumerClassName)
logger.debug("methodName ",consumerMethodName)
logger.debug("isBatchJob ",isBatchJob.toString())
if(isBatchJob == true)
batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString())
logger.debug("batchSize ",batchSize.toString())
Object message = record.value()
logger.debug("message ",message.toString())
publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName)
Thread.sleep(60000)
currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1))
logger.debug(" Commiting Messages to Kafka",TAG)
kafkaConsumer.commitSync(currentOffsetsMap)
catch(InterruptException exception)
logger.error(" In InterruptException",TAG)
logger.error(" Exception ",TAG,exception.getStackTrace().join("\n"))
catch (WakeupException exception)
logger.error(" In WakeUp Exception",TAG)
logger.error(" Exception ",TAG,exception.getStackTrace().join("\n"))
catch(Exception exception)
logger.error(" In Exception",TAG)
logger.error(" Exception ",TAG,exception.getStackTrace().join("\n"))
finally
logger.error(" In finally commiting remaining offset ",TAG)
publishAllKafkaTopicBatchMessages()
kafkaConsumer.commitSync(currentOffsetsMap)
kafkaConsumer.close()
logger.error(" Exiting Consumer",TAG)
private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName)
logger.debug(" [publishMessageToConsumer] Enter",TAG)
if(isBatchJob == true)
publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName)
else
publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message)
logger.debug(" [publishMessageToConsumer] Exit",TAG)
private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message)
logger.debug(" [publishMessageToNonBatchConsumer] Enter",TAG)
executeConsumerMethod(consumerClassName,consumerMethodName,message)
logger.debug(" [publishMessageToNonBatchConsumer] Exit",TAG)
private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName)
logger.debug(" [publishMessageToBatchConsumer] Enter",TAG)
List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
consumerMessageList.add(message)
if(consumerMessageList.size() == batchSize)
logger.debug(" [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG)
executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList)
consumerMessageList.clear()
kafkaTopicMessageListMap.put(topicName,consumerMessageList)
logger.debug(" [publishMessageToBatchConsumer] Exit",TAG)
private void populateKafkaConfigMap()
logger.debug(" [populateKafkaConfigMap] Enter",TAG)
KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance()
topicNameList.each topicName ->
DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName)
kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject)
logger.debug(" [populateKafkaConfigMap] kafkaConfigMap ",TAG,kafkaTopicConfigMap.toString())
logger.debug(" [populateKafkaConfigMap] Exit",TAG)
private void initializeKafkaTopicMessageListMap()
logger.debug(" [initializeKafkaTopicMessageListMap] Enter",TAG)
topicNameList.each topicName ->
kafkaTopicMessageListMap.put(topicName,[])
logger.debug(" [populateKafkaConfigMap] kafkaTopicMessageListMap ",TAG,kafkaTopicMessageListMap.toString())
logger.debug(" [initializeKafkaTopicMessageListMap] Exit",TAG)
private void executeConsumerMethod(String className, String methodName, def messages)
try
logger.debug(" [executeConsumerMethod] Enter",TAG)
logger.debug(" [executeConsumerMethod] className methodName messages ",TAG,className,methodName,messages.toString())
Class.forName(className)."$methodName"(messages)
catch (Exception exception)
logger.error(" [] Error while executing method : of class: with params : - ", TAG, Thread.currentThread().getName(), methodName,
className, messages.toString(), exception.getStackTrace().join("\n"))
logger.debug(" [executeConsumerMethod] Exit",TAG)
private void publishAllKafkaTopicBatchMessages()
logger.debug(" [publishAllKafkaTopicBatchMessages] Enter",TAG)
String consumerClassName = null
String consumerMethodName = null
kafkaTopicMessageListMap.each topicName,messageList ->
DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName)
consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
logger.debug(" Pushing message in topic className methodName ",TAG,topicName,consumerClassName,consumerMethodName)
if(messageList != null && messageList.size() > 0)
executeConsumerMethod(consumerClassName, consumerMethodName, messageList)
messageList.clear()
kafkaTopicMessageListMap.put(topicName,messageList)
logger.debug(" [publishAllKafkaTopicBatchMessages] Exit",TAG)
消费者属性是:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = consumer-1
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = t1
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 36000000
max.poll.records = 10
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 36000000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class com.custom.kafkaconsumer.deserializer.CustomObjectDeserializer
生产者代码:
Properties kafkaProducerProperties = getKafkaProducerProperties(topicName)
if(kafkaProducerProperties != null)
priorityKafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, byte[]>(kafkaProducerProperties)
ProducerRecord<String,byte []> record = new ProducerRecord<String,byte []>(topicName, messageMap)
try
priorityKafkaProducer.send(record).get()
priorityKafkaProducer.close()
catch (Exception e)
e.printStackTrace()
else
throw "Invalid Producer Properties for " + topicName
生产者配置:
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class com.abhimanyu.kafkaproducer.serializer.CustomObjectSerializer
我面临的问题是预期行为还是我遗漏了什么?
【问题讨论】:
【参考方案1】:您是否等待了 5 分钟(或任何配置的元数据刷新时间间隔)?
【讨论】:
嗨汉斯我已经设置了属性 metadata.max.age.ms = 5000 。但是,添加一个新分区仍然需要 5 分钟。我是否设置了错误的属性? 尝试设置生产者属性 topic.metadata.refresh.interval.ms 见***.com/questions/27722871/… 感谢您的回复,我认为 topic.metadata.refresh.ms 适用于 Kafka 0.80 版,我使用的是 Kafka 0.10 版。我仍然在我的 producer.properties 中添加了它,但它没有用。 是的,你当然是对的。很抱歉混淆了新旧属性。以上是关于增加 Kafka 中的分区数时,消息未按 RoundRobin 顺序分发的主要内容,如果未能解决你的问题,请参考以下文章