如何在 Spark Udf 中传递地图?

Posted

技术标签:

【中文标题】如何在 Spark Udf 中传递地图?【英文标题】:How to pass a map in Spark Udf? 【发布时间】:2018-05-16 22:23:33 【问题描述】:

我有一个问题。我有一个带有几列的 spark 数据框,如下所示:

id 颜色 1 红、蓝、黑 2 红、绿 3 蓝色、黄色、绿色 ...

我还有一个地图文件,看起来像: 红色,0 蓝色,1 绿色,2 黑色,3 黄色,4

我需要做的是将颜色名称映射成不同的id,比如将“红、蓝、黑”映射成[1,1,0,1,0]的数组。 我这样写代码:

def mapColor(label_string:String):Array[Int]=
var labels = label_string.split(",")
var index_array = new Array[Int](COLOR_LENGTH)
for (label<-labels)
  if(COLOR_MAP.contains(label))
    index_array(COLOR_MAP(label))=1
  
  else
    //dictionary does not contain the label, the last index set to be one
    index_array(COLOR_LENGTH-1)=1
  

index_array 

COLOR_LENGTH 是字典的长度,COLOR_MAP 是包含字符串->id 关系的字典。

我这样调用这个函数:

 val color_function = udf(mapColor:(String)=>Array[Int])
 sql.withColumn("color_idx",color_function(col("Color")))

由于我有多个列需要这个操作,但不同的列需要不同的字典。目前,我为每一列复制了这个函数(只需更改字典和长度信息)。但是代码看起来很乏味。有没有什么方法,可以把长度和字典传给映射函数,比如

def map(label_string:String,map:Map[String,Integer],len:Int):Array[Int] 

但是我应该如何在 spark 数据框中调用这个函数呢?由于我无法在声明中传递参数

val color_function = udf(mapColor:(String)=>Array[Int])

【问题讨论】:

【参考方案1】:

您可以使用颜色映射附带的 UDF 作为基本参数,如下例所示:

val df = Seq(
  (1, "Red, Blue, Black"),
  (2, "Red, Green"),
  (3, "Blue, Yellow, Green")
).toDF("id", "color")

val colorMap = Map("Red"-> 0, "Blue"->1, "Green"->2, "Black"->3, "Yellow"->4)

def mapColorCode(m: Map[String, Int]) = udf( (s: String) =>
  s.split("""\s*,\s*""").map(c => m.getOrElse(c, -99))
)

df.select($"id", mapColorCode(colorMap)($"color").as("colorcode")).show
// +---+----------+
// | id| colorcode|
// +---+----------+
// |  1| [0, 1, 3]|
// |  2|    [0, 2]|
// |  3| [1, 4, 2]|
// +---+----------+

【讨论】:

关于如何处理这个 UDF 中的地图的任何建议 ***.com/questions/63935600/…【参考方案2】:

这里是简洁的完整代码 -

val colrMapList = List("Red" -> 0, "Blue" -> 1, "Green" -> 2).toMap

def getColor = udf((colors: Seq[String]) =>  if(!colors.isEmpty) colors.map(color => colrMapList.getOrElse(color,"0")).mkString(",") else "0"   )

val colors = List((1, Array("Red","Blue","Black")),(2,Array("Red", "Green")))
val colrDF = sc.parallelize(colors).toDF

colrDF.withColumn("colorMap", getColor($"colors")).show

说明

    创建一个map 用于颜色到整数的映射。 getColor 函数提取相应的整数给定颜色 最后你应用colrDF的函数得到输出

【讨论】:

虽然这看起来像是一个解决方案,但如果你能在其中添加一些单词会很棒。 我可以知道拒绝投票答案的原因吗?是因为没有解释就发布了答案吗?

以上是关于如何在 Spark Udf 中传递地图?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Spark Dataframe 列的每个值作为字符串传递给 python UDF?

Pyspark 和使用 UDF:如何将 Python 参数(sys.argv、argparse)传递给 Python Worker?

如何在spark shell中注册Java SPark UDF?

如何在 Spark 中使用 Timestamp/Date 类型的参数创建 UDF

如何将复杂的外部变量(例如映射值)从 Spark 与 Java 中的驱动程序传递给 UDF?

Spark UDF:如何在每一行上编写一个 UDF 以提取嵌套结构中的特定值?