Spark 中的并行 FP 增长

Posted

技术标签:

【中文标题】Spark 中的并行 FP 增长【英文标题】:Parallel FP Growth in Spark 【发布时间】:2020-07-30 13:09:22 【问题描述】:

我试图了解 FPTree 类的“添加”和“提取”方法: (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala)。

    “summaries”变量的用途是什么? 组列表在哪里? 我假设它是以下内容,我是否正确:
val numParts = if (numPartitions > 0) numPartitions else data.partitions.length
val partitioner = new HashPartitioner(numParts)
    对于 a,b,c , a,b , b,c 的 3 个交易,所有频繁的交易的“摘要”将包含哪些内容?
def add(t: Iterable[T], count: Long = 1L): FPTree[T] = 
  require(count > 0)
  var curr = root
  curr.count += count
  t.foreach  item =>
    val summary = summaries.getOrElseUpdate(item, new Summary)
    summary.count += count
    val child = curr.children.getOrElseUpdate(item, 
      val newNode = new Node(curr)
      newNode.item = item
      summary.nodes += newNode
      newNode
    )
    child.count += count
    curr = child
  
  this


def extract(
    minCount: Long,
    validateSuffix: T => Boolean = _ => true): Iterator[(List[T], Long)] = 
  summaries.iterator.flatMap  case (item, summary) =>
    if (validateSuffix(item) && summary.count >= minCount) 
      Iterator.single((item :: Nil, summary.count)) ++
        project(item).extract(minCount).map  case (t, c) =>
          (item :: t, c)
        
     else 
      Iterator.empty
    
  

【问题讨论】:

【参考方案1】:

经过一些实验,它非常简单:

1+2) 分区确实是集团代表。 这也是条件交易的计算方式:

  private def genCondTransactions[Item: ClassTag](
      transaction: Array[Item],
      itemToRank: Map[Item, Int],
      partitioner: Partitioner): mutable.Map[Int, Array[Int]] = 
    val output = mutable.Map.empty[Int, Array[Int]]
    // Filter the basket by frequent items pattern and sort their ranks.
    val filtered = transaction.flatMap(itemToRank.get)
    ju.Arrays.sort(filtered)
    val n = filtered.length
    var i = n - 1
    while (i >= 0) 
      val item = filtered(i)
      val part = partitioner.getPartition(item)
      if (!output.contains(part)) 
        output(part) = filtered.slice(0, i + 1)
      
      i -= 1
    
    output
  
    摘要只是一个帮助保存事务中的项目计数 提取/项目将通过使用向上/向下递归和依赖 FP 树(项目)生成 FIS,同时检查摘要是否需要遍历该路径。 节点“a”的摘要将具有 b:2,c:1,节点“a”的子节点是“b”和“c”。

【讨论】:

以上是关于Spark 中的并行 FP 增长的主要内容,如果未能解决你的问题,请参考以下文章

FP增长算法

手推FP-growth (频繁模式增长)算法------挖掘频繁项集

Spark join 速度呈指数级增长

Spark操作dataFrame进行写入mysql,自定义sql的方式

FP并行算法的几个相关方向

业务增长400%,Uber如何快准稳扩容HDFS集群?