带键的 kafka 消息似乎总是去同一个分区

Posted

技术标签:

【中文标题】带键的 kafka 消息似乎总是去同一个分区【英文标题】:Keyed kafka messages always seem to go to the same partition 【发布时间】:2020-06-30 03:07:30 【问题描述】:

我的节点应用程序使用 kafka-node 节点模块。

我有一个包含三个分区的 kafka 主题,如下所示:

Topic: NotifierTemporarye3df:/opPartitionCount: 3in$ kafReplicationFactor: 3ibe Configs: segment.bytes=1073741824 --topic NotifierTemporary
    Topic: NotifierTemporary        Partition: 0    Leader: 1001    Replicas: 1001,1003,1002        Isr: 1001,1003,1002
    Topic: NotifierTemporary        Partition: 1    Leader: 1002    Replicas: 1002,1001,1003        Isr: 1002,1001,1003
    Topic: NotifierTemporary        Partition: 2    Leader: 1003    Replicas: 1003,1002,1001        Isr: 1003,1002,1001

当我向主题写入一系列键控消息时,它们似乎都写入了同一个分区。我希望将一些不同的键控消息发送到分区 1 和 2。

这是我从消费者 onMessage 事件函数中输出的几条消息的日志:

the message is: "topic":"NotifierTemporary","value":"\"recipient\":66,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"","offset":345,"partition":0,"highWaterOffset":346,"key":"66","timestamp":"2020-03-19T00:16:57.783Z"
the message is: "topic":"NotifierTemporary","value":"\"recipient\":222,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"","offset":346,"partition":0,"highWaterOffset":347,"key":"222","timestamp":"2020-03-19T00:16:57.786Z"
the message is: "topic":"NotifierTemporary","value":"\"recipient\":13,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"","offset":347,"partition":0,"highWaterOffset":348,"key":"13","timestamp":"2020-03-19T00:16:57.791Z"
the message is: "topic":"NotifierTemporary","value":"\"recipient\":316,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"","offset":348,"partition":0,"highWaterOffset":349,"key":"316","timestamp":"2020-03-19T00:16:57.798Z"
the message is: "topic":"NotifierTemporary","value":"\"recipient\":446,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"","offset":349,"partition":0,"highWaterOffset":350,"key":"446","timestamp":"2020-03-19T00:16:57.806Z"
the message is: "topic":"NotifierTemporary","value":"\"recipient\":66,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"","offset":350,"partition":0,"highWaterOffset":351,"key":"66","timestamp":"2020-03-19T00:17:27.918Z"
the message is: "topic":"NotifierTemporary","value":"\"recipient\":222,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"","offset":351,"partition":0,"highWaterOffset":352,"key":"222","timestamp":"2020-03-19T00:17:27.920Z"
the message is: "topic":"NotifierTemporary","value":"\"recipient\":13,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"","offset":352,"partition":0,"highWaterOffset":353,"key":"13","timestamp":"2020-03-19T00:17:27.929Z"
the message is: "topic":"NotifierTemporary","value":"\"recipient\":316,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"","offset":353,"partition":0,"highWaterOffset":354,"key":"316","timestamp":"2020-03-19T00:17:27.936Z"

这是发送消息的kafka-node生产者代码:

  * @description Adds a notification message to the Kafka topic that is not saved in a database.
  * @param Int recipientId - accountId of recipient of notification message
  * @param Object message - message payload to send to recipient
  */
  async sendTemporaryNotification(recipientId, subject, message) 
    const notificationMessage = 
      recipient: recipientId,
      subject,
      message,
    ;
    // we need to validate this message schema - this will throw if invalid
    Joi.assert(notificationMessage, NotificationMessage);
    // partition based on the recipient
    const payloads = [
       topic: KAFKA_TOPIC_TEMPORARY, messages: JSON.stringify(notificationMessage), key: notificationMessage.recipient ,
    ];
    if (this.isReady) 
      await this.producer.sendAsync(payloads);
    
    else 
      throw new ProducerNotReadyError('Notifier Producer not ready');
    
  

如您所见,它们都不是来自分区 1 和 2。即使在几分钟内不断发送带有随机整数键的消息后也是如此。我可能做错了什么?

【问题讨论】:

你能调试生产者库,看看使用了什么分区策略吗? 【参考方案1】:

创建生产者时需要配置正确的partitionerType

// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
new Producer(client, paritionerType: 3);

查看文档:https://www.npmjs.com/package/kafka-node#producerkafkaclient-options-custompartitioner

【讨论】:

【参考方案2】:

关于我没有指定分区器类型,Scarysize 是正确的。对于任何想知道完整的分区生产者是什么样子的人,您可以参考此代码。我已经验证这会根据提供的密钥分发消息。我在这里使用了 HighLevelProducer,因为 kafka-node 库的主要贡献者之一建议其他人使用它来解决分区问题。我尚未验证此解决方案是否适用于常规 Producer 而不是 HighLevelProducer。

在此示例中,我根据用户的 userId 向用户发送通知消息。这是消息被分区的关键。

const  KafkaClient, HighLevelProducer, KeyedMessage  = require('kafka-node');
const Promise = require('bluebird');
const NotificationMessage = require(__dirname + '/../models/notificationMessage.js');
const ProducerNotReadyError = require(__dirname + '/../errors/producerNotReadyError.js');
const Joi = require('@hapi/joi');

const KAFKA_TOPIC_TEMPORARY = 'NotifierTemporary';

/**
 * @classdesc Producer that sends notification messages to Kafka.
 * @class
 */
class NotifierProducer 

  /**
  * Create NotifierProducer.
  * @constructor
  * @param String kafkaHost - address of kafka server
  */
  constructor(kafkaHost) 
    const client = Promise.promisifyAll(new KafkaClient(kafkaHost));
    const producerOptions = 
      partitionerType: HighLevelProducer.PARTITIONER_TYPES.keyed, // this is a keyed partitioner
    ;
    this.producer = Promise.promisifyAll(new HighLevelProducer(client, producerOptions));
    this.isReady = false;

    this.producer.on('ready', async () => 
      await client.refreshMetadataAsync([KAFKA_TOPIC_TEMPORARY]);
      console.log('Notifier Producer is operational');
      this.isReady = true;
    );

    this.producer.on('error', err => 
      console.error('Notifier Producer error: ', err);
      this.isReady = false;
    );
  
  /**
  * @description Adds a notification message to the Kafka topic that is not saved in a database.
  * @param Int recipientId - accountId of recipient of notification message
  * @param String subject - subject header of the message
  * @param Object message - message payload to send to recipient
  */
  async sendTemporaryNotification(recipientId, subject, message) 
    const notificationMessage = 
      recipient: recipientId,
      subject,
      message,
    ;
    // we need to validate this message schema - this will throw if invalid
    Joi.assert(notificationMessage, NotificationMessage);
    // partition based on the recipient
    const messageKM = new KeyedMessage(notificationMessage.recipient, JSON.stringify(notificationMessage));
    const payloads = [
       topic: KAFKA_TOPIC_TEMPORARY, messages: messageKM, key: notificationMessage.recipient ,
    ];
    if (this.isReady) 
      await this.producer.sendAsync(payloads);
    
    else 
      throw new ProducerNotReadyError('Notifier Producer not ready');
    
  


/**
 * Kafka topic that the producer and corresponding consumer will use to send temporary messages.
 * @type string
*/
NotifierProducer.KAFKA_TOPIC_TEMPORARY = KAFKA_TOPIC_TEMPORARY;

module.exports = NotifierProducer;

【讨论】:

以上是关于带键的 kafka 消息似乎总是去同一个分区的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 消费者是否可以消费指定分区消息?

Kafka 消费者是否可以消费指定分区消息?

kafka消息通信原理学习

Kafka的顺序保证序列化及分区

Kafka消费者手动提交消息偏移

Quarkus Kafka Partitions 配置似乎只处理其中一个分区