1 周后如何处理来自分布式日志代理(例如 Kafka)的日志?

Posted

技术标签:

【中文标题】1 周后如何处理来自分布式日志代理(例如 Kafka)的日志?【英文标题】:How to process logs from distributed log broker (Eg Kafka) exactly after 1 week? 【发布时间】:2020-10-22 05:13:17 【问题描述】:

如果我想处理来自 Kafka 的恰好 1 周前的日志,可以进行什么设置?

用例是我维护过去 1 周用户活动的累积统计信息。我对最终的一致性很好,不需要统计数据恰好是 1 周。

我有一个流设置,它处理来自 Kafka 的传入日志并更新统计信息。任何超过 1 周的活动都应从统计数据中删除。我可以实现的一种方法是使用批处理(例如 Spark)从统计数据中删除超过 1 周的活动。

有什么方法可以使用流处理从统计数据中删除超过 1 周的用户活动?各种方法的优缺点是什么?

如果我在 Kafka 中至少使用过一次并且统计数据偏离了基本事实,有什么方法可以定期更正统计数据?

【问题讨论】:

你可以 watermarking 来自 Spark 结构化流的概念! 【参考方案1】:

如果您的 Kafka 消息具有正确的时间戳,那么您可以获得前一周时间戳的偏移量。所以你可以使用..

Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)

documentation 说:

按时间戳查找给定分区的偏移量。这 每个分区的返回偏移量是其最早的偏移量 时间戳大于或等于给定时间戳 对应的分区。

要获取主题分区列表,您可以调用consumer.assignment()(在subscribe()assign() 之后)返回分配给消费者的Set&lt;TopicPartition&gt;。地图中的Long 值基本上就是时间戳。因此,对于您案例中的所有键,它将是相同的值(即 1 周大的时间戳)

现在,您有一个Map&lt;TopicPartition, OffsetAndTimestamp&gt;。您现在可以使用seek(TopicPartition partition, long offset) 查找每个偏移量。

consumer.subscribe(topics);
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> map = new LinkedHashMap<>();
partitions.forEach(partition -> map.put(partition, oneWeekOldTimestamp));
Map<TopicPartition, OffsetAndTimestamp> offsetsMap = consumer.offsetForTimes(map);
offsetsMap.forEach((partition, offsetTimestamp) -> consumer.seek(partition, offsetTimestamp.offset()));

现在,您的消费者将处于一周前的消息位置。所以,当你poll() 时,你从上周到现在进行投票。

您可以更改时间戳以满足您的要求,例如,任何超过 1 周的时间都意味着,从时间戳 0 到上周时间戳。

所有前一周数据均指2weekOldTimestamp - 1weekOldTimestamp

因此,在这种情况下,您必须寻找2weekOldTimestamp,然后处理每个分区,直到遇到1weekOldTimestamp

【讨论】:

以上是关于1 周后如何处理来自分布式日志代理(例如 Kafka)的日志?的主要内容,如果未能解决你的问题,请参考以下文章

表单关闭后如何处理非托管资源? [复制]

加入多个表后如何处理空值

React context dispatch 状态改变后如何处理

读写分离中间件 MaxScale 在 slave 有故障后如何处理?

分组后如何处理大型集合聚合?

从 QIODevice.read() 读取后如何处理数据?