根据唯一 ID 列表删除 Big Query 表中的批量行

Posted

技术标签:

【中文标题】根据唯一 ID 列表删除 Big Query 表中的批量行【英文标题】:Delete bulk rows in Big Query table based on list of unique ids 【发布时间】:2021-01-25 18:00:39 【问题描述】:

所以我尝试用这样的简单查询删除 Big Query 表中的一些行:

client = bigquery.Client()
query = "DELETE FROM Sample_Dataset.Sample_Table WHERE Sample_Table_Id IN ".format(primary_key_list)
query_job = client.query(query, location="us-west1")

primary_key_list 是一些 python 列表,其中包含 Sample_Table 唯一 ID 的列表,例如:[123, 124, 125, ...]

我检索到的小数据效果很好,但是当 primary_key_list 增长时,它给了我一个错误:

查询过大。最大标准 SQL 查询长度为 1024.00K 个字符,包括 cmets 和空白字符。

我意识到查询将足够长以达到最大查询长度,并且在搜索堆栈溢出后我发现有一个使用参数化查询的解决方案,所以我将我的代码更改为:

client = bigquery.Client()
query = "DELETE FROM Sample_Dataset.Sample_Table WHERE Sample_Table_Id IN UNNEST(@List_Sample_Table_Id)"
job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ArrayQueryParameter("List_Sample_Table_Id", "INT64", primary_key_list),
    ]
)
query_job = client.query(query, job_config=job_config)

它停止给我最大标准 SQL 查询长度,但返回另一个错误异常,知道从 Big Query 中删除批量行吗?

我不知道这些信息是否有用,但是,我正在谷歌云 Dataproc 上运行这个 python 代码,这只是我最近添加的一些函数,在我添加这个函数之前一切正常,这里是我使用参数化查询运行删除得到的日志。

Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 320, in _send_until_done
    return self.connection.send(data)
  File "/opt/conda/default/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1737, in send
    self._raise_ssl_error(self._ssl, result)
  File "/opt/conda/default/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1639, in _raise_ssl_error
    raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (32, 'EPIPE')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py", line 354, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1244, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1290, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1239, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1065, in _send_output
    self.send(chunk)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 987, in send
    self.sock.sendall(data)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 331, in sendall
    sent = self._send_until_done(data[total_sent:total_sent + SSL_WRITE_BLOCKSIZE])
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 326, in _send_until_done
    raise SocketError(str(e))
OSError: (32, 'EPIPE')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/default/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py", line 638, in urlopen
    _stacktrace=sys.exc_info()[2])
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/util/retry.py", line 368, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/packages/six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/connectionpool.py", line 354, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1244, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1290, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1239, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 1065, in _send_output
    self.send(chunk)
  File "/opt/conda/default/lib/python3.7/http/client.py", line 987, in send
    self.sock.sendall(data)
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 331, in sendall
    sent = self._send_until_done(data[total_sent:total_sent + SSL_WRITE_BLOCKSIZE])
  File "/opt/conda/default/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 326, in _send_until_done
    raise SocketError(str(e))
urllib3.exceptions.ProtocolError: ('Connection aborted.', OSError("(32, 'EPIPE')"))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmp/a05a15822be04a9abdc1ba05e317bb2f/ItemHistory-get.py", line 92, in <module>
    delete_duplicate_pk()
  File "/tmp/a05a15822be04a9abdc1ba05e317bb2f/ItemHistory-get.py", line 84, in delete_duplicate_pk
    query_job = client.query(query, job_config=job_config, location="asia-southeast2")
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 2893, in query
    query_job._begin(retry=retry, timeout=timeout)
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/job/query.py", line 1069, in _begin
    super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/job/base.py", line 438, in _begin
    timeout=timeout,
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 643, in _call_api
    return call()
  File "/opt/conda/default/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
    on_error=on_error,
  File "/opt/conda/default/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target
    return target()
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py", line 434, in api_request
    timeout=timeout,
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py", line 292, in _make_request
    method, url, headers, data, target_object, timeout=timeout
  File "/opt/conda/default/lib/python3.7/site-packages/google/cloud/_http.py", line 330, in _do_request
    url=url, method=method, headers=headers, data=data, timeout=timeout
  File "/opt/conda/default/lib/python3.7/site-packages/google/auth/transport/requests.py", line 470, in request
    **kwargs
  File "/opt/conda/default/lib/python3.7/site-packages/requests/sessions.py", line 533, in request
    resp = self.send(prep, **send_kwargs)
  File "/opt/conda/default/lib/python3.7/site-packages/requests/sessions.py", line 646, in send
    r = adapter.send(request, **kwargs)
  File "/opt/conda/default/lib/python3.7/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', OSError("(32, 'EPIPE')"))

【问题讨论】:

根据google群里的这个post,可以尝试升级包requests或者/和OpenSSL的版本 @blackbishop 已经尝试升级请求并且可以确认以前的错误日志已经消失,但是由于我需要删除或更新批量行(比如说 500.000+ 行),我发现使用合并语句更多适合我的情况(我需要使用刚刚检索的数据删除或更新大查询表中的数据):cloud.google.com/bigquery/docs/reference/standard-sql/… 很高兴它帮助解决了上述错误;)是的,合并是为了组合插入/更新/删除操作。 @chairul 您可以将您的解决方案作为答案而不是评论发布吗? @IgorDvorzhak 感谢您的提醒,已作为答案发布 【参考方案1】:

正如@blackbishop 提到的那样,您可以尝试将请求升级到最新版本(在我的情况下它解决了问题),但是由于我尝试更新批量行(比如说 Big Query 中的 500.000+ 行,每行都有一个唯一的 id),结果它让我使用了我使用的小型机器类型 Dataproc 集群超时(如果有人有资源尝试更好的 Dataproc 集群并成功,请随时编辑此答案)。

所以我使用此处记录的 Merge 语句: https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement

脚本会像这样更新现有行(假设我有检索到的新数据并已将其加载到 Staging_Sample_Dataset.Staging_Sample_Table):

def merge_data():
    client = bigquery.Client()
    query = """MERGE INTO Sample_Dataset.Sample_Table st
            USING Staging_Sample_Dataset.Staging_Sample_Table ss
            ON st.Sample_Table_Id = ss.Sample_Table_Id
            WHEN MATCHED UPDATE SET st.Your_Column = ss.Your_Column -- and the list goes on...
            WHEN NOT MATCHED THEN INSERT ROW
            """
    query_job = client.query(query, location="asia-southeast2")
    results = query_job.result()

或者我可以删除批量行并在此函数执行后调用另一个函数来加载:

def bulk_delete():
    client = bigquery.Client()
    query = """MERGE INTO Sample_Dataset.Sample_Table st
            USING Staging_Sample_Dataset.Staging_Sample_Table sst
            ON st.Sample_Table_Id = sst.Sample_Table_Id
            WHEN MATCHED THEN DELETE
            """
    query_job = client.query(query, location="asia-southeast2")
    results = query_job.result()

【讨论】:

【参考方案2】:

这个概念是循环你拥有的 id 的数组,然后对它们中的每一个运行删除

这只是结构,而不是实际代码。所以它可能需要的不仅仅是调整

set arrlength = ARRAY_LENGTH(primary_key_list)
client = bigquery.Client()
WHILE i < arrlength DO
   query = "delete from Sample_Dataset.Sample_Table WHERE Sample_Table_Id=primary_key_list[OFFSET(i)]"
   query_job = client.query(query, location="us-west1")
   set i = i+1;

END WHILE;

【讨论】:

但这不是每个循环 1 个 DML 操作吗? (如果我错了,请纠正我),如果是,那么我将根据此立即达到表操作限制:cloud.google.com/bigquery/quotas#standard_tables(任何人都可以确认这一点吗?因为最后一句话说:“计算的每日操作数因为配额包括 DML 语句,但 DML 语句不会因为这个限制而失败。") 老实说,我不知道这些限制,如果每笔交易都违反了这些限制,那么您可能需要联系他们,以便澄清它并寻求可能的方式来处理此问题

以上是关于根据唯一 ID 列表删除 Big Query 表中的批量行的主要内容,如果未能解决你的问题,请参考以下文章

BIG QUERY SQL:如何在具有相同唯一键但访问期间不同的访问中查找不同的重复集?

Big Query 中的表未分区

Google Big Query 的奇怪活动

在 Big Query 的表中查找特定条件的属性计数

Big Query Tables 中列的默认值

将存储过程的结果存储在 Google Big Query 表中