Kafka JDBC连接器加载所有数据,然后增量

Posted

技术标签:

【中文标题】Kafka JDBC连接器加载所有数据,然后增量【英文标题】:Kafka JDBC connector load all data, then incremental 【发布时间】:2017-10-01 23:50:22 【问题描述】:

我试图弄清楚如何最初从查询中获取所有数据,然后只使用 kafka 连接器进行增量更改。这样做的原因是我想将所有数据加载到弹性搜索中,然后保持 es 与我的 kafka 流同步。 目前,我首先使用带有模式=批量的连接器,然后将其更改为时间戳。这工作正常。

但是,如果我们想要将所有数据重新加载到 Streams 和 ES,这意味着我们必须编写一些脚本以某种方式清理或删除 kafka 流和 es 索引数据,修改 connect ini 以将模式设置为批量,重新启动一切,给它时间加载所有数据,然后再次将脚本修改为时间戳模式,然后再次重新启动所有内容(需要这样一个脚本的原因是偶尔,批量更新会通过我们还没有的 etl 进程纠正历史数据有控制权,而且这个过程不会更新时间戳)

有没有人在做类似的事情并找到了更优雅的解决方案?

【问题讨论】:

【参考方案1】:

过了很久才回来。该方法能够解决这个问题,并且永远不必使用批量模式

    停止连接器 擦除每个连接器 jvm 的偏移文件 (可选)如果您想要完全擦除和加载,您可能还想使用 kafka/connect utils/rest api 删除您的主题(并且不要忘记状态主题) 重新启动连接。

【讨论】:

我正在尝试解决同样的问题(即,从一开始就重新加载数据)。您的解决方案似乎比将配置更新为批量并返回时间戳更复杂。它解决了其他方法没有解决的什么问题?【参考方案2】:

如何最初从查询中获取所有数据,然后仅使用 kafka 连接器增量更改。

也许这对你有帮助。比如我有一张桌子:

╔════╦═════════════╦═══════════╗
║ Id ║    Name     ║  Surname  ║
╠════╬═════════════╬═══════════╣
║  1 ║ Martin      ║ Scorsese  ║
║  2 ║ Steven      ║ Spielberg ║
║  3 ║ Christopher ║ Nolan     ║
╚════╩═════════════╩═══════════╝

在这种情况下,我将创建一个视图:

CREATE OR REPLACE VIEW EDGE_DIRECTORS AS
SELECT 0 AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID =< 2
UNION ALL
SELECT ID AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID > 2;

在你可以使用的 kafka jdbc 连接器的属性文件中:

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=incrementing
incrementing.column.name=EXID
topic.prefix=
tasks.max=1
name=gv-jdbc-source-connector
connection.url=
table.types=VIEW
table.whitelist=EDGE_DIRECTORS

所以kafka jdbc连接器会采取措施:

    首先是EXID = 0的所有数据; 它将在connector.offsets文件中存储偏移值= 0; 将在 DIRECTORS 表中插入新行。 Kafka JDBC 连接器将 执行:Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS 并将 请注意 EXID 已增加。 数据将在 Kafka Streams 中更新。

【讨论】:

不正是我要问的。目前我正在使用时间戳列。我必须将模式更改为批量重新加载所有内容,然后更改回时间戳以让 kafka 然后增量加载更改或新数据(它在查询中附加一个往返时间戳来执行此操作)。我希望避免每次我想从“干净”的状态开始时都必须切换模式。

以上是关于Kafka JDBC连接器加载所有数据,然后增量的主要内容,如果未能解决你的问题,请参考以下文章

pyflink消费kafka-connect-jdbc消息(带schema)

5. JDBC之数据库连接池——Part1

带有 MSSQL 的 Kafka JDBC 连接器仅流式传输 100 行

kafka JDBC源连接器无法获取postgres表

kafka-connect sink 连接器 pk.mode 用于具有自动增量的表

带有旧数据库的JDBC Kafka连接器