图解Spark商品关联分析

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了图解Spark商品关联分析相关的知识,希望对你有一定的参考价值。

SparkSQL粗分析

大致原理

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
//创建SparkContext对象
val c0: SparkConf = new SparkConf().setAppName("a0").setMaster("local")
val sc: SparkContext = new SparkContext(c0)
//创建SparkSession对象
val c1: SparkConf = new SparkConf().setAppName("a1").setMaster("local")
val spark: SparkSession = SparkSession.builder().config(c1).getOrCreate()
//隐式转换支持
import spark.implicits._
//原始子订单表
sc.makeRDD(Seq(
  ("u1", "o1", "g1"),
  ("u1", "o1", "g2"),
  ("u2", "o2", "g1"),
  ("u1", "o3", "g1"),
  ("u1", "o3", "g2"),
  ("u1", "o3", "g3"),
  ("u3", "o4", "g3"),
  ("u4", "o5", "g1"),
  ("u5", "o6", "g4"),
)).toDF("user_id", "order_id", "good_id").createTempView("t0")
//按订单关联
spark.sql(
  """
    |SELECT COLLECT_SET(good_id)good_set FROM t0
    |GROUP BY user_id
    |HAVING SIZE(good_set) > 1
    |""".stripMargin).createTempView("t1")
spark.sql(
  """
    |SELECT good_set,count(good_set)c FROM t1
    |GROUP BY good_set
    |ORDER BY c DESC
    |""".stripMargin).show

SQL计算结果略为粗糙

共现频数模型

大致原理

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
//创建SparkContext对象
val c0: SparkConf = new SparkConf().setAppName("a0").setMaster("local")
val sc: SparkContext = new SparkContext(c0)
//创建SparkSession对象
val c1: SparkConf = new SparkConf().setAppName("a1").setMaster("local")
val spark: SparkSession = SparkSession.builder().config(c1).getOrCreate()
//隐式转换支持
import spark.implicits._
//子订单表(模拟HIVE:SELECT order_id,good_id FROM dwd_order_detail)
sc.makeRDD(Seq(
  ("o1", "g1"),
  ("o1", "g2"),
  ("o2", "g1"),
  ("o3", "g1"),
  ("o3", "g2"),
  ("o3", "g3"),
  ("o4", "g3"),
  ("o5", "g1"),
  ("o6", "g4"),
)).toDF("order_id", "good_id").createTempView("dwd_order_detail")
//按订单关联商品
val df = spark.sql(
  """
    |SELECT COLLECT_SET(good_id) FROM dwd_order_detail
    |GROUP BY order_id
    |""".stripMargin).toDF("items") //下面模型默认输入列名叫items
//共现频数模型
import org.apache.spark.ml.fpm.FPGrowth
val fpGrowth = new FPGrowth().setMinSupport(0).setMinConfidence(0)
val model = fpGrowth.fit(df)
//商品共现频数
model.freqItemsets.show
/*
+------------+----+
|       items|freq|
+------------+----+
|        [g1]|   4|
|        [g2]|   2|
|    [g2, g1]|   2|
|        [g3]|   2|
|    [g3, g2]|   1|
|[g3, g2, g1]|   1|
|    [g3, g1]|   1|
|        [g4]|   1|
+------------+----+
*/
//商品关联规则
model.associationRules.show
/*
+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
|  [g3, g1]|      [g2]|       1.0| 3.0|
|      [g2]|      [g1]|       1.0| 1.5|
|      [g2]|      [g3]|       0.5| 1.5|
|  [g2, g1]|      [g3]|       0.5| 1.5|
|  [g3, g2]|      [g1]|       1.0| 1.5|
|      [g1]|      [g2]|       0.5| 1.5|
|      [g1]|      [g3]|      0.25|0.75|
|      [g3]|      [g2]|       0.5| 1.5|
|      [g3]|      [g1]|       0.5|0.75|
+----------+----------+----------+----+
*/
//预测
model.transform(Seq(
  "g1",
  "g2",
  "g3",
  "g4",
  "g1 g2",
  "g1 g3",
  "g2 g3",
).map(_.split(" ")).toDF("items")).show
/*
+--------+----------+
|   items|prediction|
+--------+----------+
|    [g1]|  [g2, g3]|
|    [g2]|  [g1, g3]|
|    [g3]|  [g2, g1]|
|    [g4]|        []|
|[g1, g2]|      [g3]|
|[g1, g3]|      [g2]|
|[g2, g3]|      [g1]|
+--------+----------+
*/

C o n f i d e n c e g 3 − g 1 = F r e q g 1 , g 3 F r e q g 3 = 1 2 Confidence_{g3-g1}=\\frac{Freq_{g1,g3}}{Freq_{g3}}=\\frac{1}{2} Confidenceg3g1=Freqg3Freqg1,g3=21
C o n f i d e n c e g 1 − g 3 = F r e q g 1 , g 3 F r e q g 1 = 1 4 Confidence_{g1-g3}=\\frac{Freq_{g1,g3}}{Freq_{g1}}=\\frac{1}{4} Confidenceg1g3=Freqg1Freqg1,g3=41

词向量

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
//创建SparkContext对象
val c0: SparkConf = new SparkConf().setAppName("a0").setMaster("local")
val sc: SparkContext = new SparkContext(c0)
//创建SparkSession对象
val c1: SparkConf = new SparkConf().setAppName("a1").setMaster("local")
val spark: SparkSession = SparkSession.builder().config(c1).getOrCreate()
//隐式转换支持
import spark.implicits._
//子订单表(模拟HIVE:SELECT user_id,good_id FROM dwd_order_detail)
sc.makeRDD(Seq(
  ("u1", "g1"),("u1", "g2"),("u1", "g1"),("u1", "g2"),("u1", "g3"),
  ("u2", "g1"),("u2", "g3"),("u2", "g2"),
  ("u3", "g4"),("u3", "g5"),("u3", "g4"),
  ("u4", "g5"),("u4", "g4"),
  ("u5", "g5"),("u5", "g4"),("u5", "g4"),("u5", "g4"),("u5", "g4"),("u5", "g5"),
)).toDF("user_id", "good_id").createTempView("dwd_order_detail")
//按用户关联商品
val df = spark.sql(
  """
    |SELECT COLLECT_LIST(good_id) FROM dwd_order_detail
    |GROUP BY user_id
    |""".stripMargin).toDF("goods") //下面模型默认输入列名叫items
//词向量模型
import org.apache.spark.ml.feature.Word2Vec
val word2Vec = new Word2Vec()
  .setInputCol("goods") //输入列名称
  .setOutputCol("result") //输出列名称
  .setVectorSize(20) //词向量维度
  .setWindowSize(4) //词窗大小
  .setMinCount(0) //最低词频门槛
  .setMaxIter(20) //最大训练迭代次数
val model = word2Vec.fit(df)
//计算余弦相似度来关联商品
val goods = spark.sql(
  "SELECT DISTINCT(good_id)goods FROM dwd_order_detail ORDER BY goods"
).rdd.map(_.getString(0)).collect.iterator
for (good <- goods) {
  println(good)
  model.findSynonyms(good, 4).show(false)
}

以上是关于图解Spark商品关联分析的主要内容,如果未能解决你的问题,请参考以下文章

Apriori商品关联分析

PowerBI:关联商品分析

图解大数据 | 基于Spark RDD的大数据处理分析

图解大数据 | 综合案例-使用Spark分析挖掘音乐专辑数据

商品交叉销售分析-关联分析

商品零售购物篮分析