Spark:对 RDD 中的高效批量查找

Posted

技术标签:

【中文标题】Spark:对 RDD 中的高效批量查找【英文标题】:Spark: Efficient mass lookup in pair RDD's 【发布时间】:2015-01-16 11:42:36 【问题描述】:

在 Apache Spark 中,我有两个 RDD。第一个 data : RDD[(K,V)] 包含键值形式的数据。第二个pairs : RDD[(K,K)] 包含一组有趣的数据密钥对。

如何有效地构造一个 RDD pairsWithData : RDD[((K,K)),(V,V))],使其包含来自 pairs 的所有元素作为键元组及其对应的值(来自 data)作为值-元组?

数据的一些属性:

data 中的键是唯一的 pairs 中的所有条目都是唯一的 对于pairs 中的所有对(k1,k2),保证k1 <= k2 “对”的大小只是数据大小的常数|pairs| = O(|data|) 当前数据大小(预计会增长):|data| ~ 10^8, |pairs| ~ 10^10

当前尝试

以下是 Scala 中的一些示例代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

// This kind of show the idea, but fails at runtime.
def massPairLookup1(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = 
  keyPairs map case (k1,k2) =>
    val v1 : String = data lookup k1 head;
    val v2 : String = data lookup k2 head;
    ((k1, k2), (v1,v2))
  


// Works but is O(|data|^2)
def massPairLookup2(keyPairs : RDD[(Int, Int)], data : RDD[(Int, String)]) = 
  // Construct all possible pairs of values
  val cartesianData = data cartesian data map case((k1,v1),(k2,v2)) => ((k1,k2),(v1,v2))
  // Select only the values who's keys are in keyPairs
  keyPairs map (_,0) join cartesianData mapValues _._2


// Example function that find pairs of keys
// Runs in O(|data|) in real life, but cannot maintain the values
def relevantPairs(data : RDD[(Int, String)]) = 
  val keys = data map (_._1)
  keys cartesian keys filter case (x,y) => x*y == 12 && x < y


// Example run
val data = sc parallelize(1 to 12) map (x => (x, "Number " + x))
val pairs = relevantPairs(data)
val pairsWithData = massPairLookup2(pairs, data) 


// Print: 
// ((1,12),(Number1,Number12))
// ((2,6),(Number2,Number6))
// ((3,4),(Number3,Number4))
pairsWithData.foreach(println)

尝试 1

首先我尝试在data 上使用lookup 函数,但在执行时会引发运行时错误。似乎 selfPairRDDFunctions 特征中为空。

另外我不确定lookup 的性能。 The documentation 说 如果 RDD 具有已知的分区器,只需搜索键映射到的分区即可有效地完成此操作。 这听起来像 n 查找需要 O(n*|partition|)充其量是时间,我怀疑可以优化。

尝试 2

这种尝试有效,但我创建了|data|^2 对,这会影响性能。我不希望 Spark 能够优化它。

【问题讨论】:

【参考方案1】:

您的查找 1 不起作用,因为您无法在工作人员(在另一个转换内)执行 RDD 转换。

在查找2中,我认为没有必要执行全笛卡尔...

你可以这样做:

val firstjoin = pairs.map(case (k1,k2) => (k1, (k1,k2)))
    .join(data)
    .map(case (_, ((k1, k2), v1)) => ((k1, k2), v1))
val result = firstjoin.map(case ((k1,k2),v1) => (k2, ((k1,k2),v1)))
    .join(data)
    .map(case(_, (((k1,k2), v1), v2))=>((k1, k2), (v1, v2)))

或者以更密集的形式:

    val firstjoin = pairs.map(x => (x._1, x)).join(data).map(_._2)
    val result = firstjoin.map(case (x,y) => (x._2, (x,y)))
        .join(data).map(case(x, (y, z))=>(y._1, (y._2, z)))

我认为你不能更有效地做到这一点,但我可能错了......

【讨论】:

效果很好。我做了一个快速的本地基准测试。对于|pairs| = 144 244|data|=10 000,运行时间为 2.91 秒,massPairLookup2 为 980.45 秒。见the code at this gist 我稍微修改了代码。我认为这样更容易阅读。你能检查一下以确保我没有搞砸吗? 你是对的,它不是很可读。谢谢你。但我认为你在那里犯了一个错误......我会修复它。

以上是关于Spark:对 RDD 中的高效批量查找的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 pyspark 从 Spark 获取批量行

深入探究Spark -- RDD详解

《spark常见调优》

flink和spark对比

高效批量删除 couchDB 中的文档

高效跑批设计思路——针对系统中的批量日终任务