如何按 RDD 中的选定字段数进行分组,以查找基于这些字段的重复项

Posted

技术标签:

【中文标题】如何按 RDD 中的选定字段数进行分组,以查找基于这些字段的重复项【英文标题】:How to group by a select number of fields in an RDD looking for duplicates based on those fields 【发布时间】:2015-07-24 16:50:45 【问题描述】:

我是 Scala 和 Spark 的新手。我在 Spark Shell 中工作。 我需要 Group By 并按此文件的前 三个字段 进行排序,以查找重复项。如果我在组中找到重复项,我需要在 第三个字段 中附加一个计数器,从“1”开始,并以“1”递增,用于重复组中的每条记录。读取新组时将计数器重置回“1”。如果没有找到重复项,则只需附加计数器,即“1”。

CSV 文件 包含以下内容: ("00111","00111651","4444","PY","MA") ("00111","00111651","4444","XX","MA") ("00112","00112P11","5555","TA","MA")

val csv = sc.textFile("file.csv")val recs = csv.map(line => line.split(",")

如果我在上面的示例中正确应用逻辑,则生成的 recs 的 RDD 将如下所示: ("00111","00111651","44441","PY","MA") ("00111","00111651","44442","XX","MA") ("00112","00112P11","55551","TA","MA")

【问题讨论】:

这有点像“请写我的代码”。你试过什么?你考虑过什么方法? 再次。这不是“写我的代码”。我是 SCALA 的新手。我能够使用以下方法识别重复项: var addrDupes = loadAddresses.map(a => (a.field1,a.field2,a.field3)).countByValue.toList 只是不知道如何更改字段 这不是指责,但没有任何代码的问题陈述往往会被这样解释。不管怎样,你现在有两个答案,一个来自我。 下次我会记得包含我的代码。然而,我的实际代码包括 300 个 Java 类和我正在处理的更复杂的数据结构。我发布的示例是我在不包括所有其他内容的情况下尝试做的最基本的方面。 【参考方案1】:

如何将数据分组,更改并放回:

val csv = sc.parallelize(List(
  "00111,00111651,4444,PY,MA",
  "00111,00111651,4444,XX,MA",
  "00112,00112P11,5555,TA,MA"
))
val recs = csv.map(_.split(","))
val grouped = recs.groupBy(line=>(line(0),line(1), line(2)))
val numbered = grouped.mapValues(dataList=>
      dataList.zipWithIndex.mapcase(data, idx) => data match 
          case Array(fst,scd,thd,rest@_*) => Array(fst,scd,thd+(idx+1)) ++ rest
      
    )
numbered.flatMapcase (key, values)=>values

【讨论】:

我认为(但我不确定)它的前 3 个字段需要是唯一的,而不仅仅是第三个。 OP,请确认 没错。前三个字段必须是唯一的。贾斯汀的解决方案行不通。谢谢。 是的,看起来不错。我们现在的结果几乎相同,只是在风格上略有不同【参考方案2】:

还对数据进行分组、更改、放回。

 val lists= List(("00111","00111651","4444","PY","MA"),
("00111","00111651","4444","XX","MA"),
("00112","00112P11","5555","TA","MA"))

val grouped = lists.groupBycase(a,b,c,d,e) => (a,b,c)
val indexed = grouped.mapValues(
               _.zipWithIndex
                .map case ((a,b,c,d,e), idx) => (a,b,c + (idx+1).toString,d,e)

val unwrapped = indexed.flatMap(_._2) 
//List((00112,00112P11,55551,TA,MA),
//     (00111,00111651,44442,XX ,MA),
//     (00111,00111651,44441,PY,MA))

适用于数组的版本(任意长度 >= 3)

val lists= List(Array("00111","00111651","4444","PY","MA"),
Array("00111","00111651","4444","XX","MA"),
Array("00112","00112P11","5555","TA","MA"))
val grouped = lists.groupBy_.take(3) 
val indexed = grouped.mapValues(
      _.zipWithIndex
       .map case (Array(a,b,c, rest@_*), idx) => Array(a,b,c+ (idx+1).toString) ++ rest)

val unwrapped = indexed.flatMap(_._2)  
// List(Array(00112, 00112P11, 55551, TA, MA),
//      Array(00111, 00111651, 44441, XX, MA),
//      Array(00111, 00111651, 44441, PY, MA))

【讨论】:

谢谢保罗!这行得通。只需使其适用于无限数量的字段。再次感谢。 如果您使用 split 等数组(如示例代码中)而不是元组(如示例列表中),那应该很简单。也简化了一点,我把事情弄得太复杂了 又简化了。向@JustinPihony 致敬 zipWithIndex 理念 添加了数组的版本 投反对票,因为这与我发布的完全相同。你只调整了分组是什么......

以上是关于如何按 RDD 中的选定字段数进行分组,以查找基于这些字段的重复项的主要内容,如果未能解决你的问题,请参考以下文章

如何按 ID 分组并查找日期中的空白以确定 Alteryx 中的开始和结束日期?

如何在 Oracle SQL 上进行查询以获取时间间隔,按特定字段分组

使用 Scala 根据 RDD 中的多个键列对值进行分组的最快方法是啥? [复制]

按 Unix 时间戳字段的月份分组

如何从单元素集到双元素集对 rdd-pyspark 中的元素进行分组

sql中如何按某字段值的首字母分组?