Spark - 寻找重叠值或寻找共同朋友的变体
Posted
技术标签:
【中文标题】Spark - 寻找重叠值或寻找共同朋友的变体【英文标题】:Spark - Finding overlapping values Or A variation of finding mutual friends 【发布时间】:2016-06-01 20:06:50 【问题描述】:我有一个问题,我正在尝试使用 Spark 解决。我对 Spark 还很陌生,所以我不确定设计它的最佳方法是什么。
输入:
group1=user1,user2
group2=user1,user2,user3
group3=user2,user4
group4=user1,user4
group5=user3,user5
group6=user3,user4,user5
group7=user2,user4
group8=user1,user5
group9=user2,user4,user5
group10=user4,user5
我想查找每对用户之间的相互组数。所以对于上面的输入,我期望的输出是:
输出:
1st user || 2nd user || mutual/intersection count || union count
------------------------------------------------------------
user1 user2 2 7
user1 user3 1 6
user1 user4 1 9
user2 user4 3 8
我认为有几种方法可以解决这个问题,其中一种解决方案可能是:
创建一个键值对,其中键是用户,值是组 按键分组,然后我们将有一个用户所属的组列表 然后找到两个组之间的交集/联合例子:
(1st stage): Map
group1=user1,user2 ==>
user1, group1
user2, group1
group2=user1,user2,user3 ==>
user1, group2
user2, group2
user3, group2
....
....
....
(2nd stage): Reduce by key
user1 -> group1, group2, group4, group8
user2 -> group1, group2, group3, group7, group9
但我的问题是,在我按键减少计数后,以我想要的方式表示计数的最佳方式是什么?
有没有更好的方法来处理这个问题?最大用户数是恒定的,不会超过 5000,所以这是它将创建的最大密钥数。但输入可能包含几行接近 1B 行。我认为这不会是一个问题,如果我错了,请纠正我。
更新:
这是我用我对Spark的一点了解(上个月刚开始学习Spark)解决这个问题的一段代码:
def createPair(line: String): Array[(String, String)] =
val splits = line.split("=")
val kuid = splits(0)
splits(1).split(",").map segment => (segment, kuid)
val input = sc.textFile("input/test.log")
val pair = input.flatMap line => createPair(line)
val pairListDF = pair
.aggregateByKey(scala.collection.mutable.ListBuffer.empty[String])(
(kuidList, kuid) => kuidList += kuid; kuidList ,
(kuidList1, kuidList2) => kuidList1.appendAll(kuidList2); kuidList1 )
.mapValues(_.toList).toDF().select($"_1".alias("user"), $"_2".alias("groups"))
pairListDF.registerTempTable("table")
sqlContext.udf.register("intersectCount", (list1: WrappedArray[String], list2: WrappedArray[String]) => list1.intersect(list2).size)
sqlContext.udf.register("unionCount", (list1: WrappedArray[String], list2: WrappedArray[String]) => list1.union(list2).distinct.size)
val populationDF = sqlContext.sql("SELECT t1.user AS user_first,"
+ "t2.user AS user_second,"
+ "intersectCount(t1.groups, t2.groups) AS intersect_count,"
+ "unionCount(t1.groups, t2.groups) AS union_count"
+ " FROM table t1 INNER JOIN table t2"
+ " ON t1.user < t2.user"
+ " ORDER BY user_first,user_second")
输出:
+----------+-----------+---------------+-----------+
|user_first|user_second|intersect_count|union_count|
+----------+-----------+---------------+-----------+
| user1| user2| 2| 7|
| user1| user3| 1| 6|
| user1| user4| 1| 9|
| user1| user5| 1| 8|
| user2| user3| 1| 7|
| user2| user4| 3| 8|
| user2| user5| 1| 9|
| user3| user4| 1| 8|
| user3| user5| 2| 6|
| user4| user5| 3| 8|
+----------+-----------+---------------+-----------+
希望得到一些关于我的代码和我缺少的东西的反馈。当我刚开始学习 Spark 时,请随时批评我的代码。再次感谢 @axiom 的回答,比我预期的更小更好的解决方案。
【问题讨论】:
【参考方案1】:总结:
获取对数,然后使用
联合(a, b) = count(a) + count(b) - 交点(a, b)
val data = sc.textFile("test")
//optionally data.cache(), depending on size of data.
val pairCounts = data.flatMap(pairs).reduceByKey(_ + _)
val singleCounts = data.flatMap(singles).reduceByKey(_ + _)
val singleCountMap = sc.broadcast(singleCounts.collectAsMap())
val result = pairCounts.mapcase ((user1, user2), intersectionCount) =>(user1, user2, intersectionCount, singleCountMap.value(user1) + singleCountMap.value(user2) - intersectionCount)
详情:
总共有 5000 个用户,2500 万个密钥(每对 1 个)不应该太多。我们可以使用reduceByKey
来计算交叉点数。
地图中的个人计数很容易成为Broadcasted
。
现在众所周知:
Union(user1, user2) = count(user1) + count(user2) - Intersection(user1, user2)
。
前两个计数是从广播的地图中读取的,而我们在配对计数的 rdd 上进行映射。
代码:
//generate ((user1, user2), 1) for pair counts
def pairs(str: String) =
val users = str.split("=")(1).split(",")
val n = users.length
for(i <- 0 until n; j <- i + 1 until n) yield
val pair = if(users(i) < users(j))
(users(i), users(j))
else
(users(j), users(i))
//order of the user in a list shouldn't matter
(pair, 1)
//generate (user, 1), to obtain single counts
def singles(str: String) =
for(user <- str.split("=")(1).split(",")) yield (user, 1)
//read the rdd
scala> val data = sc.textFile("test")
scala> data.collect.map(println)
group1=user1,user2
group2=user1,user2,user3
group3=user2,user4
group4=user1,user4
group5=user3,user5
group6=user3,user4,user5
group7=user2,user4
group8=user1,user5
group9=user2,user4,user5
group10=user4,user5
//get the pair counts
scala> val pairCounts = data.flatMap(pairs).reduceByKey(_ + _)
pairCounts: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[16] at reduceByKey at <console>:25
//just checking
scala> pairCounts.collect.map(println)
((user2,user3),1)
((user1,user3),1)
((user3,user4),1)
((user2,user5),1)
((user1,user5),1)
((user2,user4),3)
((user4,user5),3)
((user1,user4),1)
((user3,user5),2)
((user1,user2),2)
//single counts
scala> val singleCounts = data.flatMap(singles).reduceByKey(_ + _)
singleCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at reduceByKey at <console>:25
scala> singleCounts.collect.map(println)
(user5,5)
(user3,3)
(user1,4)
(user2,5)
(user4,6)
//broadcast single counts
scala> val singleCountMap = sc.broadcast(singleCounts.collectAsMap())
//calculate the results:
最后:
scala> val res = pairCounts.mapcase ((user1, user2), intersectionCount) => (user1, user2, intersectionCount, singleCountMap.value(user1) + singleCountMap.value(user2) - intersectionCount)
res: org.apache.spark.rdd.RDD[(String, String, Int, Int)] = MapPartitionsRDD[23] at map at <console>:33
scala> res.collect.map(println)
(user2,user3,1,7)
(user1,user3,1,6)
(user3,user4,1,8)
(user2,user5,1,9)
(user1,user5,1,8)
(user2,user4,3,8)
(user4,user5,3,8)
(user1,user4,1,9)
(user3,user5,2,6)
(user1,user2,2,7)
注意:
在生成对时,我对元组进行排序,因为我们不希望列表中的用户顺序很重要。
将用户名字符串编码为整数,您可能会获得显着的性能提升。
【讨论】:
以上是关于Spark - 寻找重叠值或寻找共同朋友的变体的主要内容,如果未能解决你的问题,请参考以下文章