即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败

Posted

技术标签:

【中文标题】即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败【英文标题】:kafka connect hdfs sink connector is failing even when json data contains schema and payload field 【发布时间】:2017-10-14 02:01:05 【问题描述】:

我正在尝试使用 kafka connect hdfs sink 连接器将 json 数据从 kafka 移动到 hdfs。

即使 kafka 中的 json 数据有模式和有效负载 kafka 连接任务失败并出现错误

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields.

Kafka 中的数据:

./bin/kafka-console-consumer --topic test_hdfs_json_schema_payload_1 --zookeeper localhost:2181 --from-beginning

"schema": "type": "struct","fields": ["type": "string","optional": false,"field": "Name", "type": "string","optional": false,"field": "company"],"optional": false,"name": "Person","payload": "Name": "deepak","company": "BT"
"schema": "type": "struct","fields": ["type": "string","optional": false,"field": "Name", "type": "string","optional": false,"field": "company"],"optional": false,"name": "Person","payload": "Name": "sufi","company": "BT"
"schema": "type": "struct","fields": ["type": "string","optional": false,"field": "Name", "type": "string","optional": false,"field": "company"],"optional": false,"name": "Person","payload": "Name": "vikas","company": "BT"

使用以下命令提交 HDFS 接收器作业:

curl -X POST -H "Content-Type: application/json" --data '"name": "connect-cluster-15may-308pm", "config": "connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"2", "hdfs.url": "hdfs://localhost:9000","flush.size": "1","topics":"test_hdfs_json_schema_payload_1","topics.dir":"/deepak/kafka-connect/trial1"' http://localhost:8083/connectors

分布式 kafka connect worker 配置:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

错误信息:

http://localhost:8083/connectors/connect-cluster-15may-308pm/tasks/0/status


    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:400)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n",
    "id": 0,
    "worker_id": "127.0.0.1:8083"

【问题讨论】:

【参考方案1】:

您使用的是哪个版本的 Kafka Connect?在通过堆栈跟踪确定错误来源时,了解这一点会有所帮助。

我认为正在发生的事情是值中有数据,但键中没有。由于您将key.convertervalue.converter 都设置为JsonConverterschemas.enable=true,因此预计会看到包含schemapayload 的信封格式。不过,我猜你的钥匙都是null

这有点像https://issues.apache.org/jira/browse/KAFKA-3832 的相反问题,其中JsonConverter 永远不会生成真正的null 值。相反,它总是生成包含预期可选模式 + null 有效负载的信封。在这种情况下,无法从 Kafka 转换为 Connect 的数据 API,因为它期望密钥中的信封格式相同。

您可以通过将--property print.key=true 添加到您的控制台使用者命令来验证这是不是问题所在。如果它打印出null 键,问题是JsonConverter 无法解码它们。

一个简单的解决方法是只使用其他一些Converter 来表示不关心null 值的键——反正键中没有数据。 Kafka Connect 附带的一个是org.apache.kafka.connect.storage.StringConverter

【讨论】:

我使用的是 3.2.1 版本。你是对的,问题是空键。当我将密钥转换器更改为 org.apache.kafka.connect.storage.StringConverter 时它起作用了。但是当我创建 schemas.enable=false 并将 JsonConverter 用于键和值转换器时,HDFS 中没有同时生成输出,我在日志中看不到任何错误消息。

以上是关于即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败的主要内容,如果未能解决你的问题,请参考以下文章

更新存储在 jwt 有效负载中的字段

使用 Hystrix Feign 记录请求和响应 json 有效负载

如何向 Moya.Response JSON 添加一个字段,该字段不在来自 http 响应的真实有效负载中

Azure 指标警报中的自定义 Json 有效负载

JSON Web 令牌 - 有效负载中的密码?

严格验证可空字段[重复]