BigQuery,Python 批量插入 bigquery 以进行流式传输服务(“告诉”错误)
Posted
技术标签:
【中文标题】BigQuery,Python 批量插入 bigquery 以进行流式传输服务(“告诉”错误)【英文标题】:BigQuery, Python insert to bigquery in batches for streaming service ('tell' error) 【发布时间】:2020-03-11 21:48:01 【问题描述】:我正在调用一个 API,它返回大约 100,000 行 JSON 对象。我正在尝试使用 Load config 进行加载,但出现错误。我正在加载一个带有字典的列表,然后使用 json.dumps 将 Python 对象转换为 JSON 对象。我还附加到 BigQuery 中的现有表。
错误:AttributeError: 'str' object has no attribute 'tell'(发生在 job = bigquery_client.load_table_from_file)
我确实看到有加载 csv 或 json 文件的选项,但如果可能的话,我宁愿直接加载 json 对象。
def export_items_to_bigquery(self):
job_config = bq.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = 'NEWLINE_DELIMITED_JSON'
json_delimit = '\n'.join([json.dumps(item) for item in JSONDict])
table_id = '..'.format(self.project_id, self.dataset_id, self.table)
table = bigquery_client.get_table(table_id)
job = bigquery_client.load_table_from_file(
json_delimit, table, job_config=job_config)
job.result()
print("Loaded rows into :.".format(job.output_rows, self.dataset_id, self.table))
使用上面的 json_delimit 变量的 print 语句测试,下面是 JSON 换行结构。看起来不错,所以不知道为什么会出现错误。
"icao_address": "A69D03", "timestamp": "2020-03-11T17:27:36Z", "latitude": 32.878402, "longitude": -95.075067, "altitude_baro": 26000, "heading": 230.0, "ground_speed": 350.0, "vertical_rate": 0, "on_ground": false, "callsign": "JTL525", "tail_number": "N525RL", "collection_type": "terrestrial", "origin_airport_iata": "TXK", "destination_airport_iata": "ACT"
"icao_address": "A56643", "timestamp": "2020-03-11T17:27:36Z", "latitude": 33.391205, "longitude": -86.070355, "altitude_baro": 26700, "heading": 250.0, "ground_speed": 380.0, "vertical_rate": 890, "squawk_code": "5750", "on_ground": false, "callsign": "SKW3917", "tail_number": "N447SW", "collection_type": "terrestrial", "flight_number": "OO3917", "origin_airport_iata": "ATL", "destination_airport_iata": "MLU"
【问题讨论】:
【参考方案1】:粘贴代码中的相关行:
json_delimit = '\n'.join([json.dumps(item) for item in JSONDict])
job = bigquery_client.load_table_from_file(
json_delimit, table, job_config=job_config)
请注意,方法 load_table_from_file
需要 file
,而不是 string
。请提供文件而不是字符串。
【讨论】:
以上是关于BigQuery,Python 批量插入 bigquery 以进行流式传输服务(“告诉”错误)的主要内容,如果未能解决你的问题,请参考以下文章
我应该为每一行插入调用 BigQuery 还是应该插入一个批量?
如何在 Python 中的 Json 中删除 Null:[]
使用 POST 请求和 Java 客户端库加载到 BigQuery 的任何示例?