使用 keras 的自定义数据生成器功能预处理海量数据

Posted

技术标签:

【中文标题】使用 keras 的自定义数据生成器功能预处理海量数据【英文标题】:Preprocess huge data with a custom data generator function for keras 【发布时间】:2019-01-29 16:35:02 【问题描述】:

实际上,我正在构建一个 keras 模型,并且我有一个 msg 格式的数据集,其中包含超过 1000 万个实例和 40 个分类特征。目前我只使用它的一个样本,因为读取所有数据集并对其进行编码以适应内存是不可能的。这是我正在使用的部分代码:

import pandas as pd
from category_encoders import BinaryEncoder as be
from sklearn.preprocessing import StandardScaler


def model():
   model = Sequential()
   model.add(Dense(120, input_dim=233, kernel_initializer='uniform', activation='selu'))
   model.add(Dense(12, kernel_initializer='uniform', activation='sigmoid'))
   model.compile(SGD(lr=0.008),loss='mean_squared_error', metrics=['accuracy'])
   return model

def addrDataLoading():

   data=pd.read_msgpack('datum.msg')
   data=data.dropna(subset=['s_address','d_address'])
   data=data.sample(300000) # taking a sample of all the dataset to make the    encoding possible
   y=data[['s_address','d_address']]
   x=data.drop(['s_address','d_address'],1)

   encX = be().fit(x, y)
   numeric_X= encX.transform(x)
   encY=be().fit(y,y)
   numeric_Y=encY.transform(y)
   scaler=StandardScaler()
   X_all=scaler.fit_transform(numeric_X)
   x_train=X_all[0:250000,:]
   y_train=numeric_Y.iloc[0:250000,:]
   x_val=X_all[250000:,:]    
   y_val=numeric_Y.iloc[250000:,:]

   return x_train,y_train,x_val,y_val 



x_train,y_train,x_val,y_val=addrDataLoading()

model.fit(x_train, y_train,validation_data=(x_val,y_val),nb_epoch=20, batch_size=200)

所以我的问题是如何使用自定义数据生成器函数来读取和处理我拥有的所有数据而不仅仅是样本,然后使用 fit_generator() 函数来训练我的模型?

编辑

这是数据样本: netData

我认为从数据中抽取不同的样本会导致不同的编码维度。

对于这个示例,有 16 个不同的类别:4 个地址(3 位)、4 个主机名(3 位)、1 个子网掩码(1 位)、5 个基础设施(3 位)、1 个访问区域(1 位),所以二进制编码将给我们 11 位,数据的新维度是 11,之前是 5。所以假设对于 address 列中的另一个样本,我们有 8 个不同的类别,这将给出 4 位二进制,我们让相同数量的类别进入其他列,因此整体编码将产生 12 个维度。我相信是什么导致了问题。

【问题讨论】:

【参考方案1】:

略慢的解决方案(重复相同的动作)

编辑 - 在创建生成器之前安装 BinatyEncoder

首先删除 NA 并进一步处理干净的数据以避免重新分配数据框。

data = pd.read_msgpack('datum.msg')
data.dropna(subset=['s_address','d_address']).to_msgpack('datum_clean.msg')

在此解决方案中,data_generator 可以多次处理相同的数据。如果不重要,您可以使用此解决方案。

定义读取数据和分割索引的函数来训练和测试。它不会消耗大量内存。

import pandas as pd
from category_encoders import BinaryEncoder as be
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import numpy as np

def model():
   #some code defining the model


def train_test_index_split():
    # if there's enough memory to add one more column
    data = pd.read_msgpack('datum_cleaned.msg')
    train_idx, test_idx = train_test_split(data.index) 
    return data, train_idx, test_idx


data, train_idx, test_idx = train_test_index_split()

定义和初始化数据生成器,用于训练和验证

def data_generator(data, encX, encY,  bathc_size, n_steps, index):
    # EDIT: As the data was cleaned, you don't need dropna
    # data = data.dropna(subset=['s_address','d_address'])
    for i in range(n_steps):
        batch_idx = np.random.choice(index, batch_size)
        sample = data.loc[batch_idx]
        y = sample[['s_address', 'd_address']]
        x = sample.drop(['s_address', 'd_address'], 1)
        numeric_X = encX.transform(x)
        numeric_Y = encY.transform(y)
        scaler = StandardScaler()
        X_all = scaler.fit_transform(numeric_X)
        yield X_all, numeric_Y

编辑部分现在训练二进制编码器。您应该对您的数据进行二次抽样,以便为编码器创建有代表性的训练集。我猜数据形状的错误是由训练不正确的BinaryEncoder (Error when checking input: expected dense_9_input to have shape (233,) but got array with shape (234,)) 引起的:

def get_minimal_unique_frame(df):
    return (pd.Series([df[column].unique() for column in df], index=df.columns)  
           .apply(pd.Series)  # tranform list on unique values to pd.Series
           .T  # transope frame: columns is columns again
           .fillna(method='ffill'))  # fill NaNs with last value

x = get_minimal_unique_frame(data.drop(['s_address', 'd_address'], 1))
y = get_minimal_unique_frame(data[['s_address', 'd_address']])

注意:我没用过category_encoders,系统配置不兼容,无法安装检查。因此,以前的代码可能会引发问题。在这种情况下,我猜你应该比较 xy 数据帧的长度并使其相同,并可能更改数据帧的索引。

encX = be().fit(x, y)
encY = be().fit(y, y)
batch_size = 200
train_steps = 100000  
val_steps = 5000

train_gen = data_generator(data, encX, encY, batch_size, train_steps, train_idx)
test_gen = data_generator(data, encX, encY, batch_size, test_steps, test_idx)

编辑请提供x_sample的示例,运行train_gen并保存输出,然后发布x_samplesy_smaples

x_samples = []
y_samples = []
for i in range(10):
    x_sample, y_sample = next(train_gen)
    x_samples.append(x_sample)
    y_samples.append(y_sample)

注意:数据生成器不会自行停止。但是在train_steps之后会被fit_generator方法停止。

使用生成器拟合模型:

model.fit_generator(generator=train_gen, steps_per_epoch=train_steps, epochs=1,
                    validation_data=test_gen, validation_steps=val_steps)

据我所知,python 不会复制 pandas 数据帧,如果您不明确使用 copy() 左右的话。因此,两个生成器都使用相同的对象。但是如果你使用 Jupyter Notebook,可能会发生数据泄漏/未收集的数据,并且随之而来的是内存问题。

更高效的解决方案——scketch

清理你的数据

data = pd.read_msgpack('datum.msg')
data.dropna(subset=['s_address','d_address']).to_msgpack('datum_clean.msg')

如果您有足够的磁盘空间,则创建训练/测试拆分,对其进行预处理并存储为 numpy 数组。

data, train_idx, test_idx = train_test_index_split()

def data_preprocessor(data, path, index):
    # data = data.dropna(subset=['s_address','d_address'])
    sample = data.loc[batch_idx]
    y = sample[['s_address', 'd_address']]
    x = sample.drop(['s_address', 'd_address'], 1)
    encX = be().fit(x, y)
    numeric_X = encX.transform(x)
    encY = be().fit(y, y)
    numeric_Y = encY.transform(y)
    scaler = StandardScaler()
    X_all = scaler.fit_transform(numeric_X)
    np.save(path + '_X', X_all)
    np.save(path + '_y', numeric_Y)

data_preprocessor(data, 'train', train_idx)
data_preprocessor(data, 'test', test_idx)

删除不必要的数据:

del data

加载您的文件并使用以下生成器:

train_X = np.load('train_X.npy')
train_y = np.load('train_y.npy')

test_X = np.load('test_X.npy')
test_y = np.load('test_y.npy')

def data_generator(X, y, batch_size, n_steps):
    idxs = np.arange(len(X))
    np.random.shuffle(idxs)
    ptr = 0

    for _ in range(n_steps):
        batch_idx = idxs[ptr:ptr+batch_size]
        x_sample = X[batch_idx]
        y_sample = y[batch_idx]
        ptr += batch_size
        if ptr > len(X):
            ptr = 0
        yield x_sapmple, y_sample

准备生成器:

train_gen = data_generator(train_X, train_y, batch_size, train_steps)
test_gen = data_generator(test_X, test_y, batch_size, test_steps)

最后拟合模型。希望其中一个解决方案会有所帮助。至少如果 python 确实通过数组和数据框购买参考,而不是按价值。 *** answer about it.

【讨论】:

谢谢。但这并不能解决我的问题。 addrDataLoading() 函数使用 data = data.sample(300000) 并且始终存在对大量数据的编码,这是问题的核心。 我知道了,会在一小时内解决我的答案 我认为我的解决方案中还有一个错误,不要重新分配data,因为在这种情况下会被复制。 第一种方法似乎有效,但我有这个问题ValueError: Error when checking input: expected dense_9_input to have shape (233,) but got array with shape (234,)。我认为应该调整model() 的参数。我已经编辑了上面的代码以包含model() 功能代码。 @mEdiHan 看起来您的模型获得了比预期更多的列。如果您现在使用我的代码,您应该检查生成器返回的 x_sample 是否具有正确的形状 (233,)(不是错的,是的)。可能有一些额外的列被添加到您的数据中并且没有从中删除。没有数据,我无法弄清楚问题所在。如果您从该代码中发布 x_sample 的print 以及它在您的原始代码中的情况,我可以更准确地回答。现在我认为某些索引列仍保留在您的数据中(可能)。

以上是关于使用 keras 的自定义数据生成器功能预处理海量数据的主要内容,如果未能解决你的问题,请参考以下文章

Keras 中基于输入数据的自定义损失函数

R Keras 中的自定义损失函数

tf2.0 Keras:使用 RNN 的自定义张量流代码时无法保存权重

用于不适合内存的大型 hdf5 文件的 Keras 自定义数据生成器

Keras 中的自定义层 - 维度问题

如何通过 tf.data API 使用 Keras 生成器