无法读取 Kafka 主题 avro 消息
Posted
技术标签:
【中文标题】无法读取 Kafka 主题 avro 消息【英文标题】:Unable to read Kafka topic avro messages 【发布时间】:2018-01-12 00:07:02 【问题描述】:Debezium 连接器的 Kafka 连接事件是 Avro 编码的。
在传递给 Kafka 连接独立服务的 connect-standalone.properties 中提到了以下内容。
key.converter=io.confluent.connect.avro.AvroConverter
value.confluent=io.confluent.connect.avro.AvroConverter
internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter
schema.registry.url=http://ip_address:8081
internal.key.converter.schema.registry.url=http://ip_address:8081
internal.value.converter.schema.registry.url=http://ip_address:8081
使用这些属性配置 Kafka 消费者代码:
Properties props = new Properties();
props.put("bootstrap.servers", "ip_address:9092");
props.put("zookeeper.connect", "ip_address:2181");
props.put("group.id", "test-consumer-group");
props.put("auto.offset.reset","smallest");
//Setting auto comit to false to ensure that on processing failure we retry the read
props.put("auto.commit.offset", "false");
props.put("key.converter.schema.registry.url", "ip_address:8081");
props.put("value.converter.schema.registry.url", "ip_address:8081");
props.put("schema.registry.url", "ip_address:8081");
在消费者实现中,以下是读取键和值组件的代码。我正在使用 REST 从架构注册表中获取键和值的架构。
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(byteData, null));
解析密钥工作正常。在解析消息的值部分时,我得到了 ArrayIndexOutOfBoundsException。
下载了 Avro 的源代码并进行了调试。发现GenericDatumReader.readInt方法返回的是负值。这个值应该是数组(符号)的索引,因此应该是正数。
尝试使用 kafka-avro-standalone-consumer 消费事件,但它也抛出了 ArrayIndexOutOfBoundsException。所以,我的猜测是消息在 Kafka 连接(生产者)处编码不正确,问题出在配置上。
以下是问题:
-
在生产者或消费者处传递的配置有什么问题吗?
为什么密钥反序列化有效但价值无效?
还有什么其他需要做的事情吗? (比如在某处指定字符编码)。
可以在生产环境中使用带有 Avro 的 Debezium,还是目前它是一个实验性功能? Debezium Avro 上的帖子明确表示,将来会包含涉及 Avro 的示例。
有很多帖子 Avro 反序列化抛出 ArrayIndexOutOfBoundsException 但无法将其与我面临的问题联系起来。
【问题讨论】:
【参考方案1】:按照http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html 中的步骤操作,现在一切正常。
【讨论】:
以上是关于无法读取 Kafka 主题 avro 消息的主要内容,如果未能解决你的问题,请参考以下文章
无法使用 JDBCSinkConnector 将数据从 Kafka 主题加载到 Postgres
在火花结构化流中反序列化 kafka avro 主题的 int 编码无效