KSQLDB - 从 debezium cdc 源连接器获取数据并将 Stream 与表连接

Posted

技术标签:

【中文标题】KSQLDB - 从 debezium cdc 源连接器获取数据并将 Stream 与表连接【英文标题】:KSQLDB - Getting data from debezium cdc source connector and joining Stream with Table 【发布时间】:2021-09-24 19:56:03 【问题描述】:

伙计们。

我先介绍一下场景:

我使用Debezium CDC Source ConnectorMS SQL SERVER 中的两个表中获取数据。按照连接器配置:

PROVIDER 表的连接器:

CREATE SOURCE CONNECTOR SOURCE_MSSQL_01_PROVIDER WITH (

    'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector', 
    'database.hostname'= '<URL>',
    'database.port'= '1433',
    'database.user'= '<USER>',
    'database.password'= '<PASS>',
    'database.dbname'= 'a',
    'database.server.name'= 'a',
    'table.whitelist'='dbo.PROVIDER',
    'decimal.handling.mode'='double',
    'transforms'= 'unwrap,addTopicPrefix',
    'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.addTopicPrefix.type'='org.apache.kafka.connect.transforms.RegexRouter',
    'transforms.addTopicPrefix.regex'='(.*)',
    'transforms.addTopicPrefix.replacement'='mssql-01-$1',
    'database.history.kafka.bootstrap.servers'= 'kafka:29092', 
    'database.history.kafka.topic'= 'dbhistory.PROVIDER' 
    );

ORDERS 表的连接器:

CREATE SOURCE CONNECTOR SOURCE_MSSQL_01_ORDER WITH (
    
    'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector', 
    'database.hostname'= '<URL>',
    'database.port'= '1433',
    'database.user'= '<USER>',
    'database.password'= '<PASS>',
    'database.dbname'= 'a',
    'database.server.name'= 'a',
    'table.whitelist'='dbo.ORDER',
    'decimal.handling.mode'='double',
    'transforms'= 'unwrap,addTopicPrefix',
    'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.addTopicPrefix.type'='org.apache.kafka.connect.transforms.RegexRouter',
    'transforms.addTopicPrefix.regex'='(.*)',
    'transforms.addTopicPrefix.replacement'='mssql-01-$1',
    'database.history.kafka.bootstrap.servers'= 'kafka:29092', 
    'database.history.kafka.topic'= 'dbhistory.ORDER'
    );

我认为它可以改进,但现在还可以。

一旦设置了连接器,我们就可以创建我们的流和表:

CREATE TABLE PROVIDER (ID_P VARCHAR PRIMARY KEY) WITH (KAFKA_TOPIC='mssql-01-a.dbo.PROVIDER', VALUE_FORMAT='AVRO');

CREATE STREAM ORDERS WITH (KAFKA_TOPIC='mssql-01 a.dbo.ORDERS',VALUE_FORMAT='AVRO');

如您所见,现在它只是使用来自 PROVIDER 表的数据来丰富 ORDERS 流,对吗?是的,但不是。

SELECT P.PROVIDER_COD, O.ID FROM ORDERS AS O JOIN PROVIDER AS P ON O.PROV = P.PROVIDER_COD EMIT CHANGES;

如果我尝试这样做,我会收到错误:

无法重新分区 TABLE 源。如果这是一个联接,请确保 条件使用 TABLE 的键列 ID_P 而不是 [PROVIDER_COD]

好吧,它应该很容易修复,但在这种情况下并非如此。最后我们解决了我的问题:

Provider's id 不在ORDERS stream 中,因为我从中获取数据的数据库就是这样设计的。

我们如何关联这两个数据集?

如果是关系型数据库那就简单了:

SELECT * FROM ORDERS O INNER JOIN PROVIDER P ON O.PROV = P.PROVIDER_COD AND O.SUB_COD = P.SUB_COD;

是的......我之前没有提到它,但我们这里有一个复合键,Provider CodeProvider' Subsidiary Code,我认为这是另一个问题。

拜托,谁能帮我理解如何在KSQLDB 中解决这个问题?

非常感谢。

【问题讨论】:

【参考方案1】:

我在 Confluent 论坛上找到了解决方案。

https://forum.confluent.io/t/join-stream-table-order-provider-when-the-stream-does-not-have-the-providers-id-but-others-informations/2279/7

感谢 Matthias J. Sax

【讨论】:

通常在 SO 上建议,不仅要提供链接,还要复制答案,因为如果链接断开,答案将无法再访问...随意复制我的回答变成了你的回答。

以上是关于KSQLDB - 从 debezium cdc 源连接器获取数据并将 Stream 与表连接的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Debezium SQL Server 连接器与 ksqlDB 嵌入式连接一起使用?

多个表之间 CDC 事件的 Debezium 排序

如何通过 debezium CDC 机制反序列化从 kafka 代理收到的 BigDecimal 值?

使用选定列更改表 - debezium 抛出 ArrayIndexOutOfBoundsException

如何使用 Debezium (cdc) 将从 mysql 获取的更改接收到另一个 mysql db

CDC 与 docker 中的 debezium