Python BigQuery 存储。并行读取多个流

Posted

技术标签:

【中文标题】Python BigQuery 存储。并行读取多个流【英文标题】:Python BigQuery Storage. Reading multiple streams in parallel 【发布时间】:2019-09-24 12:51:49 【问题描述】:

我有以下玩具代码:

import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "libraries_io"
table_ref.table_id = "versions"

parent = "projects/".format(your_project_id)
session = client.create_read_session(
    table_ref,
    parent,
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)

reader1 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0]), timeout=100000)
reader2 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[1]), timeout=100000)

df = pd.concat([reader1.to_dataframe(session),reader2.to_dataframe(session)])
df

我使用 BALANCED ShardingStrategy 启动了多个可以独立读取的流。

BigqueryStorage 文档说:

但是,如果您想分散多个读者,您可以通过 让阅读器处理每个单独的流。

我启动了两个阅读器,一个用于会话中的每个流。之后,将两个数据帧(每个读取器创建一个)连接成一个。然而,与 LIQUID ShardingStrategy 相比,这种方法并没有提高任何速度。

我试图让两个阅读器并行读取行。但是,我在库文档中找不到有关并行流读取的任何信息。

问题是:

1) 如果选择了 BALANCED ShardingStrategy,BugQuery Storage 是否提供任何本机方法来同时读取多个流?

2) 并行读取流的最佳方法是什么?我需要为此使用多处理或异步吗?

3) 如果有人能提供任何关于并行流 reding 的基本示例,我将不胜感激

【问题讨论】:

【参考方案1】:

BigQuery Storage API 确实支持多个流,但您的执行方法不支持。您可以创建多个阅读器实例,然后每个阅读器都可以使用单独的流来增加吞吐量。

Parallel processing in python 你有很多选择。但是,最容易使用的是multiprocessing package。

另一种选择是使用Apache Beam,它默认支持并行处理,但可能不适合您的用例。它有一个内置的 BigQuery IO 驱动程序,但它的 python 版本还不支持 BigQuery Storage API,因此您可能需要为 BQ Storage API 编写自己的实现。

【讨论】:

'但是你的执行方法没有'你能详细说明一下吗?我创建了两个阅读器,我有两个流,每个单独的流由 1 个阅读器使用。我究竟做错了什么?需要更改哪些内容才能使用多个流? 您正在以串行方式阅读,只有在处理完 reader1 后才会处理 reader2。这称为逐行执行。您想要的是为 reader1 和 reader2 触发各个进程并让它们并行处理。 @KunalDeo 您能否提供有关梁自定义驱动程序的更多信息?我想在梁中实现这样的事情的一个问题是,默认情况下,基于梁的并行执行环境,我们不能将一个巨大的读取操作分割成更小的部分(因为我们不知道有多少工作人员会运行prd)。然后事情就变成了并行运行许多 单流 读取会话的光束,每个会话都单独做同样的事情,实际上并没有补充彼此的缺失部分。如果您有其他想法,链接或任何东西,如果您能分享,非常感谢! :) 谢谢【参考方案2】:

我进行了一些研究,发现您使用了 BigQuery Storage API 中的代码,您是对的,如果您正在消费多个流,则使用平衡策略,需要提及的是它仍在运行测试版。

发生这种情况的一些原因是您可能只看到 1 个流,因为流分配算法的数据相对“小”,流的数量可能低于请求的数量,具体取决于 2 个因素:a表的合理并行性和服务的限制。目前,确定什么是“合理”的算法细节尚未公开,一旦 API 达到一般可用性阶段,这些细节可能会发生变化。

你也可以试试上面推荐的multiprocessing package。

【讨论】:

【参考方案3】:

您缺少requested_streams 值:

n_streams = 2
session = client.create_read_session(
    table_ref,
    parent,
    requested_streams=n_streams,
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)

您可以在一行中连接数据框:

readers = []
for i in range(n_streams):
    stream = session.streams[i]
    position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
    reader = bqstorageclient.read_rows(position)
    readers.append(reader)
df = pd.concat([reader.to_dataframe(session) for reader in readers])

希望这会有所帮助。

【讨论】:

以上是关于Python BigQuery 存储。并行读取多个流的主要内容,如果未能解决你的问题,请参考以下文章

使用 Dataflow 管道 (python) 将多个 Json zip 文件从 GCS 加载到 BigQuery

使用 BigQuery Storage API(测试版)启动和读取多个流

在 Jupyter Windows 上使用 pool 并行读取多个文件需要很长时间:

从 Python 并行批量读取文件

在 Bigquery 中为多个 CSV 文件自动创建表

使用Java 8 Parallel Stream在并行读取多个文件时排除某些文件