在 python 中使用 BigQuery 接收器流式传输管道

Posted

技术标签:

【中文标题】在 python 中使用 BigQuery 接收器流式传输管道【英文标题】:Streaming pipelines with BigQuery sinks in python 【发布时间】:2018-07-31 20:44:19 【问题描述】:

我正在构建一个源为 Pubsub 且接收器为 BigQuery 的 apache 光束流管道。我收到了错误消息:

“工作流程失败。原因:未知消息代码。”

尽管这条消息很神秘,但我现在认为 BigQuery 不支持作为流式管道的接收器,它在这里说: Streaming from Pub/Sub to BigQuery

我肯定是正确的,这是导致问题的原因吗?或者如果不是,它在任何情况下仍然不受支持?

谁能暗示这个功能什么时候发布?很遗憾,我很高兴能使用它。

【问题讨论】:

【参考方案1】:

如 Beam 文档 here 中所述,自 Beam 2.5.0 起,Python 流式传输管道在实验上可用

因此您需要安装 apache-beam 2.5.0 和 apache-beam[gcp]

pip install apache-beam==2.5.0
pip install apache-beam[gcp]

我运行了这个命令:

python pubsub_to_bq.py --runner DataflowRunner --input_topic=projects/pubsub-public-data/topics/taxirides-realtime --project <my-project> --temp_location gs://<my-bucket>/tmp --staging_location gs://<my-bucket>/staging --streaming

使用下面的代码,一切正常:

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam

def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['ride_id']), (record['point_idx']), (record['latitude']), (record['longitude']), (record['timestamp']), (record['meter_increment']), (record['ride_status']), (record['meter_reading']), (record['passenger_count'])

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input_topic', dest='input_topic', required=True,
      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:

    # Read from PubSub
    lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
    #Adapt messages from PubSub to BQ table
    lines = lines | beam.Map(parse_pubsub)
    lines = lines | beam.Map(lambda (ride_id, point_idx, latitude, longitude, timestamp, meter_increment, ride_status,meter_reading, passenger_count): 'ride_id':ride_id, 'point_idx':point_idx, 'latitude':latitude, 'longitude':longitude, 'timestamp':timestamp, 'meter_increment':meter_increment,'ride_status': ride_status,'meter_reading':meter_reading,'passenger_count': passenger_count)
    #Write to a BQ table 
    lines | beam.io.WriteToBigQuery(table ='<my-table>',dataset='<my-dataset>',project='<my-project>' )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

此代码使用publicly available topic"--topic projects/pubsub-public-data/topics/taxirides-realtime”和我使用正确架构创建的 BQ 表。

如果您使用此示例,请注意不要让它继续运行,否则您将产生成本,因为您将收到来自此 PubSub 主题的大量消息。

【讨论】:

感谢 Victor - 我唯一需要更改的行是打开了 WriteToBigQuery 的行。也许我只是犯了一个简单的语法错误。

以上是关于在 python 中使用 BigQuery 接收器流式传输管道的主要内容,如果未能解决你的问题,请参考以下文章

在 BigQuery 中,有没有办法增加一个阶段的输出接收器数量?

BigQuery 源/接收器的数据流管道详细信息未显示

使用自定义目标接收器将日志导出到 BigQuery(表分区)

在 BigQuery 接收器中进行一次性处理的情况下,重新洗牌是啥意思?

在数据流管道中动态设置 bigquery 表 id

在 Beam 管道中以编程方式生成 BigQuery 架构