BigQuery,Python 批量插入 bigquery 以进行流式传输服务(“告诉”错误)

Posted

技术标签:

【中文标题】BigQuery,Python 批量插入 bigquery 以进行流式传输服务(“告诉”错误)【英文标题】:BigQuery, Python insert to bigquery in batches for streaming service ('tell' error) 【发布时间】:2020-03-11 21:48:01 【问题描述】:

我正在调用一个 API,它返回大约 100,000 行 JSON 对象。我正在尝试使用 Load config 进行加载,但出现错误。我正在加载一个带有字典的列表,然后使用 json.dumps 将 Python 对象转换为 JSON 对象。我还附加到 BigQuery 中的现有表。

错误:AttributeError: 'str' object has no attribute 'tell'(发生在 job = bigquery_client.load_table_from_file)

我确实看到有加载 csv 或 json 文件的选项,但如果可能的话,我宁愿直接加载 json 对象。

def export_items_to_bigquery(self):

        job_config = bq.LoadJobConfig()
        job_config.autodetect = True
        job_config.source_format = 'NEWLINE_DELIMITED_JSON'
        json_delimit = '\n'.join([json.dumps(item) for item in JSONDict])
        table_id = '..'.format(self.project_id, self.dataset_id, self.table)
        table = bigquery_client.get_table(table_id)
        job = bigquery_client.load_table_from_file(
        json_delimit, table, job_config=job_config)

        job.result()
        print("Loaded  rows into :.".format(job.output_rows, self.dataset_id, self.table))

使用上面的 json_delimit 变量的 print 语句测试,下面是 JSON 换行结构。看起来不错,所以不知道为什么会出现错误。

"icao_address": "A69D03", "timestamp": "2020-03-11T17:27:36Z", "latitude": 32.878402, "longitude": -95.075067, "altitude_baro": 26000, "heading": 230.0, "ground_speed": 350.0, "vertical_rate": 0, "on_ground": false, "callsign": "JTL525", "tail_number": "N525RL", "collection_type": "terrestrial", "origin_airport_iata": "TXK", "destination_airport_iata": "ACT"
"icao_address": "A56643", "timestamp": "2020-03-11T17:27:36Z", "latitude": 33.391205, "longitude": -86.070355, "altitude_baro": 26700, "heading": 250.0, "ground_speed": 380.0, "vertical_rate": 890, "squawk_code": "5750", "on_ground": false, "callsign": "SKW3917", "tail_number": "N447SW", "collection_type": "terrestrial", "flight_number": "OO3917", "origin_airport_iata": "ATL", "destination_airport_iata": "MLU"

【问题讨论】:

【参考方案1】:

粘贴代码中的相关行:

        json_delimit = '\n'.join([json.dumps(item) for item in JSONDict])


        job = bigquery_client.load_table_from_file(
           json_delimit, table, job_config=job_config)

请注意,方法 load_table_from_file 需要 file,而不是 string。请提供文件而不是字符串。

【讨论】:

以上是关于BigQuery,Python 批量插入 bigquery 以进行流式传输服务(“告诉”错误)的主要内容,如果未能解决你的问题,请参考以下文章

在bigquery中以编程方式更新/插入数据

我应该为每一行插入调用 BigQuery 还是应该插入一个批量?

如何在 Python 中的 Json 中删除 Null:[]

使用 POST 请求和 Java 客户端库加载到 BigQuery 的任何示例?

使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS

如何在 BigQuery 中获取文件加载插入失败的插入记录