Spark SQL Dataframes - 如果使用 RDD.collectAsMap() 创建地图,则从 DataFrameNaFunctions 替换函数不起作用

Posted

技术标签:

【中文标题】Spark SQL Dataframes - 如果使用 RDD.collectAsMap() 创建地图,则从 DataFrameNaFunctions 替换函数不起作用【英文标题】:Spark SQL Dataframes - replace function from DataFrameNaFunctions does not work if the Map is created with RDD.collectAsMap() 【发布时间】:2021-02-24 06:39:14 【问题描述】:

来自 DataFrameNaFunctions 我正在使用 replace 函数将数据框中的列值替换为 Map 中的值。

地图的键和值以分隔文件的形式提供。这些被读入RDD,然后转换为pair RDD并转换为Map。 例如,月份编号和月份名称的文本文件可作为文件使用,如下所示:

01,January
02,February
03,March
...   ...
...   ...

val mRDD1 = sc.textFile("file:///.../monthlist.txt")

当使用下面给出的 RDD.collect().toMap 将此数据转换为地图时,dataframe.na.replace 函数可以正常工作,我指的是方法一。

val monthMap1= mRDD1.map(_.split(",")).map(line => (line(0),line(1))).collect().toMap
monthMap1: scala.collection.immutable.Map[String,String] = Map(12 -> December, 08 -> August, 09 ->         September, 11 -> November, 05 -> May, 04 -> April, 10 -> October, 03 -> March, 06 -> June, 02 -> February, 07 -> July, 01 -> January)

val df2 = df1.na.replace("monthname", monthMap1)
df2: org.apache.spark.sql.DataFrame = [col1: int, col2: string ... 13 more fields]

但是,当使用 RDD.collectAsMap() 将该数据转换为 Map 时,如下所示,因为它不是 不可变的 Map,所以我调用它时它不起作用方法2。 有没有一种简单的方法可以将此 scala.collection.Map 转换为 scala.collection.immutable.Map 以使其不会出现此错误?

val monthMap2= mRDD1.map(_.split(",")).map(line => (line(0),line(1))).collectAsMap()
monthMap2: scala.collection.Map[String,String] = Map(12 -> December, 09 -> September, 03 -> March, 06 -> June, 11 -> November, 05 -> May, 08 -> August, 02 -> February, 01 -> January, 10 -> October, 04 -> April, 07 -> July)

val df3 = df1.na.replace("monthname", monthMap2)
<console>:30: error: overloaded method value replace with alternatives:
  [T](cols: Seq[String], replacement: scala.collection.immutable.Map[T,T])org.apache.spark.sql.DataFrame <and>
  [T](col: String, replacement: scala.collection.immutable.Map[T,T])org.apache.spark.sql.DataFrame <and>
  [T](cols: Array[String], replacement: java.util.Map[T,T])org.apache.spark.sql.DataFrame <and>
  [T](col: String, replacement: java.util.Map[T,T])org.apache.spark.sql.DataFrame
 cannot be applied to (String, scala.collection.Map[String,String])
       val cdf3 = cdf2.na.replace("monthname", monthMap2)
                          ^

上面提到的方法 1 工作正常。 但是,对于使用方法 2,我想知道将 scala.collection.Map 转换为 scala.collection.immutable.Map 和我还需要导入哪些库。

谢谢

【问题讨论】:

【参考方案1】:

你可以试试这个:

val monthMap2 = mRDD1.map(_.split(",")).map(line => (line(0),line(1))).collectAsMap()

// create an immutable map from monthMap2
val monthMap = collection.immutable.Map(monthMap2.toSeq: _*)

val df3 = df1.na.replace("monthname", monthMap)

方法replace也是一个java map,你也可以这样转换:

import scala.jdk.CollectionConverters._

val df3 = df1.na.replace("monthname", monthMap2.asJava)

【讨论】:

以上是关于Spark SQL Dataframes - 如果使用 RDD.collectAsMap() 创建地图,则从 DataFrameNaFunctions 替换函数不起作用的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL 教程翻译Datasets and DataFrames 概述

如何在 spark dataframes/spark sql 中使用模式读取 json

[Spark2.0]Spark SQL, DataFrames 和Datasets指南

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets 指南

spark知识体系04-SQL,DataFrames,DateSets