受 BigQuery 查询影响的行数

Posted

技术标签:

【中文标题】受 BigQuery 查询影响的行数【英文标题】:Number of rows affected by BigQuery query 【发布时间】:2020-05-25 02:31:33 【问题描述】:

我每天运行命令将新记录插入 BigQuery 表,并想记录每天插入的记录数。

我创建了一个 QueryJob 对象,其中包含一个 SELECT 查询和一个 destination 表。我将write_disposition 设置为WRITE_APPEND,以便将新数据追加到表中。

我找到了两个做类似事情的选项,但都没有达到我想要的效果:

query_job.num_dml_affected_rows:这只是返回 None,因为查询不使用 DML INSERT,而是附加到目标表。 query_job.result().total_rows:这将返回表中的总行数,而不是新行数。

我可以想出多种方法来达到预期的结果,但不确定最好的方法是什么:

将查询更改为 DML 插入 - 但这意味着创建动态 SQL 而不仅仅是使用目标表是一个 Python 变量。 将结果转储到临时表中,计算行数,然后追加数据 - 但这似乎效率低下。 计算查询前后的记录并记录增量 - 这可能会导致并行运行查询出现问题。

有没有最好的方法的建议?


跟进马斯喀特的回答,我认为当查询并行运行时这将不起作用:

Get number of rows: 1000 rows
Function to call queries in paralell:

 - Query 1 --> Adds 100 rows --> finishes 3rd --> Counts 1200 rows
 - Query 2 --> Adds 80 rows --> finishes 2nd --> Counts 1100 rows
 - Query 3 --> Adds 20 rows --> finishes 1st --> Counts 1020 rows

因为无法知道这些查询将完成哪个顺序(因为它们都是使用 multiprocessing 库并行调用的),所以我不确定如何知道每个查询添加了多少行?


更新 2

示例代码:

    ...

    # We compile a list of which datasets need to be loaded from
    brands = self._bq.select(f"Select brand,  gaDataset From self.BRAND_DATASET.self.BRAND_TABLE")
    brands = list(brands.iterrows())
    _, brands = zip(*brands)

    # Define the function for parallel population
    def populate_fn(brand):
        return self._populate(brand, self.predicates)

    logging.info("Populating daily stats for brands in parallel")
    error = self._parallel_apply(populate_fn, brands)
    if error is not None:
        return error

def _populate(self, brand, predicates):
    # We can't just call <bq_load_data> because we need to update the predicates for each brand
    predicates.add_predicate('gaDataset', brand['gaDataset'], operator="_")
    query_job = self._load_data(self.table_name, predicates=predicates)
    logging.info(f"Started for brand['gaDataset']: brand['brand']")

    self._run_query_job(query_job)
    logging.info(f"brand['gaDataset']: brand['brand'] is now populated.")

_populate 函数针对每个品牌并行运行。

predicates 只是一个处理如何修改 Jinja 模板化 SQL 的对象,具有来自主对象的一些常用参数,以及一些特定于品牌的参数。

_load_data 是一个函数,它实际加载带有适当参数的 Jinja 模板化 SQL,并构造并返回一个 QueryJob 对象。

【问题讨论】:

【参考方案1】:

有效且推荐的方法是在运行查询之前和之后计算记录。并行运行查询没有问题,因为我们可以等待查询作业完成,然后再检查更新的行数。我已经准备了如何检查新添加的行数的示例:

from google.cloud import bigquery

client = bigquery.Client()

# Define destination table.
table_id = "<PROJECT_ID>.<DATASET>.<TABLE>"

# Inspect the number of rows in the table before running the query.
table = client.get_table(table_id)
num_rows_begin = table.num_rows
print("Number of rows before running the query job: " + str(num_rows_begin))

sql = """
    SELECT word, word_count
    FROM `bigquery-public-data.samples.shakespeare`
    LIMIT 10
"""

job_config = bigquery.QueryJobConfig(destination=table_id, write_disposition="WRITE_APPEND")

# Make an API request.
query_job = client.query(sql, job_config=job_config)

# Wait for the job to complete.  
query_job.result()

# Inspect the number of newly added rows in the table after running the query.
# First way:
num_rows_end = query_job._query_results.total_rows - num_rows_begin
print("Loaded  rows into ".format(str(num_rows_end), table_id))

# Second way:
table = client.get_table(table_id)
print("Loaded  rows into ".format(table.num_rows - num_rows_begin, table_id))

如您所见,检查新添加的行数的方法很少。第一个是query_job的结果:query_job._query_results.total_rows,和query_job.result().total_rows基本一致。第二种方法获取有关项目中数据集的信息。这里重要的是,在检查行数之前,我们需要再次调用table = client.get_table(table_id) 方法。如果我们不这样做,系统将打印:Loaded 0 rows into table,因为它指的是在运行查询之前指定的行数。

希望以上信息对您有用。

【讨论】:

感谢 muscat,我只是不确定如何并行实现。 OP 编辑​​了进一步的解释。 感谢帖子中的进一步解释,现在更清楚了。如果我错了,请纠正我,您是否很少有将数据插入同一个 BQ 表的查询作业?您想知道每个查询添加了多少行? 是的,没错。我有一个 Jinja 模板化的 SQL 查询,我使用不同的参数(不同的数据源)并行加载和运行。所有查询都附加到同一个表,但同时运行;我想知道每个查询附加了多少行,以检查原始数据源中的数据是否丢失。 我明白了。至于现在,我建议您查看以下thread,我正在考虑的另一种方法是创建查询作业列表:querys = [query_1, query_2, query_3] 并通过检查数字来查询它们FOR 循环中的行数。但它不是平行的。

以上是关于受 BigQuery 查询影响的行数的主要内容,如果未能解决你的问题,请参考以下文章

postgresql函数-获取受更新查询影响的行数[重复]

如何在 WordPress 查询中获得受影响的行数?

受 Microsoft JDBC 驱动程序的 SELECT INTO 查询影响的行数

返回受 UPDATE 语句影响的行数

在 SQLite-WinRT 包装器中使用“更新”或“删除”查询获取受影响的行数

在 UPDATE 查询中使用 DateTime 或字符串参数时,Informix 11.7 返回 -1 作为受影响的行数