在 Kafka 中发送 ProducerRecord 所需的 Serilizer 类型
Posted
技术标签:
【中文标题】在 Kafka 中发送 ProducerRecord 所需的 Serilizer 类型【英文标题】:Serilizer type needed for sending ProducerRecord in Kafka 【发布时间】:2018-05-25 09:30:32 【问题描述】:我正在使用 spring kafka (KafkaTemplate) 发送字符串消息。但为了使其与旧代码兼容,我需要在消息中附加额外的 CorrelationId。所以我创建了 ProducerRecord 对象,以我的消息作为它的值,并在它的 HeaderRecord 中设置 CorrelationId。
producerRecord = new ProducerRecord<>(
kafkaTemplate_.getDefaultTopic(),
null,
null,
null,
myStringMessage,
Collections.singletonList(new RecordHeader("CorrelationID", someIdAsBytes)));
kafkaTemplate_.sendDefault(producerRecord);
键和值序列化器设置为 StringSerializer,但 about 代码无法说明 ProduderRecord 不是 String 或 StringSerializer 类型。
如果我像下面那样执行 toString(),它会起作用。但是在MessageListener 端,接收到的ConsumerRecord 没有correlationId 作为它的RecordHeader,因为correlationId 是作为ProducerRecord 的RecordHeader 附加的。所以我必须在 MessageListener onMessage(Object msg)
上进行类型转换,例如从 Object 转换为 ConsumerRecord(这是 ProduderRecord 的字符串序列化版本),从 ConsumerRecord.value() 字段解析 ProducerRecord,然后从 ProcuderRecord 标头获取 CorrelationId,并从生产者记录。这看起来很麻烦。我的发送和接收逻辑正常吗?
kafkaTemplate_.sendDefault(producerRecord.toString());
【问题讨论】:
【参考方案1】:看起来您只是缺少以下区别:
/**
* Send the data to the default topic with no key or partition.
* @param data The data.
* @return a Future for the @link SendResult.
*/
ListenableFuture<SendResult<K, V>> sendDefault(V data);
和
/**
* Send the provided @link ProducerRecord.
* @param record the record.
* @return a Future for the @link SendResult.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
看,你在ProducerRecord
中有一个topic
属性,所以,根本没有理由考虑sendDefault()
。另外,正如您所见,这让您感到困惑。
【讨论】:
由于我的 ProducerRecord 中有主题,我可以使用 send(ProducerRecordvalue
的问题,但现在您拥有一个完全配置的ProducerRecord
。无论如何都必须配置key/value
序列化程序。不知道你有什么要求。想象一下你发送没有标题!虽然我看到你有myStringMessage
。所以,valueSerializer
确实必须是第一个字符串。
使用 StringSerializer 本身从 sendDefault 更改为 send 方法有效。你发现了我的困惑。谢谢 Artem。以上是关于在 Kafka 中发送 ProducerRecord 所需的 Serilizer 类型的主要内容,如果未能解决你的问题,请参考以下文章