Google Dataflow:根据条件仅将消息输出到 PubSub 主题之一

Posted

技术标签:

【中文标题】Google Dataflow:根据条件仅将消息输出到 PubSub 主题之一【英文标题】:Google Dataflow: output message only to one of the PubSub topics based on condition 【发布时间】:2018-07-11 11:39:17 【问题描述】:

在我的管道中,我想根据先前转换的结果将消息输出到 PubSub 主题之一。目前我正在将输出发送到同一主题:

 SearchItemGeneratorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SearchItemGeneratorOptions.class);
 Pipeline p = Pipeline.create(options);
 p.apply(...)
 //other transformations 
 .apply("ParseFile", new ParseFile()) // outputs PCollection<Message>, where each Message has a MessageType property with the name of the topic.
 .apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));

这是我的 Message 对象:

class Message 
    private MessageType messageType;
    private String payload;
   //constructor, getters

我的 ParseFile 转换器输出 PCollection,每个 Message 对象都有一个属性 messageType。基于 messageType 属性,我想输出到消息的不同 PubSub 主题有效负载属性。我在this 文章段落中阅读了多个转换处理相同的 PCollection,但仍然不知道如何应用它或其他解决方案。

更新 感谢@Andrew 的解决方案。 我通过使用 TupleTag 解决了我的问题,但方法类似。 我在主管道中创建了两个不同的 TupleTag 对象:

public static final TupleTag<String> full = new TupleTag<>("full");
public static final TupleTag<String> delta = new TupleTag<>("delta");

然后根据我的情况,我在 DoFn 中使用正确的 TupleTag 输出消息:

TupleTag tupleTag = //assign full or delta TupleTag
processContext.output(tupleTag, jsonObject.toString());

并在主管道中由每个 TupleTag 从 PCollectionTuple 中选择发送到 Pub/Sub 主题。

messages.get(full)
            .apply("SendToIndexTopic", PubsubIO.writeStrings().to(options.getOutputIndexTopic()));

messages.get(delta)
            .apply("SendToDeltaTopic", PubsubIO.writeStrings().to(options.getOutputDeltaTopic()));

唯一要提的是,我的 TupleTag 对象是静态对象。

【问题讨论】:

【参考方案1】:

您可以对管道进行分区以将消息发布到多个 Pub/Sub 主题。分区将允许您分隔消息,而不是将它们复制到不同的 Pub/Sub 主题。您需要提前了解所有 Pub/Sub 主题。参考:Partition。

例子:

// partition pipeline

PCollectionList<Message> msgs = p.apply(Partition.of(2, new PartitionFn<Message>() 
    public int partitionFor(Message msg, int numPartitions) 
        // TODO: determine how to partition messages
        if (msg.messageType == "x") 
            return 0;
         else 
            return 1;
        
    
));

// access partitions

PCollection<Message> partition1 = msgs.get(0);
partition1.apply("WriteItemsToTopic1", PubsubIO.writeStrings().to(options.getOutputTopic1()));

PCollection<Message> partition2 = msgs.get(1);
partition2.apply("WriteItemsToTopic2", PubsubIO.writeStrings().to(options.getOutputTopic2()));

【讨论】:

以上是关于Google Dataflow:根据条件仅将消息输出到 PubSub 主题之一的主要内容,如果未能解决你的问题,请参考以下文章

Google Dataflow 定价流模式

根据条件仅将 R 中的某些行转换为绝对值

Google Cloud DataFlow 作业尚不可用.. 在 Airflow

当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub

性能:Google Dataflow 将 avro 文件写入 GCS

Google Maps是否仅将我的IP用于地理定位?