如何在 Kafka Connect JDBC Source 连接器中添加显式 WHERE 子句

Posted

技术标签:

【中文标题】如何在 Kafka Connect JDBC Source 连接器中添加显式 WHERE 子句【英文标题】:How to add explicit WHERE clause in Kafka Connect JDBC Source connector 【发布时间】:2019-10-30 22:22:24 【问题描述】:

我正在使用 kafka 连接从 DB2 到 kafka 主题的源数据,并且我正在配置 sql 查询以从 DB2 读取数据,下面是查询

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL'

我正在使用设置 "timestamp.column.name": "CREATE_TS" 这里问题出在查询中,它们已经是 WHERE 子句,并且 kafka connect 尝试添加另一个带有时间戳列的 where 子句,它正在创建问题,另一个问题是如果我删除 where 子句来自下面的 sql 子句

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR

然后出现 substr 错误,如下所示

SQL Error [22011]: THE SECOND OR THIRD ARGUMENT OF THE SUBSTR OR SUBSTRING FUNCTION IS OUT OF RANGE. SQLCODE=-138, SQLSTATE=22011, DRIVER=4.19.26

任何人都可以就这两个问题提出建议吗,我卡在这一点上。

【问题讨论】:

【参考方案1】:

这是因为您尝试同时使用"mode": "timestamp"queryTimestampIncrementingTableQuerierWHERE 子句附加到与query 中现有WHERE 子句冲突的查询。

JDBC source connector docs 对此很清楚:

query

如果指定,则执行查询以选择新的或更新的行。采用 如果要连接表,请选择此设置中的列子集 表格或过滤数据。如果使用,此连接器将仅复制数据 使用这个查询——全表复制将被禁用。不同的 查询模式仍可用于增量更新,但为了 正确构造增量查询,必须可以 将 WHERE 子句附加到此查询(即,可能没有 WHERE 子句 用过的)。 如果使用 WHERE 子句,它必须处理增量查询 本身


作为一种解决方法,您可以将查询修改为(取决于您使用的 SQL 风格)

SELECT * FROM ( SELECT * FROM table WHERE ...)

WITH a AS
   SELECT * FROM b
    WHERE ...
SELECT * FROM a

例如,在您的情况下,查询应该是

"query":"SELECT * FROM (SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL') o"

【讨论】:

以上是关于如何在 Kafka Connect JDBC Source 连接器中添加显式 WHERE 子句的主要内容,如果未能解决你的问题,请参考以下文章

如何使用debezium更改数据捕获在mysql中捕获数据并在kafka connect中使用jdbc sink?

Kafka Connect JDBC Sink 连接器:如何删除没有 NULL 值的记录?

Kafka Connect JDBC 接收器连接器

Kafka Connect JDBC Source MySQL 全量同步

pyflink消费kafka-connect-jdbc消息(带schema)

使用 Kafka KSQL AVRO 表作为 Kafka Connect JDBC Sink 源的问题