BigQuery BQ.insert_rows_json 和 BQ.load_from_json 的区别?

Posted

技术标签:

【中文标题】BigQuery BQ.insert_rows_json 和 BQ.load_from_json 的区别?【英文标题】:Differences between BigQuery BQ.insert_rows_json and BQ.load_from_json? 【发布时间】:2021-03-17 08:41:33 【问题描述】:

我想将数据流式传输到 BigQuery 并且我正在考虑使用 PubSub + Cloud Functions,因为不需要转换(至少目前是这样),并且使用 Cloud Data Flow 感觉就像插入行有点过头了到一张桌子。我说的对吗?

数据使用 Python 脚本从 GCP 虚拟机流式传输到 PubSub,格式如下:

'SEGMENT':'datetime':'2020-12-05 11:25:05.64684','values':(2568.025,2567.03)

BigQuery 架构是 datetime:timestamp, value_A: float, value_B: float

我的问题是:

a) 我是否需要将其作为 json/dictionary 并将所有值作为字符串推送到 BigQuery 中,还是必须使用表的数据类型?

b) 使用BQ.insert_rows_jsonBQ.load_table_from_json 有什么区别,我应该使用哪一个来完成这项任务?


编辑:

我想要获取的实际上是一些资产的市场数据。说出大约 28 种乐器并捕捉它们的所有滴答声。平均每天,每个工具有约 60.k 次滴答,因此我们谈论的是每月约 3360 万次调用。 (目前)需要将它们插入表中以进行进一步分析。我目前不确定是否应该执行真正的流式传输或每批加载。由于项目还在做分析,我觉得不需要数据流,但应该使用 PubSub,因为它允许在时机成熟时更容易地扩展到数据流。这是我第一次实现流式传输管道,我正在使用我通过课程和阅读学到的所有知识。如果我有错误的方法,请纠正我:)。

我绝对想做的是,例如,当一个刻度和第 n 个刻度之间的价格差为 10 时,执行另一个插入到另一个表中。为此,我应该使用数据流还是云函数方法仍然有效吗?因为这就像一个触发条件。基本上,触发器是这样的:

if price difference >= 10:
     process all these ticks
     insert the results in this table

但我不确定如何实现此触发器。

【问题讨论】:

【参考方案1】:

除了 Marton (Pentium10) 的出色回答

a) 您可以在 BigQuery 中流式传输 JSON,即 VALID json。你的例子不是。关于类型,有一个根据您的模式的自动强制/转换。你可以看到这个here

b) 加载作业加载 GCS 中的文件或您放入请求中的内容。批处理是异步的,可能需要几秒钟或几分钟。此外,您仅限于1500 load per days and per table -> 每分钟 1 个作品(每天 1440 分钟)。加载作业有几个有趣的方面。

    首先,它是免费的! 您的数据会立即加载到正确的分区中,并且可以立即在分区中请求 如果加载失败,则不插入数据。因此,最简单的方法是重放文件而不用加倍值。

相反,流式作业将数据实时插入 BigQuery。当您有实时限制时(尤其是对于可视化、异常检测......),这很有趣。但也有不好的一面

    您被限制为 500k rows per seconds (in EU and US), 100k rows in other regions,并且每秒最大 1Gb 数据不是立即在分区中,而是在buffer name UNPARTITIONED for a while or up to have this buffer full. 中。因此,您在构建和测试实时应用程序时必须考虑到这种特殊性。 是not free。最便宜的区域是每 Gb 0.05 美元。

既然您已经意识到了这一点,请问问自己关于您的用例。

如果您需要实时(延迟少于 2 分钟),毫无疑问,流式传输非常适合您。 如果您每月只有几 Gb,流式传输也是最简单的解决方案,只需几美元 如果您有大量数据(每秒超过 1Gb),BigQuery 不是很好的服务,请考虑 BigTable(you can request with BigQuery as a federated table) 如果您的数据量很大(每分钟 1 或 2Gb),并且您的用例需要每分钟保持数据新鲜度+,您可以考虑特殊设计
    创建 PubSub 拉取订阅 创建一个 HTTP 触发的 Cloud Function(或 Cloud Run 服务),它会拉取订阅 1 分钟,然后将拉取的内容作为加载作业提交给 BigQuery(不需要文件,您可以将内存中的内容直接发布到 BigQuery) .然后优雅地存在 创建一个每分钟触发一次服务的 Cloud Scheduler。

编辑 1:

成本不应驱动您的用例。

如果目前仅用于分析,您只需想象每天触发一次您的工作以获取完整订阅。使用您的指标:60k 指标 * 28 个仪器 * 100 字节(24 + 内存丢失),您只有 168Mb。您可以将其存储在 Cloud Functions 或 Cloud Run 内存中并执行加载作业。

流媒体对于实时来说真的很重要!

流模式下的数据流将花费您每月至少 20 美元(1 个 n1-standard1 类型的小型工作人员。使用 Cloud Functions 在 BigQuery 中插入超过 1.5Gb 的流式插入。

最终,关于流式或批量插入的智能触发器,这实际上是不可能的,如果您更改逻辑,则必须重新设计数据摄取。但首先,只有当你的用例需要这个时!!

【讨论】:

感谢您非常完整的回答,@guillaume blaquiere。我添加了一个Edit 部分,我在其中更详细地解释了我在做什么。顺便说一句,每个tick 大约是 24 字节,流式传输费用至少为 1 kb,无论数据大小是否小于 1 kb 好吧,只加载当天的批次以执行分析是很有意义的。然后,在进入生产或测试阶段时,我应该将其更改为实际流式传输。是的,Data Flow 比使用云功能进行流式插入更昂贵。使用价格计算器,我得到流插入的成本为 1.64 美元,PubSub 约为 9.53 美元,云功能约为 18.45 美元,而仅数据流为 39.78 美元。您建议使用数据流还是使用云函数执行批量加载? Dataflow 更具可扩展性,Cloud Function 更便宜。现在,这是你的选择! 谢谢。由于是现阶段唯一的分析,云函数+调度器+pubsub就很有意义了。顺便说一句,我是reading,发布请求的最大数量是 1.000。这是否意味着在任何时候我都不能在任何主题中“存储”超过 1.000 条消息,或者只是我不能同时发布超过 1.000 条消息? PubSub 允许您公开每条消息和batching messages 的消息。在批量的情况下,您不能同时发送超过 1000 条消息【参考方案2】:

回答您的问题:

a) 您需要使用库的接受格式(通常是集合或格式化为表定义的 JSON 文档)推送到 BigQuery。

b) 要将数据添加到 BigQuery,您可以流式传输数据或加载文件。

对于您的示例,您需要流式传输数据,因此请使用“流式传输 api”方法 insert_rows* family。

【讨论】:

嗨@Pentium10,谢谢你的回答。所以,如果我理解得很好,我应该使用 BQ 表架构(timestamp, float, float) 的格式将数据推送到 BigQuery 表中,对吧?我有点困惑,因为如果我从 csv 文件加载数据,它会加载一堆由, 分隔的字符串,所以我认为我会很相似。 关于答案b,你告诉我使用insert_rows 方法,这是有道理的,但我仍然看不出这两种方法之间的区别。我问这个是因为我有一个云功能,第二种方法是从 GCS 存储桶加载数据,两者看起来都和我很相似。 每张桌子每天从文件加载 1500 个的限制。当心你是否在一天内过于频繁地运行它。对于流式插入,限制为 100 000 行/秒。从文件加载是免费的,插入流媒体需要一些费用。

以上是关于BigQuery BQ.insert_rows_json 和 BQ.load_from_json 的区别?的主要内容,如果未能解决你的问题,请参考以下文章

数据处理 - BigQuery 与 Data Proc+BigQuery

BigQuery:写入查询结果时使用 bigquery 作业的意外行为

Google BigQuery - 将数据流式传输到 BigQuery

Python BigQuery 脚本 bigquery.jobs.create 错误

阻止用户删除 BigQuery 表

Bigquery:在 Bigquery 中计算余额或重写 SQL 脚本