Kafka Connect - 不适用于更新操作

Posted

技术标签:

【中文标题】Kafka Connect - 不适用于更新操作【英文标题】:Kafka Connect - Not working for UPDATE operation 【发布时间】:2020-10-08 12:31:43 【问题描述】:

我是 Kafka-Connect 源和接收器的新手。我创建了将表数据从一个模式 (Schema1) 传输到另一个模式 (Schema2) 的应用程序,这里我使用 Oracle 作为数据库。我成功地将数据/行从表“Schema1.Header”转移到表“Schema2.Header”,但不能用于更新操作,配置如下。

源配置:

          
           "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:oracle:thin:@localhost:1524:XE",
            "connection.user": "USER",
            "connection.password": "user1234",
            "dialect.name": "OracleDatabaseDialect",
            "topic.prefix": "Schema1.Header",
            "incrementing.column.name": "SC_NO",
            "mode": "incrementing",
            "query": "SELECT * FROM (SELECT HEADER_V1.* FROM Schema1.Header HEADER_V1 INNER JOIN Schema1.LINE_V1 LINE_V1 ON HEADER_V1.SC_NO = LINE_V1.SC_NO AND LINE_V1.CLNAME_CODE ='XXXXXX' AND HEADER_V1.ITEM_TYPE = 'XXX')",
            "transforms": "ReplaceField",
            "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.ReplaceField.blacklist": "col_3,col_10"
            

SINK 配置:

  
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:oracle:thin:@localhost:1524:XE",
    "connection.user": "USER2",
    "connection.password": "user21234",
    "dialect.name": "OracleDatabaseDialect",
    "topics": "Schema1.Header",
    "table.name.format": "Schema2.Header",
    "tasks.max": "1"

请帮我解决这个问题。

注意:我只需要在 Schema Schema1.Tables 中执行所有 CRUD 操作,使用 Kafka 连接将这些数据传输到另一个 Schema Schema2.Tables。新插入的数据/行已传输,但更新的数据/行未通过 Kafka-Connect 传输。我必须做什么才能实现这一目标?

【问题讨论】:

【参考方案1】:

根据这个blog,您需要将mode 设置为timestamp(或者如果您想同时newupdated 行,最好设置timestamp+incrementing ) 在您的源配置中。

此外,您还需要指定timestamp.column.name,它应该指向一个时间戳列,该列在每次更新行时都会更新。

【讨论】:

以上是关于Kafka Connect - 不适用于更新操作的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect 不适用于主题策略

Connect-redis 商店不适用于 socket.io

Rails 5:更新操作不适用于 AJAXified 表单

PHP MySQL_connect 不适用于 MAMP

Connect By 子句适用于 11g,但不适用于 Oracle 8i:“ORA-01436:用户数据中的 CONNECT BY 循环”

保存、更新或对 iCloud 的任何操作不适用于 ad-hoc 或 appstore 配置文件