仅当rdd的第二个字段中存在时,如何从一个字段rdd中选择值

Posted

技术标签:

【中文标题】仅当rdd的第二个字段中存在时,如何从一个字段rdd中选择值【英文标题】:How to select values from one field rdd only if it is present in second field of rdd 【发布时间】:2016-04-19 20:24:36 【问题描述】:

我有一个包含 3 个字段的 rdd,如下所述。

1,2,6
2,4,6
1,4,9
3,4,7
2,3,8

现在,从上面的rdd,我想得到下面的rdd。

2,4,6
3,4,7
2,3,8

生成的 rdd 没有以 1 开头的行,因为 1 不在输入 rdd 的第二个字段中。

【问题讨论】:

您能否提供输入和输出 RDD 的完整类型,并详细说明您希望如何过滤和转换数据的规则。 Field 和 Field2 是字符串,Field3 是整数。我只想要输出中的那些行,其中 Field1 的值在 Field2 中可用。在上面的示例中,2 和 3 在 rdd 的 Field2 中,而 1 在 Field2 中无处 你需要用更好的解释或更好的例子来更新你的问题。 对我来说似乎很清楚。仅当第一个字段的值与任何行的第二个字段的值相同时才选择一行。如果在任何行的第二个字段中都没有找到第一个字段的值,请不要选择它。 【参考方案1】:

好的,如果我理解正确的话,有两种方法:

    将您的RDD 分成两部分,其中第一个 RDD 包含“第二个字段”的唯一值,第二个 RDD 具有“第一个值”作为键。然后将rdds连接在一起。这种方法的缺点是distinctjoin 操作缓慢。

    val r: RDD[(String, String, Int)] = sc.parallelize(Seq(
      ("1", "2", 6),
      ("2", "4", 6),
      ("1", "4", 9),
      ("3", "4", 7),
      ("2", "3", 8)
    ))
    
    val uniqueValues: RDD[(String, Unit)] = r.map(x => x._2 -> ()).distinct
    val r1: RDD[(String, (String, String, Int))] = r.map(x => x._1 -> x)
    
    val result: RDD[(String, String, Int)] = r1.join(uniqueValues).map case (_, (x, _)) => x
    
    result.collect.foreach(println)
    

    如果您的 RDD 相对较小,并且第二个值的 Set 可以完全适合所有节点的内存,那么您可以创建该内存集作为第一步,将其广播到所有节点,然后过滤您的RDD:

    val r: RDD[(String, String, Int)] = sc.parallelize(Seq(
      ("1", "2", 6),
      ("2", "4", 6),
      ("1", "4", 9),
      ("3", "4", 7),
      ("2", "3", 8)
    ))
    
    val uniqueValues = sc.broadcast(r.map(x => x._2).distinct.collect.toSet)
    
    val result: RDD[(String, String, Int)] = r.filter(x => uniqueValues.value.contains(x._1))
    
    result.collect.foreach(println)
    

两个示例输出:

(2,4,6)
(2,3,8)
(3,4,7)

【讨论】:

以上是关于仅当rdd的第二个字段中存在时,如何从一个字段rdd中选择值的主要内容,如果未能解决你的问题,请参考以下文章

如果 PIG 中的第二个字段具有不同的值,如何过滤/删除记录

Django 表单字段验证 - 如何判断操作是插入还是更新?

php中查询mysql select 第一个字段 第二个字段 from 表名。输出语句应

访问最大功能:返回10个字段中的第二个最高值

根据数组中的重复模式更改 SwiftUI 列表中文本字段的第二次出现

11.21