1 周后如何处理来自分布式日志代理(例如 Kafka)的日志?
Posted
技术标签:
【中文标题】1 周后如何处理来自分布式日志代理(例如 Kafka)的日志?【英文标题】:How to process logs from distributed log broker (Eg Kafka) exactly after 1 week? 【发布时间】:2020-10-22 05:13:17 【问题描述】:如果我想处理来自 Kafka 的恰好 1 周前的日志,可以进行什么设置?
用例是我维护过去 1 周用户活动的累积统计信息。我对最终的一致性很好,不需要统计数据恰好是 1 周。
我有一个流设置,它处理来自 Kafka 的传入日志并更新统计信息。任何超过 1 周的活动都应从统计数据中删除。我可以实现的一种方法是使用批处理(例如 Spark)从统计数据中删除超过 1 周的活动。
有什么方法可以使用流处理从统计数据中删除超过 1 周的用户活动?各种方法的优缺点是什么?
如果我在 Kafka 中至少使用过一次并且统计数据偏离了基本事实,有什么方法可以定期更正统计数据?
【问题讨论】:
你可以watermarking
来自 Spark 结构化流的概念!
【参考方案1】:
如果您的 Kafka 消息具有正确的时间戳,那么您可以获得前一周时间戳的偏移量。所以你可以使用..
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
documentation 说:
按时间戳查找给定分区的偏移量。这 每个分区的返回偏移量是其最早的偏移量 时间戳大于或等于给定时间戳 对应的分区。
要获取主题分区列表,您可以调用consumer.assignment()
(在subscribe()
或assign()
之后)返回分配给消费者的Set<TopicPartition>
。地图中的Long
值基本上就是时间戳。因此,对于您案例中的所有键,它将是相同的值(即 1 周大的时间戳)
现在,您有一个Map<TopicPartition, OffsetAndTimestamp>
。您现在可以使用seek(TopicPartition partition, long offset)
查找每个偏移量。
consumer.subscribe(topics);
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> map = new LinkedHashMap<>();
partitions.forEach(partition -> map.put(partition, oneWeekOldTimestamp));
Map<TopicPartition, OffsetAndTimestamp> offsetsMap = consumer.offsetForTimes(map);
offsetsMap.forEach((partition, offsetTimestamp) -> consumer.seek(partition, offsetTimestamp.offset()));
现在,您的消费者将处于一周前的消息位置。所以,当你poll()
时,你从上周到现在进行投票。
您可以更改时间戳以满足您的要求,例如,任何超过 1 周的时间都意味着,从时间戳 0 到上周时间戳。
所有前一周数据均指2weekOldTimestamp - 1weekOldTimestamp
。
因此,在这种情况下,您必须寻找2weekOldTimestamp
,然后处理每个分区,直到遇到1weekOldTimestamp
【讨论】:
以上是关于1 周后如何处理来自分布式日志代理(例如 Kafka)的日志?的主要内容,如果未能解决你的问题,请参考以下文章
React context dispatch 状态改变后如何处理