卡夫卡流加入
Posted
技术标签:
【中文标题】卡夫卡流加入【英文标题】:Kafka stream join 【发布时间】:2018-03-06 20:30:39 【问题描述】:我有 2 个 kafka 主题 - recommendations
和 clicks
。第一个主题具有由唯一 ID(称为recommendationsId
)作为键的推荐对象。每个产品都有一个用户可以点击的 URL。
clicks
主题获取通过点击推荐给用户的产品 URL 生成的消息。它已经如此设置,这些点击消息也由recommendationId
键入。
注意
推荐和点击之间的关系是一对多的。一条建议可能会导致多次点击,但一次点击始终与一条建议相关联。
每个点击对象都会有一个相应的推荐对象。
点击对象的时间戳会晚于推荐对象。
推荐和相应点击之间的差距可能是几秒到几天(比如最多 7 天)。
我的目标是使用 Kafka 流加入来加入这两个主题。我不清楚的是我应该使用 KStream x KStream 连接还是 KStream x KTable 连接。
我通过recommendations
表加入clicks
流来实现KStream x KTable
连接。但是,如果建议是在加入者启动之前生成的并且在加入者开始之后点击到达,我将看不到任何加入的点击推荐对。
我是否使用了正确的连接?我应该使用KStream x KStream
加入吗?如果是这样,为了能够加入最多过去 7 天的推荐点击,我应该将窗口大小设置为 7 天吗?在这种情况下我是否还需要设置“保留”期限?
我执行KStream x KTable
加入的代码如下。请注意,我已经定义了类Recommendations
和Click
以及它们对应的serde。点击消息只是简单的String
(网址)。此 URL 字符串与 Recommendations
对象连接以创建一个 Click
对象,该对象被发送到 jointTopic
。
public static void main(String[] args)
if(args.length!=4)
throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
final String booststrapList = args[0];
final String clicksTopic = args[1];
final String recsTopic = args[2];
final String jointTopic = args[3];
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
// load clicks as KStream
KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);
// load recommendations as KTable
KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);
// join the two
KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));
// emit the join to the jointTopic
join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);
// let the action begin
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
只要在加入者(上述程序)运行之后生成推荐和点击,就可以正常工作。但是,如果在运行加入者之前生成推荐的点击到达,我看不到任何加入发生。我该如何解决这个问题?
如果解决方案是使用KStream x KSTream
加入,那么请帮助我了解我应该选择什么窗口大小以及选择什么保留期。
【问题讨论】:
这可能会有所帮助:confluent.io/blog/crossing-streams-joins-apache-kafka 【参考方案1】:您的总体观察是正确的。从概念上讲,您可以通过两种方式获得正确的结果。如果你使用流表连接,你有两个缺点(这可能会在未来的 Kafka 版本中重新审视和改进)
您已经提到,如果在相应推荐之前处理了点击获取,则(内部)连接将失败。但是,您知道会有推荐,您可以使用左连接而不是内连接,检查连接结果,如果推荐是null
,则将点击事件写回输入主题(即,您获取重试逻辑)——当然,对单个推荐的连续点击可能会出现问题,您可能需要在应用程序代码中考虑这一点。
KTable
的第二个缺点是,它会随着时间的推移而无限增长,因为您将为其添加越来越多的独特推荐。因此,您需要通过将<recommendationsId, null>
形式的墓碑记录发送到推荐主题来实现一些“过期逻辑”,以删除您不再关心的旧推荐。
这种方法的优点是,与流-流连接相比,您总共需要更少的内存/磁盘空间,因为您只需要缓冲应用程序中的所有建议(但无需点击)。
如果您使用流-流加入,并且点击可能在推荐后 7 天发生,则您的窗口大小必须为 7 天 - 否则,点击不会加入推荐。
这种方法的缺点是,您需要更多的内存/磁盘,因为您将在应用程序中缓冲过去 7 天的所有点击和所有建议。 优点是,顺序或处理(即推荐与点击)不再重要(即,您不需要实施上述重试策略) 此外,旧的建议会自动过期,因此您无需实施特殊的“过期逻辑”。对于流-流连接,保留时间的答案略有不同。它必须至少为 7 天,因为窗口大小为 7 天。否则,您将删除“运行窗口”的记录。您还可以将保留期设置得更长,以便能够处理“迟到的数据”。假设用户在窗口时间范围的最后(推荐的 7 天时间跨度结束前 5 分钟)点击,但点击仅在 1 小时后报告给您的应用程序。如果您的保留期为 7 天作为您的窗口大小,则无法再处理此迟到的记录(因为建议已被删除)。如果您设置更长的保留期,例如 8 天,您仍然可以处理迟到的记录。这取决于您的应用程序/语义需要您想要使用的保留时间。
总结: 从实现的角度来看,使用流-流连接比使用流-表连接更简单。但是,内存/磁盘节省是预期的,并且可能会很大,具体取决于您的点击流数据速率。
【讨论】:
感谢您的解释(顺便说一句很棒的博客!)。我确实有一个后续问题。假设我实现了KStream x KStream
inner-join,那么运行这个 joiner 的机器是否会下载并保存过去 7 天内的 all 建议和点击消息(对于相应的分区,假设机器数量= 分区数)?这听起来像很多物理内存。有没有办法扩大规模(比如机器数量是分区数量的两倍)?
它需要保存过去 7 天的所有数据,但不在内存中。我们在内部使用可以溢出到磁盘的 RocksDB。所以你可以保持比主内存大得多的状态。 -- 关于扩展:实例不能多于分区。如果您需要更高的并行度进行处理,则需要有更多的分区 - 一种方法是创建一个具有所需分区数量的主题,并在执行连接之前调用 through()
对输入数据进行重新分区.由于这个新主题仅用于扩展,因此保留时间短(如 1 小时?)。
@MatthiasJ.Sax 我想永远做stream-stream join(我的意思是直到我用完磁盘,所以我无法指定任何时间)或者说另一个我想运行stream-steam join查询多年。考虑到中间存储在rocksdb中,我真的需要担心内存吗?您是否发现任何其他问题,或者您能否就如何处理此用例提供一般性建议?
不确定我是否理解您想要正确执行的操作:当您说“我想运行流-蒸汽连接查询多年”时,您是否意味着您想潜在地加入一条记录从现在开始记录一岁? -- 流-流连接(与任何其他 Kafka Streams 程序一样)旨在运行多年;但这并不意味着您必须存储所有已处理的原始输入记录。保存在应用程序中的数据通常受窗口限制,但窗口不限制您运行程序的时间。以上是关于卡夫卡流加入的主要内容,如果未能解决你的问题,请参考以下文章