在 KSQL 中加入两个(或更多)kafka 主题的最佳方法是从所有主题中发出变化?

Posted

技术标签:

【中文标题】在 KSQL 中加入两个(或更多)kafka 主题的最佳方法是从所有主题中发出变化?【英文标题】:Best way to join two (or more) kafka topics in KSQL emiting changes from all topics? 【发布时间】:2021-01-09 18:24:03 【问题描述】:

我们有一个“微服务”平台,我们正在使用 debezium 从这些平台上的数据库中捕获变更数据,效果很好。

现在,我们想让我们更轻松地加入这些主题并将结果流式传输到一个新主题中,供多个服务使用。

免责声明:这假定 v0.11 ksqldb 和 cli(似乎其中大部分可能不适用于旧版本)

来自两个数据库实例的两个表流入 Kafka 主题的示例:

-- source identity microservice (postgres)
CREATE TABLE public.user_entity (
    id varchar(36) NOT NULL,
    first_name varchar(255) NULL,
    PRIMARY KEY (id)
);
-- ksql stream 
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');

-- source organization microservice (postgres)
CREATE TABLE public.user_info (
    id varchar(36) NOT NULL,
    user_entity_id varchar(36) NOT NULL,
    business_unit varchar(255) NOT NULL,
    cost_center varchar(255) NOT NULL,
    PRIMARY KEY (id)
);
-- ksql stream 
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');

选项 1:流

CREATE STREAM stream_user_info_by_user_entity_id
AS SELECT * FROM stream_user_info
PARTITION BY user_entity_id
EMIT CHANGES;

SELECT 
    user_entity_id,
    first_name,
    business_unit,
    cost_center
FROM stream_user_entity ue
LEFT JOIN stream_user_info_by_user_entity_id ui WITHIN 365 DAYS ON ue.id = ui.user_entity_id 
EMIT CHANGES;

注意WITHIN 365 DAYS,从概念上讲,这些表可能会持续很长时间而不会被更改,因此这个窗口在技术上会无限大。这看起来很可疑,似乎暗示这不是一个好方法。

选项 2:表格

CREATE TABLE ktable_user_info_by_user_entity_id (
    user_entity_id,
    first_name,
    business_unit,
    cost_center
)
with (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');

SELECT 
    user_entity_id,
    first_name,
    business_unit,
    cost_center
FROM stream_user_entity ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON ue.id = ui.user_entity_id 
EMIT CHANGES;

我们不再需要窗口WITHIN 365 DAYS,所以这感觉更正确。 然而这只会在消息被发送到流而不是表时发出变化。

在这个例子中: 用户更新 first_name -> 发出更改 用户更新 business_unit -> 未发出任何更改

也许有一种方法可以创建一个由 user_entity_id 分区的合并流,并加入到将保持当前状态的子表,这导致我......

选项 3:合并流和表

-- "master" change stream with merged stream output
CREATE STREAM stream_user_changes (user_entity_id VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;

CREATE STREAM stream_user_entity_by_id
AS SELECT * FROM stream_user_entity
PARTITION BY id
EMIT CHANGES;

CREATE TABLE ktable_user_entity_by_id (
    id VARCHAR PRIMARY KEY,
    first_name VARCHAR
) with (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');

SELECT 
    uec.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

这个看起来最好,但似乎每个表都有很多移动组件,我们有 2 个流、1 个插入查询、1 个 ktable。此处的另一个潜在问题可能是隐藏的竞争条件,其中流在表更新之前发出更改。

选项 4:更多合并的表和流

CREATE STREAM stream_user_entity_changes_enriched
AS SELECT 
    ue.id AS user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_by_id ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

CREATE STREAM stream_user_info_changes_enriched
AS SELECT 
    ui.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_info_by_user_entity_id ui
LEFT JOIN ktable_user_entity_by_id ue ON ui.user_entity_id = ue.id
EMIT CHANGES;


CREATE STREAM stream_user_changes_enriched (user_entity_id VARCHAR, first_name VARCHAR, business_unit VARCHAR, cost_center VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes_enriched', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_entity_changes_enriched;
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_info_changes_enriched;

这在概念上与之前的相同,但“合并”发生在连接之后。可以想象,这可能会消除任何潜在的竞争条件,因为我们主要从流而不是表中进行选择。

缺点是复杂性甚至比选项 3 还要糟糕,并且为具有两个以上表的任何连接编写和跟踪所有这些流将有点让人麻木......

问题: 什么方法最适合这个用例和/或我们是否正在尝试做一些不应该使用 ksql 的事情?我们最好将其卸载到传统的 RDBMS 或 spark 替代品上吗?

【问题讨论】:

我认为选项 2 是预期的行为。尽管表中的更改不会立即发出事件,但此后对流的任何后续更改都会在输出流上发出带有the new information from the table 的事件。如果更改是对现有行的更新,我不确定如果表中的更改立即发出事件应该是什么预期行为,我们可能需要返回输出流中的旧事件进行更新,但是由于流是不可变的,这不应该发生。 是的,我同意这种预期的行为。我正在寻找一种基本上可以解决这种行为的解决方案。 你觉得table-table(ksql表)更适合这种情况吗?你想要的输出在这里听起来不像一个流。 我想要一个流,我希望其他应用程序能够使用该流并收到这两个组合流的任何更新的通知。例如,我可以将它们加入消费者,但如果我有多个需要了解这些的服务,则最好在 kafka 端加入一个流,消费者不需要了解所涉及的逻辑. 连接表在保留历史记录的意义上不是一个流。如果我是正确的,使用连接表的应用程序仍然会通过连接记录的最新状态通知两个流的任何更新。这听起来像你想要的 【参考方案1】:

我将尝试回答我自己的问题,只有在赞成的情况下才接受。

答案是:选项 3

这是这个用例的原因,这将是最好的,虽然可能是主观的

按主键和外键划分的流是常见且简单的。 基于这些流的表很常见且很简单。 以这种方式使用的表不会成为竞争条件。

所有选项都有优点,例如如果您不关心发出所有更改,或者数据的行为类似于流(日志或事件)而不是缓慢变化的维度(sql 表)。

至于“竞争条件”,“表”这个词会让您误以为您实际上是在处理和保存数据。实际上,它们实际上并不是物理表,它们实际上更像是流上的子查询。注意:对于实际产生主题的聚合表可能是一个例外(我建议这是一个不同的主题,但希望看到 cmets)

最后(语法可能有一些小错误):

---------------------------------------------------------
-- shared objects (likely to be used by multiple queries)
---------------------------------------------------------

-- shared streams wrapping topics
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');

-- shared keyed streams (i like to think of them as "indexes")
CREATE STREAM stream_user_entity_by_id AS 
SELECT * FROM stream_user_entity PARTITION BY id
EMIT CHANGES;
CREATE STREAM stream_user_info_by_user_entity_id AS 
SELECT * FROM stream_user_info PARTITION BY user_entity_id
EMIT CHANGES;

-- shared keyed tables (inferring columns with schema registry)
CREATE TABLE ktable_user_entity_by_id (id VARCHAR PRIMARY KEY) 
WITH (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');
CREATE TABLE ktable_user_info_by_user_entity_id (user_entity_id VARCHAR PRIMARY KEY) 
WITH (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');


---------------------------------------------------------
-- query objects (specific to the produced data)
---------------------------------------------------------
-- "master" change stream (include all tables in join)
CREATE STREAM stream_user_changes (user_entity_id VARCHAR) 
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;

-- pretty simple looking query
SELECT 
    uec.user_entity_id,
    ue.first_name,
    ui.business_unit,
    ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id 
EMIT CHANGES;

“共享”对象基本上是流式架构(诱惑是为我们所有的主题创建,但这是另一个问题),第二部分类似于查询架构。归根结底,它是一种实用、干净且可重复的模式。

【讨论】:

如果可以直接加入流,为什么还要创建表?

以上是关于在 KSQL 中加入两个(或更多)kafka 主题的最佳方法是从所有主题中发出变化?的主要内容,如果未能解决你的问题,请参考以下文章

带有 JSON 格式消息的 KSQL EXTRACTJSONFIELD 返回 null

kafka sql入门

Kafka KSQL入门

在ELFK架构中加入kafka

“注册”是 Ksql 中的保留关键字,如果是,我如何选择具有该名称的字段

使用 KSQL 将表从 Kafka 转储到 MariaDB