图解Spark商品关联分析
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了图解Spark商品关联分析相关的知识,希望对你有一定的参考价值。
文章目录
SparkSQL粗分析
大致原理
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
//创建SparkContext对象
val c0: SparkConf = new SparkConf().setAppName("a0").setMaster("local")
val sc: SparkContext = new SparkContext(c0)
//创建SparkSession对象
val c1: SparkConf = new SparkConf().setAppName("a1").setMaster("local")
val spark: SparkSession = SparkSession.builder().config(c1).getOrCreate()
//隐式转换支持
import spark.implicits._
//原始子订单表
sc.makeRDD(Seq(
("u1", "o1", "g1"),
("u1", "o1", "g2"),
("u2", "o2", "g1"),
("u1", "o3", "g1"),
("u1", "o3", "g2"),
("u1", "o3", "g3"),
("u3", "o4", "g3"),
("u4", "o5", "g1"),
("u5", "o6", "g4"),
)).toDF("user_id", "order_id", "good_id").createTempView("t0")
//按订单关联
spark.sql(
"""
|SELECT COLLECT_SET(good_id)good_set FROM t0
|GROUP BY user_id
|HAVING SIZE(good_set) > 1
|""".stripMargin).createTempView("t1")
spark.sql(
"""
|SELECT good_set,count(good_set)c FROM t1
|GROUP BY good_set
|ORDER BY c DESC
|""".stripMargin).show
SQL计算结果略为粗糙
共现频数模型
大致原理
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
//创建SparkContext对象
val c0: SparkConf = new SparkConf().setAppName("a0").setMaster("local")
val sc: SparkContext = new SparkContext(c0)
//创建SparkSession对象
val c1: SparkConf = new SparkConf().setAppName("a1").setMaster("local")
val spark: SparkSession = SparkSession.builder().config(c1).getOrCreate()
//隐式转换支持
import spark.implicits._
//子订单表(模拟HIVE:SELECT order_id,good_id FROM dwd_order_detail)
sc.makeRDD(Seq(
("o1", "g1"),
("o1", "g2"),
("o2", "g1"),
("o3", "g1"),
("o3", "g2"),
("o3", "g3"),
("o4", "g3"),
("o5", "g1"),
("o6", "g4"),
)).toDF("order_id", "good_id").createTempView("dwd_order_detail")
//按订单关联商品
val df = spark.sql(
"""
|SELECT COLLECT_SET(good_id) FROM dwd_order_detail
|GROUP BY order_id
|""".stripMargin).toDF("items") //下面模型默认输入列名叫items
//共现频数模型
import org.apache.spark.ml.fpm.FPGrowth
val fpGrowth = new FPGrowth().setMinSupport(0).setMinConfidence(0)
val model = fpGrowth.fit(df)
//商品共现频数
model.freqItemsets.show
/*
+------------+----+
| items|freq|
+------------+----+
| [g1]| 4|
| [g2]| 2|
| [g2, g1]| 2|
| [g3]| 2|
| [g3, g2]| 1|
|[g3, g2, g1]| 1|
| [g3, g1]| 1|
| [g4]| 1|
+------------+----+
*/
//商品关联规则
model.associationRules.show
/*
+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
| [g3, g1]| [g2]| 1.0| 3.0|
| [g2]| [g1]| 1.0| 1.5|
| [g2]| [g3]| 0.5| 1.5|
| [g2, g1]| [g3]| 0.5| 1.5|
| [g3, g2]| [g1]| 1.0| 1.5|
| [g1]| [g2]| 0.5| 1.5|
| [g1]| [g3]| 0.25|0.75|
| [g3]| [g2]| 0.5| 1.5|
| [g3]| [g1]| 0.5|0.75|
+----------+----------+----------+----+
*/
//预测
model.transform(Seq(
"g1",
"g2",
"g3",
"g4",
"g1 g2",
"g1 g3",
"g2 g3",
).map(_.split(" ")).toDF("items")).show
/*
+--------+----------+
| items|prediction|
+--------+----------+
| [g1]| [g2, g3]|
| [g2]| [g1, g3]|
| [g3]| [g2, g1]|
| [g4]| []|
|[g1, g2]| [g3]|
|[g1, g3]| [g2]|
|[g2, g3]| [g1]|
+--------+----------+
*/
C
o
n
f
i
d
e
n
c
e
g
3
−
g
1
=
F
r
e
q
g
1
,
g
3
F
r
e
q
g
3
=
1
2
Confidence_{g3-g1}=\\frac{Freq_{g1,g3}}{Freq_{g3}}=\\frac{1}{2}
Confidenceg3−g1=Freqg3Freqg1,g3=21
C
o
n
f
i
d
e
n
c
e
g
1
−
g
3
=
F
r
e
q
g
1
,
g
3
F
r
e
q
g
1
=
1
4
Confidence_{g1-g3}=\\frac{Freq_{g1,g3}}{Freq_{g1}}=\\frac{1}{4}
Confidenceg1−g3=Freqg1Freqg1,g3=41
词向量
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
//创建SparkContext对象
val c0: SparkConf = new SparkConf().setAppName("a0").setMaster("local")
val sc: SparkContext = new SparkContext(c0)
//创建SparkSession对象
val c1: SparkConf = new SparkConf().setAppName("a1").setMaster("local")
val spark: SparkSession = SparkSession.builder().config(c1).getOrCreate()
//隐式转换支持
import spark.implicits._
//子订单表(模拟HIVE:SELECT user_id,good_id FROM dwd_order_detail)
sc.makeRDD(Seq(
("u1", "g1"),("u1", "g2"),("u1", "g1"),("u1", "g2"),("u1", "g3"),
("u2", "g1"),("u2", "g3"),("u2", "g2"),
("u3", "g4"),("u3", "g5"),("u3", "g4"),
("u4", "g5"),("u4", "g4"),
("u5", "g5"),("u5", "g4"),("u5", "g4"),("u5", "g4"),("u5", "g4"),("u5", "g5"),
)).toDF("user_id", "good_id").createTempView("dwd_order_detail")
//按用户关联商品
val df = spark.sql(
"""
|SELECT COLLECT_LIST(good_id) FROM dwd_order_detail
|GROUP BY user_id
|""".stripMargin).toDF("goods") //下面模型默认输入列名叫items
//词向量模型
import org.apache.spark.ml.feature.Word2Vec
val word2Vec = new Word2Vec()
.setInputCol("goods") //输入列名称
.setOutputCol("result") //输出列名称
.setVectorSize(20) //词向量维度
.setWindowSize(4) //词窗大小
.setMinCount(0) //最低词频门槛
.setMaxIter(20) //最大训练迭代次数
val model = word2Vec.fit(df)
//计算余弦相似度来关联商品
val goods = spark.sql(
"SELECT DISTINCT(good_id)goods FROM dwd_order_detail ORDER BY goods"
).rdd.map(_.getString(0)).collect.iterator
for (good <- goods) {
println(good)
model.findSynonyms(good, 4).show(false)
}
以上是关于图解Spark商品关联分析的主要内容,如果未能解决你的问题,请参考以下文章