即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败

Posted

技术标签:

【中文标题】即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败【英文标题】:kafka connect hdfs sink connector is failing even when json data contains schema and payload field 【发布时间】:2017-10-14 02:01:05 【问题描述】:

我正在尝试使用 kafka connect hdfs sink 连接器将 json 数据从 kafka 移动到 hdfs。

即使 kafka 中的 json 数据有模式和有效负载 kafka 连接任务失败并出现错误

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.

Kafka 中的数据:

./bin/kafka-console-consumer --topic test_hdfs_json_schema_payload_1 --zookeeper localhost:2181 --from-beginning

"schema": "type": "struct","fields": ["type": "string","optional": false,"field": "Name", "type": "string","optional": false,"field": "company"],"optional": false,"name": "Person","payload": "Name": "deepak","company": "BT"
"schema": "type": "struct","fields": ["type": "string","optional": false,"field": "Name", "type": "string","optional": false,"field": "company"],"optional": false,"name": "Person","payload": "Name": "sufi","company": "BT"
"schema": "type": "struct","fields": ["type": "string","optional": false,"field": "Name", "type": "string","optional": false,"field": "company"],"optional": false,"name": "Person","payload": "Name": "vikas","company": "BT"

使用以下命令提交 HDFS 接收器作业:

curl -X POST -H "Content-Type: application/json" --data '"name": "connect-cluster-15may-308pm", "config": "connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"2", "hdfs.url": "hdfs://localhost:9000","flush.size": "1","topics":"test_hdfs_json_schema_payload_1","topics.dir":"/deepak/kafka-connect/trial1"' http://localhost:8083/connectors

分布式 kafka connect worker 配置:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

错误信息:

http://localhost:8083/connectors/connect-cluster-15may-308pm/tasks/0/status


    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
    "id": 0,
    "worker_id": "127.0.0.1:8083"

【问题讨论】:

【参考方案1】:

您使用的是哪个版本的 Kafka Connect?在通过堆栈跟踪确定错误来源时,了解这一点会有所帮助。

我认为正在发生的事情是值中有数据,但键中没有。由于您将key.convertervalue.converter 都设置为JsonConverterschemas.enable=true,因此预计会看到包含schemapayload 的信封格式。不过,我猜你的钥匙都是null

这有点像https://issues.apache.org/jira/browse/KAFKA-3832 的相反问题,其中JsonConverter 永远不会生成真正的null 值。相反,它总是生成包含预期可选模式 + null 有效负载的信封。在这种情况下,无法从 Kafka 转换为 Connect 的数据 API,因为它期望密钥中的信封格式相同。

您可以通过将--property print.key=true 添加到您的控制台使用者命令来验证这是不是问题所在。如果它打印出null 键,问题是JsonConverter 无法解码它们。

一个简单的解决方法是只使用其他一些Converter 来表示不关心null 值的键——反正键中没有数据。 Kafka Connect 附带的一个是org.apache.kafka.connect.storage.StringConverter

【讨论】:

我使用的是 3.2.1 版本。你是对的,问题是空键。当我将密钥转换器更改为 org.apache.kafka.connect.storage.StringConverter 时它起作用了。但是当我创建 schemas.enable=false 并将 JsonConverter 用于键和值转换器时,HDFS 中没有同时生成输出,我在日志中看不到任何错误消息。

以上是关于即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败的主要内容,如果未能解决你的问题,请参考以下文章