neo4j 接收器连接器 - Avro - 未知的魔术字节

Posted

技术标签:

【中文标题】neo4j 接收器连接器 - Avro - 未知的魔术字节【英文标题】:neo4j sink connector - Avro - unknown magic byte 【发布时间】:2020-06-02 05:48:39 【问题描述】:

我在融合平台中添加 neo4j 接收器连接器时遇到以下错误。

引起:org.apache.kafka.connect.errors.DataException: 带有 schemas.enable 的 JsonConverter 需要“schema”和“payload” 字段,并且可能不包含其他字段。如果你想 反序列化纯 JSON 数据,设置 schemas.enable=false 在您的 转换器配置。在 org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)

请检查以下配置部分:

curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-book/config -H "Content-Type: application/json" -d '

    "topics": "pls.bookneo",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "neo4j.server.uri": "bolt://localhost:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "*****"

'

更新更改如下: curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-book/config -H "Content-Type: application/json" -d '

"topics": "pulseTest.bookneo",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter.schema.registry.url": "http://localhost:8081", 
"value.converter.schema.registry.url": "http://localhost:8081", 
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"true",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pulse",
"neo4j.encryption.enabled": false,
"neo4j.topic.cdc.schema": "pulseTest.bookneo"

'

现在面临这个问题:

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pulseTest.bookneo to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

现在,我的连接器运行正常,但节点在 neo4j 中创建为空白。

curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-projectp/config -H "Content-Type: application/json" -d '

"topics": "projectsp",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"errors.tolerance": "all",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pulse",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.projectsp": " MERGE (p:projectsprojectname:coalesce(event.projectname,0),count:coalesce(event.count,0))  "

'

这是我的密码查询: " MERGE (p:projectsprojectname:coalesce(event.projectname,0),count:coalesce(event.count,0)) "

I have attached the data coming from the topic

【问题讨论】:

您的错误并非特定于 neo4j。您为此做了哪些研究? 我正在为 neo4j 接收器连接器添加配置。 【参考方案1】:

您也在 key 中使用 JsonConverter,因此还需要在此处指定 schemas.enable

curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-book/config -H "Content-Type: application/json" -d '
    "topics": "pls.bookneo",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "neo4j.server.uri": "bolt://localhost:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "*****"

'

要了解有关转换器的更多信息,请参阅https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

【讨论】:

【参考方案2】:

未知的魔法字节!

您的主题不包含 Confluent 客户端生成的 Avro 数据,因此反序列化器失败

您可以使用 Kafka avro 控制台消费者来验证同样的事情

【讨论】:

以上是关于neo4j 接收器连接器 - Avro - 未知的魔术字节的主要内容,如果未能解决你的问题,请参考以下文章

数据流 - 将 avro 对象存储到未知的 GCS 文件夹

使用 Kafka KSQL AVRO 表作为 Kafka Connect JDBC Sink 源的问题

Kafka Connect JDBC 接收器连接器

flink sql client 连接kafka解析avro数据 (avro ArrayIndexOutOfBoundsException 解决办法)

Apache Camel Kafka 连接器:以 Avro 格式写入 GCS

无法读取 Kafka 主题 avro 消息