增加 Kafka 中的分区数时,消息未按 RoundRobin 顺序分发

Posted

技术标签:

【中文标题】增加 Kafka 中的分区数时,消息未按 RoundRobin 顺序分发【英文标题】:Message not getting distributed in RoundRobin order when increasing the number of partition in Kafka 【发布时间】:2017-07-24 18:57:39 【问题描述】:

您好,我是 kafka 新手,我使用的是 kafka 版本 0.10.2 和 zookeeper 版本 3.4.9 。我有一个主题有两个分区和两个正在运行的消费者。所以为了提高处理速度,我决定将分区数增加到 10,这样我就可以增加消费者的数量。所以我运行了命令

./kafka-topics.sh --zookeeper localhost:2181 --alter --topic 主题 --partition 10.

所以我观察到了两件奇怪的事情

    我的消费者仍然只连接到两个分区。分区的其余部分没有任何消费者。(两个消费者应该监听所有 10 个分区的预期行为)

    消息仅被推送到两个(旧分区)。新分区未收到任何消息。(预期行为消息应以 RoundRobin 方式分布在所有分区中。)

我正在使用此命令查看有关分区的详细信息

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 主题组

我的消费者代码:

class KafkaPollingConsumer implements Runnable 
    private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
        private static final String TAG = "[KafkaPollingConsumer]"
        private final KafkaConsumer<String, byte []> kafkaConsumer
        private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>()
        List topicNameList
        Map kafkaTopicConfigMap = new HashMap<String,Object>()
        Map kafkaTopicMessageListMap = new HashMap<String,List>()

        public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex)
            logger.debug(" [Constructor] [Enter] Thread Name  serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex)
            logger.debug("Populating Property for kafak consumer")
            Properties kafkaConsumerProperties = new Properties()
            kafkaConsumerProperties.put("group.id", groupName)
            kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
            kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumer.deserializer.CustomObjectDeserializer")
            switch(serverType)
                case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() :
                    kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode)
                    kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit)
                    kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset)
                    break
                case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() :
                    kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode)
                    kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit)
                    kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset)
                    kafkaConsumerProperties.put("max.poll.records",10)
                    kafkaConsumerProperties.put("max.poll.interval.ms",900000)
                    kafkaConsumerProperties.put("request.timeout.ms",900000)
                    break
                default :
                    throw "Invalid server type"
                    break
            
            logger.debug(" [Constructor] KafkaConsumer Property Populated ",properties.toString())
            kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties)
            topicNameList = topicNameRegex.split(Pattern.quote('|'))
            logger.debug(" [Constructor] Kafkatopic List ",topicNameList.toString())
            logger.debug(" [Constructor] Exit",TAG)
        

        private class HandleRebalance implements ConsumerRebalanceListener 
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) 
            

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
                if(currentOffsetsMap != null && !currentOffsetsMap.isEmpty()) 
                    logger.debug(" In onPartitionsRevoked Rebalanced ",TAG)
                    kafkaConsumer.commitSync(currentOffsetsMap)
                
            
        

        @Override
        void run() 
            logger.debug(" Starting Thread ThreadName ",TAG,Thread.currentThread().getName())
            populateKafkaConfigMap()
            initializeKafkaTopicMessageListMap()
            String topicName
            String consumerClassName
            String consumerMethodName
            Boolean isBatchJob
            Integer batchSize = 0
            final Thread mainThread = Thread.currentThread()
            Runtime.getRuntime().addShutdownHook(new Thread() 
                public void run() 
                    logger.error(",gracefully shutdowning thread ",TAG,mainThread.getName())
                    kafkaConsumer.wakeup()
                    try 
                        mainThread.join()
                     catch (InterruptedException exception) 
                        logger.error(" Error : ",TAG,exception.getStackTrace().join("\n"))
                    
                
            )
            kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
            try
                while(true)
                    logger.debug(" Starting Consumer with polling time in ms 100",TAG)
                    ConsumerRecords kafkaRecords = kafkaConsumer.poll(100)
                    for(ConsumerRecord record: kafkaRecords)
                        topicName = record.topic()
                        DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName)
                        consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                        consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                        isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
                        logger.debug("Details about Message")
                        logger.debug("Thread ",mainThread.getName())
                        logger.debug("Topic ",topicName)
                        logger.debug("Partition ",record.partition().toString())
                        logger.debug("Offset ",record.offset().toString())
                        logger.debug("clasName ",consumerClassName)
                        logger.debug("methodName ",consumerMethodName)
                        logger.debug("isBatchJob ",isBatchJob.toString())
                        if(isBatchJob == true)
                            batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString())
                            logger.debug("batchSize ",batchSize.toString())
                        
                        Object message = record.value()
                        logger.debug("message ",message.toString())
                        publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName)
                        Thread.sleep(60000)
                        currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1))
                    
                    logger.debug(" Commiting Messages to Kafka",TAG)
                    kafkaConsumer.commitSync(currentOffsetsMap)
                
            
            catch(InterruptException exception)
                logger.error(" In InterruptException",TAG)
                logger.error(" Exception ",TAG,exception.getStackTrace().join("\n"))
            
            catch (WakeupException exception) 
                logger.error(" In WakeUp Exception",TAG)
                logger.error(" Exception ",TAG,exception.getStackTrace().join("\n"))
            
            catch(Exception exception)
                logger.error(" In Exception",TAG)
                logger.error(" Exception ",TAG,exception.getStackTrace().join("\n"))
            
            finally 
                logger.error(" In finally commiting remaining offset ",TAG)
                publishAllKafkaTopicBatchMessages()
                kafkaConsumer.commitSync(currentOffsetsMap)
                kafkaConsumer.close()
                logger.error(" Exiting Consumer",TAG)
            
        


private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName)
    logger.debug(" [publishMessageToConsumer] Enter",TAG)
    if(isBatchJob == true)
        publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName)
    
    else
        publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message)
    
    logger.debug(" [publishMessageToConsumer] Exit",TAG)


private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message)
    logger.debug(" [publishMessageToNonBatchConsumer] Enter",TAG)
    executeConsumerMethod(consumerClassName,consumerMethodName,message)
    logger.debug(" [publishMessageToNonBatchConsumer] Exit",TAG)


private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName)
    logger.debug(" [publishMessageToBatchConsumer] Enter",TAG)
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
    consumerMessageList.add(message)
    if(consumerMessageList.size() == batchSize)
        logger.debug(" [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG)
        executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList)
        consumerMessageList.clear()
    
    kafkaTopicMessageListMap.put(topicName,consumerMessageList)
    logger.debug(" [publishMessageToBatchConsumer] Exit",TAG)


private void populateKafkaConfigMap()
    logger.debug(" [populateKafkaConfigMap] Enter",TAG)
    KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance()
    topicNameList.each  topicName ->
        DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName)
        kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject)
    
    logger.debug(" [populateKafkaConfigMap] kafkaConfigMap ",TAG,kafkaTopicConfigMap.toString())
    logger.debug(" [populateKafkaConfigMap] Exit",TAG)


private void initializeKafkaTopicMessageListMap()
    logger.debug(" [initializeKafkaTopicMessageListMap] Enter",TAG)
    topicNameList.each  topicName ->
        kafkaTopicMessageListMap.put(topicName,[])
    
    logger.debug(" [populateKafkaConfigMap] kafkaTopicMessageListMap ",TAG,kafkaTopicMessageListMap.toString())
    logger.debug(" [initializeKafkaTopicMessageListMap] Exit",TAG)


private void executeConsumerMethod(String className, String methodName, def messages)
    try
        logger.debug(" [executeConsumerMethod] Enter",TAG)
        logger.debug(" [executeConsumerMethod] className   methodName  messages ",TAG,className,methodName,messages.toString())
        Class.forName(className)."$methodName"(messages)
     catch (Exception exception)
        logger.error(" [] Error while executing method :  of class:  with params :  - ", TAG, Thread.currentThread().getName(), methodName,
                className, messages.toString(), exception.getStackTrace().join("\n"))
    
    logger.debug(" [executeConsumerMethod] Exit",TAG)


private void publishAllKafkaTopicBatchMessages()
    logger.debug(" [publishAllKafkaTopicBatchMessages] Enter",TAG)
    String consumerClassName = null
    String consumerMethodName = null
    kafkaTopicMessageListMap.each  topicName,messageList ->
        DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName)
        consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
        consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
        logger.debug(" Pushing message in topic  className  methodName  ",TAG,topicName,consumerClassName,consumerMethodName)
        if(messageList != null && messageList.size() > 0)
            executeConsumerMethod(consumerClassName, consumerMethodName, messageList)
            messageList.clear()
            kafkaTopicMessageListMap.put(topicName,messageList)
        
    
    logger.debug(" [publishAllKafkaTopicBatchMessages] Exit",TAG)

消费者属性是:

auto.commit.interval.ms = 5000 
auto.offset.reset = earliest 
bootstrap.servers = [localhost:9092] 
check.crcs = true 
client.id = consumer-1 
connections.max.idle.ms = 540000 
enable.auto.commit = false 
exclude.internal.topics = true 
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
group.id = t1 
heartbeat.interval.ms = 3000 
interceptor.classes = null 
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 36000000 
max.poll.records = 10 
metadata.max.age.ms = 300000 
metric.reporters = [] 
metrics.num.samples = 2 
metrics.recording.level = INFO 
metrics.sample.window.ms = 30000 
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] 
receive.buffer.bytes = 65536 
reconnect.backoff.ms = 50 
request.timeout.ms = 36000000 
retry.backoff.ms = 100 
sasl.jaas.config = null 
sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.service.name = null 
sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.mechanism = GSSAPI 
security.protocol = PLAINTEXT 
send.buffer.bytes = 131072 
session.timeout.ms = 10000 
ssl.cipher.suites = null 
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.endpoint.identification.algorithm = null 
ssl.key.password = null 
ssl.keymanager.algorithm = SunX509 
ssl.keystore.location = null 
ssl.keystore.password = null 
ssl.keystore.type = JKS 
ssl.protocol = TLS 
ssl.provider = null 
ssl.secure.random.implementation = null 
ssl.trustmanager.algorithm = PKIX 
ssl.truststore.location = null 
ssl.truststore.password = null 
ssl.truststore.type = JKS 
value.deserializer = class com.custom.kafkaconsumer.deserializer.CustomObjectDeserializer

生产者代码:

 Properties kafkaProducerProperties = getKafkaProducerProperties(topicName)
        if(kafkaProducerProperties != null)
            priorityKafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, byte[]>(kafkaProducerProperties)
                ProducerRecord<String,byte []> record = new ProducerRecord<String,byte []>(topicName, messageMap)
            try 
                priorityKafkaProducer.send(record).get()
                priorityKafkaProducer.close()
             catch (Exception e) 
                e.printStackTrace()
            

        
        else
            throw "Invalid Producer Properties for " + topicName
        

生产者配置:

acks = 1
    batch.size = 16384
    block.on.buffer.full = false
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class com.abhimanyu.kafkaproducer.serializer.CustomObjectSerializer

我面临的问题是预期行为还是我遗漏了什么?

【问题讨论】:

【参考方案1】:

您是否等待了 5 分钟(或任何配置的元数据刷新时间间隔)?

【讨论】:

嗨汉斯我已经设置了属性 metadata.max.age.ms = 5000 。但是,添加一个新分区仍然需要 5 分钟。我是否设置了错误的属性? 尝试设置生产者属性 topic.metadata.refresh.interval.ms 见***.com/questions/27722871/… 感谢您的回复,我认为 topic.metadata.refresh.ms 适用于 Kafka 0.80 版,我使用的是 Kafka 0.10 版。我仍然在我的 producer.properties 中添加了它,但它没有用。 是的,你当然是对的。很抱歉混淆了新旧属性。

以上是关于增加 Kafka 中的分区数时,消息未按 RoundRobin 顺序分发的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 分区数可以增加或减少吗?为什么?

是否可以将分区添加到 Kafka 0.8.2 中的现有主题

如何将消息发送到 Kafka 中的特定分区?

kafka-2-生产者分区机制/消息压缩/无丢失配置介绍

Kafka Topic Partition GroupId 及高可用

如何增加Spring Kafka Consumer每批消费的消息数?