如何同步调用 google-bigquery 删除和插入 API?

Posted

技术标签:

【中文标题】如何同步调用 google-bigquery 删除和插入 API?【英文标题】:How can i call google-bigquery delete and insert API's synchronously? 【发布时间】:2019-01-11 12:52:12 【问题描述】:

我正在维护一个包含交易记录的数据库,这些记录会定期更改数据。

我有一个每半小时运行一次的 cron 从主数据库中提取最新事务并提供给我的 express 节点应用程序(我对节点很陌生),我正在删除与传入匹配的旧事务事务的订单号先插入到大查询表中。

在运行应用程序一天后,我的数据库中出现了重复的事务。即使在检查日志后,我也没有看到删除 api 在任何地方都失败了,不知道重复项是如何以及从哪里来的。

我正在使用 @google-cloud/bigquery: ^2.0.2 ,我正在使用 query api 将数据删除并插入到 bigquery 表中。

我曾尝试使用流式插入,但它不允许我删除最近插入的行,直到 90 分钟,这在我的情况下不起作用。

我的 index.js 让 orderNumbers = '';

                    rows.map(function (value) 
                        orderNumbers += "'" + value.Order_Number+ "',";
                    );

                    orderNumbers = orderNumbers.slice(0, -1);

                    await functions.deleteAllWhere('Order_Number', orderNumbers);

                        let chunkedRowsArray = _.chunk(rows, CONSTANTS.chunkSize);


                        let arrSize = chunkedRowsArray.length;
                        for (var i = 0; i < arrSize; i++) 
                            let insertString = '';

                            chunkedRowsArray[i].forEach(element => 
                                let values = '(';
                                Object.keys(element).forEach(function (key) 
                                    if (typeof element[key] == 'string') 
                                        values += '"' + element[key] + '",';
                                     else 
                                        values += element[key] + ",";
                                    
                                );
                                values = values.slice(0, -1);
                                values += '),';
                                insertString += values;
                            );
                            insertString = insertString.slice(0, -1);

                            let rs = await functions.bulkInsert(insertString,i);
                        

删除函数调用

await functions.deleteAllWhere('Order_Number', orderNumbers);

module.exports.deleteAllWhere = async (conditionKey, params) => 

const DELETEQUERY = `
DELETE FROM
\`$URI\` 
WHERE $conditionKey
IN
($params)`;

const options = 
    query: DELETEQUERY,
    timeoutMs: 300000,
    useLegacySql: false, // Use standard SQL syntax for queries.
;

// // Runs the query
return await bigquery.query(options);
;

类似地在插入函数中使用 200 块的值构建插入查询。

我需要编写一个同步节点程序,它先删除一些行,然后在成功删除行后插入新行。

我不知道这是由代码的异步性质引起的,还是 bigquery 出了问题,或者我从中获取数据的 存储过程 有问题。

抱歉这篇长篇文章我是节点和堆栈溢出的新手。

感谢任何帮助。

【问题讨论】:

合并选项能帮到你吗,查看这个链接:***.com/questions/51171517/… 【参考方案1】:

关于 BigQuery 集成,您应该以这样的方式构建您的数据流,以让 BigQuery 表中的每个新行。然后有只返回最新行的查询,如果您有一个按最新行排序的字段,这很容易做到。

您可以安排 BigQuery 查询来维护此清理数据的具体化表。所以最终你会得到两张表,一张你流到所有行中,一张被物化为只保留最新的。

【讨论】:

以上是关于如何同步调用 google-bigquery 删除和插入 API?的主要内容,如果未能解决你的问题,请参考以下文章

如何优化 google-bigquery 以从大数据表中查找最常见的类别?

如何在没有授权令牌的情况下从 python 脚本查询 google-bigquery 中的私有表?

jaro_winkle_distance 的 google-bigquery UDF

Google-Bigquery:整合聚合

google-bigquery 在查询结果中将日期格式设置为 mm/dd/yyyy

Google-BigQuery - CSV 文件的架构解析