GCP Pubsub 中的消息丢失和重复
Posted
技术标签:
【中文标题】GCP Pubsub 中的消息丢失和重复【英文标题】:Message lost and duplicates in GCP Pubsub 【发布时间】:2017-10-23 16:11:58 【问题描述】:我在从 Dataflow 读取 GCP PubSub 时遇到问题,当在短时间内发布大量消息时,Dataflow 将收到大部分发送的消息,除了一些消息会丢失,还有一些其他消息会丢失重复。而最奇怪的是,丢失消息的数量会与被复制的消息数量完全相同。
在其中一个示例中,我在 5 秒内发送了 4,000 条消息,总共收到了 4,000 条消息,但丢失了 9 条消息,恰好有 9 条消息重复。
我确定重复项的方法是通过日志记录。我正在记录发布到 Pubsub 的每条消息以及 pubsub 生成的消息 ID。我也在 Pardo 转换中从 PubsubIO 读取后立即记录消息。
我在 Dataflow 中从 Pubsub 读取的方式是使用 org.apache.beam.sdk.ioPubsubIO
:
public interface Options extends GcpOptions, DataflowPipelineOptions
// PUBSUB URL
@Description("Pubsub URL")
@Default.String("https://pubsub.googleapis.com")
String getPubsubRootUrl();
void setPubsubRootUrl(String value);
// TOPIC
@Description("Topic")
@Default.String("projects/test-project/topics/test_topic")
String getTopic();
void setTopic(String value);
...
public static void main(String[] args)
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
options.setRunner(DataflowRunner.class);
...
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
.apply("Logging data coming out of Pubsub", ParDo
.of(some_logging_transformation)
)
.apply("Saving data into db", ParDo
.of(some_output_transformation)
)
;
pipeline.run().waitUntilFinish();
我想知道这是否是 Pubsub 或 PubsubIO 中的已知问题?
更新: 用 pubsub 模拟器尝试了 4000 请求,没有丢失数据,没有重复
更新 #2:
我又进行了一些实验,发现重复的消息正在从丢失的消息中获取message_id
。因为问题的方向已经偏离了它的起源,所以我决定发布另一个问题,其中包含详细的日志以及我用来发布和接收消息的代码。
新问题的链接:Google Cloud Pubsub Data lost
【问题讨论】:
能分享一下管道的作业ID吗? 感谢您的回复。我做了一个简单的管道,它使用 pubsbuIO 读取 pubsub,解析为对象,保存到 Neo4j 数据库。在发送 3000 个请求的快速运行中,13 个丢失,13 个重复。作业编号为 2017-05-24_08_46_18-1297374047328352490 您如何确定重复消息和丢失消息? 在我的设置中,发送到 pubsub 的每条消息本质上都是一个带有递增会话 ID 的假用户会话。从 PubsubIO 读取后,我记录了在 pubsubIO 之后的第一次转换期间收到的所有消息,然后将消息转储到数据库中。从那里,通过比较收到的每条消息的会话 ID,我可以列出与多条消息关联的所有会话 ID。而且由于会话 id 每次递增 1,我可以跟踪缺少哪个 id 那么您是通过日志记录确定重复项吗? ParDos 可以重新执行,所以这并不一定意味着有重复。 【参考方案1】:我与 PubSub 团队的一位 Google 人员进行了交谈。这似乎是由 Python 客户端的线程安全问题引起的。谷歌的回复请参考Google Cloud Pubsub Data lost接受的答案
【讨论】:
python客户端0.29版本似乎解决了这个问题以上是关于GCP Pubsub 中的消息丢失和重复的主要内容,如果未能解决你的问题,请参考以下文章