pyspark vs scala中的FPgrowth计算关联

Posted

技术标签:

【中文标题】pyspark vs scala中的FPgrowth计算关联【英文标题】:FPgrowth computing association in pyspark vs scala 【发布时间】:2016-10-18 11:57:53 【问题描述】:

使用

http://spark.apache.org/docs/1.6.1/mllib-frequent-pattern-mining.html

Python 代码:

from pyspark.mllib.fpm import FPGrowth
model = FPGrowth.train(dataframe,0.01,10)

斯卡拉:

import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD

val data = sc.textFile("data/mllib/sample_fpgrowth.txt")

val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))

val fpg = new FPGrowth()
  .setMinSupport(0.2)
  .setNumPartitions(10)
val model = fpg.run(transactions)

model.freqItemsets.collect().foreach  itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)


val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach  rule =>
  println(
    rule.antecedent.mkString("[", ",", "]")
      + " => " + rule.consequent .mkString("[", ",", "]")
      + ", " + rule.confidence)

从代码here 可以看出,scala 部分没有最低置信度。

def trainFPGrowthModel(
      data: JavaRDD[java.lang.Iterable[Any]],
      minSupport: Double,
      numPartitions: Int): FPGrowthModel[Any] = 
    val fpg = new FPGrowth()
      .setMinSupport(minSupport)
      .setNumPartitions(numPartitions)

    val model = fpg.run(data.rdd.map(_.asScala.toArray))
    new FPGrowthModelWrapper(model)
  

在pyspark的情况下如何添加minConfidence生成关联规则?可以看到scala有例子,python没有例子。

【问题讨论】:

【参考方案1】:

火花 >= 2.2

有一个DataFrame 基础ml API 提供AssociationRules

from pyspark.ml.fpm import FPGrowth

data = ...

fpm = FPGrowth(minSupport=0.3, minConfidence=0.9).fit(data)
associationRules = fpm.associationRules.

火花

目前 PySpark 不支持提取关联规则(DataFrame 基于 FPGrowth 支持 Python 的 API 正在开发中 SPARK-1450),但我们可以轻松解决这个问题。

首先,您必须安装 SBT(只需转到 the downloads page)并按照您的操作系统的说明进行操作。

接下来,您必须创建一个只有两个文件的简单 Scala 项目:

.
├── AssociationRulesExtractor.scala
└── build.sbt

您可以稍后调整它以关注the established directory structure。

接下来在build.sbt 中添加以下内容(调整 Scala 版本和 Spark 版本以匹配您使用的版本):

name := "fpm"

version := "1.0"

scalaVersion := "2.10.6"

val sparkVersion = "1.6.2"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion
)

并关注AssociationRulesExtractor.scala:

package com.example.fpm

import org.apache.spark.mllib.fpm.AssociationRules.Rule
import org.apache.spark.rdd.RDD

object AssociationRulesExtractor 
  def apply(rdd: RDD[Rule[String]]) = 
    rdd.map(rule => Array(
      rule.confidence, rule.javaAntecedent, rule.javaConsequent
    ))
  

打开你选择的终端模拟器,进入项目根目录调用:

sbt package

它将在目标目录中生成一个jar文件。例如在 Scala 2.10 中它将是:

target/scala-2.10/fpm_2.10-1.0.jar

启动 PySpark shell 或使用 spark-submit 并将生成的 jar 文件的路径传递给 --driver-class-path:

bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar

在非本地模式下:

bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar --jars /path/to/fpm_2.10-1.0.jar

在集群模式下,jar 应该存在于所有节点上。

添加一些方便的包装器:

from pyspark import SparkContext
from pyspark.mllib.fpm import FPGrowthModel
from pyspark.mllib.common import _java2py
from collections import namedtuple


rule = namedtuple("Rule", ["confidence", "antecedent", "consequent"])

def generateAssociationRules(model, minConfidence):
    # Get active context
    sc = SparkContext.getOrCreate()

    # Retrieve extractor object
    extractor = sc._gateway.jvm.com.example.fpm.AssociationRulesExtractor

    # Compute rules
    java_rules = model._java_model.generateAssociationRules(minConfidence)

    # Convert rules to Python RDD
    return _java2py(sc, extractor.apply(java_rules)).map(lambda x:rule(*x))

最后,您可以将这些助手用作函数:

generateAssociationRules(model, 0.9)

或作为一种方法:

FPGrowthModel.generateAssociationRules = generateAssociationRules
model.generateAssociationRules(0.9)

此解决方案依赖于内部 PySpark 方法,因此不能保证它可以在版本之间移植。

【讨论】:

你可以在 PySpark 中使用 Spark 【参考方案2】:

您可以使用 Spark

# model was produced by FPGrowth.train() method
rules = sorted(model._java_model.generateAssociationRules(0.9).collect(), 
    key=lambda x: x.confidence(), reverse=True)
for rule in rules[:200]:
    # rule variable has confidence(), consequent() and antecedent() 
    # methods for individual value access.
    print rule

【讨论】:

以上是关于pyspark vs scala中的FPgrowth计算关联的主要内容,如果未能解决你的问题,请参考以下文章

我如何使用 s & $ 访问 Pyspark 中的变量,就像在 Scala 中一样

PySpark 中的 mkString 等价物是啥?

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

使用 Scala 类作为带有 pyspark 的 UDF

在 scala 中编写 udf 函数并在 pyspark 作业中使用它们

如何在 Scala Spark 项目中使用 PySpark UDF?