替换 RDD 的一些元素
Posted
技术标签:
【中文标题】替换 RDD 的一些元素【英文标题】:Replace some elements of an RDD 【发布时间】:2016-02-11 04:26:12 【问题描述】:我有这个文件,其中包含 A、B、C 三种类型的数据。我只想根据某些条件用其他 RDD(称为 RDD2)的元素替换此 RDD 的 A 类元素(称为 RDD1)。RDD1 和 RDD2 有一些共同的字符串。
文件结构[RDD[String]]
1 A 2
2 B 12 13 4
2 C 67 29
2 A 5
RDD2 [RDD[行]]
1 A 2 5 6
2 A 5 7 8
我正在过滤第一个 RDD 并向其附加一些字符串以创建 RDD2。
我知道 RDD 是不可变的,但由于 map 函数不接受参数。我想知道是否有办法实现这一点。
编辑:(考虑到下面的 cmets)
样本输出 RDD[Any]
1 A 2 5 6
2 B 12 13 4
2 C 67 29
2 A 5 7 8
【问题讨论】:
你能给出这个示例输出吗? @MukrramRahman '1 A 2 5 6' '2 B 12 13 4' '2 C 67 29' '2 A 5 7 8' 你的 RDD 的类型是什么,我的意思是它是 RDD[Row] 或 RDD[String] 还是别的什么? RDD1 属于 RDD[string] 类型,RDD2 属于 RDD[row] 类型,我想将输出 RDD 导出到文本文件中。我认为一旦我们有了最终 RDD(任何类型)。 目前我能想到的唯一解决方案是对 RDD2 和 RDD3 执行联合操作(过滤掉 B 和 C 类型)。 【参考方案1】:您应该考虑在您的 RDD 中使用更合适和更明确的数据结构,例如 (key, value) 对的 RDD。
然后您可以利用密钥在 RDD1 和 RDD2 之间执行“a la SQL”连接。我相信这就是Gabber 上面已经在做的事情,但是使用了 Scala 语法糖的全部功能。
以更明确的方式: 你的初始RDD,如Gabber:
val rdd1 = sc.parallelize(Seq(List("1", "A", "2"), List("2", "B", "12", "13", "4"), List("2", "C", "67", "29"), List("2", "A", "5")))
val rdd2 = sc.parallelize(Seq(List("1", "A", "2", "5", "6"), List("2", "A", "5", "7", "8")))
然后使用映射创建(键,值)对的RDD,其中键将用于满足您的匹配条件(您的键似乎在您的示例中是前两个元素,例如(1,A))
val rdd1KeyValue = rdd1.map(row => ((row(0),row(1)), row)
val rdd2KeyValue = rdd2.map(row => ((row(0),row(1)), row))
现在,由于您要对键为“A”的值执行“连接”,并留下其他不匹配的值,因此这是一个 SQL 左外连接。所以:
val resultRaw = rdd1KeyValue.leftOuterJoin(rdd2KeyValue)
但 resultRaw 现在是这样的:
((2,C),(List(2, C, 67, 29),None))
((1,A),(List(1, A, 2),Some(List(1, A, 2, 5, 6))))
((2,B),(List(2, B, 12, 13, 4),None))
((2,A),(List(2, A, 5),Some(List(2, A, 5, 7, 8))))
因此,要获取最终结果,您需要再次映射以“选择”您需要的内容(._1 运算符是获取 (key,value) 对的第一个值):
val resFinal = result.map(row => row._2._2.getOrElse(row._2._1))
就我而言,最终结果是:
List(1, A, 2, 5, 6)
List(2, B, 12, 13, 4)
List(2, A, 5, 7, 8)
List(2, C, 67, 29)
【讨论】:
【参考方案2】:它对我有用
val rdd1 = sc.parallelize(Seq(List("1", "A", "2"), List("2", "B", "12", "13", "4"), List("2", "C", "67", "29"), List("2", "A", "5")))
val rdd2 = sc.parallelize(Seq(List("1", "A", "2", "5", "6"), List("2", "A", "5", "7", "8")))
rdd1.map(row =>//where row(0), row(1) is your condition
((row(0), row(1)), row)).leftOuterJoin(rdd2.map(row =>
((row(0), row(1)), row))).map(row =>
row._2._2.getOrElse(row._2._1)
).foreach(println)
【讨论】:
以上是关于替换 RDD 的一些元素的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL Dataframes - 如果使用 RDD.collectAsMap() 创建地图,则从 DataFrameNaFunctions 替换函数不起作用