Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?
Posted
技术标签:
【中文标题】Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?【英文标题】:Kafka Connect: How can I send protobuf data from Kafka topics to HDFS using hdfs sink connector? 【发布时间】:2017-04-07 22:09:19 【问题描述】:我有一个生产者正在为某个主题生成 protobuf 消息。我有一个反序列化 protobuf 消息的消费者应用程序。但是 hdfs sink 连接器直接从 Kafka 主题中获取消息。 etc/schema-registry/connect-avro-standalone.properties
中的键值转换器将设置为什么?最好的方法是什么?提前致谢!
【问题讨论】:
What should I do when someone answers my question? 【参考方案1】:Kafka Connect 旨在通过converters 的概念将 Kafka 中的序列化格式问题与单个连接器分离。正如您似乎发现的那样,您需要将key.converter
和value.converter
类调整为支持protobufs 的实现。这些类通常作为普通的 Kafka Deserializer 实现,然后执行从特定于序列化的运行时格式(例如 protobufs 中的消息)到 Kafka Connect 的运行时 API(没有任何关联的序列化格式——它只是一个一组 Java 类型和一个用于定义模式的类)。
我不知道现有的实现。实现这一点的主要挑战是 protobufs 是自描述的(即您可以在不访问原始模式的情况下对其进行反序列化),但由于它的字段只是整数 ID,因此如果没有任何一个 a) 要求,您可能无法获得有用的模式信息转换器可以使用特定的模式,例如通过配置(这使得迁移模式更加复杂)或 b) 模式注册服务 + 数据包装格式,允许您动态查找模式。
【讨论】:
我有一个几乎无法正常工作的实现。我使用avro-protobuf
扩展了AvroConter
类的Deserializer
。我知道 Kafka Connect 希望限制支持的格式的数量,即 JSON 和 Avro,所以我不会按原样发布它。再说一次,我不想将整个avro-converter
复制并重命名为protobuf-converter
。贡献项目的最佳方法是什么?
Kafka Connect 绝对不想限制支持的格式数量。恰恰相反,我们包含了转换器,并使连接器的数据 API 成为通用的,以支持插入不同的序列化格式。 protobuf 实现绝对有价值,我建议发布它。我们愿意将它与 AvroConverter 一起包含在我们的存储库中,尽管我们希望获得一个相当完整的实现。我看到的最大问题是要获得有用的实现,我希望您需要类似于模式注册表的东西。以上是关于Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?的主要内容,如果未能解决你的问题,请参考以下文章
使用 Kafka HDFS Connect 写入 HDFS 时出错
使用JsonConverter的Kafka Connect HDFS Sink for JSON格式
使用kafka connect,将数据批量写到hdfs完整过程
Kafka 到 Elasticsearch、带有 Logstash 的 HDFS 或 Kafka Streams/Connect
有哪些开源解决方案可以使用 Kafka Connect 将数据从 Kafka 移动到 HDFS3?
需要使用 Kafka Connect 将小型 JSON 消息从 Kafka 移动到 HDFS,但不使用 Confluent 库,如果不是完全免费的话