在 Dataflow SQL 中解析属性
Posted
技术标签:
【中文标题】在 Dataflow SQL 中解析属性【英文标题】:Parsing attributes in Dataflow SQL 【发布时间】:2021-02-13 05:48:33 【问题描述】:给定一个 Pub/Sub 主题,BigQuery 允许使用 Dataflow SQL 语法将数据流式传输到表中。
假设您将此消息"a": 1, "b": 2, "c": 3
发布到某个主题。在 BigQuery 中,使用 Dataflow 引擎,您需要将 my_topic
架构定义为
第一步
event_timestamp: TIMESTAMP
a: INT64
b: INT64
c: INT64
然后使用该命令创建 Dataflow 流式传输作业,以便将每条消息流式传输到目标 BigQuery 表。
第二步
gcloud dataflow sql query 'SELECT * FROM pubsub.topic.my_project.my_topic' \
--job-name my_job --region europe-west1 --bigquery-write-disposition write-append \
--bigquery-project my_project --bigquery-dataset staging --bigquery-table my_topic
gcloud pubsub topics publish my_topic --message='"a": 1, "b": 2, "c": 3'
bq query --nouse_legacy_sql \
'SELECT * FROM my_project.staging.my_topic ORDER BY event_timestamp DESC LIMIT 10'
+---------------------+-----+-----+-----+
| event_timestamp | a | b | c |
+---------------------+-----+-----+-----+
| 2020-10-28 14:21:40 | 1 | 2 | 3 |
在第 2 步,我还想将 --attribute="origin=gcloud,username=gcp"
发送到 Pub/Sub 主题。是否可以在 Step 1 定义架构以便自动写入表?
我一直在尝试不同的事情:
attributes: STRUCT
在架构中,在 this Beam extensions documentation 之后,但我得到的只是数据流中的 JSON 解析错误
gcloud pubsub topics publish my_topic --message='"a": 1, "b": 2' --attribute='c=3'
期望消息像 piece of code 一样被展平,但我在结果表中得到了 c
的 NULL
值。
谢谢。
【问题讨论】:
我做不到同样的事情。这可能是不可能的! 实现相同行为的唯一方法似乎是使用主题架构中的WHERE
sql 语句来过滤数据流作业中的消息。 Dataflow SQL 错过了过滤订阅等属性的可能性。
【参考方案1】:
Pub/Sub 属性属于 MAP
类型,但这不是 Dataflow SQL 的 supported types 之一。有关于增加支持的讨论,但我不知道那是什么状态。
如果属性很重要,我建议使用ReadFromPubSub 创建自定义管道
【讨论】:
我认为“Pub/Sub to BigQuery”数据流模板也可以提供帮助,它使用 PubsubIO 类。 github.com/GoogleCloudPlatform/DataflowTemplates/blob/…以上是关于在 Dataflow SQL 中解析属性的主要内容,如果未能解决你的问题,请参考以下文章
在 Python 中为 Dataflow 管道使用 WriteToBigquery 时出错。 Unicode 对象没有属性“项目”
如何使用 Stream 为 Spring Cloud Dataflow 中的子任务设置全局属性 - Task-Launcher-Dataflow
如何从 Dataflow 中的 PCollection 读取 bigQuery
从 ValueProvider 读取的 Dataflow BigQuery:“StaticValueProvider”对象没有属性“projectId”
我正在尝试在 PL/SQL 中解析 XML。我无法从标签中检索属性值,我做错了啥?
使用 DataflowRunner 和 Dataflow 服务运行时,PubsubIO 不会将自定义时间戳属性输出为 context.timestamp