bigquery storage API:是不是可以将 AVRO 文件直接流式传输/保存到 Google Cloud Storage?

Posted

技术标签:

【中文标题】bigquery storage API:是不是可以将 AVRO 文件直接流式传输/保存到 Google Cloud Storage?【英文标题】:bigquery storage API: Is it possible to stream / save AVRO files directly to Google Cloud Storage?bigquery storage API:是否可以将 AVRO 文件直接流式传输/保存到 Google Cloud Storage? 【发布时间】:2020-05-20 07:35:16 【问题描述】:

我想将 90 TB BigQuery 表导出到 Google Cloud Storage。根据the documentation 的说法,由于与其他方法相关的导出大小配额(例如 ExtractBytesPerDay),BigQuery Storage API(测试版)应该是可行的方法。

该表是按日期分区的,每个分区占用约 300 GB。我有一个在 GCP 上运行的 Python AI Notebook,它通过这个改编自 docs 的脚本运行分区(并行)。

from google.cloud import bigquery_storage_v1

client = bigquery_storage_v1.BigQueryReadClient()

table = "projects//datasets//tables/".format(
    "bigquery-public-data", "usa_names", "usa_1910_current"
) # I am using my private table instead of this one.

requested_session = bigquery_storage_v1.types.ReadSession()
requested_session.table = table
requested_session.data_format = bigquery_storage_v1.enums.DataFormat.AVRO

parent = "projects/".format(project_id)
session = client.create_read_session(
    parent,
    requested_session,
    max_stream_count=1,
)
reader = client.read_rows(session.streams[0].name)

# The read stream contains blocks of Avro-encoded bytes. The rows() method
# uses the fastavro library to parse these blocks as an iterable of Python
# dictionaries.

rows = reader.rows(session)

是否可以将数据流中的数据直接保存到 Google Cloud Storage?

我尝试使用 fastavro 将表作为 AVRO 文件保存到我的 AI 实例,然后使用 Blob.upload_from_filename() 将它们上传到 GCS,但这个过程非常很慢。我希望可以将流指向我的 GCS 存储桶。我尝试了 Blob.upload_from_file,但无法弄清楚。

我无法将整个流解码到内存并使用 Blob.upload_from_string,因为我没有超过 300 GB 的 RAM。

过去两天我一直在解析 GCP 文档,但找不到任何东西,因此我希望您能提供帮助,如果可能的话,最好使用代码 sn-p。 (如果使用另一种文件格式更容易,我完全赞成。)

谢谢!

【问题讨论】:

【参考方案1】:

是否可以将流中的数据直接保存到 Google Cloud Storage?

BigQuery Storage API 本身无法直接写入 GCS;您需要将 API 与代码配对以解析数据,将其写入本地存储,然后上传到 GCS。这可能是您手动编写的代码,也可能是来自某种框架的代码。

看起来您共享的代码 sn-p 以单线程方式处理每个分区,这将您的吞吐量限制为单个读取流的吞吐量。存储 API 旨在通过并行实现高吞吐量,因此它适用于并行处理框架,例如 Google Cloud Dataflow 或 Apache Spark。如果您想使用 Dataflow,可以从 Google-provided template 开始;对于 Spark,您可以使用 David 已经分享的代码 sn-ps。

【讨论】:

谢谢,肯尼斯!我认为我的过程中的瓶颈是保存到我的 VM 实例中的本地存储,而不是单个读取流的吞吐量。我没有使用过 Google Cloud Dataflow 或 Apache Spark,但我会尝试一下。 (我使用 parsl)。【参考方案2】:

一个简单的方法是使用Spark 和spark-bigquery-connector?它使用 BigQuery Storage API 将表直接读入 Spark 的 DataFrame。您可以在Dataproc 上创建一个 Spark 集群,该集群与 BigQuery 和 GCS 位于同一数据中心,从而使读写速度更快。

代码示例如下所示:

df = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data.usa_names.usa_1910_current") \
  .load()

df.write.format("avro").save("gs://bucket/path")

您还可以过滤数据并分别在每个分区上工作:

df = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data.usa_names.usa_1910_current") \
  .option("filter", "the_date='2020-05-12'") \
  .load()

# OR, in case you don't need to give the partition at load

df = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data.usa_names.usa_1910_current") \
  .load()

df.where("the_date='2020-05-12'").write....

请注意,为了读取大量数据,您需要一个足够大的集群。

【讨论】:

谢谢你,大卫。我没有使用过 Dataproc 和 Spark,但我会尽量听从您的建议。在这个项目之前,我只使用了 PLX。

以上是关于bigquery storage API:是不是可以将 AVRO 文件直接流式传输/保存到 Google Cloud Storage?的主要内容,如果未能解决你的问题,请参考以下文章

使用 BigQuery Storage API(测试版)启动和读取多个流

Bigquery API:如何为 load_table_from_storage 调用提供架构

通过 Spark 使用 BigQuery Storage API:请求多个分区但仅获得 1 个

如何使用 API 存储在 Google Cloud Storage 中的架构文件在 BigQuery 加载作业上设置架构?

BigQuery Storage API 无法读取由有序 (ORDER BY) 查询创建的临时表

从 Cloud Storage Json 加载数据时出现 BigQuery 错误