30天从入门到放弃我的机器学习之路 8
Posted 大数据干货分享
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了30天从入门到放弃我的机器学习之路 8相关的知识,希望对你有一定的参考价值。
BaseLine步骤:
0. 准备工作 20210113
1. 数据分析EDA 20210114
2. 特征工程 20210116
3. 模型训练 20210117
4. 线下验证 20210117
二、特征工程
选定机器学习的建模方案,核心思想是对时间序列抽取窗口特征。
1.时间序列转为feature-label的形式
2.类别标签转换
3.关联其他维度信息
日期
价格
4.抽取窗口特征:
前7天
前28天
前7天均值
前28天均值
import sys
import lightgbm as lgb
from datetime import datetime, timedelt
# 创建训练数据集(对其做预处理,完成1,2,3)
def create_train_data(train_start=750,test_start=1800,is_train=True): # 传入两个常量和一个标记变量,便于后续切换使用
# 基本参数
PRICE_DTYPES = {"store_id": "category", "item_id": "category", "wm_yr_wk": "int16","sell_price":"float32" } # 价格信息中的数据类型进行定义
CAL_DTYPES={"event_name_1": "category", "event_name_2": "category", "event_type_1": "category",
"event_type_2": "category", "weekday": "category", 'wm_yr_wk': 'int16', "wday": "int16",
"month": "int16", "year": "int16", "snap_CA": "float32", 'snap_TX': 'float32', 'snap_WI': 'float32' } # 日历信息中的数据类型进行定义
start_day = train_start if is_train else test_start # 定义start_day
numcols = [f"d_{day}" for day in range(start_day,1914)] # 选择需要的时间段list
catcols = ['id', 'item_id', 'dept_id','store_id', 'cat_id', 'state_id'] # 汇总类别型的特征
SALE_DTYPES = {numcol:"float32" for numcol in numcols} # 时间段内的销量特征定义为float32,并且转为字典
SALE_DTYPES.update({col: "category" for col in catcols if col != "id"}) #字典插入上述除了id之外的特征
# 加载price数据
price_data = pd.read_csv('./sell_prices.csv',dtype=PRICE_DTYPES)
# 加载cal数据
cal_data = pd.read_csv('./calendar.csv',dtype=CAL_DTYPES)
# 加载sale数据
sale_data = pd.read_csv('./sales_train_validation.csv',dtype=SALE_DTYPES,usecols=catcols+numcols)
# 类别标签转换
for col, col_dtype in PRICE_DTYPES.items():
if col_dtype == "category":
price_data[col] = price_data[col].cat.codes.astype("int16") # category转码成int16才可以排序
price_data[col] -= price_data[col].min() # 转码后,全部都从最小的0开始编码
cal_data["date"] = pd.to_datetime(cal_data["date"])
for col, col_dtype in CAL_DTYPES.items():
if col_dtype == "category":
cal_data[col] = cal_data[col].cat.codes.astype("int16")
cal_data[col] -= cal_data[col].min()
for col in catcols:
if col != "id":
sale_data[col] = sale_data[col].cat.codes.astype("int16")
sale_data[col] -= sale_data[col].min()
# 注意提交格式里有一部分为空
if not is_train:
for day in range(1913+1, 1913+ 2*28 +1):
sale_data[f"d_{day}"] = np.nan
# 行转列
sale_data = pd.melt(sale_data,
id_vars = catcols, # 指定联合主键
value_vars = [col for col in sale_data.columns if col.startswith("d_")], # 指定需要被转的列
var_name = "d", # var名为‘d’
value_name = "sales") # 新value列名为sales
sale_data = sale_data.merge(cal_data, on= "d", copy = False) # 和日历数据关联
sale_data = sale_data.merge(price_data, on = ["store_id", "item_id", "wm_yr_wk"], copy = False) # 和价格数据关联
return sale_data
# 抽取特征(可以在这里加入更多的特征抽取方法)
def create_feature(sale_data, is_train=True, day=None):
# 获取7天前的数据,28天前的数据
lags = [7, 28]
lag_cols = [f"lag_{lag}" for lag in lags ]
# 如果是测试集只需要计算一天的特征,减少计算量
# 注意训练集和测试集特征生成要一致
if is_train:
for lag, lag_col in zip(lags, lag_cols): # zip 可以把两个列表打包为一个元组
sale_data[lag_col] = sale_data[["id","sales"]].groupby("id")["sales"].shift(lag) # 新建两列平移变换后的特征
else:
for lag, lag_col in zip(lags, lag_cols):
sale_data.loc[sale_data.date == day, lag_col] = sale_data.loc[sale_data.date ==day-timedelta(days=lag), 'sales'].values
# 将获取7天前的数据,28天前的数据做移动平均
wins = [7, 28]
if is_train:
for win in wins :
for lag,lag_col in zip(lags, lag_cols):
sale_data[f"rmean_{lag}_{win}"] = sale_data[["id", lag_col]].groupby("id")[lag_col].transform(lambda x : x.rolling(win).mean())
else:
for win in wins:
for lag in lags:
df_window = sale_data[(sale_data.date <= day-timedelta(days=lag)) & (sale_data.date > day-timedelta(days=lag+win))]
df_window_grouped = df_window.groupby("id").agg({'sales':'mean'}).reindex(sale_data.loc[sale_data.date==day,'id'])
sale_data.loc[sale_data.date == day,f"rmean_{lag}_{win}"] = df_window_grouped.sales.values
# 处理时间特征
# 有的时间特征没有,通过datetime的方法自动生成
date_features = {
"wday": "weekday",
"week": "weekofyear",
"month": "month",
"quarter": "quarter",
"year": "year",
"mday": "day",
}
for date_feat_name, date_feat_func in date_features.items():
if date_feat_name in sale_data.columns:
sale_data[date_feat_name] = sale_data[date_feat_name].astype("int16")
else:
sale_data[date_feat_name] = getattr(sale_data["date"].dt, date_feat_func).astype("int16")
return sale_data
sale_data = create_train_data(train_start=350,is_train=True)
sale_data = create_feature(sale_data)
# 清洗数据,选择需要训练的数据
sale_data.dropna(inplace=True) # dropna,不写参数,默认为axis=0,how='any',表示删除任何含有null的行。inplace=True表示不创建新的df
cat_feats = ['item_id', 'dept_id','store_id', 'cat_id', 'state_id'] + ["event_name_1", "event_name_2", "event_type_1", "event_type_2"]
useless_cols = ["id", "date", "sales","d", "wm_yr_wk", "weekday"]
train_cols = sale_data.columns[~sale_data.columns.isin(useless_cols)] # 过滤无效的列作为训练数据
X_train = sale_data[train_cols]
y_train = sale_data["sales"]
三、模型训练
1.选择 LGB 模型进行模型的训练。
- 损失函数的选择
- 预测时候的技巧
2.tweedie_variance_power 参数的选择 [1,2] 之间。
3.LGB 模型是 GBDT 模型的变种,无法突破训练集的上界。
def train_model(train_data,valid_data):
params = {
"objective" : "tweedie",
"metric" :"rmse",
"force_row_wise" : True,
"learning_rate" : 0.075,
"sub_feature" : 0.8,
"sub_row" : 0.75,
"bagging_freq" : 1,
"lambda_l2" : 0.1,
"metric": ["rmse"],
"nthread": 8,
"tweedie_variance_power":1.1,
'verbosity': 1,
'num_iterations' : 1500,
'num_leaves': 128,
"min_data_in_leaf": 104,
}
m_lgb = lgb.train(params, train_data, valid_sets = [valid_data], verbose_eval=50)
return m_lgb
def predict_ensemble(train_cols,m_lgb):
date = datetime(2016,4, 25)
# alphas = [1.035, 1.03, 1.025, 1.02]
# alphas = [1.028, 1.023, 1.018]
alphas = [1.035, 1.03, 1.025]
weights = [1/len(alphas)]*len(alphas)
sub = 0.
test_data = create_train_data(is_train=False)
for icount, (alpha, weight) in enumerate(zip(alphas, weights)):
test_data_c = test_data.copy()
cols = [f"F{i}" for i in range(1,29)]
for i in range(0, 28):
day = date + timedelta(days=i)
print(i, day)
tst = test_data_c[(test_data_c.date >= day - timedelta(days=57)) & (test_data_c.date <= day)].copy()
tst = create_feature(tst,is_train=False, day=day)
tst = tst.loc[tst.date == day , train_cols]
test_data_c.loc[test_data_c.date == day, "sales"] = alpha*m_lgb.predict(tst)
# 改为提交数据的格式
test_sub = test_data_c.loc[test_data_c.date >= date, ["id", "sales"]].copy()
test_sub["F"] = [f"F{rank}" for rank in test_sub.groupby("id")["id"].cumcount()+1]
test_sub = test_sub.set_index(["id", "F" ]).unstack()["sales"][cols].reset_index()
test_sub.fillna(0., inplace = True)
test_sub.sort_values("id", inplace = True)
test_sub.reset_index(drop=True, inplace = True)
test_sub.to_csv(f"submission_{icount}.csv",index=False)
if icount == 0 :
sub = test_sub
sub[cols] *= weight
else:
sub[cols] += test_sub[cols]*weight
print(icount, alpha, weight)
sub2 = sub.copy()
# 把大于28天后的validation替换成evaluation
sub2["id"] = sub2["id"].str.replace("validation$", "evaluation")
sub = pd.concat([sub, sub2], axis=0, sort=False)
sub.to_csv("submissionV3.csv",index=False)
train_data = lgb.Dataset(X_train, label = y_train, categorical_feature=cat_feats, free_raw_data=False)
valid_inds = np.random.choice(len(X_train), 10000)
valid_data = lgb.Dataset(X_train.iloc[valid_inds], label = y_train.iloc[valid_inds],categorical_feature=cat_feats, free_raw_data=False)
m_lgb = train_model(train_data,valid_data)
predict_ensemble(train_cols,m_lgb)
四、线下验证
WRMSSE 的评估方法和 RMSE 很不一致,我们需要拆分出每一条时间序列的权重到底是多少,一方面能帮助我们做线下验证,另一方面可以帮助我们思考能否使用自定义的损失函数.
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error
from scipy.sparse import csr_matrix
import gc
# 转换数据类型,减少内存占用空间
def reduce_mem_usage(df, verbose=True):
numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
start_mem = df.memory_usage().sum() / 1024**2
for col in df.columns:
col_type = df[col].dtypes
if col_type in numerics:
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
df[col] = df[col].astype(np.int64)
else:
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
df[col] = df[col].astype(np.float64)
end_mem = df.memory_usage().sum() / 1024**2
if verbose: print('Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction)'.format(end_mem, 100 * (start_mem - end_mem) / start_mem))
return df
# sale数据
sales = pd.read_csv('./sales_train_validation.csv')
# 日期数据
calendar = pd.read_csv('./calendar.csv')
calendar = reduce_mem_usage(calendar)
# 价格数据
sell_prices = pd.read_csv('./sell_prices.csv')
sell_prices = reduce_mem_usage(sell_prices)
# 计算价格
# 按照定义,只需要计算最近的 28 天售卖量(售卖数*价格),通过这个可以得到 weight
# 可以不是 1914
cols = ["d_{}".format(i) for i in range(1914-28, 1914)]
data = sales[["id", 'store_id', 'item_id'] + cols]
# 从横表改为纵表
data = data.melt(id_vars=["id", 'store_id', 'item_id'],
var_name="d", value_name="sale")
# 和日期数据做关联
data = pd.merge(data, calendar, how = 'left',
left_on = ['d'], right_on = ['d'])
data = data[["id", 'store_id', 'item_id', "sale", "d", "wm_yr_wk"]]
# 和价格数据关联
data = data.merge(sell_prices, on = ['store_id', 'item_id', 'wm_yr_wk'], how = 'left')
data.drop(columns = ['wm_yr_wk'], inplace=True)
# 计算售卖量
data['sale_usd'] = data['sale'] * data['sell_price']
# 得到聚合矩阵
# 30490 -> 42840
# 需要聚合的维度明细计算出来
dummies_list = [sales.state_id, sales.store_id,
sales.cat_id, sales.dept_id,
sales.state_id + sales.cat_id, sales.state_id + sales.dept_id,
sales.store_id + sales.cat_id, sales.store_id + sales.dept_id,
sales.item_id, sales.state_id + sales.item_id, sales.id]
# 全部聚合为一个, 最高 level
dummies_df_list =[pd.DataFrame(np.ones(sales.shape[0]).astype(np.int8),
index=sales.index, columns=['all']).T]
# 挨个计算其他 level 等级聚合
for i, cats in enumerate(dummies_list):
dummies_df_list +=[pd.get_dummies(cats, drop_first=False, dtype=np.int8).T]
# 得到聚合矩阵
roll_mat_df = pd.concat(dummies_df_list, keys=list(range(12)),
names=['level','id'])#.astype(np.int8, copy=False)
# 保存聚合矩阵
roll_index = roll_mat_df.index
roll_mat_csr = csr_matrix(roll_mat_df.values)
roll_mat_df.to_pickle('roll_mat_df.pkl')
# 释放内存
del dummies_df_list, roll_mat_df
gc.collect()
# 按照定义,计算每条时间序列 的 RMSSE:
def get_s(drop_days=0):
"""
drop_days: int, equals 0 by default, so S is calculated on all data.
If equals 28, last 28 days won't be used in calculating S.
"""
# 要计算的时间序列长度
d_name = ['d_' + str(i+1) for i in range(1913-drop_days)]
# 得到聚合结果
sales_train_val = roll_mat_csr * sales[d_name].values
# 按照定义,前面连续为 0 的不参与计算
start_no = np.argmax(sales_train_val>0, axis=1)
# 这些连续为 0 的设置为 nan
flag = np.dot(np.diag(1/(start_no+1)) , np.tile(np.arange(1,1914-drop_days),(roll_mat_csr.shape[0],1)))<1
sales_train_val = np.where(flag, np.nan, sales_train_val)
# 根据公式计算每条时间序列 rmsse的权重
weight1 = np.nansum(np.diff(sales_train_val,axis=1)**2,axis=1)/(1913-start_no-1)
return weight1
S = get_s(drop_days=0)
# 根据定义计算 WRMSSE 的权重,这里指 w
def get_w(sale_usd):
"""
"""
# 得到最细维度的每条时间序列的权重
total_sales_usd = sale_usd.groupby(
['id'], sort=False)['sale_usd'].apply(np.sum).values
# 通过聚合矩阵得到不同聚合下的权重
weight2 = roll_mat_csr * total_sales_usd
return 12*weight2/np.sum(weight2)
W = get_w(data[['id','sale_usd']])
SW = W/np.sqrt(S)
sw_df = pd.DataFrame(np.stack((S, W, SW), axis=-1),index = roll_index,columns=['s','w','sw'])
sw_df.to_pickle('sw_df.pkl')
# 评分函数
# 得到聚合的结果
def rollup(v):
'''
'''
return (v.T*roll_mat_csr.T).T
# 计算 WRMSSE 评估指标
def wrmsse(preds, y_true, score_only=False, s = S, w = W, sw=SW):
'''
preds - Predictions: pd.DataFrame of size (30490 rows, N day columns)
y_true - True values: pd.DataFrame of size (30490 rows, N day columns)
sequence_length - np.array of size (42840,)
sales_weight - sales weights based on last 28 days: np.array (42840,)
'''
if score_only:
return np.sum(
np.sqrt(
np.mean(
np.square(rollup(preds.values-y_true.values))
,axis=1)) * sw *12)
else:
score_matrix = (np.square(rollup(preds.values-y_true.values)) * np.square(w)[:, None]) *12 / s[:, None]
score = np.sum(np.sqrt(np.mean(score_matrix,axis=1)))
return score, score_matrix
# 加载前面预先计算好的各个权重
file_pass = './'
sw_df = pd.read_pickle(file_pass+'sw_df.pkl')
S = sw_df.s.values
W = sw_df.w.values
SW = sw_df.sw.values
roll_mat_df = pd.read_pickle(file_pass+'roll_mat_df.pkl')
roll_index = roll_mat_df.index
roll_mat_csr = csr_matrix(roll_mat_df.values)
print(sw_df.loc[(11,slice(None))].sw)
np.max(sw_df.loc[(11,slice(None))].sw)
--------
其实,写到最后,我也不太确定自己写得是什么了=_=。
接下来的一周还是要补一些基础。
1.涉及到从最基本的决策树模型,如何一步步到GBDT、XGBoost、LightGBM。
2.评估函数的计算公式都要能理解,并用代码实现出来
以上是关于30天从入门到放弃我的机器学习之路 8的主要内容,如果未能解决你的问题,请参考以下文章