在 Dataflow Managed Service 中运行时,Dataflow 未读取 PubSub 消息

Posted

技术标签:

【中文标题】在 Dataflow Managed Service 中运行时,Dataflow 未读取 PubSub 消息【英文标题】:Dataflow not reading PubSub messages when running in Dataflow Managed Service 【发布时间】:2020-10-16 12:46:15 【问题描述】:

我们的 python Dataflow 管道在本地运行,但在使用 Google Cloud Platform 上的 Dataflow 托管服务部署时无法运行。它没有显示已连接到 PubSub 订阅的迹象。我们已经尝试订阅订阅和主题,但它们都没有奏效。消息在 PubSub 订阅中累积,并且 Dataflow 管道没有显示被调用的迹象或任何东西。我们已经仔细检查了项目是否相同

任何关于这方面的指导都将非常感激

这是连接到请求订阅的代码

with beam.Pipeline(options=options) as p:
        something = p | "ReadPubSub" >> beam.io.ReadFromPubSub(
            subscription="projects/PROJECT_ID/subscriptions/cloudflow"
        )

这里是使用的选项

 options = PipelineOptions()
 file_processing_options = PipelineOptions().view_as(FileProcessingOptions)
 if options.view_as(GoogleCloudOptions).project is None:
        print(sys.argv[0] + ": error: argument --project is required")
        sys.exit(1)
 options.view_as(SetupOptions).save_main_session = True
 options.view_as(StandardOptions).streaming = True

PubSub 订阅具有以下配置:

Delivery type: Pull
Subscription expiration: Subscription expires in 31 days if there is no activity.
Acknowledgement deadline: 57 Seconds
Subscription filter: —
Message retention duration: 7 Days
Retained acknowledged messages: No
Dead lettering: Disabled
Retry policy : Retry immediately

【问题讨论】:

您能否分享您连接到 PubSub 的管道部分以及管道的选项?您能否详细说明您用于 Dataflow 的类型订阅?你有仔细检查过是同一个项目吗? 更新帖子以提供该信息 没什么奇怪的。您在依赖项中使用的 Beam sdk 版本是什么? 版本为2.20.0 您在管道上使用特殊服务帐户吗?无论如何,使用的服务帐户是否有权访问 pubsub? 【参考方案1】:

我认为对于从订阅中提取我们需要将 with_attributes 参数作为 True 传递。

with_attributes - True - 输出元素将是 PubsubMessage 对象。错误的 - 输出元素将是字节类型(仅限消息数据)。

在这里找到类似的: When using Beam IO ReadFromPubSub module, can you pull messages with attributes in Python? It's unclear if its supported

【讨论】:

以上是关于在 Dataflow Managed Service 中运行时,Dataflow 未读取 PubSub 消息的主要内容,如果未能解决你的问题,请参考以下文章

在 Dataflow 流式传输管道上捕获 BigQuery HttpBadRequestError

如何在 TPL/Dataflow 中发出笛卡尔积?

在 Dataflow 管道中写入 BigQuery 表失败

是否可以在 Dataflow 中暂存模型文件?

managed_shared_memory 与 windows_shared_memory

如何在 google-dataflow 中读取 cassandra