Spark 中的并行 FP 增长
Posted
技术标签:
【中文标题】Spark 中的并行 FP 增长【英文标题】:Parallel FP Growth in Spark 【发布时间】:2020-07-30 13:09:22 【问题描述】:我试图了解 FPTree 类的“添加”和“提取”方法: (https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala)。
-
“summaries”变量的用途是什么?
组列表在哪里?
我假设它是以下内容,我是否正确:
val numParts = if (numPartitions > 0) numPartitions else data.partitions.length val partitioner = new HashPartitioner(numParts)
-
对于 a,b,c , a,b , b,c 的 3 个交易,所有频繁的交易的“摘要”将包含哪些内容?
def add(t: Iterable[T], count: Long = 1L): FPTree[T] = require(count > 0) var curr = root curr.count += count t.foreach item => val summary = summaries.getOrElseUpdate(item, new Summary) summary.count += count val child = curr.children.getOrElseUpdate(item, val newNode = new Node(curr) newNode.item = item summary.nodes += newNode newNode ) child.count += count curr = child this def extract( minCount: Long, validateSuffix: T => Boolean = _ => true): Iterator[(List[T], Long)] = summaries.iterator.flatMap case (item, summary) => if (validateSuffix(item) && summary.count >= minCount) Iterator.single((item :: Nil, summary.count)) ++ project(item).extract(minCount).map case (t, c) => (item :: t, c) else Iterator.empty
【问题讨论】:
【参考方案1】:经过一些实验,它非常简单:
1+2) 分区确实是集团代表。 这也是条件交易的计算方式:
private def genCondTransactions[Item: ClassTag](
transaction: Array[Item],
itemToRank: Map[Item, Int],
partitioner: Partitioner): mutable.Map[Int, Array[Int]] =
val output = mutable.Map.empty[Int, Array[Int]]
// Filter the basket by frequent items pattern and sort their ranks.
val filtered = transaction.flatMap(itemToRank.get)
ju.Arrays.sort(filtered)
val n = filtered.length
var i = n - 1
while (i >= 0)
val item = filtered(i)
val part = partitioner.getPartition(item)
if (!output.contains(part))
output(part) = filtered.slice(0, i + 1)
i -= 1
output
-
摘要只是一个帮助保存事务中的项目计数
提取/项目将通过使用向上/向下递归和依赖 FP 树(项目)生成 FIS,同时检查摘要是否需要遍历该路径。
节点“a”的摘要将具有 b:2,c:1,节点“a”的子节点是“b”和“c”。
【讨论】:
以上是关于Spark 中的并行 FP 增长的主要内容,如果未能解决你的问题,请参考以下文章
手推FP-growth (频繁模式增长)算法------挖掘频繁项集