无法读取 Kafka 主题 avro 消息

Posted

技术标签:

【中文标题】无法读取 Kafka 主题 avro 消息【英文标题】:Unable to read Kafka topic avro messages 【发布时间】:2018-01-12 00:07:02 【问题描述】:

Debezium 连接器的 Kafka 连接事件是 Avro 编码的。

在传递给 Kafka 连接独立服务的 connect-standalone.properties 中提到了以下内容。

key.converter=io.confluent.connect.avro.AvroConverter
value.confluent=io.confluent.connect.avro.AvroConverter
internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter
schema.registry.url=http://ip_address:8081
internal.key.converter.schema.registry.url=http://ip_address:8081
internal.value.converter.schema.registry.url=http://ip_address:8081

使用这些属性配置 Kafka 消费者代码:

Properties props = new Properties();
props.put("bootstrap.servers", "ip_address:9092");
props.put("zookeeper.connect", "ip_address:2181");
props.put("group.id", "test-consumer-group");
props.put("auto.offset.reset","smallest");
//Setting auto comit to false to ensure that on processing failure we retry the read
props.put("auto.commit.offset", "false");
props.put("key.converter.schema.registry.url", "ip_address:8081");
props.put("value.converter.schema.registry.url", "ip_address:8081");
props.put("schema.registry.url", "ip_address:8081");

在消费者实现中,以下是读取键和值组件的代码。我正在使用 REST 从架构注册表中获取键和值的架构。

GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(byteData, null));

解析密钥工作正常。在解析消息的值部分时,我得到了 ArrayIndexOutOfBoundsException。

下载了 Avro 的源代码并进行了调试。发现GenericDatumReader.readInt方法返回的是负值。这个值应该是数组(符号)的索引,因此应该是正数。

尝试使用 kafka-avro-standalone-consumer 消费事件,但它也抛出了 ArrayIndexOutOfBoundsException。所以,我的猜测是消息在 Kafka 连接(生产者)处编码不正确,问题出在配置上。

以下是问题:

    在生产者或消费者处传递的配置有什么问题吗? 为什么密钥反序列化有效但价值无效? 还有什么其他需要做的事情吗? (比如在某处指定字符编码)。 可以在生产环境中使用带有 Avro 的 Debezium,还是目前它是一个实验性功能? Debezium Avro 上的帖子明确表示,将来会包含涉及 Avro 的示例。

有很多帖子 Avro 反序列化抛出 ArrayIndexOutOfBoundsException 但无法将其与我面临的问题联系起来。

【问题讨论】:

【参考方案1】:

按照http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html 中的步骤操作,现在一切正常。

【讨论】:

以上是关于无法读取 Kafka 主题 avro 消息的主要内容,如果未能解决你的问题,请参考以下文章

如何查看kafka消息

仅读取来自 kafka 主题的特定消息

无法使用 JDBCSinkConnector 将数据从 Kafka 主题加载到 Postgres

在火花结构化流中反序列化 kafka avro 主题的 int 编码无效

使用 Apache Beam 反序列化 Kafka AVRO 消息

无法将 Catalyst 类型 IntegerType 转换为 Avro 类型 ["null","int"]