在 Dataflow Python 中从 PubSub 读取 AVRO 消息

Posted

技术标签:

【中文标题】在 Dataflow Python 中从 PubSub 读取 AVRO 消息【英文标题】:Read AVRO messages from PubSub in Dataflow Python 【发布时间】:2020-07-27 16:13:10 【问题描述】:

我需要从另一个 GCP 项目的 PubSub 主题中读取 AVRO 消息。我之前实现了 Python 数据流管道,它从 PubSub 读取 JSON 消息并写入 BigQuery。但我是处理 AVRO 消息的新手。我试图查找 AVRO 的 Python 文档,它指向此链接 https://avro.apache.org/docs/current/gettingstartedpython.html

在这个链接中有一些从文件读取和写入文件的例子,但我认为这些函数对于从 PubSub 读取没有用。我正在使用以下转换从输出为字节串的 PubSub 中读取。

"Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)

我需要一种读取这些字节的方法(AVRO 格式)

【问题讨论】:

【参考方案1】:

这是一个您可以使用的示例代码

    从 Pub/Sub 读取消息
from fastavro import parse_schema, schemaless_reader

messages = (p
            | beam.io.ReadFromPubSub(
                subscription=known_args.input_subscription)
            .with_output_types(bytes))
    使用 Fastavro 包通过类定义定义架构和读取器
class AvroReader:
    def __init__(self, schema):
        self.schema = schema

    def deserialize(self, record):
        bytes_reader = io.BytesIO(record)
        dict_record = schemaless_reader(bytes_reader, self.schema)
        return dict_record
    现在映射字节元素并指定架构
schema = avro.schema.parse(open("avro.avsc", "rb").read())
avroReader = AvroReader(schema)

lines = messages | "decode" >> beam.Map(lambda input: avroReader.deserialize(input))

这些行应该有PCollection,格式为Avro。

【讨论】:

感谢 Jayadeep 的回答。我对“模式”变量有疑问。我从其他团队收到了模式作为“.avsc”文件。所以在你分享的这个脚本中,我猜模式是这个文件 .avsc 文件的内容。所以我正在尝试这样,""" with open(schema_path) as fd: schema = json.load(fd) """ 嗨 Jayadeep,我没有得到您建议 ReadFromAvro 的最后一行。我猜这个函数是从文件中读取的。在我的情况下,记录仍将来自 PubSub 主题。 谢谢。我现在将尝试创建 AVRO 消息并将它们放在 PubSub 中,以便我可以测试这个管道。目前我们依赖其他团队在这里发布消息。他们也使用过Java。如果你有发布者代码也请在这里分享,否则我会做更多的搜索。 会不会是这样,records = [ , ] fo = BytesIO() writer(fo, schema, records)

以上是关于在 Dataflow Python 中从 PubSub 读取 AVRO 消息的主要内容,如果未能解决你的问题,请参考以下文章

在Dataflow作业中查找重复的数据 - Python

DataFlow 使用 Airflow DataflowHook.start_python_dataflow 失败并返回代码 1

在 Python 中为 Dataflow 管道使用 WriteToBigquery 时出错。 Unicode 对象没有属性“项目”

在 GCP Dataflow 上的 python apache 光束中使用 scipy

将 Pub/Sub 连接到 Dataflow Python 管道

使用 Dataflow 管道 (python) 将多个 Json zip 文件从 GCS 加载到 BigQuery