将 BigQuery 表流式传输到 Google Pub/Sub

Posted

技术标签:

【中文标题】将 BigQuery 表流式传输到 Google Pub/Sub【英文标题】:Stream BigQuery table into Google Pub/Sub 【发布时间】:2017-05-10 08:54:15 【问题描述】:

我有一个 Google bigQuery 表,我想将整个表流式传输到 pub-sub 主题

什么应该是简单/快速的方法?

提前谢谢你,

【问题讨论】:

【参考方案1】:

2019 年更新:

现在,使用 Pub/Sub 中的点击大查询选项真的很容易:

找到它:https://console.cloud.google.com/cloudpubsub/topicList


我知道的最简单的方法是通过 Google Cloud Dataflow,它本身就知道如何访问 BigQuery 和 Pub/Sub。

理论上它应该像以下 Python 行一样简单:

p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
  beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
  beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))

这种 Python/Dataflow/BigQuery/PubSub 组合目前无法使用(Python Dataflow 处于测试阶段,但 keep an eye on the changelog)。

我们可以用 Java 做同样的事情,而且效果很好——我刚刚测试了它。它既可以在本地运行,也可以在托管的 Dataflow 运行器中运行:

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<TableRow> weatherData = p.apply(
        BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>() 
    @Override
    public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception 
        c.output(c.element().toString());
    
)).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));

p.run();

测试消息是否存在:

gcloud --project myproject beta pubsub subscriptions  pull --auto-ack sub1

托管数据流屏幕截图:

【讨论】:

你有这个例子的完整脚本在 github 上吗?我有兴趣将其从 pub/sub 部署到 bigquery。谢谢【参考方案2】:

这真的取决于桌子的大小。

如果它是一个小表(几千条记录,几个打瞌睡的列),那么您可以设置一个流程来查询整个表,将响应转换为 JSON 数组,然后推送到 pub-sub。

如果它是一张大表(数百万/数十亿条记录,数百列),您必须导出到文件,然后准备/运送到 pub-sub

这还取决于您的分区策略 - 如果您的表设置为按日期分区,您也许可以再次查询而不是导出。

最后但同样重要的是,它还取决于频率 - 这是一次性交易(然后导出)还是连续过程(然后使用表装饰器仅查询最新数据)?

如果您想要一个真正有用的答案,需要更多信息。

编辑

根据您对表格大小的 cmets,我认为最好的方法是编写一个脚本:

    将表格导出到GCS,作为换行符分隔的 JSON

    处理文件(逐行读取)并发送到 pub-sub

大多数编程语言都有client libraries。我用 Python 做过类似的事情,而且非常简单。

【讨论】:

表格有 3M 行和 ~7 列 一次性上传,还是连续上传? 它不是连续的,但我想偶尔做一次.. :) 问题是关于流媒体的,这个答案没有解决。请参阅 Felipe 的答案,这是正确的 imo

以上是关于将 BigQuery 表流式传输到 Google Pub/Sub的主要内容,如果未能解决你的问题,请参考以下文章

Google BigQuery - 将数据流式传输到 BigQuery

使用 GET 方法将数据流式传输到 Google BigQuery?

将数据从 Google 表格流式传输到 BigQuery 以在 Tableau 中进行可视化

直接流式传输到 BigQuery 与通过 Google Pub/Sub + Dataflow 流式传输的优缺点

bigquery storage API:是不是可以将 AVRO 文件直接流式传输/保存到 Google Cloud Storage?

GoogleApiException:流式传输到 BigQuery 时,Google.Apis.Requests.RequestError 后端错误 [500]