推荐系统实战task01笔记(赛题理解+baseline)

Posted 姑苏城外娄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了推荐系统实战task01笔记(赛题理解+baseline)相关的知识,希望对你有一定的参考价值。

目录

  • 题目背景
  • 数据概况
  • 评价方式
  • 题目理解
  • 代码示例
  • 总结

题目背景

数据概况

该数据来自某新闻APP平台的用户交互数据,包括30万用户,近300万次点击,共36万多篇不同的新闻文章,同时每篇新闻文章有对应的embedding向量表示。为了保证比赛的公平性,从中抽取20万用户的点击日志数据作为训练集,5万用户的点击日志数据作为测试集A,5万用户的点击日志数据作为测试集B。

评价方式

理解评价方式, 我们需要结合着最后的提交文件来看, 根据sample.submit.csv, 我们最后提交的格式是针对每个用户, 我们都会给出五篇文章的推荐结果,按照点击概率从前往后排序。而真实的每个用户最后一次点击的文章只会有一篇的真实答案, 所以我们就看我们推荐的这五篇里面是否有命中真实答案的。比如对于user1来说, 我们的提交会是:

user1, article1, article2, article3, article4, article5.

评价指标的公式如下:

假如article1就是真实的用户点击文章,也就是article1命中, 则s(user1,1)=1, s(user1,2-4)都是0, 如果article2是用户点击的文章, 则s(user,2)=1/2,s(user,1,3,4,5)都是0。也就是score(user)=命中第几条的倒数。如果都没中, 则score(user1)=0。这个是合理的, 因为我们希望的就是命中的结果尽量靠前, 而此时分数正好比较高

题目理解

根据赛题简介,我们首先要明确我们此次比赛的目标:根据用户历史浏览点击新闻的数据信息预测用户最后一次点击的新闻文章。从这个目标上看, 会发现此次比赛和我们之前遇到的普通的结构化比赛不太一样, 主要有两点:

  • 首先是目标上: 要预测最后一次点击的新闻文章,也就是我们给用户推荐的是新闻文章, 并不是像之前那种预测一个数或者预测数据哪一类那样的问题
  • 数据上: 通过给出的数据我们会发现, 这种数据也不是我们之前遇到的那种特征+标签的数据,而是基于了真实的业务场景, 拿到的用户的点击日志 我们的思考方向就是结合我们的目标,把该预测问题转成一个监督学习的问题(特征+标签),然后我们才能进行ML,DL等建模预测。


那么我们自然而然的就应该在心里会有这么几个问题:如何转成一个监督学习问题呢?转成一个什么样的监督学习问题呢?我们能利用的特征又有哪些呢?又有哪些模型可以尝试呢?此次面对数万级别的文章推荐,我们又有哪些策略呢?

当然这些问题不会在我们刚看到赛题之后就一下出来答案, 但是只要有了问题之后, 我们就能想办法解决问题了, 比如上面的第二个问题,转成一个什么样的监督学习问题?由于我们是预测用户最后一次点击的新闻文章,从36万篇文章中预测某一篇的话我们首先可能会想到这可能是一个多分类的问题(36万类里面选1), 但是如此庞大的分类问题, 我们做起来可能比较困难, 那么能不能转化一下?既然是要预测最后一次点击的文章, 那么如果我们能预测出某个用户最后一次对于某一篇文章会进行点击的概率, 是不是就间接性的解决了这个问题呢?概率最大的那篇文章不就是用户最后一次可能点击的新闻文章吗?这样就把原问题变成了一个点击率预测的问题(用户, 文章) --> 点击的概率(软分类), 而这个问题, 就是我们所熟悉的监督学习领域分类问题了, 这样我们后面建模的时候, 对于模型的选择就基本上有大致方向了,比如最简单的逻辑回归模型。这样, 我们对于该赛题的解决方案应该有了一个大致的解决思路,要先转成一个分类问题来做, 而分类的标签就是用户是否会点击某篇文章,分类问题的特征中会有用户和文章,我们要训练一个分类模型, 对某用户最后一次点击某篇文章的概率进行预测。

代码示例

导入工具包

# import packagesimport time, math, osfrom tqdm import tqdmimport gcimport pickleimport randomfrom datetime import datetimefrom operator import itemgetterimport numpy as npimport pandas as pdimport warningsfrom collections import defaultdictimport collectionswarnings.filterwarnings('ignore')

数据地址

# read datadata_path = '/home/work/louxuezheng/news/data/' # 原始数据save_path = '/home/work/louxuezheng/news/temp_results/' # 临时结果路径

节约内存的一个标配函数

# 节约内存的一个标配函数def reduce_mem(df): starttime = time.time() numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64'] start_mem = df.memory_usage().sum() / 1024**2 for col in df.columns: col_type = df[col].dtypes if col_type in numerics: c_min = df[col].min() c_max = df[col].max() if pd.isnull(c_min) or pd.isnull(c_max): continue if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) end_mem = df.memory_usage().sum() / 1024**2 print('-- Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction),time spend:{:2.2f} min'.format(end_mem, 100*(start_mem-end_mem)/start_mem, (time.time()-starttime)/60)) return df

读取采样或全量数据

# debug模式:从训练集中划出一部分数据来调试代码def get_all_click_sample(data_path, sample_nums=10000): """ 训练集中采样一部分数据调试 data_path: 原数据的存储路径 sample_nums: 采样数目(这里由于机器的内存限制,可以采样用户做) """ all_click = pd.read_csv(data_path + 'train_click_log.csv') all_user_ids = all_click.user_id.unique()
sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False) all_click = all_click[all_click['user_id'].isin(sample_user_ids)] all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp'])) return all_click
# 读取点击数据,这里分成线上和线下,如果是为了获取线上提交结果应该讲测试集中的点击数据合并到总的数据中# 如果是为了线下验证模型的有效性或者特征的有效性,可以只使用训练集def get_all_click_df(data_path='./data/', offline=True): if offline: all_click = pd.read_csv(data_path + 'train_click_log.csv') else: trn_click = pd.read_csv(data_path + 'train_click_log.csv') tst_click = pd.read_csv(data_path + 'testA_click_log.csv')
all_click = trn_click.append(tst_click) all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp'])) return all_click
# 全量训练集all_click_df = get_all_click_df(data_path, offline=False

获取 用户 - 文章 - 点击时间字典

# 根据点击时间获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...}def get_user_item_time(click_df):      click_df = click_df.sort_values('click_timestamp') def make_item_time_pair(df):        return list(zip(df['click_article_id'], df['click_timestamp']))    user_item_time_df = click_df.groupby('user_id')['click_article_id''click_timestamp'].apply(lambda x: make_item_time_pair(x))\                                                        .reset_index().rename(columns={0'item_time_list'})    user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list'])) return user_item_time_dict

获取点击最多的topk个文章

# 获取近期点击最多的文章def get_item_topk_click(click_df, k): topk_click = click_df['click_article_id'].value_counts().index[:k] return topk_click

itemcf的物品相似度计算

def itemcf_sim(df): """ 文章与文章之间的相似性矩阵计算 :param df: 数据表 :item_created_time_dict: 文章创建时间的字典 return : 文章与文章的相似性矩阵 思路: 基于物品的协同过滤(详细请参考上一期推荐系统基础的组队学习), 在多路召回部分会加上关联规则的召回策略 """  user_item_time_dict = get_user_item_time(df)  # 计算物品相似度 i2i_sim = {} item_cnt = defaultdict(int) for user, item_time_list in tqdm(user_item_time_dict.items()): # 在基于商品的协同过滤优化的时候可以考虑时间因素 for i, i_click_time in item_time_list: item_cnt[i] += 1 i2i_sim.setdefault(i, {}) for j, j_click_time in item_time_list: if(i == j): continue i2i_sim[i].setdefault(j, 0)  i2i_sim[i][j] += 1 / math.log(len(item_time_list) + 1)  i2i_sim_ = i2i_sim.copy() for i, related_items in i2i_sim.items(): for j, wij in related_items.items(): i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])  # 将得到的相似性矩阵保存到本地 pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))  return i2i_sim_

itemcf召回文章

# 基于商品的召回i2idef item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click): """ 基于文章协同过滤的召回 :param user_id: 用户id :param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...} :param i2i_sim: 字典,文章相似性矩阵 :param sim_item_topk: 整数, 选择与当前文章最相似的前k篇文章 :param recall_item_num: 整数, 最后的召回文章数量 :param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全  return: 召回的文章列表 {item1:score1, item2: score2...} 注意: 基于物品的协同过滤(详细请参考上一期推荐系统基础的组队学习), 在多路召回部分会加上关联规则的召回策略 """  # 获取用户历史交互的文章 user_hist_items = user_item_time_dict[user_id] user_hist_items_ = {user_id for user_id, _ in user_hist_items}  item_rank = {} for loc, (i, click_time) in enumerate(user_hist_items): for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]: if j in user_hist_items_: continue  item_rank.setdefault(j, 0) item_rank[j] += wij  # 不足10个,用热门商品补全 if len(item_rank) < recall_item_num: for i, item in enumerate(item_topk_click): if item in item_rank.items(): # 填充的item应该不在原来的列表中 continue item_rank[item] = - i - 100 # 随便给个负数就行 if len(item_rank) == recall_item_num: break  item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]  return item_rank

给每个用户根据物品的协同过滤推荐文章

# 定义user_recall_items_dict = collections.defaultdict(dict)# 获取 用户 - 文章 - 点击时间的字典user_item_time_dict = get_user_item_time(all_click_df)
itemcf_sim(all_click_df);# 去取文章相似度i2i_sim = pickle.load(open(save_path + 'itemcf_i2i_sim.pkl', 'rb'))

召回字典转换成df

# 将字典的形式转换成dfuser_item_score_list = []for user, items in tqdm(user_recall_items_dict.items()): for item, score in items: user_item_score_list.append([user, item, score])
recall_df = pd.DataFrame(user_item_score_list, columns=['user_id', 'click_article_id', 'pred_score'])

生成提交文件

# 生成提交文件def submit(recall_df, topk=5, model_name=None): recall_df = recall_df.sort_values(by=['user_id', 'pred_score']) recall_df['rank'] = recall_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first')  # 判断是不是每个用户都有5篇文章及以上 tmp = recall_df.groupby('user_id').apply(lambda x: x['rank'].max()) assert tmp.min() >= topk  del recall_df['pred_score'] submit = recall_df[recall_df['rank'] <= topk].set_index(['user_id', 'rank']).unstack(-1).reset_index()  submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)] # 按照提交格式定义列名 submit = submit.rename(columns={'': 'user_id', 1: 'article_1', 2: 'article_2',  3: 'article_3', 4: 'article_4', 5: 'article_5'})  save_name = save_path + model_name + '_' + datetime.today().strftime('%m-%d') + '.csv' submit.to_csv(save_name, index=False, header=True) # 获取测试集tst_click = pd.read_csv(data_path + 'testA_click_log.csv')tst_users = tst_click['user_id'].unique()# 从所有的召回数据中将测试集中的用户选出来tst_recall = recall_df[recall_df['user_id'].isin(tst_users)]
# 生成提交文件submit(tst_recall, topk=5, model_name='itemcf_baseline')

总结

示例参考天池竞赛baseline代码,主要学习了协同过滤算法,跑通流程,对题目有个大致的了解。

以上是关于推荐系统实战task01笔记(赛题理解+baseline)的主要内容,如果未能解决你的问题,请参考以下文章

推荐系统实战Task3笔记多路召回上

Python ML实战-工业蒸汽量预测01-赛题理解

Task1 赛题理解

Datawhale 零基础入门数据挖掘-Task1 赛题理解

Datawhale 学CV--task1 赛题理解

王喆-深度学习推荐系统实战基础架构篇-(task1)DL推荐系统架构