使用 BigQuery Storage API(测试版)启动和读取多个流
Posted
技术标签:
【中文标题】使用 BigQuery Storage API(测试版)启动和读取多个流【英文标题】:Initiating and reading from multiple streams with the BigQuery Storage API (Beta) 【发布时间】:2019-05-18 06:36:46 【问题描述】:BigQuery Storage API (https://googleapis.github.io/google-cloud-python/latest/bigquery_storage/gapic/v1beta1/api.html) 在从 BigQuery 表中读取数据方面非常有用,几乎比标准 BigQuery API 快 10 倍。为了让它更快,它支持多个读取流,每个读取流从相关表中读取一组动态分配的行。
我的问题是:虽然您可能会请求多个流,但请求后分配的流不在您的控制范围内。因此,我无法启动超过 1 个流。
我正在读取的数据由 3 列和 600 万行组成,如下所示。我将创建的流总数打印到控制台。
from google.cloud import bigquery_storage_v1beta1
project_id = 'myproject'
client = bigquery_storage_v1beta1.BigQueryStorageClient()
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "usa_names"
table_ref.table_id = "usa_1910_current"
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("year")
read_options.selected_fields.append("name")
read_options.selected_fields.append("number")
# I request 3 streams to be created!
requested_streams = 3
parent = "projects/".format(project_id)
session = client.create_read_session(
table_ref, parent, table_modifiers=modifiers, read_options=read_options,
requested_streams=requested_streams
)
response = client.batch_create_read_session_streams(session, requested_streams)
# I see only 1 stream being created.
print("Streams created: " + str(len(session.streams)))
print("Stream names array: " + str(session.streams))
reader = client.read_rows(
bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0])
)
rows = reader.rows(session)
names = set()
import time
start = time.time()
#---------------------------------------------------
i=0
for row in rows:
i += 1
names.add(row["name"])
if i > 6000000:
break
#---------------------------------------------------
end = time.time()
print(end - start)
print("Got unique names and total rows.".format(len(names), i))
我有几个问题:
1) 我是否只看到 1 个流,因为多流实现不完整(API 处于 Beta 版本)?
2) 我是否只看到 1 个流,因为流分配算法的数据相对“小”? 6m 行已经相当大了。
3) 如果我开始看到创建了多个流,API 文档没有描述如何并行读取这些流。关于如何做到这一点的任何想法?
【问题讨论】:
这个问题被否决了。如果您不赞成,请告诉我可以改进的地方。 【参考方案1】:问题是您正在读取的表只有一个可用的输入文件。虽然它有 600 万行,但数据是高度可压缩的,因此数据只有一个后备列文件。目前,存储 API 不会比这更精细地拆分数据。
如果您检查从该表中 SELECT 的查询计划,您会看到相同的内容(只有一个输入)。
【讨论】:
以上是关于使用 BigQuery Storage API(测试版)启动和读取多个流的主要内容,如果未能解决你的问题,请参考以下文章
通过 Spark 使用 BigQuery Storage API:请求多个分区但仅获得 1 个
Bigquery API:如何为 load_table_from_storage 调用提供架构
如何使用 API 存储在 Google Cloud Storage 中的架构文件在 BigQuery 加载作业上设置架构?
BigQuery Storage API 无法读取由有序 (ORDER BY) 查询创建的临时表
bigquery storage API:是不是可以将 AVRO 文件直接流式传输/保存到 Google Cloud Storage?