Python 2.7 和 GCP Google BigQuery:捕获文件加载错误?

Posted

技术标签:

【中文标题】Python 2.7 和 GCP Google BigQuery:捕获文件加载错误?【英文标题】:Python 2.7 & GCP Google BigQuery: Capturing file load errors? 【发布时间】:2017-11-16 13:23:55 【问题描述】:

全部,

我正在获取一些 Python 2.7 BiqQuery (BQ) 数据加载“操作就绪”,并且我正在努力以与我在过去。

在 BQ 中,我可以从中访问错误,示例如下:bigquery_client.load_table_from_uri.errors



    'reason': 'invalid', 
        'message': "Could not parse 'r2501' as int for field lineNum (position 0) starting at location 56708 ", 
        'location': 'gs://bucketNameHere/fake-data.csv' 
    'reason': 'invalid', 
        'message': 'CSV table references column position 2, but line starting at position:56731 contains only 2 columns.', 
        'location': 'gs://bucketNameHere/fake-data.csv'
    'reason': 'invalid', 
        'message': "Could not parse 'a' as int for field lineNum (position 0) starting at location 56734 ", 
        'location': 'gs://bucketNameHere/fake-data.csv'
    'reason': 'invalid', 
        'message': "Could not parse 'a' as int for field lineNum (position 0) starting at location 56739 ", 
        'location': 'gs://bucketNameHere/fake-data.csv'
    'reason': 'invalid', 
        'message': 'CSV table references column position 1, but line starting at position:56751 contains only 1 columns.', 
        'location': 'gs://bucketNameHere/fake-data.csv'


这很好,但我真的需要一些更好的信息,尤其是错误的行号,这是我遇到的主要问题。

在 Redshift 中:stl_loaderror_detail & stl_load_errors http://docs.aws.amazon.com/redshift/latest/dg/r_STL_LOAD_ERRORS.html

在 SnowflakeDB 中:load_history & TABLE(VALIDATE(table_name, job_id => '_last')); https://docs.snowflake.net/manuals/sql-reference/functions/validate.html

总而言之,我需要加载我能加载的数据(将我的 max_bad_records 设置得相当高),当记录失败时,我需要知道:

    加载的文件名(如果我执行通配符文件加载),这个 目前提供 发生错误的行号,目前未提供,但字节# 嵌入在消息中 - “从位置开始”或“位置:”。我真的需要行号作为一个独立的元素 错误消息,已提供此消息,当前消息已足够

任何指导将不胜感激。

谢谢,最好的...丰富

附言我将跟进包含我的加载脚本的评论,我认为我获取统计数据的方式可能对人们有所帮助,因为我花了一段时间才弄清楚。

附言 在 Linux 上运行并设置了 GOOGLE_APPLICATION_CREDENTIALS 蟒蛇2.7

库版本如下:

google-cloud==0.29.0 
google-cloud-bigquery==0.28.0
google-cloud-core==0.28.0




# load a table to bq from gcs with the schema
def load_table_from_gcs(dataset_name, table_name, schema, source, skip_leading_rows=1, source_format='CSV', max_bad_records=0, write_disposition='WRITE_EMPTY', project=None):
    try:

        # convert the schema json string to a list
        schemaList = convert_schema(schema)

        bigquery_client = bigquery.Client(project=project)
        dataset_ref = bigquery_client.dataset(dataset_name)
        table_ref = dataset_ref.table(table_name)
        table = bigquery.Table(table_ref, schema=schemaList)

        bigquery_client.create_table(table)

        job_id_prefix = "bqTools_load_job"
        job_config = bigquery.LoadJobConfig()
        job_config.create_disposition = 'NEVER'
        job_config.skip_leading_rows = skip_leading_rows
        job_config.source_format = source_format
        job_config.write_disposition = write_disposition

        if max_bad_records:
            job_config.max_bad_records = max_bad_records

        load_job = bigquery_client.load_table_from_uri(
            source, table_ref, job_config=job_config,
            job_id_prefix=job_id_prefix)

        # the following waits for table load to complete
        load_job.result()

        print("------ load_job\n")
        print("load_job: " + str(type(load_job)))
        print(dir(load_job))

        print("------ load_job.result\n")
        job_result = load_job.result
        print("job_result: " + str(type(job_result)))
        print(job_result)

        job_exception = load_job.exception
        job_id = load_job.job_id
        job_state = load_job.state
        error_result = load_job.error_result
        job_statistics = load_job._job_statistics()
        badRecords = job_statistics['badRecords']
        outputRows = job_statistics['outputRows']
        inputFiles = job_statistics['inputFiles']
        inputFileBytes = job_statistics['inputFileBytes']
        outputBytes = job_statistics['outputBytes']

        print("\n ***************************** ")
        print(" job_state:      " + str(job_state))
        print(" error_result:   " + str(error_result))
        print(" job_id:         " + str(job_id))
        print(" badRecords:     " + str(badRecords))
        print(" outputRows:     " + str(outputRows))
        print(" inputFiles:     " + str(inputFiles))
        print(" inputFileBytes: " + str(inputFileBytes))
        print(" outputBytes:    " + str(outputBytes))
        print(" type(job_exception):  " + str(type(job_exception)))
        print(" job_exception:  " + str(job_exception))
        print(" ***************************** ")

        print("------ load_job.errors \n")
        myErrors = load_job.errors
        # print("myErrors: " + str(type(myErrors)))
        for errorRecord in myErrors:
            print(errorRecord)

        print("------ ------ ------ ------\n")

        # TODO:  need to figure out how to get # records failed, and which ones they are
        # research shoed "statistics.load_job" - but not sure how that works

        returnMsg = 'load_table_from_gcs : '.format(dataset_name, table_name, source)

        return returnMsg

    except Exception as e:
        errorStr = 'ERROR (load_table_from_gcs): ' + str(e)
        print(errorStr)
        raise

【问题讨论】:

编辑主题以添加我的加载代码 我不确定这是否可能,因为这与 API 无关,而是 BQ 的后端作为响应返回的内容(一种可能是在 BQ 跟踪器上启动 new issue 并查看团队认为) 感谢 Willian,我添加了一张票 (69405901),如果我得到回复,我会继续发布此帖子。 【参考方案1】:

BigQuery 不报告错误行号的原因是文件被许多并行工作人员拆分和解析。假设一个工作人员负责文件的偏移量 10000~20000,它将寻找到 10000 并从那里开始解析。当它无法解析一行时,它只知道该行的起始偏移量。要知道它需要从文件开头扫描的行号。

您可以找出给定起始偏移量的行。您需要行号是否有特定原因?

【讨论】:

我需要原始记录和有关失败原因的信息。拥有此信息将让数据提供者了解他们在发送给我们之前应该对数据进行的更改,或者让我们对负载进行更改。非常标准的东西,什么失败了,为什么?如果我不能得到线#,没什么大不了的,它有用但不紧急。失败的数据及其原因是最重要的。 BigQuery 允许单个 CSV 行最大为 10MB。在错误消息中返回 10MB 行会非常低效。更糟糕的是在 100 个警告中返回它。而是返回一个偏移量,以便您可以从文件中检索失败的行。为什么已经在错误消息中? “无法将'r2501'解析为字段lineNum(位置0)的int”,就像您的问题一样? 从操作的角度来看,我需要知道哪些记录失败(来自哪些文件)以及原因。我也想知道失败行的行号,但这似乎很困难(尽管其他人正在这样做),请从操作的角度考虑这一点,您每晚都会收到数百个文件,提供不同的系统/表,哪些文件有错误,它们是什么(消息和失败的记录)。 感谢您的链接。看起来 Redshift 返回了字符(1024)行的样本。我们可能也可以这样做。我提交了功能请求issuetracker.google.com/issues/69640730。由于他们不需要 JSON 以换行符分隔,因此我假设他们按顺序而不是并行处理文件,因为扫描 1TB 文件以找出行号会非常昂贵。

以上是关于Python 2.7 和 GCP Google BigQuery:捕获文件加载错误?的主要内容,如果未能解决你的问题,请参考以下文章

通过 pip 安装的 Google Cloud SDK:“gsutil 需要 python 2.7”

如何从 Python 运行时云函数访问 Google Cloud Platform Firestore 触发器

如何在Python中更有效地审计GCP存储桶中的数千个对象

在 Google App Engine Datastore (python 2.7) 中存储 lambda 函数

在 Mac OS X 10.6 上使用带有 Python 2.7 的 Google App Engine SDK

使用 python 对 GCP 计算 API 端点进行身份验证