在 Dataflow Python 中从 PubSub 读取 AVRO 消息
Posted
技术标签:
【中文标题】在 Dataflow Python 中从 PubSub 读取 AVRO 消息【英文标题】:Read AVRO messages from PubSub in Dataflow Python 【发布时间】:2020-07-27 16:13:10 【问题描述】:我需要从另一个 GCP 项目的 PubSub 主题中读取 AVRO 消息。我之前实现了 Python 数据流管道,它从 PubSub 读取 JSON 消息并写入 BigQuery。但我是处理 AVRO 消息的新手。我试图查找 AVRO 的 Python 文档,它指向此链接 https://avro.apache.org/docs/current/gettingstartedpython.html
在这个链接中有一些从文件读取和写入文件的例子,但我认为这些函数对于从 PubSub 读取没有用。我正在使用以下转换从输出为字节串的 PubSub 中读取。
"Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
我需要一种读取这些字节的方法(AVRO 格式)
【问题讨论】:
【参考方案1】:这是一个您可以使用的示例代码
-
从 Pub/Sub 读取消息
from fastavro import parse_schema, schemaless_reader
messages = (p
| beam.io.ReadFromPubSub(
subscription=known_args.input_subscription)
.with_output_types(bytes))
-
使用 Fastavro 包通过类定义定义架构和读取器
class AvroReader:
def __init__(self, schema):
self.schema = schema
def deserialize(self, record):
bytes_reader = io.BytesIO(record)
dict_record = schemaless_reader(bytes_reader, self.schema)
return dict_record
-
现在映射字节元素并指定架构
schema = avro.schema.parse(open("avro.avsc", "rb").read())
avroReader = AvroReader(schema)
lines = messages | "decode" >> beam.Map(lambda input: avroReader.deserialize(input))
这些行应该有PCollection
,格式为Avro。
【讨论】:
感谢 Jayadeep 的回答。我对“模式”变量有疑问。我从其他团队收到了模式作为“.avsc”文件。所以在你分享的这个脚本中,我猜模式是这个文件 .avsc 文件的内容。所以我正在尝试这样,""" with open(schema_path) as fd: schema = json.load(fd) """ 嗨 Jayadeep,我没有得到您建议 ReadFromAvro 的最后一行。我猜这个函数是从文件中读取的。在我的情况下,记录仍将来自 PubSub 主题。 谢谢。我现在将尝试创建 AVRO 消息并将它们放在 PubSub 中,以便我可以测试这个管道。目前我们依赖其他团队在这里发布消息。他们也使用过Java。如果你有发布者代码也请在这里分享,否则我会做更多的搜索。 会不会是这样,records = [以上是关于在 Dataflow Python 中从 PubSub 读取 AVRO 消息的主要内容,如果未能解决你的问题,请参考以下文章
DataFlow 使用 Airflow DataflowHook.start_python_dataflow 失败并返回代码 1
在 Python 中为 Dataflow 管道使用 WriteToBigquery 时出错。 Unicode 对象没有属性“项目”
在 GCP Dataflow 上的 python apache 光束中使用 scipy