在 Spark Scala 中将 RDD[(String, String, String)] 转换为 RDD[(String, (String, String))]

Posted

技术标签:

【中文标题】在 Spark Scala 中将 RDD[(String, String, String)] 转换为 RDD[(String, (String, String))]【英文标题】:Convert RDD[(String, String, String)] to RDD[(String, (String, String))] in Spark Scala 【发布时间】:2021-09-28 10:02:16 【问题描述】:

有 2 个 rdds ,我正在尝试加入: 当每个 rdd 中有 2 个参数时,它就会加入,但是当我在现有GTINs rdd 中添加一个新参数时,我面临以下错误:

下面是代码:

newGTS.collect()
(00070137115045,00070137115045)
(00799999150451,00799999150451)

existingGTS.collect()
(00799999150451,(00003306-808b-46da-bc7f-419c5ae223a7,2016-10-10 10:23:12.0))
(00016700000653,(00006d79-94ea-4651-be0c-0ce77958cd45,2021-05-31 01:20:39.291))
(00923846453024,(0000704b-b40d-4b9e-b266-f7c66723df0e,null))
(00610074049265,(0000a7a1-587c-4b13-a155-7846df82fdee,2020-03-20 12:16:55.873))
(00034100516079,(0002495f-6084-49dd-aadb-20cd137d9694,null))


val join1 = newGTINs.leftOuterJoin(existingGTINs) mapValues 
      case (gtin, iUUID, createDt) => (iUUID.isEmpty, iUUID.getOrElse(UUID.randomUUID.toString))
    


 error: constructor cannot be instantiated to expected type;
 found   : (T1, T2, T3)
 required: (String, Option[(String, String)])
                 case (gtin, iUUID, createDt) => (iUUID.isEmpty, iUUID.getOrElse(UUID.randomUUID.toString))
                      ^

PS: UUID.randomUUID.toString --> 这个函数是创建一个随机id

【问题讨论】:

【参考方案1】:

我猜测连接中使用的newGTINsexistingGTINs 应该与collects 中显示的newGTSexistingGTS 相同。

由于您的newGTSINs 看起来是RDD[(String, String)]existingGTINSRDD[(String, (String, String))],因此您的newGTINs.leftOuterJoin(existingGTINs) 将是RDD[(String,(String, Option[(String, String)]))]

这意味着您的mapValues 将需要一个函数(String, Option[(String, String)]) => SomeNewType 或作为参数。它也可以接受满足相似类型语义的偏函数。

但是您的 case (gtin, iUUID, createDt) => (iUUID.isEmpty, iUUID.getOrElse(UUID.randomUUID.toString)) 是一个偏函数,对应于类型 (String, String, String) => SomeNewType

注意差异,因此错误。您可以通过提供适当的偏函数来满足 mapValues 要求来解决此问题。

val join1 = 
  newGTINs
    .leftOuterJoin(existingGTINs)
    .mapValues 
      case (gtin, Some(iUUID, createDt)) =>
        (iUUID.isEmpty, iUUID.getOrElse(UUID.randomUUID.toString))
      case (gtin, None) =>
        // what heppens for gtins without matching element in existing one's
        (true, UUID.randomUUID.toString)
    

【讨论】:

以上是关于在 Spark Scala 中将 RDD[(String, String, String)] 转换为 RDD[(String, (String, String))]的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark Scala 中将一行从一个数据集添加到另一个数据集

Spark RDD API(scala)

如何在Scala中将rdd对象转换为数据框

使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集

Spark:scala - 如何将集合从 RDD 转换为另一个 RDD

Scala/Spark 减少类对象的 RDD