Python BigQuery 存储。并行读取多个流
Posted
技术标签:
【中文标题】Python BigQuery 存储。并行读取多个流【英文标题】:Python BigQuery Storage. Reading multiple streams in parallel 【发布时间】:2019-09-24 12:51:49 【问题描述】:我有以下玩具代码:
import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "libraries_io"
table_ref.table_id = "versions"
parent = "projects/".format(your_project_id)
session = client.create_read_session(
table_ref,
parent,
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)
reader1 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0]), timeout=100000)
reader2 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[1]), timeout=100000)
df = pd.concat([reader1.to_dataframe(session),reader2.to_dataframe(session)])
df
我使用 BALANCED ShardingStrategy 启动了多个可以独立读取的流。
BigqueryStorage 文档说:
但是,如果您想分散多个读者,您可以通过 让阅读器处理每个单独的流。
我启动了两个阅读器,一个用于会话中的每个流。之后,将两个数据帧(每个读取器创建一个)连接成一个。然而,与 LIQUID ShardingStrategy 相比,这种方法并没有提高任何速度。
我试图让两个阅读器并行读取行。但是,我在库文档中找不到有关并行流读取的任何信息。
问题是:
1) 如果选择了 BALANCED ShardingStrategy,BugQuery Storage 是否提供任何本机方法来同时读取多个流?
2) 并行读取流的最佳方法是什么?我需要为此使用多处理或异步吗?
3) 如果有人能提供任何关于并行流 reding 的基本示例,我将不胜感激
【问题讨论】:
【参考方案1】:BigQuery Storage API 确实支持多个流,但您的执行方法不支持。您可以创建多个阅读器实例,然后每个阅读器都可以使用单独的流来增加吞吐量。
Parallel processing in python 你有很多选择。但是,最容易使用的是multiprocessing package。
另一种选择是使用Apache Beam,它默认支持并行处理,但可能不适合您的用例。它有一个内置的 BigQuery IO 驱动程序,但它的 python 版本还不支持 BigQuery Storage API,因此您可能需要为 BQ Storage API 编写自己的实现。
【讨论】:
'但是你的执行方法没有'你能详细说明一下吗?我创建了两个阅读器,我有两个流,每个单独的流由 1 个阅读器使用。我究竟做错了什么?需要更改哪些内容才能使用多个流? 您正在以串行方式阅读,只有在处理完 reader1 后才会处理 reader2。这称为逐行执行。您想要的是为 reader1 和 reader2 触发各个进程并让它们并行处理。 @KunalDeo 您能否提供有关梁自定义驱动程序的更多信息?我想在梁中实现这样的事情的一个问题是,默认情况下,基于梁的并行执行环境,我们不能将一个巨大的读取操作分割成更小的部分(因为我们不知道有多少工作人员会运行prd)。然后事情就变成了并行运行许多 单流 读取会话的光束,每个会话都单独做同样的事情,实际上并没有补充彼此的缺失部分。如果您有其他想法,链接或任何东西,如果您能分享,非常感谢! :) 谢谢【参考方案2】:我进行了一些研究,发现您使用了 BigQuery Storage API 中的代码,您是对的,如果您正在消费多个流,则使用平衡策略,需要提及的是它仍在运行测试版。
发生这种情况的一些原因是您可能只看到 1 个流,因为流分配算法的数据相对“小”,流的数量可能低于请求的数量,具体取决于 2 个因素:a表的合理并行性和服务的限制。目前,确定什么是“合理”的算法细节尚未公开,一旦 API 达到一般可用性阶段,这些细节可能会发生变化。
你也可以试试上面推荐的multiprocessing package。
【讨论】:
【参考方案3】:您缺少requested_streams
值:
n_streams = 2
session = client.create_read_session(
table_ref,
parent,
requested_streams=n_streams,
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)
您可以在一行中连接数据框:
readers = []
for i in range(n_streams):
stream = session.streams[i]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)
readers.append(reader)
df = pd.concat([reader.to_dataframe(session) for reader in readers])
希望这会有所帮助。
【讨论】:
以上是关于Python BigQuery 存储。并行读取多个流的主要内容,如果未能解决你的问题,请参考以下文章
使用 Dataflow 管道 (python) 将多个 Json zip 文件从 GCS 加载到 BigQuery
使用 BigQuery Storage API(测试版)启动和读取多个流