如何将记录拆分为不同的流,从一个主题到不同的流?

Posted

技术标签:

【中文标题】如何将记录拆分为不同的流,从一个主题到不同的流?【英文标题】:How to split records into different streams, from one topic to different streams? 【发布时间】:2020-07-25 16:54:00 【问题描述】:

我有一个包含不同大小记录的单个源 CSV 文件,它将每条记录推送到一个源主题中。我想将记录拆分为来自该源主题的不同 KStreams/KTables。我有一个用于一个表加载的管道,我将源主题中的记录以分隔格式推送到 stream1,然后将记录推送到另一个 AVRO 格式的流中,然后将其推送到 JDBC 接收器连接器中,该连接器将记录推送到 mysql 数据库中.管道必须相同。但是我想将不同表的记录推送到一个源主题中,然后根据一个值将记录拆分为不同的流。这可能吗?我试图寻找方法来做到这一点,但不能。我也可以以某种方式改进管道或使用 KTable 代替 KStreams 或任何其他修改吗?

我目前的流程 - 一个源 CSV 文件(source.csv)-> 源主题(名称 - 包含 test1 记录的源主题)-> 流 1(分隔值格式)-> 流 2(作为 AVRO 值格式)-> 结束主题(名称 - sink-db-test1 ) -> JDBC sink 连接器 -> MySQL DB (name - test1)

我有一个不同的 MySQL 表 test2 具有不同的架构,并且该表的记录也存在于 source.csv 文件中。由于架构不同,我无法按照test1 的当前管道将数据插入test2 表中。

示例 - 在 CSV 源文件中,

line 1 - 9,atm,mun,ronaldo line 2- 10,atm,mun,bravo,num2 line 3 - 11,atm,sign,bravo,sick

在此示例中,要拆分的值为column 4ronaldobravo) 所有这些数据应分别加载到table 1table 2table 3 关键是第 4 列。

if col4==ronaldo, go to table 1 if col4==bravo and col3==mun, go to table 2 if col4==bravo and col3 ==sign go to table 3

我对 Kafka 很陌生,从上周开始开发 Kafka。

【问题讨论】:

对不起,我隐藏了我的答案,因为它似乎不相关,因为您必须编写一个 Kafka Streams 应用程序才能从输入主题读取不同的主题。您应该将数据从不同的表发送到不同的主题,而不是像评论中所述的不同主题的分区。为什么要将每种类型发送到一个主题的不同分区。 我在单个源 CSV 文件中获取所有数据。从那里我必须拆分记录,然后再做。所以目前我有一个源主题,它从源 CSV 文件中读取数据,然后插入到数据库中。源文件是一个,所以我想只能使用一个主题来读取该源文件。 所以流程应该是这样的 - 一个包含所有数据的源 CSV 文件 -> 将数据拆分为不同的 MySQLA 表的不同结束主题。 明白了,我的解决方案是编写一个单独的Kafka Streams应用程序来读取输入主题,然后根据一些自定义逻辑应用分支逻辑 好的。 Kafka 流可以根据创建流时提到的模式直接获取数据。当我们拥有不同表但具有相同列的数据时,就会出现问题。我尝试了不同列大小的记录,并且能够拆分。但是,当我为不同的表数据使用相同的列大小时,就会出现问题。 【参考方案1】:

您可以使用KStream#branch() operator 编写一个单独的 Kafka Streams 应用程序来将记录从输入主题拆分到不同的 KStream 或输出主题:

KStream<K, V>[] branches = streamsBuilder.branch(
        (key, value) -> filter logic for topic 1 here,
        (key, value) -> filter logic for topic 2 here,
        (key, value) -> true//get all messages for this branch
);

// KStream branches[0] records for logic 1
// KStream branches[1] records for logic 2
// KStream branches[2] records for logic 3

或者您可以像这样手动分支您的 KStream:

KStream<K, V> inputKStream = streamsBuilder.stream("your_input_topic", Consumed.with(keySerde, valueSerdes));

inputKStream
        .filter((key, value) -> filter logic for topic 1 here)
        .to("your_1st_output_topic");

inputKStream
        .filter((key, value) -> filter logic for topic 2 here)
        .to("your_2nd_output_topic");
...

【讨论】:

所以为此我需要编写一个自定义源连接器吗?我目前正在使用 SpoolDir 连接器 (SpoolDirLineDelimitedSourceConnector)。我正在尝试为此添加一些配置。如果我将特定类型的数据发送到特定分区,然后根据分区加载流,是否可以考虑或可能这样做?【参考方案2】:

我能够拆分数据并将 KSQL 用于我在下面分享的方法。 1. 使用value_format='JSON'payload 列创建输入流STRING 2.有效载荷将包含整个记录为STRING 3. 然后使用WHERE 子句中的LIKE 运算符将记录拆分为不同的流,同时根据要求将有效负载放入不同的流中。在这里,我使用了 KSQL 的 SPLIT 运算符从有效载荷中获取逗号分隔格式的记录

【讨论】:

以上是关于如何将记录拆分为不同的流,从一个主题到不同的流?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams - 根据 Streams 数据发送不同的主题

Spark:并行处理多个kafka主题

为啥压缩然后未压缩不同长度的流

MSDeploy 抛出奇怪的错误:xxxxx.dll 的流数据尚不可用

如何在单个查询中计算不同类型列的流数据帧的统计信息?

从 Azure Application Insights 连续导出的 ParsedStack 异常的流分析查询