使用 kafka lib 反序列化 PRIMITIVE AVRO KEY

Posted

技术标签:

【中文标题】使用 kafka lib 反序列化 PRIMITIVE AVRO KEY【英文标题】:Deserialization PRIMITIVE AVRO KEY with kafka lib 【发布时间】:2019-12-04 19:07:29 【问题描述】:

我目前无法在 KSTREAM 应用程序中反序列化 avro PRIMITIVE 密钥

用 avro 模式编码的密钥(在模式注册表中注册),

当我使用 kafka-avro-console-consumer 时,我可以看到密钥已正确反序列化

但不可能让它在 KSTREAM 应用程序中运行

密钥的 avro 模式是一个 PRIMITIVE:

"type":"string"

我已经按照confluent的文档了

final Serde<V> valueSpecificAvroSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
valueSpecificAvroSerde.configure(serdeConfig, false);

final Serdes.StringSerde keySpecificAvroSerde = new Serdes.StringSerde();
keySpecificAvroSerde.configure(serdeConfig, true);

Consumed<String, totoAvro> inputConf = Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde);

final KStream<String, totoAvro> mystream = builder.stream("name topic", inputConf);

mystream.peek((key, value) -> logger.info("topic KEY :" + key))

它对值很有效,但键将是一个字符串,其中包含模式注册表中的字节,而不仅仅是“卷轴”键

https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format

所以字符串键是 /§/./11016015201 ,但我想要卷轴值:1016015201

如果我打印字符串中的字节,它是 [0x00 0x00 0x00 0x02 0x31 0x14 0x31 0x30 0x31 0x36 0x30 0x31 0x35 0x32 0x30 0x31]

【问题讨论】:

如果你 key if Avro,你需要使用 AvroSerde,而不是 StringSerde (StringSerde) 仅用于纯字符串类型。 您好,感谢您的帮助。对不起,我应该说我已经尝试过使用 GenericAvroSerde java Exception caught during Deserialization, taskId: 6_3, topic: myTopic, partition: 3, offset: 0 (org.apache.kafka.streams.errors.LogAndFailExceptionHandler) java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord 这是一个原始的 avro 模式,所以它没有我可以 get() 的命名字段。 Schema Registry 不支持原始 Avro 类型。您需要在 Streams 代码中“硬编码”Avro Serde。 @MatthiasJ.Sax 原语是指非记录类型吗?他们为我们工作得很好......curl schema-registry:8081/subjects/topic-key/versions/1 显示"subject":"topic-key","version":1,"id":181,"schema":"\"string\"" 架构注册表中提供的 Avro Serdes 始终假定它是记录类型。因此,如果您使用原始类型,则会遇到类转换异常。 【参考方案1】:

更新

它现在正在工作:https://***.com/a/51957801/6227500

原答案

该功能目前在架构注册表项目中不可用。

但通过实现自定义 SERDE,您可以管理案例,

Thiyaga Rajan 提出了一个可行的实现

Serde class for AVRO primitive type

【讨论】:

以上是关于使用 kafka lib 反序列化 PRIMITIVE AVRO KEY的主要内容,如果未能解决你的问题,请参考以下文章

从Kafka主题消费消息时反序列化的问题

使用 Apache Beam 反序列化 Kafka AVRO 消息

如何使用来自 Kafka 的 Python 解码/反序列化 Avro

反序列化Avro序列化Kafka流的问题

LocalDateTime 的自定义 spring-kafka 反序列化器

在火花结构化流中反序列化 kafka avro 主题的 int 编码无效