将 BigQuery 表流式传输到 Google Pub/Sub
Posted
技术标签:
【中文标题】将 BigQuery 表流式传输到 Google Pub/Sub【英文标题】:Stream BigQuery table into Google Pub/Sub 【发布时间】:2017-05-10 08:54:15 【问题描述】:我有一个 Google bigQuery 表,我想将整个表流式传输到 pub-sub 主题
什么应该是简单/快速的方法?
提前谢谢你,
【问题讨论】:
【参考方案1】:2019 年更新:
现在,使用 Pub/Sub 中的点击大查询选项真的很容易:
找到它:https://console.cloud.google.com/cloudpubsub/topicList
我知道的最简单的方法是通过 Google Cloud Dataflow,它本身就知道如何访问 BigQuery 和 Pub/Sub。
理论上它应该像以下 Python 行一样简单:
p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))
这种 Python/Dataflow/BigQuery/PubSub 组合目前无法使用(Python Dataflow 处于测试阶段,但 keep an eye on the changelog)。
我们可以用 Java 做同样的事情,而且效果很好——我刚刚测试了它。它既可以在本地运行,也可以在托管的 Dataflow 运行器中运行:
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<TableRow> weatherData = p.apply(
BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>()
@Override
public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception
c.output(c.element().toString());
)).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));
p.run();
测试消息是否存在:
gcloud --project myproject beta pubsub subscriptions pull --auto-ack sub1
托管数据流屏幕截图:
【讨论】:
你有这个例子的完整脚本在 github 上吗?我有兴趣将其从 pub/sub 部署到 bigquery。谢谢【参考方案2】:这真的取决于桌子的大小。
如果它是一个小表(几千条记录,几个打瞌睡的列),那么您可以设置一个流程来查询整个表,将响应转换为 JSON 数组,然后推送到 pub-sub。
如果它是一张大表(数百万/数十亿条记录,数百列),您必须导出到文件,然后准备/运送到 pub-sub
这还取决于您的分区策略 - 如果您的表设置为按日期分区,您也许可以再次查询而不是导出。
最后但同样重要的是,它还取决于频率 - 这是一次性交易(然后导出)还是连续过程(然后使用表装饰器仅查询最新数据)?
如果您想要一个真正有用的答案,需要更多信息。
编辑
根据您对表格大小的 cmets,我认为最好的方法是编写一个脚本:
将表格导出到GCS,作为换行符分隔的 JSON
处理文件(逐行读取)并发送到 pub-sub
大多数编程语言都有client libraries。我用 Python 做过类似的事情,而且非常简单。
【讨论】:
表格有 3M 行和 ~7 列 一次性上传,还是连续上传? 它不是连续的,但我想偶尔做一次.. :) 问题是关于流媒体的,这个答案没有解决。请参阅 Felipe 的答案,这是正确的 imo以上是关于将 BigQuery 表流式传输到 Google Pub/Sub的主要内容,如果未能解决你的问题,请参考以下文章
Google BigQuery - 将数据流式传输到 BigQuery
使用 GET 方法将数据流式传输到 Google BigQuery?
将数据从 Google 表格流式传输到 BigQuery 以在 Tableau 中进行可视化
直接流式传输到 BigQuery 与通过 Google Pub/Sub + Dataflow 流式传输的优缺点
bigquery storage API:是不是可以将 AVRO 文件直接流式传输/保存到 Google Cloud Storage?
GoogleApiException:流式传输到 BigQuery 时,Google.Apis.Requests.RequestError 后端错误 [500]