即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败
Posted
技术标签:
【中文标题】即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败【英文标题】:kafka connect hdfs sink connector is failing even when json data contains schema and payload field 【发布时间】:2017-10-14 02:01:05 【问题描述】:我正在尝试使用 kafka connect hdfs sink 连接器将 json 数据从 kafka 移动到 hdfs。
即使 kafka 中的 json 数据有模式和有效负载 kafka 连接任务失败并出现错误
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.
Kafka 中的数据:
./bin/kafka-console-consumer --topic test_hdfs_json_schema_payload_1 --zookeeper localhost:2181 --from-beginning
"schema": "type": "struct","fields": ["type": "string","optional": false,"field": "Name", "type": "string","optional": false,"field": "company"],"optional": false,"name": "Person","payload": "Name": "deepak","company": "BT"
"schema": "type": "struct","fields": ["type": "string","optional": false,"field": "Name", "type": "string","optional": false,"field": "company"],"optional": false,"name": "Person","payload": "Name": "sufi","company": "BT"
"schema": "type": "struct","fields": ["type": "string","optional": false,"field": "Name", "type": "string","optional": false,"field": "company"],"optional": false,"name": "Person","payload": "Name": "vikas","company": "BT"
使用以下命令提交 HDFS 接收器作业:
curl -X POST -H "Content-Type: application/json" --data '"name": "connect-cluster-15may-308pm", "config": "connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"2", "hdfs.url": "hdfs://localhost:9000","flush.size": "1","topics":"test_hdfs_json_schema_payload_1","topics.dir":"/deepak/kafka-connect/trial1"' http://localhost:8083/connectors
分布式 kafka connect worker 配置:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
错误信息:
http://localhost:8083/connectors/connect-cluster-15may-308pm/tasks/0/status
"state": "FAILED",
"trace": "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
"id": 0,
"worker_id": "127.0.0.1:8083"
【问题讨论】:
【参考方案1】:您使用的是哪个版本的 Kafka Connect?在通过堆栈跟踪确定错误来源时,了解这一点会有所帮助。
我认为正在发生的事情是值中有数据,但键中没有。由于您将key.converter
和value.converter
都设置为JsonConverter
和schemas.enable=true
,因此预计会看到包含schema
和payload
的信封格式。不过,我猜你的钥匙都是null
。
这有点像https://issues.apache.org/jira/browse/KAFKA-3832 的相反问题,其中JsonConverter
永远不会生成真正的null
值。相反,它总是生成包含预期可选模式 + null
有效负载的信封。在这种情况下,无法从 Kafka 转换为 Connect 的数据 API,因为它期望密钥中的信封格式相同。
您可以通过将--property print.key=true
添加到您的控制台使用者命令来验证这是不是问题所在。如果它打印出null
键,问题是JsonConverter 无法解码它们。
一个简单的解决方法是只使用其他一些Converter
来表示不关心null
值的键——反正键中没有数据。 Kafka Connect 附带的一个是org.apache.kafka.connect.storage.StringConverter
。
【讨论】:
我使用的是 3.2.1 版本。你是对的,问题是空键。当我将密钥转换器更改为 org.apache.kafka.connect.storage.StringConverter 时它起作用了。但是当我创建 schemas.enable=false 并将 JsonConverter 用于键和值转换器时,HDFS 中没有同时生成输出,我在日志中看不到任何错误消息。以上是关于即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败的主要内容,如果未能解决你的问题,请参考以下文章
使用 Hystrix Feign 记录请求和响应 json 有效负载