如何在 GCP 上执行数据非规范化?

Posted

技术标签:

【中文标题】如何在 GCP 上执行数据非规范化?【英文标题】:How can I perform data denormalization on GCP? 【发布时间】:2019-06-24 13:40:07 【问题描述】:

我正在尝试对 Bigquery 中名为 order、order_item 和 user 的三个表进行非规范化和连接。我已经有一个管道,用于使用 pubsub 和数据流更新这些表。现在我想创建一个管道,只要在提到的三个表中的任何一个发生更新或插入时,它就会更新一个新的非规范化表。 order 和 order_item 的这些更新必须同步,并且只有在 order 发生插入时才应加入用户(以便在创建新订单时显示用户状态)。

到目前为止,我想出了两个解决方案。

一种方法是在通过数据流读取来自 pubsub 的消息时捕获每个表上的更改,然后使用非规范化表中的最新相应记录进行丰富。最后,旧记录被新记录替换。 另一个是查询 order 和 order_item 表以获取更新或新插入的行,然后使用 bigquery sql 将它们连接起来,从而将结果附加到非规范化表中。此作业在 Airflow 的帮助下定期运行。

订购

id(主键) 上次更新时间 created_at user_id(外键)

======================

order_item

id(主键) 上次更新时间 created_at order_id(外键)

======================

用户

id(主键) 上次更新时间 created_at

我对数据流不是很熟悉,也找不到任何说明如何使用它完成工作的教程或示例(尽管有示例代码显示如何完成 ETL 操作,但没有解决方案同步问题)。有没有我可以研究的例子,哪种方法看起来更有效?

【问题讨论】:

对您的问题有一些疑问: 1. 您是否有三个 pubsub 主题(用户、订单、订单项)? 2. 你想让 order 和 order_item 同步更新。这是什么意思? 3.order和order_item表有新更新后,你想order_item加入order_id,然后加入user表,对吧? 1.对于所有三个表,我只有一个 pubsub 主题。 2.我的意思是例如在决赛桌中,不应该有过期的订单记录与其他表格连接导致表格错误。新添加的行必须始终包含除 user 之外的每个表的最新值。 3. 是的。 【参考方案1】:

我想以此作为开头,BigQuery 不是一个事务性数据库,因此在事后尝试保持一致性将非常困难。在这种情况下,我建议使用 Cloud Spanner 或 Cloud SQL(请参阅Quora post 了解两者之间的区别)。例如,这在 Cloud Spanner 中非常简单。有transactions 的概念,您可以在其中保持非规范化表在任何给定时间与其他表完美同步。

从好的方面来说,如果您对非规范化表可能与其他表不同步没有问题,那么有更简单的解决方案。

在这种情况下,我假设切换到另一种存储产品的成本过高,并且表可能不同步是可以的。如果您需要进行批处理或流式数据分析,Cloud Dataflow 是一款很棒的产品,但是在像您这样的用例(基于事件的处理)中使用 API 会变得很尴尬。如果您想继续使用 Dataflow,您的第一个解决方案似乎是最好的,但我实际上建议使用类似 Cloud Functions 的东西。

设置如下所示:

    Pub/Sub 写入数据流 Dataflow 将更新的行写入 BigQuery Dataflow 写入包含增量的 Pub/Sub 消息(例如,将行 X 插入 order,更新 order_item 中的 Y 行)。 拥有一个在 Pub/Sub 订阅上触发的云函数,该订阅具有您指定的逻辑以从规范化表中读取正确的行,然后写入非规范化表。

您的云函数可能类似于以下内容(用 javascript 编写),灵感来自 here 和 here。 :

// Import the Google Cloud client library
const BigQuery = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
const dataset = bigquery.dataset('my-dataset');
const orders = dataset.table('orders');
const order_items = dataset.table('order_items');
const users = dataset.table('users');
const denorm = dataset.table('denormalized');

function GetOrder(order_id) 
  let [order] = await orders.row(order_id);
  return order;


function GetOrderItem(...)  ... 
function GetUser(...)  ... 

/**
 * HTTP Cloud Function.
 *
 * @param Object req Cloud Function request context.
 * @param Object res Cloud Function response context.
 */
exports.get = (req, res) => 
  const method = req.params.method;
  const table = req.params.table;

  let query = '';
  if (method === 'insert' && table === 'order') 
    let order = GetOrder(req.params.order_id);
    let order_item = GetOrderItem(order.id);
    let user = GetUser(order.user_id);
    denorm.insert(
      ORDER: order.my_data,
      ORDER_ITEM: order_item.my_data,
      USER: user.my_data
    );
   else if ( ... )  ... 

How to insert How to query

【讨论】:

以上是关于如何在 GCP 上执行数据非规范化?的主要内容,如果未能解决你的问题,请参考以下文章

如何在Firebase中编写非规范化数据

如何在 Firebase 中写入非规范化数据

如何在 javascript 中最有效地对规范化数据进行非规范化

Cassandra 非规范化数据模型

AngularFire - 如何查询非规范化数据?

尝试了解在我的场景中使用 firebase 数据库进行非规范化工作的效果如何