确认 Apache Beam 上的 Google Pub/Sub 消息

Posted

技术标签:

【中文标题】确认 Apache Beam 上的 Google Pub/Sub 消息【英文标题】:Acknowledge Google Pub/Sub message on Apache Beam 【发布时间】:2017-10-15 15:51:26 【问题描述】:

我正在尝试使用以下代码从 pub/sub 读取

Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() 
    @Override
    public String apply(PubsubMessage input) 
        LOG.info("hola " + input.getAttributeMap());
        return new String(input.getMessage());
    
);
PCollection<String> pps = p.apply(pubsub)
        .apply(
                Window.<String>into(
                    FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() 
    @ProcessElement
    public void processElement(ProcessContext c) 
        LOG.info("hola amigo "+c.element());
        c.output(c.element());
    
  ));

与我在 NodeJS 上收到的相比,我收到的消息将包含在 data 字段中。我怎样才能得到ackId 字段(我以后可以用它来确认消息)?我正在打印的属性映射是null。 有没有其他方法可以确认所有消息而无需找出 ackId?

【问题讨论】:

我使用的是 v0.6.0 您使用的是哪个跑步者?我认为PubsubIO.read() 应该在消息成功处理后为您确认消息——您确定有必要自己确认吗? 我正在使用 flink-runner。消息似乎没有得到确认,但我会再次检查。 我再次检查,消息肯定没有得到确认。但是我错误地认为 ackId 会在属性中 - 属性映射值是正确的。所以我只需要知道如何让我的消息得到确认。 PubsubIO 阅读器负责确认消息。我相信它与跑步者的检查点行为有关。具体来说,源将仅在读出的元素已被检查点时确认。你是如何配置 flink-runner 的 checkpointing 行为的? 【参考方案1】:

PubsubIO 阅读器负责确认消息。它与跑步者的检查点行为有关。具体来说,源只会在结果元素被检查点后才确认消息。

在这种情况下,您应该查看 Flink 运行程序检查点何时提供该源的状态信息。我相信这与检查点频率的 Flink 配置有关。

【讨论】:

以上是关于确认 Apache Beam 上的 Google Pub/Sub 消息的主要内容,如果未能解决你的问题,请参考以下文章

Python 上的 Apache Beam 将 beam.Map 调用相乘

Dataflow/Apache Beam 在啥阶段确认发布/订阅消息?

Apache Beam 不会将文件写入本地环境或 Google 存储

使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?

Scio:Apache Beam和Google Cloud Dataflow的Scala API

如何在Apache Beam / Google Dataflow中使用ParseJsons?