使用 kafka lib 反序列化 PRIMITIVE AVRO KEY
Posted
技术标签:
【中文标题】使用 kafka lib 反序列化 PRIMITIVE AVRO KEY【英文标题】:Deserialization PRIMITIVE AVRO KEY with kafka lib 【发布时间】:2019-12-04 19:07:29 【问题描述】:我目前无法在 KSTREAM 应用程序中反序列化 avro PRIMITIVE 密钥
用 avro 模式编码的密钥(在模式注册表中注册),
当我使用 kafka-avro-console-consumer 时,我可以看到密钥已正确反序列化
但不可能让它在 KSTREAM 应用程序中运行
密钥的 avro 模式是一个 PRIMITIVE:
"type":"string"
我已经按照confluent的文档了
final Serde<V> valueSpecificAvroSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
valueSpecificAvroSerde.configure(serdeConfig, false);
final Serdes.StringSerde keySpecificAvroSerde = new Serdes.StringSerde();
keySpecificAvroSerde.configure(serdeConfig, true);
Consumed<String, totoAvro> inputConf = Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde);
final KStream<String, totoAvro> mystream = builder.stream("name topic", inputConf);
mystream.peek((key, value) -> logger.info("topic KEY :" + key))
它对值很有效,但键将是一个字符串,其中包含模式注册表中的字节,而不仅仅是“卷轴”键
https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format
所以字符串键是 /§/./11016015201 ,但我想要卷轴值:1016015201
如果我打印字符串中的字节,它是 [0x00 0x00 0x00 0x02 0x31 0x14 0x31 0x30 0x31 0x36 0x30 0x31 0x35 0x32 0x30 0x31]
【问题讨论】:
如果你 key if Avro,你需要使用 AvroSerde,而不是StringSerde
(StringSerde
) 仅用于纯字符串类型。
您好,感谢您的帮助。对不起,我应该说我已经尝试过使用 GenericAvroSerde java Exception caught during Deserialization, taskId: 6_3, topic: myTopic, partition: 3, offset: 0 (org.apache.kafka.streams.errors.LogAndFailExceptionHandler) java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
这是一个原始的 avro 模式,所以它没有我可以 get() 的命名字段。
Schema Registry 不支持原始 Avro 类型。您需要在 Streams 代码中“硬编码”Avro Serde。
@MatthiasJ.Sax 原语是指非记录类型吗?他们为我们工作得很好......curl schema-registry:8081/subjects/topic-key/versions/1
显示"subject":"topic-key","version":1,"id":181,"schema":"\"string\""
架构注册表中提供的 Avro Serdes 始终假定它是记录类型。因此,如果您使用原始类型,则会遇到类转换异常。
【参考方案1】:
更新
它现在正在工作:https://***.com/a/51957801/6227500
原答案
该功能目前在架构注册表项目中不可用。
但通过实现自定义 SERDE,您可以管理案例,
Thiyaga Rajan 提出了一个可行的实现
Serde class for AVRO primitive type
【讨论】:
以上是关于使用 kafka lib 反序列化 PRIMITIVE AVRO KEY的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Beam 反序列化 Kafka AVRO 消息
如何使用来自 Kafka 的 Python 解码/反序列化 Avro