替换 RDD 的一些元素

Posted

技术标签:

【中文标题】替换 RDD 的一些元素【英文标题】:Replace some elements of an RDD 【发布时间】:2016-02-11 04:26:12 【问题描述】:

我有这个文件,其中包含 A、B、C 三种类型的数据。我只想根据某些条件用其他 RDD(称为 RDD2)的元素替换此 RDD 的 A 类元素(称为 RDD1)。RDD1 和 RDD2 有一些共同的字符串。

文件结构[RDD[String]]

1 A 2   
2 B 12 13 4
2 C 67 29  
2 A 5

RDD2 [RDD[行]]

1 A 2 5 6
2 A 5 7 8 

我正在过滤第一个 RDD 并向其附加一些字符串以创建 RDD2。

我知道 RDD 是不可变的,但由于 map 函数不接受参数。我想知道是否有办法实现这一点。

编辑:考虑到下面的 cmets

样本输出 RDD[Any]

1 A 2 5 6
2 B 12 13 4
2 C 67 29
2 A 5 7 8

【问题讨论】:

你能给出这个示例输出吗? @MukrramRahman '1 A 2 5 6' '2 B 12 13 4' '2 C 67 29' '2 A 5 7 8' 你的 RDD 的类型是什么,我的意思是它是 RDD[Row] 或 RDD[String] 还是别的什么? RDD1 属于 RDD[string] 类型,RDD2 属于 RDD[row] 类型,我想将输出 RDD 导出到文本文件中。我认为一旦我们有了最终 RDD(任何类型)。 目前我能想到的唯一解决方案是对 RDD2 和 RDD3 执行联合操作(过滤掉 B 和 C 类型)。 【参考方案1】:

您应该考虑在您的 RDD 中使用更合适和更明确的数据结构,例如 (key, value) 对的 RDD。

然后您可以利用密钥在 RDD1 和 RDD2 之间执行“a la SQL”连接。我相信这就是Gabber 上面已经在做的事情,但是使用了 Scala 语法糖的全部功能。

以更明确的方式: 你的初始RDD,如Gabber:

val rdd1 = sc.parallelize(Seq(List("1", "A", "2"), List("2", "B", "12", "13", "4"), List("2", "C", "67", "29"), List("2", "A", "5")))
val rdd2 = sc.parallelize(Seq(List("1", "A", "2", "5", "6"), List("2", "A", "5", "7", "8")))

然后使用映射创建(键,值)对的RDD,其中键将用于满足您的匹配条件(您的键似乎在您的示例中是前两个元素,例如(1,A))

val rdd1KeyValue = rdd1.map(row => ((row(0),row(1)), row)
val rdd2KeyValue = rdd2.map(row => ((row(0),row(1)), row))

现在,由于您要对键为“A”的值执行“连接”,并留下其他不匹配的值,因此这是一个 SQL 左外连接。所以:

val resultRaw = rdd1KeyValue.leftOuterJoin(rdd2KeyValue)

但 resultRaw 现在是这样的:

((2,C),(List(2, C, 67, 29),None))
((1,A),(List(1, A, 2),Some(List(1, A, 2, 5, 6))))
((2,B),(List(2, B, 12, 13, 4),None))
((2,A),(List(2, A, 5),Some(List(2, A, 5, 7, 8))))

因此,要获取最终结果,您需要再次映射以“选择”您需要的内容(._1 运算符是获取 (key,value) 对的第一个值):

val resFinal = result.map(row => row._2._2.getOrElse(row._2._1))

就我而言,最终结果是:

List(1, A, 2, 5, 6)
List(2, B, 12, 13, 4)
List(2, A, 5, 7, 8)
List(2, C, 67, 29)

【讨论】:

【参考方案2】:

它对我有用

val rdd1 = sc.parallelize(Seq(List("1", "A", "2"), List("2", "B", "12", "13", "4"), List("2", "C", "67", "29"), List("2", "A", "5")))
val rdd2 = sc.parallelize(Seq(List("1", "A", "2", "5", "6"), List("2", "A", "5", "7", "8")))
rdd1.map(row =>//where row(0), row(1) is your condition
  ((row(0), row(1)), row)).leftOuterJoin(rdd2.map(row =>
  ((row(0), row(1)), row))).map(row => 
  row._2._2.getOrElse(row._2._1)
).foreach(println)

【讨论】:

以上是关于替换 RDD 的一些元素的主要内容,如果未能解决你的问题,请参考以下文章

数据帧到 RDD[Row] 用空值替换空间

如何替换/删除 PySpark RDD 中的正则表达式?

在pyspark中替换循环到并行进程

Spark SQL Dataframes - 如果使用 RDD.collectAsMap() 创建地图,则从 DataFrameNaFunctions 替换函数不起作用

HTML - 替换(置换)元素和非替换(置换)元素

一些常用css技巧的为什么我所理解的line-height