使用 Map 替换 Spark 中的列值

Posted

技术标签:

【中文标题】使用 Map 替换 Spark 中的列值【英文标题】:Use Map to replace column values in Spark 【发布时间】:2019-06-12 08:29:19 【问题描述】:

我必须将列列表映射到 Spark 数据集中的另一列:想想这样的事情

val translationMap: Map[Column, Column] = Map(
  lit("foo") -> lit("bar"),
  lit("baz") -> lit("bab")
)

我有一个像这样的数据框:

val df = Seq("foo", "baz").toDF("mov")

所以我打算这样翻译:

df.select(
  col("mov"),
  translationMap(col("mov"))
)

但是这段代码吐出以下错误

key not found: movs
java.util.NoSuchElementException: key not found: movs

有没有办法在不连接数百个whens 的情况下执行这种翻译?认为translationMap 可能有很多键值对。

【问题讨论】:

【参考方案1】:

您应该使用包含地图文字的 Column 而不是 Map[Column, Column]

import org.apache.spark.sql.functions.typedLit

val translationMap: Column = typedLit(Map(
  "foo" -> "bar",
  "baz" -> "bab"
))

您的其余代码可以保持原样:

df.select(
  col("mov"),
  translationMap(col("mov"))
).show
+---+---------------------------------------+
|mov|keys: [foo,baz], values: [bar,bab][mov]|
+---+---------------------------------------+
|foo|                                    bar|
|baz|                                    bab|
+---+---------------------------------------+

【讨论】:

顺便问一下,这如何处理未知密钥的情况?例如,如果我将值不同于foobaz 的列输入translationMap?理想情况下,我想返回未修改的未知值 @mrbolichi, coalesce(translationMap(col("mov")), col("mov")) 会按照你想要的方式处理不匹配的键。【参考方案2】:

您不能像这样在分布式数据帧中引用在驱动程序上声明的 Scala 集合。另一种方法是使用 UDF,如果你有一个大型数据集,那么它的性能效率将不高,因为 Spark 没有优化 UDF。

val translationMap = Map( "foo" -> "bar" , "baz" -> "bab" )
val getTranslationValue = udf ((x: String)=>translationMap.getOrElse(x,null.asInstanceOf[String]) )
df.select(col("mov"), getTranslationValue($"mov").as("value")  ).show

//+---+-----+
//|mov|value|
//+---+-----+
//|foo|  bar|
//|baz|  bab|
//+---+-----+

另一种解决方案是将Map 加载为DataSet[(String, String)],然后以mov 为键加入两个数据集。

【讨论】:

如果可能,由于您提到的性能问题,我宁愿避免使用udfs。此外,这将是一个非常常见的操作,所以我想避免加入,因为有一个函数会返回一个列以将其插入我的selects @philantrovert 我使用的是相同的,但集群translationMap 为空,如何处理?是否可以将 translationMap 作为参数发送到 UDF 中? ***.com/questions/63935600/…

以上是关于使用 Map 替换 Spark 中的列值的主要内容,如果未能解决你的问题,请参考以下文章

匹配 Map 中的键时替换列值

使用库替换python中的列值

如何将具有嵌套StructType的列转换为Spark SQL中的类实例?

如何遍历 spark 数据集并更新 Java 中的列值?

删除pandas数据帧中的重复项后,替换特定的列值

用另一个表中的列值替换列的空值