02_基于用户点击推荐
Posted 梦如汐
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了02_基于用户点击推荐相关的知识,希望对你有一定的参考价值。
在推荐之前我们先获取用户的点击行为数据
完整的数据格式如下
“user_info”:“vid”:“CtJvamG6rxFTomeTA16oAg==”,“user_id”:“”,“session_id”:“I1w6cWv2nLv8DEe9T56x”,“device”:“site_info”:“site”:“ms”,“lang”:“en”,“visit_time”:“time”:1677740262466,“timezone”:-8,“event_body”:“event_type”:“click”,“event_name”:“click”,“event_data”:“event_entity_info”:“link_type”:“click”,“item_code”:“618769781”,“link_url”:“”,“link_title”:“”,“resource_id”:“”,“spm_link”:“msen.select.productdetail.buynow.KWQhMbMg5c8gWmIVGJXu”,“event_time”:1677740262465
数据源过滤筛选点击事件数据
val stream_click = streamEnv.addSource(kafka_click).setParallelism(1)
.map(x =>
val str = JSON.parseObject(x.substring(x.indexOf(""), x.lastIndexOf("") + 1))
val user_info = str.getJSONObject("user_info")
val event_body = str.getJSONObject("event_body")
val event_type: String = event_body.getString("event_type")
val event_name: String = event_body.getString("event_name")
(event_type, event_name, event_body.getJSONObject("event_data").getJSONObject("event_entity_info"), user_info.getString("user_id"))
)
val splitDataStream = stream_click.filter(line =>
line._1.equalsIgnoreCase("click") && line._2.equalsIgnoreCase("click") && line._3.getString("item_code") != "" && line._3.getString("link_title") != "" && line._4 != ""
)
通过flink的商品标题,调用bert获取商品标题向量
//1. 通过flink传商品标题调取bert接口获取商品的向量IIV
val bert: String = BertMilvusUtil.getVevtorFromRedisAndUpdate(item_code, jedis, bertTitle, bert_url)
val text: JSONObject = JSON.parseObject(bert).getJSONArray("texts").getJSONObject(0)
var IIV: mutable.Buffer[Double] = ArrayDoubleUtil.getArrayDouble(text.getJSONArray("vector"))
获取商品类目对应的向量
val category_bert: String = BertMilvusUtil.getVectortFromBert(str, bert_url)
val category_text: JSONObject = JSON.parseObject(category_bert).getJSONArray("texts").getJSONObject(0)
//获取二三级类目的向量,
val IV: mutable.Buffer[Double] = ArrayDoubleUtil.getArrayDouble(category_text.getJSONArray("vector"))
商品标题向量和类目向量累加
IIV = IIV.zip(IV).map(z => z._1 + (z._2))
从redis获取长期用户向量和点击次数,如果是第一次点击就直接计算用户向量
if (jedis.exists(s"recommend:LUIV:$in._4"))
val long_UIV_T: String = jedis.get(s"recommend:LUIV:$in._4")
val long_UIV_T_Json = JSON.parseObject(long_UIV_T)
val click_t = long_UIV_T_Json.getInteger("click_t")
//重新赋值 long_UIV
long_UIV = long_UIV_T_Json.getJSONArray("long_UIV").toArray.map(x => x.toString.toDouble).toBuffer
T = click_t
val d: Double = 1 - scala.math.pow(beat, T)
//3.计算用户短期兴趣向量 公式算法团队提供
val left = long_UIV.map x => beat * x
val right = IIV.map y => (1 - beat) * y
val c = left.zip(right).map(z => z._1 + (z._2))
long_UIV = c.map x => x / d
//获取不到key的话就用默认值long_UIV = 0, T=1 走catch块
else
val right = IIV.map y => (1 - beat) * y
long_UIV = right
用户长期向量结果保存到redis中,然后根据计算的用户长期向量调优milvus库,得到相似商品推荐数组,并带得分,把推荐的结果(具体的IC存到redis)
//5. Flink根据计算好的短期兴趣从Milvus里获取推荐商品数组,带上得分,
val str: String = BertMilvusUtil.getDistanceFromMilvus(long_UIV, top, ef, milvus_url)
val result: JSONArray = JSON.parseObject(str).getJSONArray("result")
//6. 获取到的milvus吐出来的商品itemcode列表 去关联商品信息redis 的key product_info:itemcode拉宽,带上类目和价格
val array: JSONArray = result.getJSONArray(0)
val recommend_itemcodes = new util.ArrayList[JSONObject]()
//TODO 距离为1 并且是第0个元素 才要剔除
for (i <- 0 until array.size())
val recommend_itemcode = new JSONObject()
val nObject: JSONObject = JSON.parseObject(array.getJSONObject(i).toString)
val itemcode: String = nObject.getString("id")
//去关联商品信息redis 的key product_info:$itemcode拉宽,带上类目和价格
// val dim_Product_info: util.Map[String, String] = jedis.hgetAll(s"product_info:$itemcode")
if (!flag)
recommend_itemcode.put("itemcode", itemcode)
recommend_itemcode.put("cid2", category_id_2)
recommend_itemcode.put("low_price", low_price)
val score: Double = nObject.getDouble("distance")
//distance范围从[-1, 1]转化到[0, 1]
recommend_itemcode.put("oldScore", ((1 + score) / 2))
recommend_itemcode.put("recall", "long")
//过滤商品本身
if (!itemcode.equals(in._3.getString("item_code")))
recommend_itemcodes.add(recommend_itemcode)
长期兴趣推荐完成
以上是关于02_基于用户点击推荐的主要内容,如果未能解决你的问题,请参考以下文章