ES Sink 连接器 debezium 因错误而停止

Posted

技术标签:

【中文标题】ES Sink 连接器 debezium 因错误而停止【英文标题】:ES Sink connector debezium stops with error 【发布时间】:2020-01-05 12:03:57 【问题描述】:

要了解 cdc 的工作原理, 我一直在使用 debezium 网站https://debezium.io/blog/2018/03/08/creating-ddd-aggregates-with-debezium-and-kafka-streams/ 提供的以下示例。

如果我尝试将接收器连接器从 mongo db 更改为弹性搜索,然后启动 es-sink 连接器。它显示以下错误

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Mysql Debezium 源连接器属性是这样的(请忽略更正网址)


"name": "mysql-source",
"config": 
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "cdc",
    "database.password": "passwrod",
    "database.server.id": "1840514",
    "database.server.name": "dbserver1",
    "table.whitelist": "inventory.customers,inventory.addresses",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "transforms": "unwrap",
    "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.unwrap.drop.tombstones":"false"

本示例中使用的是弹性搜索接收器连接器 https://debezium.io/blog/2018/01/17/streaming-to-elasticsearch/

Elastic sink 连接器 属性是这样的(请忽略更正网址)


"name": "elastic-sink",
"config": 
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "product-cdc,final_ddd_aggregates,dbserver1.inventory.customers,dbserver1.inventory.addresses",
    "connection.url": "https://localhost:9243",
    "transforms": "unwrap,key",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.key.field": "id",
    "key.ignore": "false",
    "schema.ignore" : "true",
    "value.converter.schemas.enable":"true",
    "type.name":"final_ddd_aggregates"

请协助我。

【问题讨论】:

【参考方案1】:

在您的配置中,您需要按照错误消息告诉您的操作,并设置schemas.enable=false。使用文章中的示例,而不是:


    "name": "mongodb-sink",
    "config": 
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": "1",
        "topics": "final_ddd_aggregates",
        "mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
        "mongodb.collection": "customers_with_addresses",
        "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
        "mongodb.delete.on.null.values": true
    

你会:


    "name": "mongodb-sink",
    "config": 
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": "1",
        "topics": "final_ddd_aggregates",
        "mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
        "mongodb.collection": "customers_with_addresses",
        "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
        "mongodb.delete.on.null.values": true,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false"
    

要了解有关转换器等的更多信息,请参阅 https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained 和 http://rmoff.dev/ksldn19-kafka-connect

【讨论】:

嘿罗宾,似乎谁被这个问题误导了,实际上 mongodb sink 连接器可以正常工作,如示例中所述。所以 mongodb sink 没有问题,它可以在没有转换器的情况下正常工作。问题出在弹性接收器连接器上,我从其他示例中引用了它。 您能否用您的源和接收器连接器配置的详细信息更新您的问题,我们可以从那里得到它。我的答案基于您引用的错误消息。 嘿@Robin 按照指示,我已经使用源和接收器属性更新了问题,并使用了虚拟 url 连接器。 您的 ES 接收器中有 "value.converter.schemas.enable":"true",。您需要将其设置为"value.converter.schemas.enable": "false"(请参阅我当前答案中的示例)。【参考方案2】:

因为错误消息暗示您可能在正在阅读的主题中存储了没有架构的 JSON 消息。您需要在源端启用它或在​​接收端禁用它。

详情请查看FAQ entry。

【讨论】:

你能告诉我如何在接收端禁用它,因为主题源是发布它的 kstream 代码。

以上是关于ES Sink 连接器 debezium 因错误而停止的主要内容,如果未能解决你的问题,请参考以下文章

Debezium 消息与 kafka-connect sink 连接器期望的格式兼容

Debezium Mysql 连接器因 IllegalStateException 而失败,历史主题具有无限保留

使用 SQL Server 上的 JDBC Sink 连接器自动创建适当的 DATETIME 类型字段

如何使用debezium更改数据捕获在mysql中捕获数据并在kafka connect中使用jdbc sink?

如何使用在 docker 上运行的 debezium 和 confluent-sink-connector 将所有更改从源数据库复制到目标数据库

由于错误代码 1236,无法启动 debezium MySQL 连接器