流式缓冲区 - Google BigQuery
Posted
技术标签:
【中文标题】流式缓冲区 - Google BigQuery【英文标题】:Streaming buffer - Google BigQuery 【发布时间】:2018-11-05 15:34:24 【问题描述】:我正在开发一个 Python 程序,以便像 Google Dataflow 模板一样使用。
我正在做的是从 PubSub 在 BigQuery 中写入数据:
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
(p
# This is the source of the pipeline.
| 'Read from PubSub' >> beam.io.ReadFromPubSub('projects/.../topics/...')
#<Transformation code if needed>
# Destination
| 'String To BigQuery Row' >> beam.Map(lambda s: dict(Trama=s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='Trama:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
)
p.run().wait_until_finish()
代码在本地运行,尚未在 Google Dataflow 中运行
这“有效”但不是我想要的方式,因为目前数据存储在 BigQuery 缓冲区流中,我看不到它(即使等待了一段时间)。
什么时候可以在 BigQuery 中使用? 为什么存储在缓冲流而不是“普通”表中?
【问题讨论】:
数据在那里,但您需要运行查询来检索它,而不是查看表预览,这不会显示流缓冲区中的数据。 谢谢。在发布 Q 之前,我尝试查看运行查询的数据:SELECT * FROMTable
LIMIT 1000 即使使用 C# 代码而不是使用 Google云控制台,但仍然丢失。我想这可能是因为我在本地运行这个管道,就像一个流管道它没有完成,所以要完成它我必须按下停止按钮。 Here 是我可以看到停止和排水之间的不同之处,但不确定这是否是问题
【参考方案1】:
在您的示例中,您创建了一个将数据流式传输到 BigQuery 的数据流。流式传输意味着 - 正如您所写的那样 - 数据不会立即到达其永久位置,而是在一段时间后(最多 2 小时),该状态实际上是流式传输缓冲区。在这种情况下,跑步者之间没有区别 - 您在本地 (DirectRunner) 或在云中 (DataflowRunner) 运行它 - 因为两种解决方案都使用云资源(直接写入云 BigQuery)。如果您使用模拟器进行本地开发,那是另一种情况(但据我所知,BQ 还没有)。
您可以在此处找到一篇很好的文章,了解此架构的外观以及流式传输到 BigQuery 的深入工作原理:https://cloud.google.com/blog/products/gcp/life-of-a-bigquery-streaming-insert。
您无法立即看到数据的原因是预览按钮可能适用于 BQ 的列式永久存储。
如果您想查看缓冲区中的数据,请使用如下查询:
SELECT * FROM `project_id.dataset_id.table_id` WHERE _PARTITIONTIME IS NULL
顺便说一下,查询缓冲区是免费的。
我希望它有助于解决问题。
【讨论】:
【参考方案2】:这就是问题所在:
beam.io.Write(beam.io.BigQuerySink
应该是:
beam.io.WriteToBigQuery
第一个在我从文件中读取时运行良好,第二个在我从 pub/sub 读取时运行良好
【讨论】:
0 我处于同样的情况,用户 beam.io.WriteToBigQuery 但数据仍停留在 BQ 流缓冲区中。使用 SELECT * FROM Table 但仍无法检索数据。必须等待数小时才能取回。使用 beam.io.WriteToBigQuery 是否解决了您的问题?以上是关于流式缓冲区 - Google BigQuery的主要内容,如果未能解决你的问题,请参考以下文章
Google bigquery中的提取工作者是否将数据保存到Columnar Storage遵循FIFO模式?