如何从 PubSub 主题读取数据并将其解析到光束管道中并打印
Posted
技术标签:
【中文标题】如何从 PubSub 主题读取数据并将其解析到光束管道中并打印【英文标题】:How to read and parse data from PubSub topic into a beam pipeline and print it 【发布时间】:2020-01-17 02:04:47 【问题描述】:我有一个程序,它在 pubSub 中创建一个主题并将消息发布到该主题。我还有一个自动数据流作业(使用模板),它将这些消息保存到我的 BigQuery 表中。现在我打算用 python 管道替换基于模板的作业,我的要求是从 PubSub 读取数据,应用转换并将数据保存到 BigQuery/发布到另一个 PubSub 主题。我开始用 python 编写脚本并做了很多试验和错误来实现它,但令我沮丧的是,我无法实现它。代码如下所示:
import apache_beam as beam
from apache_beam.io import WriteToText
TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"
def run():
o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
print("I reached here")
# # Read from PubSub into a PCollection.
data = (
p
| "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
)
data | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
print("Lines: ", data)
run()
如果我能尽早得到一些帮助,我将不胜感激。 注意:我在谷歌云上设置了我的项目,并且我的脚本在本地运行。
【问题讨论】:
【参考方案1】:基于Beam programming guide,您只需在管道中添加一个转换步骤。这里是一个例子或转换:
class PrintValue(beam.DoFn):
def process(self, element):
print(element)
return [element]
将其添加到您的管道中
data | beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
您可以添加所需的转换数量。您可以测试该值并将标记为 PCollection(用于具有多个输出)中的元素设置为扇出,或使用侧输入作为 PCollection 中的风扇。
【讨论】:
感谢您的帮助。我试过了,但我再次无法将任何数据发送到输出 pubsub 主题,也无法打印任何元素。我不确定我做错了什么。我所做的是data = ( p | "Read From Pub/Sub"
>>beam.io.ReadFromPubSub(topic=TOPIC_PATH)
| beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH) )
另外,我的 printValue 类在同一个文件中。请提出什么问题?
好吧,你错了。我的简短回应很好;我发布了一个更完整的(和更长的!)【参考方案2】:
这里是工作代码。
import apache_beam as beam
TOPIC_PATH = "projects/test-pipeline-253103/topics/test-pipeline-topic"
OUTPUT_PATH = "projects/test-pipeline-253103/topics/topic-repub"
class PrintValue(beam.DoFn):
def process(self, element):
print(element)
return [element]
def run():
o = beam.options.pipeline_options.PipelineOptions()
# Replace this by --stream execution param
standard_options = o.view_as(beam.options.pipeline_options.StandardOptions)
standard_options.streaming = True
p = beam.Pipeline(options=o)
print("I reached here")
# # Read from PubSub into a PCollection.
data = p | beam.io.ReadFromPubSub(topic=TOPIC_PATH) | beam.ParDo(PrintValue()) | beam.io.WriteToPubSub(topic=OUTPUT_PATH)
# Don't forget to run the pipeline!
result = p.run()
result.wait_until_finish()
run()
总结
您错过了运行管道。事实上,Beam 是一种图编程模型。因此,在您之前的代码中,您构建了图表,但从未运行过它。在这里,最后,运行它(不是阻塞调用)并等待结束(阻塞调用) 当您启动管道时,Beam 提到 PubSub 仅在流模式下工作。因此,您可以使用--streaming
参数开始您的代码,或者按照我的代码所示以编程方式进行
请注意,流媒体模式意味着在 PubSub 上无限期收听。如果您在 Dataflow 上运行它,您的管道将始终处于运行状态,直到您停止它。如果您的消息很少,这可能会很昂贵。确保这是目标模型
另一种方法是在有限的时间内使用您的管道(您使用调度程序来启动它,而另一个用于停止它)。但是,此时,您必须堆叠消息。在这里,您使用Topic
作为管道的入口。此选项强制 Beam 创建临时订阅并在此订阅上收听消息。这意味着在此订阅创建之前发布的消息将不会被接收和处理。
这个想法是创建一个订阅,通过这种方式,消息将被堆叠在其中(默认情况下最多 7 天)。然后,在管道 beam.io.ReadFromPubSub(subscription=SUB_PATH)
的条目中使用订阅名称。消息将被 Beam 拆散和处理(不保证订单!)
【讨论】:
以上是关于如何从 PubSub 主题读取数据并将其解析到光束管道中并打印的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Node.js 控制 Cloud PubSub 中的确认