使用 python 根据 apache Beam 中的条件调用特定的 pubsub 主题

Posted

技术标签:

【中文标题】使用 python 根据 apache Beam 中的条件调用特定的 pubsub 主题【英文标题】:call specific pubsub topic based on condition in apache beam with python 【发布时间】:2021-07-24 08:28:34 【问题描述】:

我想从 pubsub topic1 读取主题,并根据条件将清理后的 json 写入 topic2 和 topic3。

假设:我在 json 中有一个来自 topic1 的标志,我做了一些转换,并检查标志值并根据标志值写入 topic2 和 topic3。

我已经尝试了以下方法,但从这里我无法进一步移动,因为不知道如何根据条件调用管道。

我的 Beam 管道代码如下:

with beam.Pipeline(options=pipeline_options) as p:
    Ingest = ( p
                | 'Read from Topic' >> beam.io.ReadFromPubSub(topic=known_args.topic).with_output_types(bytes)
                | 'Decode' >> beam.Map(decode_message)
                | 'Make One Json' >> beam.Map(make_one)
                | 'Split based on event' >> beam.Map(split)
                # when event_name== 'aa_afo_addtocart_clicked'
                | 'write to topic2'
                # when event_name== 'aa_afo_merchantpage_visited'
                | 'write to topic3'

            )

第四步,我正在调用拆分函数,但请指导我如何将拆分后的输出写入多个主题。

split python 函数执行以下操作。 获取一个单一的输入 json -> 检查标志并将结果拆分为两个 -> 一个应该转到 topic2,另一个应该转到 topic3。

def split(p):
    json_obj_list = json.load(p)
    jb =[]
    for json_obj in json_obj_list:
       if json_obj['event_name']== 'aa_afo_addtocart_clicked':
          filename = json_obj['event_name'] + '.json'
          with open(filename, 'a') as out_json_file:
            json_string = json.dumps(json_obj)
            print(json_string)
            #json.dump(json_obj, out_json_file)
       if json_obj['event_name'] == 'aa_afo_merchantpage_visited':
          filename = json_obj['event_name'] + '.json'
          with open(filename, 'a') as out_json_file:
            json_string = json.dumps(json_obj)
            print(json_string)

【问题讨论】:

【参考方案1】:

这里的解决方案是创建多个输出,如programming guide 中所述。像这样,你执行你的拆分,你有 2 个 PCollection 作为输出。

然后独立处理 2 个 PCollection:在主题 2 和主题 3 中下沉。

【讨论】:

以上是关于使用 python 根据 apache Beam 中的条件调用特定的 pubsub 主题的主要内容,如果未能解决你的问题,请参考以下文章

Python 上的 Apache Beam 将 beam.Map 调用相乘

无法使用 Apache Beam(Python SDK)读取 Pub/Sub 消息

在 python Apache Beam 中打开一个 gzip 文件

如何使用 Apache BEAM 在 BigQuery 中执行快速联接

如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表

使用 Python 处理 Apache Beam 管道中的异常