Apache Beam 数据流 BigQuery

Posted

技术标签:

【中文标题】Apache Beam 数据流 BigQuery【英文标题】:Apache Beam Dataflow BigQuery 【发布时间】:2018-07-16 10:17:34 【问题描述】:

如何使用 apache Beam 和 DataflowRunner 从 Google BigQuery 数据集中获取表列表?

我找不到如何从指定数据集中获取表。我想使用 Dataflow 的并行处理编程模型将表从位于美国的数据集中迁移到位于欧盟的数据集中。

【问题讨论】:

如果您使用的是 java 或 python,请做标记。谢谢! 使用 java, apache dataflow with python 有一些未解决的问题... 【参考方案1】:

声明库

from google.cloud import bigquery

准备一个 bigquery 客户端

client = bigquery.Client(project='your_project_name')

准备对新数据集的引用

dataset_ref = client.dataset('your_data_set_name')

发出 API 请求

tables = list(client.list_tables(dataset_ref))
if tables:
    for table in tables:
        print('\t'.format(table.table_id))

参考: https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/usage.html#datasets

【讨论】:

我需要java中的代码,beam 2.4也使用com.google.api.services.bigquery v2-rev374-1.22.0。这也是您应该与 Beam 2.4 一起使用的库。 代码参考:***.com/questions/51101842/…【参考方案2】:

您可以尝试使用 google-cloud-examples maven repo。有一个名为 BigQuerySnippets 的类,它调用 API 来获取表元数据,您可以获取模式。请注意,限制 API 配额为每秒最多 6 个并发请求。

【讨论】:

【参考方案3】:

Dataflow 的目的是创建管道,因此不包括发出某些 API 请求的能力。您必须使用 BigQuery Java 客户端库来获取数据,然后将其提供给您的 Apache Pipeline。

DatasetId datasetId = DatasetId.of(projectId, datasetName);
Page<Table> tables = bigquery.listTables(datasetId, TableListOption.pageSize(100));
for (Table table : tables.iterateAll()) 
  // do something

【讨论】:

以上是关于Apache Beam 数据流 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

结合 BigQuery 和 Pub/Sub Apache Beam

使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS

使用 Apache Beam 向 BigQuery 传播插入时如何指定 insertId

Apache Beam - 将 BigQuery TableRow 写入 Cassandra

Apache-beam Bigquery .fromQuery ClassCastException

使用 apache beam Json Time Partitioning 在 bigquery 中创建时间分区表