使用 python 根据 apache Beam 中的条件调用特定的 pubsub 主题
Posted
技术标签:
【中文标题】使用 python 根据 apache Beam 中的条件调用特定的 pubsub 主题【英文标题】:call specific pubsub topic based on condition in apache beam with python 【发布时间】:2021-07-24 08:28:34 【问题描述】:我想从 pubsub topic1 读取主题,并根据条件将清理后的 json 写入 topic2 和 topic3。
假设:我在 json 中有一个来自 topic1 的标志,我做了一些转换,并检查标志值并根据标志值写入 topic2 和 topic3。
我已经尝试了以下方法,但从这里我无法进一步移动,因为不知道如何根据条件调用管道。
我的 Beam 管道代码如下:
with beam.Pipeline(options=pipeline_options) as p:
Ingest = ( p
| 'Read from Topic' >> beam.io.ReadFromPubSub(topic=known_args.topic).with_output_types(bytes)
| 'Decode' >> beam.Map(decode_message)
| 'Make One Json' >> beam.Map(make_one)
| 'Split based on event' >> beam.Map(split)
# when event_name== 'aa_afo_addtocart_clicked'
| 'write to topic2'
# when event_name== 'aa_afo_merchantpage_visited'
| 'write to topic3'
)
第四步,我正在调用拆分函数,但请指导我如何将拆分后的输出写入多个主题。
split python 函数执行以下操作。 获取一个单一的输入 json -> 检查标志并将结果拆分为两个 -> 一个应该转到 topic2,另一个应该转到 topic3。
def split(p):
json_obj_list = json.load(p)
jb =[]
for json_obj in json_obj_list:
if json_obj['event_name']== 'aa_afo_addtocart_clicked':
filename = json_obj['event_name'] + '.json'
with open(filename, 'a') as out_json_file:
json_string = json.dumps(json_obj)
print(json_string)
#json.dump(json_obj, out_json_file)
if json_obj['event_name'] == 'aa_afo_merchantpage_visited':
filename = json_obj['event_name'] + '.json'
with open(filename, 'a') as out_json_file:
json_string = json.dumps(json_obj)
print(json_string)
【问题讨论】:
【参考方案1】:这里的解决方案是创建多个输出,如programming guide 中所述。像这样,你执行你的拆分,你有 2 个 PCollection 作为输出。
然后独立处理 2 个 PCollection:在主题 2 和主题 3 中下沉。
【讨论】:
以上是关于使用 python 根据 apache Beam 中的条件调用特定的 pubsub 主题的主要内容,如果未能解决你的问题,请参考以下文章
Python 上的 Apache Beam 将 beam.Map 调用相乘
无法使用 Apache Beam(Python SDK)读取 Pub/Sub 消息
在 python Apache Beam 中打开一个 gzip 文件
如何使用 Apache BEAM 在 BigQuery 中执行快速联接