02_基于用户点击推荐

Posted 梦如汐

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了02_基于用户点击推荐相关的知识,希望对你有一定的参考价值。

在推荐之前我们先获取用户的点击行为数据
完整的数据格式如下
“user_info”:“vid”:“CtJvamG6rxFTomeTA16oAg==”,“user_id”:“”,“session_id”:“I1w6cWv2nLv8DEe9T56x”,“device”:“site_info”:“site”:“ms”,“lang”:“en”,“visit_time”:“time”:1677740262466,“timezone”:-8,“event_body”:“event_type”:“click”,“event_name”:“click”,“event_data”:“event_entity_info”:“link_type”:“click”,“item_code”:“618769781”,“link_url”:“”,“link_title”:“”,“resource_id”:“”,“spm_link”:“msen.select.productdetail.buynow.KWQhMbMg5c8gWmIVGJXu”,“event_time”:1677740262465
数据源过滤筛选点击事件数据

 val stream_click = streamEnv.addSource(kafka_click).setParallelism(1)
        .map(x => 
      val str = JSON.parseObject(x.substring(x.indexOf(""), x.lastIndexOf("") + 1))
      val user_info = str.getJSONObject("user_info")
      val event_body = str.getJSONObject("event_body")
      val event_type: String = event_body.getString("event_type")
      val event_name: String = event_body.getString("event_name")
      (event_type, event_name, event_body.getJSONObject("event_data").getJSONObject("event_entity_info"), user_info.getString("user_id"))
    )

    val splitDataStream = stream_click.filter(line => 
      line._1.equalsIgnoreCase("click") && line._2.equalsIgnoreCase("click") && line._3.getString("item_code") != "" && line._3.getString("link_title") != "" && line._4 != ""
    )

通过flink的商品标题,调用bert获取商品标题向量

 //1.  通过flink传商品标题调取bert接口获取商品的向量IIV
   val bert: String = BertMilvusUtil.getVevtorFromRedisAndUpdate(item_code, jedis, bertTitle, bert_url)
   val text: JSONObject = JSON.parseObject(bert).getJSONArray("texts").getJSONObject(0)
   var IIV: mutable.Buffer[Double] = ArrayDoubleUtil.getArrayDouble(text.getJSONArray("vector"))

获取商品类目对应的向量

   val category_bert: String = BertMilvusUtil.getVectortFromBert(str, bert_url)
   val category_text: JSONObject = JSON.parseObject(category_bert).getJSONArray("texts").getJSONObject(0)
    //获取二三级类目的向量,
   val IV: mutable.Buffer[Double] = ArrayDoubleUtil.getArrayDouble(category_text.getJSONArray("vector"))

商品标题向量和类目向量累加

IIV = IIV.zip(IV).map(z => z._1 + (z._2))

从redis获取长期用户向量和点击次数,如果是第一次点击就直接计算用户向量

          if (jedis.exists(s"recommend:LUIV:$in._4")) 
            val long_UIV_T: String = jedis.get(s"recommend:LUIV:$in._4")
            val long_UIV_T_Json = JSON.parseObject(long_UIV_T)
            val click_t = long_UIV_T_Json.getInteger("click_t")
            //重新赋值 long_UIV
            long_UIV = long_UIV_T_Json.getJSONArray("long_UIV").toArray.map(x => x.toString.toDouble).toBuffer
            T = click_t
            val d: Double = 1 - scala.math.pow(beat, T)
            //3.计算用户短期兴趣向量 公式算法团队提供
            val left = long_UIV.map  x => beat * x 
            val right = IIV.map  y => (1 - beat) * y 
            val c = left.zip(right).map(z => z._1 + (z._2))
            long_UIV = c.map  x => x / d 
            //获取不到key的话就用默认值long_UIV = 0, T=1 走catch块
           else 
            val right = IIV.map  y => (1 - beat) * y 
            long_UIV = right
          

用户长期向量结果保存到redis中,然后根据计算的用户长期向量调优milvus库,得到相似商品推荐数组,并带得分,把推荐的结果(具体的IC存到redis)

//5.   Flink根据计算好的短期兴趣从Milvus里获取推荐商品数组,带上得分,
          val str: String = BertMilvusUtil.getDistanceFromMilvus(long_UIV, top, ef, milvus_url)
          val result: JSONArray = JSON.parseObject(str).getJSONArray("result")
          //6.   获取到的milvus吐出来的商品itemcode列表 去关联商品信息redis 的key product_info:itemcode拉宽,带上类目和价格
          val array: JSONArray = result.getJSONArray(0)
          val recommend_itemcodes = new util.ArrayList[JSONObject]()
          //TODO 距离为1 并且是第0个元素 才要剔除
          for (i <- 0 until array.size()) 
            val recommend_itemcode = new JSONObject()
            val nObject: JSONObject = JSON.parseObject(array.getJSONObject(i).toString)
            val itemcode: String = nObject.getString("id")
            //去关联商品信息redis 的key product_info:$itemcode拉宽,带上类目和价格
//            val dim_Product_info: util.Map[String, String] = jedis.hgetAll(s"product_info:$itemcode")
            if (!flag) 
              recommend_itemcode.put("itemcode", itemcode)
              recommend_itemcode.put("cid2", category_id_2)
              recommend_itemcode.put("low_price", low_price)
              val score: Double = nObject.getDouble("distance")
              //distance范围从[-1, 1]转化到[0, 1]
              recommend_itemcode.put("oldScore", ((1 + score) / 2))
              recommend_itemcode.put("recall", "long")
              //过滤商品本身
              if (!itemcode.equals(in._3.getString("item_code"))) 
                recommend_itemcodes.add(recommend_itemcode)
              
            
          

长期兴趣推荐完成

以上是关于02_基于用户点击推荐的主要内容,如果未能解决你的问题,请参考以下文章

基于 Flink 实现的商品实时推荐系统(附源码)

大数据技术之_24_电影推荐系统项目_04_推荐系统算法详解

基于Flink商品实时推荐系统项目

基于Flink SQL构建实时数据仓库

Flink视频教程_大数据Flink教程下载

Flink视频教程_大数据Flink教程下载