是否可以从 PubSub 读取消息并将其数据分隔在 PCollection<String> 的不同元素中?如果是这样,怎么做?

Posted

技术标签:

【中文标题】是否可以从 PubSub 读取消息并将其数据分隔在 PCollection<String> 的不同元素中?如果是这样,怎么做?【英文标题】:Is it possible to read a message from a PubSub and separate its data in different elements of a PCollection<String>? If so, how? 【发布时间】:2015-08-25 09:55:12 【问题描述】:

现在,我有以下代码:

PCollection<String> input_data =
    pipeline
        .apply(PubsubIO
            .Read
            .withCoder(StringUtf8Coder.of())
            .named("ReadFromPubSub")
            .subscription("/subscriptions/project_name/subscription_name"));

【问题讨论】:

【参考方案1】:

您可以输出Iterable&lt;A&gt;,然后使用Flatten 压缩它。不出所料,这在许多下一代数据处理平台中被称为flatMap,c.f.火花/闪烁。

【讨论】:

【参考方案2】:

看起来您想从 pubsub 读取一些消息,并通过在空格字符上拆分消息将每个消息转换为多个部分,然后将这些部分提供给管道的其余部分。不需要对 PubsubIO 进行特殊配置,因为它不是“读取数据”问题——它是“转换你已经读取的数据”问题——你只需要插入一个 ParDo 来获取你的“复合”记录并将其分解到你想要的方式,例如:

PCollection<String> input_data =
pipeline
    .apply(PubsubIO
        .Read
        .withCoder(StringUtf8Coder.of())
        .named("ReadFromPubSub")
        .subscription("/subscriptions/project_name/subscription_name"))
    .apply(ParDo.of(new DoFn<String, String>() 
      public void processElement(ProcessContext c) 
        String composite = c.element();
        for (String part : composite.split(" ")) 
          c.output(part);
        
      ));
    ));

【讨论】:

非常感谢您的回复。【参考方案3】:

我认为您的意思是您想要的数据存在于 PCollection 的不同元素中,并且希望以某种方式对其进行提取和分组。

一种可能的方法是编写一个 DoFn 函数来处理 PCollection 中的每个字符串。您为要分组的每条数据输出一个键值对。然后,您可以使用 GroupByKey 转换将所有相关数据组合在一起。

例如,您的 PCollection 中有以下来自 pubsub 的消息:

    用户 1234 购买了商品 A 用户 1234 购买了商品 B

DoFn 函数将输出一个键值对,其中用户 id 为键,购买的物品为值。 ( , )。 使用 GroupByKey 转换,您可以将两个值组合到一个元素中。然后,您可以对该元素执行进一步处理。

这是大数据中一种非常常见的模式,称为 mapreduce。

【讨论】:

不,我的意思是我的 PubSub 消息例如: message: "123|HOK|6789 547|EDF|6878" 如果 DataflowPubSub 读取此消息,它会将该消息写入我的唯一一个元素中PCollection。我想写,例如一个元素中的字符串“123|HOK|6789”和不同元素中的字符串“547|EDF|6878”。有可能吗?

以上是关于是否可以从 PubSub 读取消息并将其数据分隔在 PCollection<String> 的不同元素中?如果是这样,怎么做?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出

Bigquery 使用数据流发布订阅消息推送

使用来自 Google Pubsub 的消息并将其发布到 Kafka

在 Dataflow Python 中从 PubSub 读取 AVRO 消息

使用 pubsub 推送触发器运行云功能

Go GCP Cloud PubSub 不批量发布消息