Bigquery 存储 API 多处理段错误

Posted

技术标签:

【中文标题】Bigquery 存储 API 多处理段错误【英文标题】:Bigquery Storage API Multiprocessing segfault 【发布时间】:2021-02-28 23:53:01 【问题描述】:

长期阅读,第一次海报。我正在使用 BigQuery Storage API Python 客户端库,但在使用 Python 多处理拆分读者时遇到了一些麻烦。

文档中有一条注释说:

因为这个客户端使用了grpcio库,所以共享实例是安全的 跨线程。在多处理场景中,最佳实践是 调用 os.fork() 之后创建客户端实例 multiprocessing.Pool 或 multiprocessing.Process.

我认为我这样做是正确的......但我一定不是。

这是我目前的代码。目标是读取多个并行流中的 BQ 表,然后将数据行写入单个 CSV 文件。创建完所有 CSV 文件后,我将执行一个简单的 cat 命令来组合它们。

附带说明一下,此代码实际上适用于小型 BigQuery 表,但在尝试下载大型 BQ 表时会因段错误而失败。

import faulthandler
faulthandler.enable()
from google.cloud.bigquery_storage import BigQueryReadClient
from google.cloud.bigquery_storage import types
import multiprocessing as mp
import psutil
import os
import sys
import csv
from datetime import datetime


def extract_table(i):

    client_in = BigQueryReadClient()
    reader_in = client_in.read_rows(session.streams[i].name, timeout=10000)

    rows = reader_in.rows(session)

    csv_file = "/home/user/sas/" + table_name + "_" + str(i) + ".csv"
    print(f"Starting at time datetime.now() for file csv_file")

    try:
        with open(csv_file, 'w') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
            if i == 0:
                writer.writeheader()
            else:
                pass
            for data in rows:
                # print(data)
                writer.writerow(data)
    except IOError:
        print("I/O error")

    print(f"Finished at time datetime.now() for file csv_file")
    return


if __name__ == '__main__':
    # Get input args
    project_id = sys.argv[1]
    db_name = sys.argv[2]
    table_name = sys.argv[3]

    n = len(sys.argv[4])
    a = sys.argv[4][1:n - 1]
    csv_columns = a.replace("'", '').split(', ')

    output_type = sys.argv[5]  # csv or sas
    bucket_root = sys.argv[6]

    # The read session is created in this project. This project can be
    # different from that which contains the table.
    client = BigQueryReadClient()

    table = "projects//datasets//tables/".format(
        project_id, db_name, table_name
    )

    requested_session = types.ReadSession()
    requested_session.table = table
    
    # This API can also deliver data serialized in Apache Arrow format.
    # This example leverages Apache Avro.
    requested_session.data_format = types.DataFormat.AVRO

    # We limit the output columns to a subset of those allowed in the table
    requested_session.read_options.selected_fields = csv_columns
    
    ncpus = psutil.cpu_count(logical=False)

    if ncpus <= 2:
        ncpus_buffer = 2
    else:
        ncpus_buffer = ncpus - 2

    print(f"You have ncpus cores according to psutil. Using ncpus_buffer cores")

    parent = "projects/".format(project_id)
    session = client.create_read_session(
        parent=parent,
        read_session=requested_session,
        max_stream_count=ncpus_buffer,
    )

    print(f"There are len(session.streams) streams")

    num_streams = int(len(session.streams))

    with mp.Pool(processes=ncpus_buffer) as p:
        result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)

使用以下命令样式调用代码:

python /home/user/sas/bq_extract_2.py gc-project-id dataset table "['column1', 'column2']" csv 'path/to/gcs/bucket'

同样,这适用于小表,有几次我已经让它在 50-100 GB 大小范围内的非常大的 BQ 表上工作。但是,大多数情况下,大型表失败并出现以下错误:

有 1000 个流 根据 psutil,您有 2 个内核。 从 2020-11-17 17:46:04.645398 开始使用 2 个内核作为文件 /home/user/sas/diag_0.csv

从 2020-11-17 开始 17:46:04.829381 文件 /home/user/sas/diag_1.csv

致命的 Python 错误:分段错误

线程 0x00007f4293f94700(最近调用优先):文件 "/home/user/anaconda3/envs/sas-controller/lib/python3.8/site-packages/grpc/_channel.py", channel_spin 文件中的第 1235 行 "/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py", 运行文件中的第 870 行 "/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py", _bootstrap_inner 文件中的第 932 行 "/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py", _bootstrap 中的第 890 行

线程 0x00007f42bc4c9740(最近调用优先):文件 "/home/user/anaconda3/envs/sas-controller/lib/python3.8/csv.py", _dict_to_list 文件中的第 151 行 "/home/user/anaconda3/envs/sas-controller/lib/python3.8/csv.py", writerow 文件“/home/user/sas/bq_extract_2.py”中的第 154 行,行 extract_table 文件中的 39 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py”, mapstar 文件中的第 48 行 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py”, 工作文件中的第 125 行 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/process.py”, 运行文件中的第 108 行 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/process.py”, _bootstrap 文件中的第 315 行 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/popen_fork.py”, _launch 文件中的第 75 行 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/popen_fork.py”, init 文件中的第 19 行 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/context.py”, _Popen 文件中的第 277 行 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/process.py”, 开始文件中的第 121 行 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py”, _repopulate_pool_static 文件中的第 326 行 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py”, _repopulate_pool 文件中的第 303 行 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py”, init 文件中的第 212 行 “/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/context.py”, 池文件“/home/user/sas/bq_extract_2.py”中的第 119 行,第 157 行 在模块中

编辑 1: 将 .read_rows 上的超时更新为 10000,以允许从 BQ 读取大型结果。还将 max_stream_count 更改为等于池将使用的核心数。这似乎对我的测试有很大帮助,但是当我在 Google Cloud Compute 实例上将其作为启动脚本运行时,控制台输出中仍然会出现段错误。

编辑 2: 我对此研究得越多,似乎就越不可能有效地将 Python 多处理与 Google BigQuery Storage API 结合使用。鉴于需要在调用 os.fork() 之后创建读取会话,我无法确保为各个进程分配正确的读取行数。每个会话都在与它所附加的 BQ 表创建自己的一对多(一个会话对多个流)关系,并且每个会话似乎在流中拆分表行的方式略有不同。

以一个包含 30 行的表为例,我们要使用 3 个进程导出该表,每个进程处理一个行流。格式化在移动设备上可能看起来很奇怪。

                       os.fork()

Process 1              Process 2              Process 3
Session1               Session2               Session3
*Stream1 - 10 rows     Stream1 - 8 rows       Stream1 - 9 rows
Stream2 - 10 rows      *Stream2 - 12 rows     Stream2 - 11 rows
Stream3 - 10 rows      Stream3 - 10 rows      *Stream3 - 10 rows

在此示例中,我们最终得到 32 个输出行,因为每个会话没有以完全相同的方式定义其流。

我尝试使用 threading(下面的代码)而不是进程,因为 gRPC 是线程安全的,所以这很有效。

# create read session here
    
# Then call the target worker function with one thread per worker
    for i in range(0, num_streams):
        t = threading.Thread(target=extract_table, args=(i,))
        t.start()

然而,最大的问题是使用 8 个线程所花费的时间与使用 1 个线程的时间一样长,并且无论现在如何,跨线程的总吞吐量似乎最高可达约 5 MB/s你使用了很多线程。

这与使用 进程 形成对比,后者的吞吐量似乎会随着工人的增加而线性扩展(我在某些测试中看到高达 ~100 MB/s)...在极少数情况下,我能够让它在没有段错误中断的情况下工作。这似乎只是纯粹的运气。

使用 1 个线程:

总时间:~ 3:11

使用 8 个线程:

总时间:~ 3:15

据我所知,使用多线程基本上没有速度优势。

如果有人对我遗漏的任何东西有任何想法,请告诉我!我希望能够让它发挥作用。我真的很喜欢 BQ Storage API 的功能(行过滤器、列选择、无导出限制),但在找到合适的方式分散读者之前,我们将无法使用它。

【问题讨论】:

看起来段错误发生在 CSV 模块中,而不是 BigQuery Storage API 客户端中。您是否碰巧知道导致此段错误发生的行的形状是否有什么特别之处? 嗨蒂姆,好问题。我得看一下,但据我所知,BQ API 只是返回一个字典数据流,该数据流由 CSV 模块转换为数据行。我对代码(上图)进行了一次编辑,将流数限制为等于机器上的内核数。这似乎有所帮助,但是当我在 Google Cloud Compute 上将其作为启动脚本运行时,控制台上仍然会出现段错误。 @TimSwast:我看不出数据有什么问题。我尝试删除 csv 代码以查看它是否导致问题,但我仍然遇到段错误。看起来你可以为谷歌工作?如果是这样,BQ 团队中是否有人碰巧有一个如何使用 mutilprocessing.Pool 读取多个流的示例?我会假设他们会这样做,因为它在文档中被称为用例……但没有示例。 googleapis.dev/python/bigquerystorage/latest/index.html 当您删除 CSV 代码时,段错误发生在哪里?另外,您是否尝试过使用 Arrow 代替 Avro?这可能是 fastavro 库中的错误。 @TimSwast 我也有这个想法,但是在所有表中动态处理它可能会变得很痛苦。让我看看是否可以编写一个使用公共 BQ 数据演示错误的脚本。这是令人沮丧的不可预测的。有时会发生段错误,有时不会。一旦我得到一个清理过的脚本,我会通过 GitHub 分享它。 【参考方案1】:

有一个带有 grpcio 的 known issue(由 google-cloud-bigquery-storage 库使用)和多处理。根据this code example,“工作子进程在任何 gRPC 服务器启动之前被分叉”。

由于您的工作负载主要受 I/O 限制,因此全局解释器锁不应成为主要的性能瓶颈。我建议使用线程来分配工作,done in the google-cloud-bigquery library 也是如此。

替换:

with mp.Pool(processes=ncpus_buffer) as p:
    result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)

与:

with concurrent.futures.ThreadPoolExecutor(max_workers=num_streams) as p:
    result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)

【讨论】:

感谢您的建议。不幸的是,我已经尝试过线程,并且它们似乎没有比非多线程实现提供任何速度改进。我尝试了 concurrent.futures.ThreadPoolExecutor 代码,它显示了相同的结果。我只能在它们实际工作的极少数情况下展示进程的线性速度改进。我同意你关于这是受 IO 限制的评估,所以我不确定为什么线程不会提高吞吐量。 Per github.com/grpc/grpc/blob/master/doc/fork_support.md 您可以在构造任何客户端对象之前尝试创建进程池。这应该允许进程在启动任何与 grpc 相关的后台线程之前分叉。 是的,我在这个例子之后给出了一个镜头:gist.github.com/tseaver/95c8f54416c4a5093b4ad2d4755ab819 问题是在 grpc 之前分叉进程会导致为每个进程创建一个新会话。我刚刚运行的一个大测试导致了 8,562,767,649 行输出,而它应该是 8,549,457,649。除非我们能想到其他解决方案,否则我可能不得不对每个表使用单独的过滤器来均匀地分解行。

以上是关于Bigquery 存储 API 多处理段错误的主要内容,如果未能解决你的问题,请参考以下文章

无法将表从 BigQuery 导出到 Google Cloud Storage

Bigquery 流式处理 API 超时错误

多线程中快速定位段错误位置

提取操作中的 BigQuery 错误:错误处理作业意外。请再试一次

400 Bad Request 错误尝试从云存储加载 bigquery 表

使用模板表的 BigQuery 流式插入 - 503 错误