使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出
Posted
技术标签:
【中文标题】使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出【英文标题】:Dataflow Streaming using Python SDK: Transform for PubSub Messages to BigQuery Output 【发布时间】:2018-04-01 22:11:00 【问题描述】:我正在尝试使用数据流来读取 pubsub 消息并将其写入大查询。 Google 团队为我提供了 alpha 访问权限,并且提供的示例可以正常工作,但现在我需要将其应用到我的场景中。
Pubsub 有效负载:
Message
data: 'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1
attributes:
大查询架构:
schema='mac:STRING, status:INTEGER, datetime:TIMESTAMP',
我的目标是简单地读取消息有效负载并插入 bigquery。我正在努力了解转换以及如何将键/值映射到大查询模式。
我对此很陌生,因此感谢您提供任何帮助。
当前代码:https://codeshare.io/ayqX8w
谢谢!
【问题讨论】:
【参考方案1】:我能够通过定义一个将其加载到 json 对象中的函数来成功解析 pubsub 字符串(请参阅 parse_pubsub())。我遇到的一个奇怪问题是我无法在全局范围内导入 json。我收到“NameError:未定义全局名称'json'”错误。我必须在函数中导入 json。
在下面查看我的工作代码:
from __future__ import absolute_import
import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window
'''Normalize pubsub string to json object'''
# Lines look like this:
# 'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1
def parse_pubsub(line):
import json
record = json.loads(line)
return (record['mac']), (record['status']), (record['datetime'])
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic', required=True,
help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
parser.add_argument(
'--output_table', required=True,
help=
('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read the pubsub topic into a PCollection.
lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
| beam.Map(parse_pubsub)
| beam.Map(lambda (mac_bq, status_bq, datetime_bq): 'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq)
| beam.io.WriteToBigQuery(
known_args.output_table,
schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
【讨论】:
如果您将 save_main_session 设置为 true,您应该能够全局导入模块 - 例如:github.com/GoogleCloudPlatform/python-docs-samples/blob/master/…【参考方案2】:写入 Python SDK 的 BigQuery 接收器的数据应采用字典的形式,其中字典的每个键给出 BigQuery 表的一个字段,对应的值给出要写入该字段的值。对于 BigQuery RECORD 类型,值本身应该是具有对应键值对的字典。
我提交了一个 JIRA 来改进 Beam 中相应 python 模块的文档:https://issues.apache.org/jira/browse/BEAM-3090
【讨论】:
感谢您的反馈。经过一些更多的实验后,看起来传入的发布/订阅消息是作为字符串传入的(显然)。我必须应用将线条对象转换为字典的转换。我在数据流中遇到的错误消息是:Group: Expected Tuple[TypeVariable[K], TypeVariable[V]], got我有一个类似的用例(从 PubSub 将行作为字符串读取,将它们转换为 dicts 然后处理它们)。
我正在使用ast.literal_eval()
,这似乎对我有用。此命令将评估字符串,但以比eval()
更安全的方式(请参阅here)。它应该返回一个 dict,其键是字符串,并且值被评估为最可能的类型(int、str、float ...)。不过,您可能希望确保这些值采用正确的类型。
这会给你一个像这样的管道
import ast
lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
| "JSON row to dict" >> beam.Map(
lambda s: ast.literal_eval(s))
| beam.io.WriteToBigQuery( ... )
)
我还没有使用过 BigQuery,所以我无法在最后一行为您提供帮助,但您所写的内容乍一看似乎是正确的。
【讨论】:
以上是关于使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出的主要内容,如果未能解决你的问题,请参考以下文章
如何将从 Spotify 流式传输的音乐可播放数据推送到不使用 Spotify 提供的 SDK 的设备
使用 aws-sdk 将 gm 调整大小的图像流上传到 s3
从 BigQuery 缓慢更改查找缓存 - Dataflow Python 流式 SDK