流式传输之前的 BigQuery 表截断不起作用
Posted
技术标签:
【中文标题】流式传输之前的 BigQuery 表截断不起作用【英文标题】:BigQuery table truncation before streaming not working 【发布时间】:2016-04-25 16:53:37 【问题描述】:我们正在使用 BigQuery Python API 来运行一些分析。为此,我们创建了以下适配器:
def stream_data(self, table, data, schema, how=None):
r = self.connector.tables().list(projectId=self._project_id,
datasetId='lbanor').execute()
table_exists = [row['tableReference']['tableId'] for row in
r['tables'] if
row['tableReference']['tableId'] == table]
if table_exists:
if how == 'WRITE_TRUNCATE':
self.connector.tables().delete(projectId=self._project_id,
datasetId='lbanor',
tableId=table).execute()
body =
'tableReference':
'tableId': table,
'projectId': self._project_id,
'datasetId': 'lbanor'
,
'schema': schema
self.connector.tables().insert(projectId=(
self._project_id),
datasetId='lbanor',
body=body).execute()
else:
body =
'tableReference':
'tableId': table,
'projectId': self._project_id,
'datasetId': 'lbanor'
,
'schema': schema
self.connector.tables().insert(projectId=(
self._project_id),
datasetId='lbanor',
body=body).execute()
body =
'rows': [
'json': data,
'insertId': str(uuid.uuid4())
]
self.connector.tabledata().insertAll(projectId=(
self._project_id),
datasetId='lbanor',
tableId=table,
body=body).execute(num_retries=5)
connector
只是构建对象。
它的主要目的是将数据流式传输到给定的表。如果表已经存在并且“如何”输入作为“WRITE_TRUNCATE”传递,则首先删除并再次创建表。 之后,继续做数据流。
当表没有被一遍又一遍地删除时,一切正常。
例如,这是我们在不模拟写入截断选项的情况下运行脚本时的结果(for
循环不断调用stream_data
和how=None
):
[
"date": "2016-04-25",
"unix_date": "1461606664981207",
"init_cv_date": "2016-03-12",
"end_cv_date": "2016-03-25",
"days_trained": "56",
"days_validated": "14",
"navigated_score": "1",
"carted_score": "3",
"purchased_score": "10",
"description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
"metric": "rank",
"result": "0.31729249914663893"
,
"date": "2016-04-25",
"unix_date": "1461606599745107",
"init_cv_date": "2016-03-06",
"end_cv_date": "2016-03-25",
"days_trained": "80",
"days_validated": "20",
"navigated_score": "1",
"carted_score": "3",
"purchased_score": "10",
"description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
"metric": "rank",
"result": "0.32677143128667446"
,
"date": "2016-04-25",
"unix_date": "1461606688950415",
"init_cv_date": "2016-03-14",
"end_cv_date": "2016-03-25",
"days_trained": "48",
"days_validated": "12",
"navigated_score": "1",
"carted_score": "3",
"purchased_score": "10",
"description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
"metric": "rank",
"result": "0.3129267723358932"
,
"date": "2016-04-25",
"unix_date": "1461606707195122",
"init_cv_date": "2016-03-16",
"end_cv_date": "2016-03-25",
"days_trained": "40",
"days_validated": "10",
"navigated_score": "1",
"carted_score": "3",
"purchased_score": "10",
"description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
"metric": "rank",
"result": "0.310620987663015"
,
"date": "2016-04-25",
"unix_date": "1461606622432947",
"init_cv_date": "2016-03-08",
"end_cv_date": "2016-03-25",
"days_trained": "72",
"days_validated": "18",
"navigated_score": "1",
"carted_score": "3",
"purchased_score": "10",
"description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
"metric": "rank",
"result": "0.32395802949369296"
]
但是当我们使用带有输入 how="WRITE_TRUNCATE" 的同一个适配器时,它的行为发生了变化并且变得不可预测。
有时它会起作用并将数据保存到表中。但有时,即使没有引发错误,也不会将数据保存到表中。
尝试查询表时,未返回任何数据。它只返回“查询返回零结果”。
删除表、重新创建表和流式传输数据时出现问题。我们是不是搞错了?
如果您需要更多信息,请告诉我。提前致谢!
【问题讨论】:
【参考方案1】:查看 Jordan Tigani 的回答和 Sean Chen 对 https://***.com/a/36417177/132438(均为 BigQuery 工程师)的评论。
总结是:
重新创建或截断表时“您需要等待 >2 分钟才能进行流式传输,以避免数据丢失。这可以解释为什么你会得到这种不确定的行为。
【讨论】:
非常感谢霍法!然后我会调整我们的脚本。以上是关于流式传输之前的 BigQuery 表截断不起作用的主要内容,如果未能解决你的问题,请参考以下文章
如果我在流式传输之前先删除表并创建表,Google BigQuery Streaming 有时会失败
BigQuery 流式传输和分区:_PARTITIONTIME 何时真正评估?
将 BigQuery 表流式传输到 Google Pub/Sub
使用现有架构将表数据从一个 BigQuery 表流式传输到另一个