在 Kafka 中发送 ProducerRecord 所需的 Serilizer 类型

Posted

技术标签:

【中文标题】在 Kafka 中发送 ProducerRecord 所需的 Serilizer 类型【英文标题】:Serilizer type needed for sending ProducerRecord in Kafka 【发布时间】:2018-05-25 09:30:32 【问题描述】:

我正在使用 spring kafka (KafkaTemplate) 发送字符串消息。但为了使其与旧代码兼容,我需要在消息中附加额外的 CorrelationId。所以我创建了 ProducerRecord 对象,以我的消息作为它的值,并在它的 HeaderRecord 中设置 CorrelationId。

        producerRecord = new ProducerRecord<>(
            kafkaTemplate_.getDefaultTopic(),
            null,
            null,
            null,
            myStringMessage,
            Collections.singletonList(new RecordHeader("CorrelationID", someIdAsBytes)));
    kafkaTemplate_.sendDefault(producerRecord);

键和值序列化器设置为 StringSerializer,但 about 代码无法说明 ProduderRecord 不是 String 或 StringSerializer 类型。

如果我像下面那样执行 toString(),它会起作用。但是在MessageListener 端,接收到的ConsumerRecord 没有correlationId 作为它的RecordHeader,因为correlationId 是作为ProducerRecord 的RecordHeader 附加的。所以我必须在 MessageListener onMessage(Object msg) 上进行类型转换,例如从 Object 转换为 ConsumerRecord(这是 ProduderRecord 的字符串序列化版本),从 ConsumerRecord.value() 字段解析 ProducerRecord,然后从 ProcuderRecord 标头获取 CorrelationId,并从生产者记录。这看起来很麻烦。我的发送和接收逻辑正常吗?

kafkaTemplate_.sendDefault(producerRecord.toString());

【问题讨论】:

【参考方案1】:

看起来您只是缺少以下区别:

/**
 * Send the data to the default topic with no key or partition.
 * @param data The data.
 * @return a Future for the @link SendResult.
 */
ListenableFuture<SendResult<K, V>> sendDefault(V data);

/**
 * Send the provided @link ProducerRecord.
 * @param record the record.
 * @return a Future for the @link SendResult.
 * @since 1.3
 */
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

看,你在ProducerRecord 中有一个topic 属性,所以,根本没有理由考虑sendDefault()。另外,正如您所见,这让您感到困惑。

【讨论】:

由于我的 ProducerRecord 中有主题,我可以使用 send(ProducerRecord record) 方法而不是 sendDefault() 发送。知道了!。但我怀疑即使是 send() 方法也能让我发送 ProducerRecord 而无需转换为 String。 你知道如果我发送(producerRecord),我应该配置什么类型的键和值序列化器? 好吧,你根本不能施放。那是value 的问题,但现在您拥有一个完全配置的ProducerRecord。无论如何都必须配置key/value 序列化程序。不知道你有什么要求。想象一下你发送没有标题!虽然我看到你有myStringMessage。所以,valueSerializer 确实必须是第一个字符串。 使用 StringSerializer 本身从 sendDefault 更改为 send 方法有效。你发现了我的困惑。谢谢 Artem。

以上是关于在 Kafka 中发送 ProducerRecord 所需的 Serilizer 类型的主要内容,如果未能解决你的问题,请参考以下文章

在 Kafka 中发送 ProducerRecord 所需的 Serilizer 类型

Java API中kafka生产者发送消息没有成功

Kafka核心API——Producer生产者

(04)使用kafka脚本发送消息和接收消息

springboot kafka发送消息支持成功失败通知

kafka生产者的发送消息的流程以及代码案例