每个用户的 GCP PubSub(或 GCP 任务)同步处理

Posted

技术标签:

【中文标题】每个用户的 GCP PubSub(或 GCP 任务)同步处理【英文标题】:GCP PubSub (or GCP Tasks) Synchronous Processing per User 【发布时间】:2021-03-04 19:26:29 【问题描述】:

我有一个需要处理一组事件的用例。我需要他们对每个用户进行整体并行处理但串行处理。这可以在 PubSub 中完成吗(可能是 GCP 任务?)?

例如:

6 个事件同时进入(User_A_Event_1、User_A_Event_2、User_B_Event_1、User_B_Event_2、User_C_Event_1、User_D_Event_1)。

我想按 UserID 对它们进行分组,并行处理每个用户,然后依次处理每个事件(在成功完成前一个事件之前,不会开始后续事件处理)。比如:

用户 A 串行处理:处理 User_A_Event_1 --> 处理 User_A_Event_2 用户 B 串行处理:处理 User_B_Event_1 --> 处理 User_B_Event_2 用户 C 串行处理:处理 User_C_Event_1 用户 D 串行处理:处理 User_D_Event_1

如果重要的话,我不知道哪些用户会在什么时间举办活动。我们可能几个月都看不到用户的任何事件,然后开始收到大量事件。

我正在尝试找出一种在 GCP PubSub 中实现此目的的方法,但我也对其他解决方案持开放态度。我的偏好是通过推而不是拉来执行此操作,因为我可以在队列中没有任何内容的情况下进行很长时间。

感谢您的帮助。

克雷格

【问题讨论】:

3 个问题: 1. 你怎么知道一个事件在另一个事件之前?你有时间戳吗?增量ID?还要别的吗? 2、你提前知道用户数吗?如果是这样,提供新用户的流程是什么? 3. 您需要实时还是可以按批次(例如每小时)处理事件? 我确实有一个时间戳,但我实际上对 FIFO 没问题。当事件发生时,我也会知道用户 ID。我不知道用户什么时候会有事件,但我系统中的所有用户最终可能每天都会有事件。不幸的是,我确实需要它,批量处理无法满足我的需求。 好吧,事实上,如果你有 2 个事件发生的时间非常接近,你想要一个像“锁”这样的东西来不并行处理它们。您知道同一用户的 2 个事件之间可以有的高频率(或最低间隔)吗?你知道一个事件的最长处理时间吗? 【参考方案1】:

Cloud Pub/Sub's ordered delivery 可以在这里提供帮助。您将使用用户作为订购键。这意味着 Cloud Pub/Sub 将按照服务从您的发布者那里收到消息的顺序将消息传递给您的订阅者。在您事先不知道用户集以及特定用户的事件可能很少或突发的情况下,有序交付将具有您想要的属性。

在订阅方面,所做出的保证取决于订阅者的类型。对于客户端库(使用流式拉取),您提供的回调将针对具有相同键的消息一次执行完成。对于使用拉取的订阅者,每个拉取请求将按照接收顺序包含密钥的消息,并且密钥的消息一次仅在一个拉取响应中未完成。对于推送订阅者,订购密钥的每条消息都将单独发送到您的端点,并且在确认同一密钥的上一条消息之前不会发送下一条消息。

请注意,Cloud Pub/Sub 的有序交付仍然具有至少一次交付语义,这意味着可以重新交付已确认的消息,这也会导致针对同一密钥重新交付后续消息。

有关详细信息,请参阅Medium post about ordering。

【讨论】:

以上是关于每个用户的 GCP PubSub(或 GCP 任务)同步处理的主要内容,如果未能解决你的问题,请参考以下文章

从存储中读取 JSON 数组并发送到 GCP PubSub

如何使用 gsutil 向 GCP 存储桶添加 pubsub 通知

GCP Pubsub 未传递消息的数量不会改变

GCP - 从 PubSub 到 BigQuery 的消息

从现有的 GCP pubsub 订阅中消费

GCP PubSub 主题推送问题