构建AI前的数据准备,SQL要比Python强

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了构建AI前的数据准备,SQL要比Python强相关的知识,希望对你有一定的参考价值。

参考技术A

作为一名 Web 开发人员,我第一次与数据库和 SQL 产生交集是使用对象关系映射(ORM)。我使用的是 Django 查询集 API,这个界面用户体验很好。之后,我转向数据工程方向,更多地利用数据集来构建 AI。我的职责是从用户应用程序中获取数据,并将其转换为数据科学家可利用的内容,这一过程通常称为 ETL(extract, transform and load)。

随着产业发展,生产系统中的数据非常混乱,需要进行大量转换才能用于构建 AI。有些 JSON 列每行模式都不相同,有些列包含混合数据类型,有些行有错误值。此外,还需要计算「用户成为访问者的时间」以及「他们在两次访问间的等待时间」等特征。当我着手清理、聚合和管理数据特征时,我想确定哪种语言最适合该任务。在之前的工作中我每天都使用 Python,我知道它可以完成工作。但是,这次经历使我了解到,Python 可以完成一项任务并不意味着这个任务就应该使用 Python 来做。

我对 SQL 的第一个误解是:SQL 无法进行复杂的转换

我们正在处理一个时间序列数据集,我们希望能够跟踪特定用户。隐私法规不允许获取用户访问的具体日期,因此我们决定将记录日期归一化为用户首次访问的日期(如首次访问后 5 天等)。对于我们的分析,重要的是要知道离上次访问过去了多久以及离首次访问过去了多久。A 有两个样本数据集,一个有大约 750 万行,大小为 6.5 GB,另一个有 55 万行,大小为 900MB。

我使用下面的 Python 和 SQL 代码先在较小的数据集上测试转换。Python 和 SQL 分别花费 591 秒和 40.9 秒完成了任务。这意味着 SQL 的速度是 Python 的大约 14.5 倍!

SQL 转换不仅速度更快,而且代码也更易读,更易于维护。在这里,我使用 lag 和 first_value 函数来查找用户 历史 记录中的特定记录(即分区)。然后使用 age 函数来确定两次访问间的时间差。

更有趣的是,当这些转换脚本应用于 6.5 GB 的数据集时,Python 完全失败。在 3 次尝试中,Python 崩溃了 2 次,第三次我的计算机完全崩溃...... 而 SQL 只耗时 226 秒。

更多信息参见:

我对 SQL 的第一个误解是:SQL 无法扁平化不规则的 json

对我来说,另一个改变是我意识到 Postgres 可以很好地处理 json。我最初认为用 Postgres 扁平化或解析 json 是不可能的...... 我不敢相信自己竟然如此愚蠢。如果你想关联 json 并且它的模式在行间是一致的,那么最好的选择可能就是使用 Postgres 内置功能来解析 json。

另一方面,我的样本数据集中一半 json 不是有效的,因此存储为文本。在这种情况下,我要么重新编码数据使其有效,或者删除无效的行。为此,我创建了一个名为 is_json 的新 SQL 函数,然后使用该函数来验证 WHERE 子句中的 json 是否有效。

不幸的是,我发现 user_json 具有不同的模式,具体取决于用户所使用的 app 版本。虽然从应用程序开发的角度来看这是有道理的,但是有条件地解析每行的每种可能性代价是很高昂的。难道我的最终归宿还是 Python?不不不!我在 Stack Overflow 上找到了一个由 Postgres 大神编写的 klin 函数(https://stackoverflow.com/users/1995738/klin)。

这个函数能够成功地扁平化 json,轻松解决我的噩梦。

结语

有一种说法叫「Python 是做任何事情的第二好语言」。我相信这是真的,并且在某些情况下 Python 和「最好」语言之间的性能差异可以忽略不计。但是在本文介绍的情况下,Python 无法与 SQL 比肩。这些发现完全改变了我做 ETL 的方法。我现在的工作模式是「不要将数据移动到代码中,而是将代码移动到数据中」。Python 将数据移动到代码中,而 SQL 执行后者。更重要的是,我知道我只是触及了 SQL 和 postgres 的皮毛。我期待能发掘出更多出色的功能,使用分析库实现加速。

原文链接:https://towardsdatascience.com/python-vs-sql-comparison-for-data-pipelines-8ca727b34032

⭐openGauss数据库源码解析系列文章—— AI查询时间预测⭐

上一篇介绍了“8.5 指标采集、预测与异常检测”的相关内容,本篇我们介绍“8.6 AI查询时间预测”的相关精彩内容介绍。

8.6 AI查询时间预测

在前面介绍过“慢SQL发现”特性,该特性的典型场景是新业务上线前的检查,输入源是提前采集到的SQL流水数据。慢SQL发现功能主要主要应用在多条SQL语句的批量检查上,要求之前执行过SQL语句,因此给出的结果主要是定性的,在某些场景下可能难以满足用户对于评估精度的要求。
因此,为了弥补上述场景的不足,满足用户更精确的SQL时间预测需求,同时为AI优化器做铺垫,实现了本章所述的功能。
由于实际业务场景具有复杂的特质,现有的数据库静态代价估计模型往往统计结果失准,从而选择了一些执行计划较差的路径。因此,针对上述复杂场景,需要数据库的代价估计模型具备自我更新的能力。本特性主要功能为基于查询语句的历史数据,对当前执行的SQL语句进行查询耗时和基数的估算。

8.6.1 使用场景

AI查询分析的前提是需要获取执行计划。首先需要根据用户需求在查询执行时收集复杂查询实际查询计划(包括计划结构、算子类型、相关数据源、过滤条件等)、各算子节点实际执行时间、优化器估算代价、实际返回行数、优化器估算行数、SMP并发线程数、等信息。将其记录在数据表中,并进行持久化管理包括定期进行数据失效清理。
本功能主要分为两个方面,一个是行数估算,一个是查询预测,前者是后者预测好坏的前提。目前openGauss基于在线学习对执行计划各层的结果集大小进行估算,仅起到展示作用,并未影响到执行计划的生成。后续可帮助优化器更准确地进行结果集估算,从而获取更优的执行计划。
当前阶段本需求会提供系统函数来进行预测,并加入到explain中进行实际比较验证。

8.6.2 现有技术

当前学术界在AI4DB领域,对基于机器学习的行数估算和查询时延预测有许多尝试。

1. 传统方法

正如数据库优化器专家Guy Lohman在博客Is query optimization a “solved” problem中所说,传统数据库查询性能预测的“阿喀琉斯之踵”便是中间结果集大小的估算。对于行数估算传统基于统计信息行数估算方法主要基于三类假设。
(1) 数据独立分布假设。
(2) 均匀分布假设。
(3) 主外键假设。
而实际场景中数据往往存在一定的相关性和倾斜性,此时上述假设可能会被打破,导致传统数据库优化器在多表连接中间结果集大小估算中可能会存在数个数量级的误差。
2000年以来,以基于采样的估算、基于采样的核密度函数估算、基于多列直方图为代表的统计学方法被提出,用于解决数据相关性带来的估算问题。然而这些方法都存在一个共性问题,就是模型无法进行增量维护,而收集这些额外的统计信息会增加巨大的数据库维护开销,虽然在一些特定的问题场景(如多列Range条件选择率)取得了很大的准确率提升,但并没有被各大数据库厂商广泛采用。
传统性能预测方法主要依赖代价模型,在以下几个方面存在明显劣势。
(1) 准确性:随着底层硬件架构和优化技术不断演进,实际性能预测模型的复杂度远不可以用线性模型来建模。
(2) 可扩展性:代价模型的开发成本较高,不能面面俱到地对用户具体场景进行优化。
(3) 可校准性:代价模型灵活性仅局限于各资源维度线性相加时使用的系数,以及部分惩罚代价,灵活性较差,用户实际使用时难以校准。
(4) 时效性:代价模型依赖统计信息的收集和使用,目前缺乏增量维护方法,导致数据流动性较大的场景下统计信息长期处于失效状态。

2. 机器学习方法

机器学习模型在模型复杂度、可校准性、可增量维护性几个维度的优势能够弥补传统优化器代价模型的不足,基于机器学习的查询性能预测逐渐成为数据库学术界和产业界的主流研究方向之一。
除前文8.3节慢SQL发现部分介绍过相关方法外,清华大学的Learned Cost Estimator模型基于Multi-task Learning和字符条件的Word-Embedding方法进一步提升了预测准确率。
至此,机器学习方法虽然从实验效果上看达到了较高的准确率,但现实业务场景持续性的数据分布变化对模型的在线学习能力提出了要求。openGauss采用了数据驱动的在线学习模式,通过内核不断收集历史作业性能信息,并在AI Engine侧使用了R-LSTM(recursive long short term memory,递归长短期记忆网络)模型对算子级查询时延和中间结果集大小进行预测。

8.6.3 实现原理

图8-15 AI查询性能预测架构示意图

总体而言,查询性能预测由数据库内核侧和AI Engine侧两个部分组成,如图8-15所示。
(1) 数据库内核侧除提供数据库基本功能外还需要对历史数据进行收集和持久化管理,并通过curl向AI Engine侧发送HTTPS请求。
(2) AI Engine提供模型训练、执行预测、模型管理等接口,基于Flask框架的服务端接受HTTPS请求,该流程如图8-16所示。

图 8-16 数据库内核和AI Engine进程关系示意图

开启数据收集相关参数后(其对性能可能有5%左右的影响,取决于实际业务负载情况),历史性能数据被持久化收集在数据库的系统表中,用于模型的训练。
模型训练之前,用户需要对模型参数进行配置(详见8.6.5使用示例)。用户训练指令下发之后,内核进程会向AI Engine侧发送configure请求,用于初始化机器学习模型。configure流程时序如图8-17所示。

图8-17 configure流程时序图

模型配置成功后,内核进程向AI Engine侧发送train请求,触发训练,该流程如图8-18所示。

图8-18 train流程时序图 模型训练之后,用户下发预测指令,数据库会先向AI Engine侧发送setup请求,用于模型加载,加载成功后发送predict请求得到预测结果,如图8-19所示。 图8-19 模型预测完整流程时序图,分为setup和predict两个阶段 本特性架构上支持多模型,目前已实现R- LSTM模型,该模型架构如图8-20所示。 计划中,算子间的执行顺序也会影响算子的性能。基于这种特性,我们使用了LSTM神经网络模型来学习计划中算子间这种有意义的依赖关系,并根据行数/时间预测的场景对模型的结构、损失函数、优化算法等方面进行针对性的优化,提高此场景下学习和预测的准确率。 输入:查询计划树,各节点上的算子类型,对应表名列名以及过滤条件。 输出:行数、startup time、total time、Peak Memory。 在编码(encoding)阶段,每个计划节点(plan node)被编码成固定长度,连接成序列作为输入LSTM神经网络的特征值。 LSTM具有多个重复神经网络模块组成的链式网络,在每个模块中都有三个函数来决定历史时序中的哪些信息将被传递到下一个时序的网络模块中。最后一个模块的输出值h_t即为模型返回的预测结果。



其中,Хt是当前时序模块的输入,ht﹣₁是当前时序模块的输入,是前一个时序的输出信息,使用sigmoid(σ)函数得到当前细胞状态中将要输出的部分Οt;Ct表示所有历史时序保留的信息,通过tanh函数处理后与当前状态输出信息Οt相乘得到此状态的输出ht,将具有三个元素的一维向量 [startup time, total time, cardinality] 的预测结果同真实数据进行比较,使用ratio-error计算模型的损失函数。

图8-20 模型架构图

8.6.4 关键源码解析

1. 项目结构

AI Engine侧涉及的主要文件路径为openGauss-server/src/gausskernel/dbmind/tools/predictor,其文件结构如表8-13所示。

表8-13 AI Engine文件结构

文件结构

说明

install

部署所需文件路径

install/ca_ext.txt

证书配置文件

install/requirements-gpu.txt

使用GPU(graphics processing unit,图形处理器)训练依赖库列表

install/requirements.txt

使用CPU训练依赖库列表

install/ssl.sh

证书生成脚本

python

项目代码路径

python/certs.py

加密通信

python/e_log

系统日志路径

python/log

模型训练日志路径

python/log.conf

配置文件

python/model.py

机器学习模型

python/run.py

服务端主函数

python/saved_models

模型训练checkpoint

python/settings.py

工程配置文件

python/uploads

Curl传输的文件存放路径

内核侧主要涉及的文件路径为openGauss-server/src/gausskernel/optimizer/util/learn,其文件结构如表8-14所示。

表8-14 内核端主要文件结构

文件结构

说明

comm.cpp

通信层代码实现

encoding.cpp

数据编码

ml_model.cpp

通用模型调用接口

plan_tree_model.cpp

树状模型调用接口

2. 训练流程

内核侧的模型训练接口通过ModelTrainInternal函数实现,该函数的关键部分如下:

static void ModelTrainInternal(const char* templateName, const char* modelName, ModelAccuracy** mAcc)
{
  …
    /* 对于树形模型调用对应的训练接口 */
    char* trainResultJson = TreeModelTrain(modelinfo, labels);
    /* 解析返回结果 */
    …
    ModelTrainInfo* info = GetModelTrainInfo(jsonObj);
    cJSON_Delete(jsonObj);
    /* 更新模型信息 */
    Relation modelRel = heap_open(OptModelRelationId, RowExclusiveLock);
   …
    UpdateTrainRes(values, datumsMax, datumsAcc, nLabel, mAcc, info, labels);

    HeapTuple modelTuple = SearchSysCache1(OPTMODEL, CStringGetDatum(modelName));
   …
    HeapTuple newTuple = heap_modify_tuple(modelTuple, RelationGetDescr(modelRel), values, nulls, replaces);
    simple_heap_update(modelRel, &newTuple->t_self, newTuple);
CatalogUpdateIndexes(modelRel, newTuple);
…
}

内核侧的树状模型训练接口通过TreeModelTrain函数实现,核心代码如下:

char* TreeModelTrain(Form_gs_opt_model modelinfo, char* labels)
{
    char* filename = (char*)palloc0(sizeof(char) * MAX_LEN_TEXT);
    char* buf = NULL;
    /* configure阶段 */
    ConfigureModel(modelinfo, labels, &filename);

    /* 将编码好的数据写入临时文件 */
    SaveDataToFile(filename);

    /* Train阶段 */
    buf = TrainModel(modelinfo, filename);
    return buf;
}

AI Engine侧配置的Web服务的URI是/configure,训练阶段的URI是/train.下面的代码段展示了训练过程。

  def fit(self, filename):
        keras.backend.clear_session()
        set_session(self.session)
        with self.graph.as_default():
            # 根据模型入参和出参维度变化情况,判断是否需要初始化模型
            feature, label, need_init = self.parse(filename) 
            os.environ['CUDA_VISIBLE_DEVICES'] = '0'
            epsilon = self.model_info.make_epsilon()
            if need_init: # 冷启动训练
                epoch_start = 0
                self.model = self._build_model(epsilon)
            else: # 增量训练
                epoch_start = int(self.model_info.last_epoch)
                ratio_error = ratio_error_loss_wrapper(epsilon)
                ratio_acc_2 = ratio_error_acc_wrapper(epsilon, 2)
                self.model = load_model(self.model_info.model_path,
                                        custom_objects={'ratio_error': ratio_error, 'ratio_acc': ratio_acc_2})
            self.model_info.last_epoch = int(self.model_info.max_epoch) + epoch_start
            self.model_info.dump_dict()
            log_path = os.path.join(settings.PATH_LOG, self.model_info.model_name + '_log.json')
            if not os.path.exists(log_path):
                os.mknod(log_path, mode=0o600)
            # 训练日志记录回调函数
            json_logging_callback = LossHistory(log_path, self.model_info.model_name, self.model_info.last_epoch)
            # 数据分割
            X_train, X_val, y_train, y_val = \\
                train_test_split(feature, label, test_size=0.1)
            # 模型训练
            self.model.fit(X_train, y_train, epochs=self.model_info.last_epoch,
                           batch_size=int(self.model_info.batch_size), validation_data=(X_val, y_val),
                           verbose=0, initial_epoch=epoch_start, callbacks=[json_logging_callback])
            # 记录模型checkpoint
            self.model.save(self.model_info.model_path)
            val_pred = self.model.predict(X_val)
            val_re = get_ratio_errors_general(val_pred, y_val, epsilon)
            self.model_logger.debug(val_re)
            del self.model
            return val_re

3. 预测流程

内核侧的模型预测过程主要通过ModelPredictInternal函数实现。树状模型预测过程通过TreeModelPredict函数实现。内核侧的树状模型预测过程会占用一些与AI Engine进行通信的信令,该通信过程如下:

char* TreeModelPredict(const char* modelName, char* filepath, const char* ip, int port)
{
    …
    if (!TryConnectRemoteServer(conninfo, &buf)) {
        DestroyConnInfo(conninfo);
        ParseResBuf(buf, filepath, "AI engine connection failed.");
        return buf;
    }

    switch (buf[0]) {
        case '0': {
            ereport(NOTICE, (errmodule(MOD_OPT_AI), errmsg("Model setup successfully.")));
            break;
        }
        case 'M': {
            ParseResBuf(buf, filepath, "Internal error: missing compulsory key.");
            break;
        }
…
    }
    /* Predict阶段 */
    …
    if (!TryConnectRemoteServer(conninfo, &buf)) {
        ParseResBuf(buf, filepath, "AI engine connection failed.");
        return buf;
    }
    switch (buf[0]) {
        case 'M': {
            ParseResBuf(buf, filepath, "Internal error: fail to load the file to predict.");
            break;
        }
        case 'S': {
            ParseResBuf(buf, filepath, "Internal error: session is not loaded, model setup required.");
            break;
        }
        default: {
            break;
        }
    }
    return buf;
}

AI Engine侧的Setup过程的Web接口是/model_setup,预测阶段的Web接口是/predict,他们的协议都是Post。

4. 数据编码

数据编码分为以下两个维度。
(1) 算子维度:包括每个执行计划算子的属性,如表8-15所示。

表8-15 算子维度

属性名

含义

编码策略

Optname

算子类型

One-hot

Orientation

返回元组存储格式

One-hot

Strategy

逻辑属性

One-hot

Options

物理属性

One-hot

Quals

谓词

hash

Projection

返回投影列

hash

(2) 计划维度。
对于每个算子,在其固有属性之外,openGauss还对query id,plan node id和parent node id进行了记录,在训练/预测阶段,使用这些信息将算子信息重建为树状计划结构,且可以递归构建子计划树来进行数据增强,从而提升模型泛化能力。树状数据结构如图8-21所示。

图8-21 树状数据结构示意图 内核侧的树状数据编码通过GetOPTEncoding函数实现。

5. 模型结构

AI Engine的模型解析、训练和预测见8.6.4章节,下面的代码展示了模型的结构。

class RnnModel():
    def _build_model(self, epsilon):
        model = Sequential()
        model.add(LSTM(units=int(self.model_info.hidden_units), return_sequences=True, input_shape=(None, int(self.model_info.feature_length))))
        model.add(LSTM(units=int(self.model_info.hidden_units), return_sequences=False))
        model.add(Dense(units=int(self.model_info.hidden_units), activation='relu'))
        model.add(Dense(units=int(self.model_info.hidden_units), activation='relu'))
        model.add(Dense(units=int(self.model_info.label_length), activation='sigmoid'))
        optimizer = keras.optimizers.Adadelta(lr=float(self.model_info.learning_rate), rho=0.95)
        ratio_error = ratio_error_loss_wrapper(epsilon)
        ratio_acc_2 = ratio_error_acc_wrapper(epsilon, 2)
        model.compile(loss=ratio_error, metrics=[ratio_acc_2], optimizer=optimizer)
        return model

AI Engine的损失函数使用ratio error(部分文献中使用qerror代称),该损失函数相较于MRE和MSE的优势在于其能够等价地惩罚高估和低估两种情况,公式为:

ε声明为性能预测值的无穷小值,防止分母为0的情况发生。

8.6.5 使用示例

AI查询时间预测功能使用示例如下。
① 定义性能预测模型,代码如下:

INSERT INTO gs_opt_model VALUES(‘rlstm’, ‘model_name’, ‘host_ip’, ‘port’);

② 通过GUC参数开启数据收集,配置的参数列表,代码如下:

enable_resource_track = on;
enable_resource_record = on;

③ 编码训练数据,代码如下:

SELECT gather_encoding_info('db_name');

④ 校准模型,代码如下:

SELECT model_train_opt('template_name', 'model_name');

⑤ 监控训练状态,代码如下:

SELECT track_train_process('host_ip', 'port');

⑥ 通过explain + SQL语句来预测SQL查询的性能,代码如下:

EXPLAIN (..., predictor 'model_name') SELECT ...

获得结果,其中,“p-time”列为标签预测值。

Row Adapter  (cost=110481.35..110481.35 rows=100 p-time=99..182 width=100) (actual time=375.158..375.160 rows=2 loops=1)

8.6.6 演进路线

目前模型的泛化能力不足,依赖外置的AI Engine组件,且深度学习网络比较重,这会为部署造成困难;模型需要数据进行训练,冷启动阶段的衔接不够顺畅,后续从以下几个方面演进。
(1) 加入不同复杂度模型,并支持多模型融合分析,提供更健壮的模型预测结果和置信度。
(2) AI Engine考虑加入任务队列,目前仅支持单并发预测/训练,可以考虑建立多个服务端进行并发业务。
(3) 基于在线学习/迁移学习的增强,考虑对损失函数加入锚定惩罚代价来避免灾难遗忘问题,同时优化数据管理模式,考虑data score机制,根据数据时效性赋权。
(4) 将本功能与优化器深度结合,探索基于AI的路径选择方法。

感谢大家学习第8章 AI技术中“8.6 AI查询时间预测”的精彩内容,下一篇我们开启“8.7 DeepSQL”的相关内容的介绍。
敬请期待。

以上是关于构建AI前的数据准备,SQL要比Python强的主要内容,如果未能解决你的问题,请参考以下文章

python入门前的准备

实习前的准备

测试开发系列之Python开发mock接口

测试开发系列之Python开发mock接口

Linux 0.11-读取硬盘前的准备工作有哪些-15

对于应对目标攻击前的准备