Google-cloud-dataflow:无法通过带有“BigQueryDisposition.WRITE_TRUNCATE”的“WriteToBigQuery/BigQuerySink”将 jso
Posted
技术标签:
【中文标题】Google-cloud-dataflow:无法通过带有“BigQueryDisposition.WRITE_TRUNCATE”的“WriteToBigQuery/BigQuerySink”将 json 数据插入到 bigquery【英文标题】:Google-cloud-dataflow: Failed to insert json data to bigquery through `WriteToBigQuery/BigQuerySink` with `BigQueryDisposition.WRITE_TRUNCATE` 【发布时间】:2018-09-10 09:21:17 【问题描述】:给定数据集如下
"slot":"reward","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42544
"slot":"reward_dlg","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42545
...more type json data here
我尝试过滤这些 json 数据并使用 python sdk 将它们插入到 bigquery 中,如下所示
ba_schema = 'slot:STRING,result:INTEGER,play_type:STRING,level:INTEGER'
class ParseJsonDoFn(beam.DoFn):
B_TYPE = 'tag_B'
def process(self, element):
text_line = element.trip()
data = json.loads(text_line)
if data['type'] == 'ba':
ba = 'slot': data['slot'], 'result': data['result'], 'p_type': data['p_type'], 'level': data['level']
yield pvalue.TaggedOutput(self.B_TYPE, ba)
def run():
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='data/path/data',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DirectRunner',
'--project=project-id',
'--job_name=data-job',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input)
multiple_lines = (
lines
| 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
ParseJsonDoFn.B_TYPE)))
b_line = multiple_lines.tag_B
(b_line
| "output_b" >> beam.io.WriteToBigQuery(
'temp.ba',
schema = B_schema,
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
))
并且调试日志显示
INFO:root:finish <DoOperation output_b/WriteToBigQuery output_tags=['out'], receivers=[ConsumerSet[output_b/WriteToBigQuery.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
DEBUG:root:Successfully wrote 2 rows.
这两个带有type:ba
的数据似乎被插入到bigquery 表temp.ba
中。但是,我运行
select * from `temp.ba` limit 100;
此表中没有数据temp.ba
。
我的代码有什么问题或者我遗漏了什么吗?
更新:
感谢@Eric Schmidt 的回答,我知道初始数据可能会有一些滞后。但是,运行上述脚本 5 分钟后,表格中还没有没有数据。
当我尝试删除 BigQuerySink
中的 write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE
时
| "output_b" >> beam.io.Write(
beam.io.BigQuerySink(
table = 'ba',
dataset = 'temp',
project = 'project-id',
schema = ba_schema,
#write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
))
可以立即找到这两条记录。
而表信息是
也许我还没有理解初始数据可用性滞后的含义。谁能给我更多的信息?
【问题讨论】:
【参考方案1】:需要考虑的两件事:
1) Direct(本地)运行器使用流式插入。 see this post 存在初始数据可用性延迟。
2) 确保您完全符合您正在流入的项目。使用 BigQuerySink() project="foo",dataset="bar",table="biz"。
我怀疑你的问题是 #1。
【讨论】:
当我用BigQuerySink
注释这行`#write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,`时,数据可以成功插入到bigquery中。
我还遇到了另一个管道问题,管道在我的代码中运行了两次?对我的问题很清楚,另一个问题***.com/questions/52270674/… 是帖子
当您删除截断选项时,该表不会被删除,因此数据会立即显示。写入已删除的表名有 150 秒的延迟。我现在看看你的另一个帖子。
感谢您的耐心,我再次更新了我的问题。以上是关于Google-cloud-dataflow:无法通过带有“BigQueryDisposition.WRITE_TRUNCATE”的“WriteToBigQuery/BigQuerySink”将 jso的主要内容,如果未能解决你的问题,请参考以下文章
Google-cloud-dataflow:无法通过带有“BigQueryDisposition.WRITE_TRUNCATE”的“WriteToBigQuery/BigQuerySink”将 jso