使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出

Posted

技术标签:

【中文标题】使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出【英文标题】:Dataflow Streaming using Python SDK: Transform for PubSub Messages to BigQuery Output 【发布时间】:2018-04-01 22:11:00 【问题描述】:

我正在尝试使用数据流来读取 pubsub 消息并将其写入大查询。 Google 团队为我提供了 alpha 访问权限,并且提供的示例可以正常工作,但现在我需要将其应用到我的场景中。

Pubsub 有效负载:

Message 
    data: 'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1
    attributes: 

大查询架构:

schema='mac:STRING, status:INTEGER, datetime:TIMESTAMP',

我的目标是简单地读取消息有效负载并插入 bigquery。我正在努力了解转换以及如何将键/值映射到大查询模式。

我对此很陌生,因此感谢您提供任何帮助。

当前代码:https://codeshare.io/ayqX8w

谢谢!

【问题讨论】:

【参考方案1】:

我能够通过定义一个将其加载到 json 对象中的函数来成功解析 pubsub 字符串(请参阅 parse_pubsub())。我遇到的一个奇怪问题是我无法在全局范围内导入 json。我收到“NameError:未定义全局名称'json'”错误。我必须在函数中导入 json。

在下面查看我的工作代码:

from __future__ import absolute_import

import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window

'''Normalize pubsub string to json object'''
# Lines look like this:
  # 'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1
def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['mac']), (record['status']), (record['datetime'])

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input_topic', required=True,
      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
  parser.add_argument(
      '--output_table', required=True,
      help=
      ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
       'or DATASET.TABLE.'))
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:
    # Read the pubsub topic into a PCollection.
    lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | beam.Map(parse_pubsub)
                | beam.Map(lambda (mac_bq, status_bq, datetime_bq): 'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq)
                | beam.io.WriteToBigQuery(
                    known_args.output_table,
                    schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
            )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

【讨论】:

如果您将 save_main_session 设置为 true,您应该能够全局导入模块 - 例如:github.com/GoogleCloudPlatform/python-docs-samples/blob/master/…【参考方案2】:

写入 Python SDK 的 BigQuery 接收器的数据应采用字典的形式,其中字典的每个键给出 BigQuery 表的一个字段,对应的值给出要写入该字段的值。对于 BigQuery RECORD 类型,值本身应该是具有对应键值对的字典。

我提交了一个 JIRA 来改进 Beam 中相应 python 模块的文档:https://issues.apache.org/jira/browse/BEAM-3090

【讨论】:

感谢您的反馈。经过一些更多的实验后,看起来传入的发布/订阅消息是作为字符串传入的(显然)。我必须应用将线条对象转换为字典的转换。我在数据流中遇到的错误消息是:Group: Expected Tuple[TypeVariable[K], TypeVariable[V]], got 【参考方案3】:

我有一个类似的用例(从 PubSub 将行作为字符串读取,将它们转换为 dicts 然后处理它们)。

我正在使用ast.literal_eval(),这似乎对我有用。此命令将评估字符串,但以比eval() 更安全的方式(请参阅here)。它应该返回一个 dict,其键是字符串,并且值被评估为最可能的类型(int、str、float ...)。不过,您可能希望确保这些值采用正确的类型。

这会给你一个像这样的管道

import ast
lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
            | "JSON row to dict" >> beam.Map(
                        lambda s: ast.literal_eval(s))
            | beam.io.WriteToBigQuery( ... )
        )

我还没有使用过 BigQuery,所以我无法在最后一行为您提供帮助,但您所写的内容乍一看似乎是正确的。

【讨论】:

以上是关于使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出的主要内容,如果未能解决你的问题,请参考以下文章

使用 Media Foundation SDK 进行直播

如何将从 Spotify 流式传输的音乐可播放数据推送到不使用 Spotify 提供的 SDK 的设备

使用 aws-sdk 将 gm 调整大小的图像流上传到 s3

从 BigQuery 缓慢更改查找缓存 - Dataflow Python 流式 SDK

在 SDK 8 中使用 Android MediaPlayer 进行流式传输

流式处理 WCF 大量对象