GCP Pubsub 中的消息丢失和重复

Posted

技术标签:

【中文标题】GCP Pubsub 中的消息丢失和重复【英文标题】:Message lost and duplicates in GCP Pubsub 【发布时间】:2017-10-23 16:11:58 【问题描述】:

我在从 Dataflow 读取 GCP PubSub 时遇到问题,当在短时间内发布大量消息时,Dataflow 将收到大部分发送的消息,除了一些消息会丢失,还有一些其他消息会丢失重复。而最奇怪的是,丢失消息的数量会与被复制的消息数量完全相同。

在其中一个示例中,我在 5 秒内发送了 4,000 条消息,总共收到了 4,000 条消息,但丢失了 9 条消息,恰好有 9 条消息重复。

我确定重复项的方法是通过日志记录。我正在记录发布到 Pubsub 的每条消息以及 pubsub 生成的消息 ID。我也在 Pardo 转换中从 PubsubIO 读取后立即记录消息。

我在 Dataflow 中从 Pubsub 读取的方式是使用 org.apache.beam.sdk.ioPubsubIO:

public interface Options extends GcpOptions, DataflowPipelineOptions 

    // PUBSUB URL
    @Description("Pubsub URL")
    @Default.String("https://pubsub.googleapis.com")
    String getPubsubRootUrl();
    void setPubsubRootUrl(String value);

    // TOPIC
    @Description("Topic")
    @Default.String("projects/test-project/topics/test_topic")
    String getTopic();
    void setTopic(String value);

...


public static void main(String[] args) 
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);


    options.setStreaming(true);
    options.setRunner(DataflowRunner.class);

    ...

    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply(PubsubIO
                 .<String>read()
                 .topic(options.getTopic())
                 .withCoder(StringUtf8Coder.of())
            )

            .apply("Logging data coming out of Pubsub", ParDo
                .of(some_logging_transformation)
            )

            .apply("Saving data into db", ParDo
                .of(some_output_transformation)
            )
            ;


    pipeline.run().waitUntilFinish();



我想知道这是否是 Pubsub 或 PubsubIO 中的已知问题?

更新: 用 pubsub 模拟器尝试了 4000 请求,没有丢失数据,没有重复

更新 #2:

我又进行了一些实验,发现重复的消息正在从丢失的消息中获取message_id。因为问题的方向已经偏离了它的起源,所以我决定发布另一个问题,其中包含详细的日志以及我用来发布和接收消息的代码。 新问题的链接:Google Cloud Pubsub Data lost

【问题讨论】:

能分享一下管道的作业ID吗? 感谢您的回复。我做了一个简单的管道,它使用 pubsbuIO 读取 pubsub,解析为对象,保存到 Neo4j 数据库。在发送 3000 个请求的快速运行中,13 个丢失,13 个重复。作业编号为 2017-05-24_08_46_18-1297374047328352490 您如何确定重复消息和丢失消息? 在我的设置中,发送到 pubsub 的每条消息本质上都是一个带有递增会话 ID 的假用户会话。从 PubsubIO 读取后,我记录了在 pubsubIO 之后的第一次转换期间收到的所有消息,然后将消息转储到数据库中。从那里,通过比较收到的每条消息的会话 ID,我可以列出与多条消息关联的所有会话 ID。而且由于会话 id 每次递增 1,我可以跟踪缺少哪个 id 那么您是通过日志记录确定重复项吗? ParDos 可以重新执行,所以这并不一定意味着有重复。 【参考方案1】:

我与 PubSub 团队的一位 Google 人员进行了交谈。这似乎是由 Python 客户端的线程安全问题引起的。谷歌的回复请参考Google Cloud Pubsub Data lost接受的答案

【讨论】:

python客户端0.29版本似乎解决了这个问题

以上是关于GCP Pubsub 中的消息丢失和重复的主要内容,如果未能解决你的问题,请参考以下文章

GCP 云功能未正确接收/确认 PubSub 消息

GCP - 从 PubSub 到 BigQuery 的消息

GCP - 如何添加关于发送到 pubsub 死信队列的消息数量的警报?

为啥 GCP 存储桶上的新文件不发送 pubsub 消息?

GCP Pubsub 低消息/秒的高延迟

如何使用 GCP 在 pubsub 模型中一次向所有订阅者发送消息