推荐系统实战Task3笔记多路召回上
Posted 姑苏城外娄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了推荐系统实战Task3笔记多路召回上相关的知识,希望对你有一定的参考价值。
多路召回
所谓的“多路召回”策略,就是指采用不同的策略、特征或简单模型,分别召回一部分候选集,然后把候选集混合在一起供后续排序模型使用,可以明显的看出,“多路召回策略”是在“计算速度”和“召回率”之间进行权衡的结果。其中,各种简单策略保证候选集的快速召回,从不同角度设计的策略保证召回率接近理想的状态,不至于损伤排序效果。如下图是多路召回的一个示意图,在多路召回中,每个策略之间毫不相关,所以一般可以写并发多线程同时进行,这样可以更加高效。
也就是说可以使用多种不同的策略来获取用户排序的候选商品集合,而具体使用哪些召回策略其实是与业务强相关的 ,针对不同的任务就会有对于该业务真实场景下需要考虑的召回规则。例如新闻推荐,召回规则可以是“热门新闻”、“作者召回”、“关键词召回”、“主题召回“、”协同过滤召回“等等
读取数据
在一般的rs比赛中读取数据部分主要分为三种模式, 不同的模式对应的不同的数据集:
-
debug模式:这个的目的是帮助我们基于数据先搭建一个简易的baseline并跑通, 保证写的baseline代码没有什么问题。由于推荐比赛的数据往往非常巨大, 如果一上来直接采用全部的数据进行分析,搭建baseline框架, 往往会带来时间和设备上的损耗, 所以这时候我们往往需要从海量数据的训练集中随机抽取一部分样本来进行调试(train_click_log_sample), 先跑通一个baseline。 -
线下验证模式:这个的目的是帮助我们在线下基于已有的训练集数据, 来选择好合适的模型和一些超参数。所以我们这一块只需要加载整个训练集(train_click_log), 然后把整个训练集再分成训练集和验证集。训练集是模型的训练数据, 验证集部分帮助我们调整模型的参数和其他的一些超参数。 -
线上模式:我们用debug模式搭建起一个推荐系统比赛的baseline, 用线下验证模式选择好了模型和一些超参数, 这一部分就是真正的对于给定的测试集进行预测, 提交到线上, 所以这一块使用的训练数据集是全量的数据集(train_click_log+test_click_log)
下面就分别对这三种不同的数据读取模式先建立不同的代导入函数, 方便后面针对不同的模式下导入数据。
# debug模式:从训练集中划出一部分数据来调试代码
def get_all_click_sample(data_path, sample_nums=10000):
"""
训练集中采样一部分数据调试
data_path: 原数据的存储路径
sample_nums: 采样数目(这里由于机器的内存限制,可以采样用户做)
"""
all_click = pd.read_csv(data_path + 'train_click_log.csv')
all_user_ids = all_click.user_id.unique()
sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False)
all_click = all_click[all_click['user_id'].isin(sample_user_ids)]
all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
return all_click
# 读取点击数据,这里分成线上和线下,如果是为了获取线上提交结果应该讲测试集中的点击数据合并到总的数据中
# 如果是为了线下验证模型的有效性或者特征的有效性,可以只使用训练集
def get_all_click_df(data_path='./data_raw/', offline=True):
if offline:
all_click = pd.read_csv(data_path + 'train_click_log.csv')
else:
trn_click = pd.read_csv(data_path + 'train_click_log.csv')
tst_click = pd.read_csv(data_path + 'testA_click_log.csv')
all_click = trn_click.append(tst_click)
all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
return all_click
# 读取文章的基本属性
def get_item_info_df(data_path):
item_info_df = pd.read_csv(data_path + 'articles.csv')
# 为了方便与训练集中的click_article_id拼接,需要把article_id修改成click_article_id
item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'})
return item_info_df
# 读取文章的Embedding数据
def get_item_emb_dict(data_path):
item_emb_df = pd.read_csv(data_path + 'articles_emb.csv')
item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols])
# 进行归一化
item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
item_emb_dict = dict(zip(item_emb_df['article_id'], item_emb_np))
pickle.dump(item_emb_dict, open(save_path + 'item_content_emb.pkl', 'wb'))
return item_emb_dict
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
# 采样数据
all_click_df = get_all_click_sample(data_path)
# # 全量训练集
# all_click_df = get_all_click_df(data_path, offline=False)
# 对时间戳进行归一化,用于在关联规则的时候计算权重
all_click_df['click_timestamp'] = all_click_df[['click_timestamp']].apply(max_min_scaler)
item_info_df = get_item_info_df(data_path)
item_emb_dict = get_item_emb_dict(data_path)
工具函数
获取用户-文章-时间函数
这个在基于关联规则的用户协同过滤的时候会用到
# 根据点击时间获取用户的点击文章序列 {user1: [(item1, time1), (item2, time2)..]...}
def get_user_item_time(click_df):
click_df = click_df.sort_values('click_timestamp')
def make_item_time_pair(df):
return list(zip(df['click_article_id'], df['click_timestamp']))
user_item_time_df = click_df.groupby('user_id')['click_article_id', 'click_timestamp'].apply(lambda x: make_item_time_pair(x))\
.reset_index().rename(columns={0: 'item_time_list'})
user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))
return user_item_time_dict
获取文章-用户-时间函数
这个在基于关联规则的文章协同过滤的时候会用到
# 根据时间获取商品被点击的用户序列 {item1: [(user1, time1), (user2, time2)...]...}
# 这里的时间是用户点击当前商品的时间,好像没有直接的关系。
def get_item_user_time_dict(click_df):
def make_user_time_pair(df):
return list(zip(df['user_id'], df['click_timestamp']))
click_df = click_df.sort_values('click_timestamp')
item_user_time_df = click_df.groupby('click_article_id')['user_id', 'click_timestamp'].apply(lambda x: make_user_time_pair(x))\
.reset_index().rename(columns={0: 'user_time_list'})
item_user_time_dict = dict(zip(item_user_time_df['click_article_id'], item_user_time_df['user_time_list']))
return item_user_time_dict
获取历史和最后一次点击
这个在评估召回结果, 特征工程和制作标签转成监督学习测试集的时候会用到
# 获取当前数据的历史点击和最后一次点击
def get_hist_and_last_click(all_click):
all_click = all_click.sort_values(by=['user_id', 'click_timestamp'])
click_last_df = all_click.groupby('user_id').tail(1)
# 如果用户只有一个点击,hist为空了,会导致训练的时候这个用户不可见,此时默认泄露一下
def hist_func(user_df):
if len(user_df) == 1:
return user_df
else:
return user_df[:-1]
click_hist_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True)
return click_hist_df, click_last_df
获取文章属性特征
# 获取文章id对应的基本属性,保存成字典的形式,方便后面召回阶段,冷启动阶段直接使用
def get_item_info_dict(item_info_df):
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].apply(max_min_scaler)
item_type_dict = dict(zip(item_info_df['click_article_id'], item_info_df['category_id']))
item_words_dict = dict(zip(item_info_df['click_article_id'], item_info_df['words_count']))
item_created_time_dict = dict(zip(item_info_df['click_article_id'], item_info_df['created_at_ts']))
return item_type_dict, item_words_dict, item_created_time_dict
获取用户历史点击的文章信息
def get_user_hist_item_info_dict(all_click):
# 获取user_id对应的用户历史点击文章类型的集合字典
user_hist_item_typs = all_click.groupby('user_id')['category_id'].agg(set).reset_index()
user_hist_item_typs_dict = dict(zip(user_hist_item_typs['user_id'], user_hist_item_typs['category_id']))
# 获取user_id对应的用户点击文章的集合
user_hist_item_ids_dict = all_click.groupby('user_id')['click_article_id'].agg(set).reset_index()
user_hist_item_ids_dict = dict(zip(user_hist_item_ids_dict['user_id'], user_hist_item_ids_dict['click_article_id']))
# 获取user_id对应的用户历史点击的文章的平均字数字典
user_hist_item_words = all_click.groupby('user_id')['words_count'].agg('mean').reset_index()
user_hist_item_words_dict = dict(zip(user_hist_item_words['user_id'], user_hist_item_words['words_count']))
# 获取user_id对应的用户最后一次点击的文章的创建时间
all_click_ = all_click.sort_values('click_timestamp')
user_last_item_created_time = all_click_.groupby('user_id')['created_at_ts'].apply(lambda x: x.iloc[-1]).reset_index()
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
user_last_item_created_time['created_at_ts'] = user_last_item_created_time[['created_at_ts']].apply(max_min_scaler)
user_last_item_created_time_dict = dict(zip(user_last_item_created_time['user_id'], \
user_last_item_created_time['created_at_ts']))
return user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict
获取点击次数最多的topk个文章
# 获取近期点击最多的文章
def get_item_topk_click(click_df, k):
topk_click = click_df['click_article_id'].value_counts().index[:k]
return topk_click
定义多路召回字典
# 获取文章的属性信息,保存成字典的形式方便查询
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)
# 定义一个多路召回的字典,将各路召回的结果都保存在这个字典当中
user_multi_recall_dict = {'itemcf_sim_itemcf_recall': {},
'embedding_sim_item_recall': {},
'youtubednn_recall': {},
'youtubednn_usercf_recall': {},
'cold_start_recall': {}}
# 提取最后一次点击作为召回评估,如果不需要做召回评估直接使用全量的训练集进行召回(线下验证模型)
# 如果不是召回评估,直接使用全量数据进行召回,不用将最后一次提取出来
trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
召回效果评估函数
做完了召回有时候也需要对当前的召回方法或者参数进行调整以达到更好的召回效果,因为召回的结果决定了最终排序的上限,下面也会提供一个召回评估的方法
# 依次评估召回的前10, 20, 30, 40, 50个文章中的击中率
def metrics_recall(user_recall_items_dict, trn_last_click_df, topk=5):
last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['click_article_id']))
user_num = len(user_recall_items_dict)
for k in range(10, topk+1, 10):
hit_num = 0
for user, item_list in user_recall_items_dict.items():
# 获取前k个召回的结果
tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]]
if last_click_item_dict[user] in set(tmp_recall_items):
hit_num += 1
hit_rate = round(hit_num * 1.0 / user_num, 5)
print(' topk: ', k, ' : ', 'hit_num: ', hit_num, 'hit_rate: ', hit_rate, 'user_num : ', user_num)
计算相似性矩阵
这一部分主要是通过协同过滤以及向量检索得到相似性矩阵,相似性矩阵主要分为user2user和item2item,下面依次获取基于itemcf的item2item的相似性矩阵,
itemcf i2i_sim
借鉴KDD2020的去偏商品推荐,在计算item2item相似性矩阵时,使用关联规则,使得计算的文章的相似性还考虑到了:
-
用户点击的时间权重 -
用户点击的顺序权重 -
文章创建的时间权重
def itemcf_sim(df, item_created_time_dict):
"""
文章与文章之间的相似性矩阵计算
:param df: 数据表
:item_created_time_dict: 文章创建时间的字典
return : 文章与文章的相似性矩阵
思路: 基于物品的协同过滤 + 关联规则
"""
user_item_time_dict = get_user_item_time(df)
# 计算物品相似度
i2i_sim = {}
item_cnt = defaultdict(int)
for user, item_time_list in tqdm(user_item_time_dict.items()):
# 在基于商品的协同过滤优化的时候可以考虑时间因素
for loc1, (i, i_click_time) in enumerate(item_time_list):
item_cnt[i] += 1
i2i_sim.setdefault(i, {})
for loc2, (j, j_click_time) in enumerate(item_time_list):
if(i == j):
continue
# 考虑文章的正向顺序点击和反向顺序点击
loc_alpha = 1.0 if loc2 > loc1 else 0.7
# 位置信息权重,其中的参数可以调节
loc_weight = loc_alpha * (0.9 ** (np.abs(loc2 - loc1) - 1))
# 点击时间权重,其中的参数可以调节
click_time_weight = np.exp(0.7 ** np.abs(i_click_time - j_click_time))
# 两篇文章创建时间的权重,其中的参数可以调节
created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
i2i_sim[i].setdefault(j, 0)
# 考虑多种因素的权重计算最终的文章之间的相似度
i2i_sim[i][j] += loc_weight * click_time_weight * created_time_weight / math.log(len(item_time_list) + 1)
i2i_sim_ = i2i_sim.copy()
for i, related_items in i2i_sim.items():
for j, wij in related_items.items():
i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])
# 将得到的相似性矩阵保存到本地
pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))
return i2i_sim_
i2i_sim = itemcf_sim(all_click_df, item_created_time_dict)
usercf u2u_sim
在计算用户之间的相似度的时候,也可以使用一些简单的关联规则,比如用户活跃度权重,这里将用户的点击次数作为用户活跃度的指标
def get_user_activate_degree_dict(all_click_df):
all_click_df_ = all_click_df.groupby('user_id')['click_article_id'].count().reset_index()
# 用户活跃度归一化
mm = MinMaxScaler()
all_click_df_['click_article_id'] = mm.fit_transform(all_click_df_[['click_article_id']])
user_activate_degree_dict = dict(zip(all_click_df_['user_id'], all_click_df_['click_article_id']))
return user_activate_degree_dict
def usercf_sim(all_click_df, user_activate_degree_dict):
"""
用户相似性矩阵计算
:param all_click_df: 数据表
:param user_activate_degree_dict: 用户活跃度的字典
return 用户相似性矩阵
思路: 基于用户的协同过滤(详细请参考上一期推荐系统基础的组队学习) + 关联规则
"""
item_user_time_dict = get_item_user_time_dict(all_click_df)
u2u_sim = {}
user_cnt = defaultdict(int)
for item, user_time_list in tqdm(item_user_time_dict.items()):
for u, click_time in user_time_list:
user_cnt[u] += 1
u2u_sim.setdefault(u, {})
for v, click_time in user_time_list:
u2u_sim[u].setdefault(v, 0)
if u == v:
continue
# 用户平均活跃度作为活跃度的权重,这里的式子也可以改善
activate_weight = 100 * 0.5 * (user_activate_degree_dict[u] + user_activate_degree_dict[v])
u2u_sim[u][v] += activate_weight / math.log(len(user_time_list) + 1)
u2u_sim_ = u2u_sim.copy()
for u, related_users in u2u_sim.items():
for v, wij in related_users.items():
u2u_sim_[u][v] = wij / math.sqrt(user_cnt[u] * user_cnt[v])
# 将得到的相似性矩阵保存到本地
pickle.dump(u2u_sim_, open(save_path + 'usercf_u2u_sim.pkl', 'wb'))
return u2u_sim_
# 由于usercf计算时候太耗费内存了,这里就不直接运行了
# 如果是采样的话,是可以运行的
# user_activate_degree_dict = get_user_activate_degree_dict(all_click_df)
# u2u_sim = usercf_sim(all_click_df, user_activate_degree_dict)
item embedding sim
使用Embedding计算item之间的相似度是为了后续冷启动的时候可以获取未出现在点击数据中的文章,后面有对冷启动专门的介绍,这里简单的说一下faiss。
faiss是Facebook的AI团队开源的一套用于做聚类或者相似性搜索的软件库,底层是用C++实现。Faiss因为超级优越的性能,被广泛应用于推荐相关的业务当中.
faiss工具包一般使用在推荐系统中的向量召回部分。在做向量召回的时候要么是u2u,u2i或者i2i,这里的u和i指的是user和item.我们知道在实际的场景中user和item的数量都是海量的,我们最容易想到的基于向量相似度的召回就是使用两层循环遍历user列表或者item列表计算两个向量的相似度,但是这样做在面对海量数据是不切实际的,faiss就是用来加速计算某个查询向量最相似的topk个索引向量。
# 向量检索相似度计算
# topk指的是每个item, faiss搜索后返回最相似的topk个item
def embdding_sim(click_df, item_emb_df, save_path, topk):
"""
基于内容的文章embedding相似性矩阵计算
:param click_df: 数据表
:param item_emb_df: 文章的embedding
:param save_path: 保存路径
:patam topk: 找最相似的topk篇
return 文章相似性矩阵
思路: 对于每一篇文章, 基于embedding的相似性返回topk个与其最相似的文章, 只不过由于文章数量太多,这里用了faiss进行加速
"""
# 文章索引与文章id的字典映射
item_idx_2_rawid_dict = dict(zip(item_emb_df.index, item_emb_df['article_id']))
item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32)
# 向量进行单位化
item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
# 建立faiss索引
item_index = faiss.IndexFlatIP(item_emb_np.shape[1])
item_index.add(item_emb_np)
# 相似度查询,给每个索引位置上的向量返回topk个item以及相似度
sim, idx = item_index.search(item_emb_np, topk) # 返回的是列表
# 将向量检索的结果保存成原始id的对应关系
item_sim_dict = collections.defaultdict(dict)
for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(item_emb_np)), sim, idx)):
target_raw_id = item_idx_2_rawid_dict[target_idx]
# 从1开始是为了去掉商品本身, 所以最终获得的相似商品只有topk-1
for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]):
rele_raw_id = item_idx_2_rawid_dict[rele_idx]
item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
# 保存i2i相似度矩阵
pickle.dump(item_sim_dict, open(save_path + 'emb_i2i_sim.pkl', 'wb'))
return item_sim_dict
item_emb_df = pd.read_csv(data_path + '/articles_emb.csv')
emb_i2i_sim = embdding_sim(all_click_df, item_emb_df, save_path, topk=10) # topk可以自行设置
faiss查询的原理:faiss使用了PCA和PQ(Product quantization乘积量化)两种技术进行向量压缩和编码,当然还使用了其他的技术进行优化,但是PCA和PQ是其中最核心部分。
-
PCA降维算法细节参考下面这个链接进行学习主成分分析(PCA)原理总结
-
PQ编码的细节下面这个链接进行学习实例理解product quantization算法
faiss使用 faiss官方教程
以上是关于推荐系统实战Task3笔记多路召回上的主要内容,如果未能解决你的问题,请参考以下文章
推荐系统[六]:混排算法简介研究现状混排技术以及MDP-DOTA信息流第三代混排调控框架,高质量项目实战。
推荐系统[二]:召回算法超详细讲解[召回模型演化过程召回模型主流常见算法(DeepMF/TDM/Airbnb Embedding/Item2vec等)召回路径简介多路召回融合]
推荐系统[八]算法实践总结V1:淘宝逛逛and阿里飞猪个性化推荐:召回算法实践总结冷启动召回复购召回用户行为召回等算法实战
推荐系统[七]:推荐系统通用技术架构(Netfilx等)API服务接口