Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?

Posted

技术标签:

【中文标题】Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?【英文标题】:Kafka Connect: How can I send protobuf data from Kafka topics to HDFS using hdfs sink connector? 【发布时间】:2017-04-07 22:09:19 【问题描述】:

我有一个生产者正在为某个主题生成 protobuf 消息。我有一个反序列化 protobuf 消息的消费者应用程序。但是 hdfs sink 连接器直接从 Kafka 主题中获取消息。 etc/schema-registry/connect-avro-standalone.properties 中的键值转换器将设置为什么?最好的方法是什么?提前致谢!

【问题讨论】:

What should I do when someone answers my question? 【参考方案1】:

Kafka Connect 旨在通过converters 的概念将 Kafka 中的序列化格式问题与单个连接器分离。正如您似乎发现的那样,您需要将key.convertervalue.converter 类调整为支持protobufs 的实现。这些类通常作为普通的 Kafka Deserializer 实现,然后执行从特定于序列化的运行时格式(例如 protobufs 中的消息)到 Kafka Connect 的运行时 API(没有任何关联的序列化格式——它只是一个一组 Java 类型和一个用于定义模式的类)。

我不知道现有的实现。实现这一点的主要挑战是 protobufs 是自描述的(即您可以在不访问原始模式的情况下对其进行反序列化),但由于它的字段只是整数 ID,因此如果没有任何一个 a) 要求,您可能无法获得有用的模式信息转换器可以使用特定的模式,例如通过配置(这使得迁移模式更加复杂)或 b) 模式注册服务 + 数据包装格式,允许您动态查找模式。

【讨论】:

我有一个几乎无法正常工作的实现。我使用avro-protobuf 扩展了AvroConter 类的Deserializer。我知道 Kafka Connect 希望限制支持的格式的数量,即 JSON 和 Avro,所以我不会按原样发布它。再说一次,我不想将整个avro-converter 复制并重命名为protobuf-converter。贡献项目的最佳方法是什么? Kafka Connect 绝对不想限制支持的格式数量。恰恰相反,我们包含了转换器,并使连接器的数据 API 成为通用的,以支持插入不同的序列化格式。 protobuf 实现绝对有价值,我建议发布它。我们愿意将它与 AvroConverter 一起包含在我们的存储库中,尽管我们希望获得一个相当完整的实现。我看到的最大问题是要获得有用的实现,我希望您需要类似于模式注册表的东西。

以上是关于Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Kafka HDFS Connect 写入 HDFS 时出错

使用JsonConverter的Kafka Connect HDFS Sink for JSON格式

使用kafka connect,将数据批量写到hdfs完整过程

Kafka 到 Elasticsearch、带有 Logstash 的 HDFS 或 Kafka Streams/Connect

有哪些开源解决方案可以使用 Kafka Connect 将数据从 Kafka 移动到 HDFS3?

需要使用 Kafka Connect 将小型 JSON 消息从 Kafka 移动到 HDFS,但不使用 Confluent 库,如果不是完全免费的话