在 Dataflow SQL 中解析属性

Posted

技术标签:

【中文标题】在 Dataflow SQL 中解析属性【英文标题】:Parsing attributes in Dataflow SQL 【发布时间】:2021-02-13 05:48:33 【问题描述】:

给定一个 Pub/Sub 主题,BigQuery 允许使用 Dataflow SQL 语法将数据流式传输到表中。

假设您将此消息"a": 1, "b": 2, "c": 3 发布到某个主题。在 BigQuery 中,使用 Dataflow 引擎,您需要将 my_topic 架构定义为

第一步

event_timestamp: TIMESTAMP
a: INT64
b: INT64
c: INT64

然后使用该命令创建 Dataflow 流式传输作业,以便将每条消息流式传输到目标 BigQuery 表。

第二步

gcloud dataflow sql query 'SELECT * FROM pubsub.topic.my_project.my_topic' \
  --job-name my_job --region europe-west1 --bigquery-write-disposition write-append \
  --bigquery-project my_project --bigquery-dataset staging --bigquery-table my_topic

gcloud pubsub topics publish my_topic --message='"a": 1, "b": 2, "c": 3'
​
bq query --nouse_legacy_sql \
  'SELECT * FROM my_project.staging.my_topic ORDER BY event_timestamp DESC LIMIT 10'

+---------------------+-----+-----+-----+
|   event_timestamp   |  a  |  b  |  c  |
+---------------------+-----+-----+-----+
| 2020-10-28 14:21:40 |  1  |  2  |  3  |

第 2 步,我还想将 --attribute="origin=gcloud,username=gcp" 发送到 Pub/Sub 主题。是否可以在 Step 1 定义架构以便自动写入表?

我一直在尝试不同的事情:

attributes: STRUCT 在架构中,在 this Beam extensions documentation 之后,但我得到的只是数据流中的 JSON 解析错误 gcloud pubsub topics publish my_topic --message='"a": 1, "b": 2' --attribute='c=3' 期望消息像 piece of code 一样被展平,但我在结果表中得到了 cNULL 值。

谢谢。

【问题讨论】:

我做不到同样的事情。这可能是不可能的! 实现相同行为的唯一方法似乎是使用主题架构中的WHERE sql 语句来过滤数据流作业中的消息。 Dataflow SQL 错过了过滤订阅等属性的可能性。 【参考方案1】:

Pub/Sub 属性属于 MAP 类型,但这不是 Dataflow SQL 的 supported types 之一。有关于增加支持的讨论,但我不知道那是什么状态。

如果属性很重要,我建议使用ReadFromPubSub 创建自定义管道

【讨论】:

我认为“Pub/Sub to BigQuery”数据流模板也可以提供帮助,它使用 PubsubIO 类。 github.com/GoogleCloudPlatform/DataflowTemplates/blob/…

以上是关于在 Dataflow SQL 中解析属性的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 中为 Dataflow 管道使用 WriteToBigquery 时出错。 Unicode 对象没有属性“项目”

如何使用 Stream 为 Spring Cloud Dataflow 中的子任务设置全局属性 - Task-Launcher-Dataflow

如何从 Dataflow 中的 PCollection 读取 bigQuery

从 ValueProvider 读取的 Dataflow BigQuery:“StaticValueProvider”对象没有属性“projectId”

我正在尝试在 PL/SQL 中解析 XML。我无法从标签中检索属性值,我做错了啥?

使用 DataflowRunner 和 Dataflow 服务运行时,PubsubIO 不会将自定义时间戳属性输出为 context.timestamp