使用JsonConverter的Kafka Connect HDFS Sink for JSON格式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用JsonConverter的Kafka Connect HDFS Sink for JSON格式相关的知识,希望对你有一定的参考价值。

通过JSON向Kafka制作/消费。使用以下属性保存到JSON中的HDFS:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

制片人:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" 
      --data '{"schema": {"type": "boolean", "optional": false, "name": "bool", "version": 2, "doc": "the documentation", "parameters": {"foo": "bar" }}, "payload": true }' "http://localhost:8082/topics/test_hdfs_json"

消费者:

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-hdfs/quickstart-hdfs.properties

问题1:

key.converter.schemas.enable=true

value.converter.schemas.enable=true

获得例外:

org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires "schema" and "payload" fields and may not contain additional fields
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)

问题2:

启用上述两个属性不会引发任何问题,但不会通过hdfs写入任何数据。

任何建议将受到高度赞赏。

谢谢

答案

转换器指的是如何从Kafka主题转换数据以由连接器解释并写入HDFS。 HDFS连接器仅支持以avro或镶木地板开箱即用于写入HDFS。您可以找到有关如何将格式扩展为JSON here的信息。如果您进行此类扩展,我建议您将其贡献给连接器的开源项目。

另一答案

要将输入的Json格式消息写入HDFS,请在下面设置属性

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

以上是关于使用JsonConverter的Kafka Connect HDFS Sink for JSON格式的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON

Web API - JsonConverter - 自定义属性

如何修复 Spark Streaming Kafka Consumer 中的“java.io.NotSerializableException:org.apache.kafka.clients.con

如何调用 JsonConvert.DeserializeObject 并禁用通过 [JsonConverter] 应用于基本类型的 JsonConverter?

.net HttpClient 与自定义 JsonConverter

JsonConverter 等效于使用 System.Text.Json