在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列
Posted
技术标签:
【中文标题】在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列【英文标题】:Transform columns in Spark DataFrame based on map without using UDFs 【发布时间】:2020-07-20 09:49:05 【问题描述】:我想根据 Scala 映射表示的配置转换我的数据框中的一些列。
我有两个案例:
-
接收映射
Map[String, Seq[String]]
和 col1、col2 列,如果映射中有 key = col1 的实体,并且 col2 在此实体值列表中,则转换 col3。
接收映射Map[String, (Long, Long)
和col1,col2,转换col3 if
映射中有一个实体,其键 = col1 和 col2 在 Longs 的元组描述为 (start, end) 的范围内。
例子:
案例 1 有这张表,还有一张地图 Map(u1-> Seq(w1,w11), u2 -> Seq(w2,w22))
+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| u1 | w1 | v1 |
+------+------+------+
| u2 | w2 | v2 |
+------+------+------+
| u3 | w3 | v3 |
+------+------+------+
我想在 col3 中添加“x-”前缀,前提是它与术语匹配
+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| u1 | w1 | x-v1 |
+------+------+------+
| u2 | w2 | x-v2 |
+------+------+------+
| u3 | w3 | v3 |
+------+------+------+
案例 2: 这个表和地图 Map("u1" -> (1,5), u2 -> (2, 4))
+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| u1 | 2 | v1 |
+------+------+------+
| u1 | 6 | v11 |
+------+------+------+
| u2 | 3 | v3 |
+------+------+------+
| u3 | 4 | v3 |
+------+------+------+
预期的输出应该是:
+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| u1 | 2 | x-v1 |
+------+------+------+
| u1 | 6 | v11 |
+------+------+------+
| u2 | 3 | x-v3 |
+------+------+------+
| u3 | 4 | v3 |
+------+------+------+
这可以通过 UDF 轻松完成,但出于性能考虑,我不想使用它们。
有没有办法在 Spark 2.4.2 中实现它?
谢谢
【问题讨论】:
您还可以添加示例输入和预期输出吗? & 什么火花版本?? @Srinivas 示例已添加,谢谢 火花版?? @Srinivas Spark 2.4.2 可以吗,如果我把这个Map("u1" -> (1,5), u2 -> (2, 4))
转换成Map("u1" -> Seq(1,5), u2 -> Seq(2, 4))
?
【参考方案1】:
检查下面的代码。
注意 -
我已将您的第二个案例映射值更改为Map("u1" -> Seq(1,5), u2 -> Seq(2, 4))
将映射值转换为json map
,将json映射作为列值添加到DataFrame,然后在DataFrame上应用逻辑。
如果可以的话可以直接在json map里面加值,这样就可以避免map到json map的转换。
导入所需的库。
import org.apache.spark.sql.types._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
案例 1 逻辑
scala> val caseOneDF = Seq(("u1","w1","v1"),("u2","w2","v2"),("u3","w3","v3")).toDF("col1","col2","col3")
caseOneDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 1 more field]
scala> val caseOneMap = Map("u1" -> Seq("w1","w11"),"u2" -> Seq("w2","w22"))
caseOneMap: scala.collection.immutable.Map[String,Seq[String]] = Map(u1 -> List(w1, w11), u2 -> List(w2, w22))
scala> val caseOneJsonMap = lit(compact(render(caseOneMap)))
caseOneJsonMap: org.apache.spark.sql.Column = "u1":["w1","w11"],"u2":["w2","w22"]
scala> val caseOneSchema = MapType(StringType,ArrayType(StringType))
caseOneSchema: org.apache.spark.sql.types.MapType = MapType(StringType,ArrayType(StringType,true),true)
scala> val caseOneExpr = from_json(caseOneJsonMap,caseOneSchema)
caseOneExpr: org.apache.spark.sql.Column = entries
案例 1 最终输出
scala> dfa
.withColumn("data",caseOneExpr)
.withColumn("col3",when(expr("array_contains(data[col1],col2)"),concat(lit("x-"),$"col3")).otherwise($"col3"))
.drop("data")
.show(false)
+----+----+----+
|col1|col2|col3|
+----+----+----+
|u1 |w1 |x-v1|
|u2 |w2 |x-v2|
|u3 |w3 |v3 |
+----+----+----+
案例 2 逻辑
scala> val caseTwoDF = Seq(("u1",2,"v1"),("u1",6,"v11"),("u2",3,"v3"),("u3",4,"v3")).toDF("col1","col2","col3")
caseTwoDF: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 1 more field]
scala> val caseTwoMap = Map("u1" -> Seq(1,5),"u2" -> Seq(2,4))
caseTwoMap: scala.collection.immutable.Map[String,Seq[Int]] = Map(u1 -> List(1, 5), u2 -> List(2, 4))
scala> val caseTwoJsonMap = lit(compact(render(caseTwoMap)))
caseTwoJsonMap: org.apache.spark.sql.Column = "u1":[1,5],"u2":[2,4]
scala> val caseTwoSchema = MapType(StringType,ArrayType(IntegerType))
caseTwoSchema: org.apache.spark.sql.types.MapType = MapType(StringType,ArrayType(IntegerType,true),true)
scala> val caseTwoExpr = from_json(caseTwoJsonMap,caseTwoSchema)
caseTwoExpr: org.apache.spark.sql.Column = entries
案例 2 最终输出
scala> caseTwoDF
.withColumn("data",caseTwoExpr)
.withColumn("col3",when(expr("array_contains(sequence(data[col1][0],data[col1][1]),col2)"), concat(lit("x-"),$"col3")).otherwise($"col3"))
.drop("data")
.show(false)
+----+----+----+
|col1|col2|col3|
+----+----+----+
|u1 |2 |x-v1|
|u1 |6 |v11 |
|u2 |3 |x-v3|
|u3 |4 |v3 |
+----+----+----+
【讨论】:
这对我有用!!!现在我试图说服自己,这实际上会提高我的工作表现。感谢您的精彩建议!!!【参考方案2】:另一种选择-
import org.apache.spark.sql.functions.typedLit
案例一
df1.show(false)
df1.printSchema()
/**
* +----+----+----+
* |col1|col2|col3|
* +----+----+----+
* |u1 |w1 |v1 |
* |u2 |w2 |v2 |
* |u3 |w3 |v3 |
* +----+----+----+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: string (nullable = true)
* |-- col3: string (nullable = true)
*/
val case1 = Map("u1" -> Seq("w1","w11"), "u2" -> Seq("w2","w22"))
val p1 = df1.withColumn("case1", typedLit(case1))
.withColumn("col3",
when(array_contains(expr("case1[col1]"), $"col2"), concat(lit("x-"), $"col3"))
.otherwise($"col3")
)
p1.show(false)
p1.printSchema()
/**
* +----+----+----+----------------------------------+
* |col1|col2|col3|case1 |
* +----+----+----+----------------------------------+
* |u1 |w1 |x-v1|[u1 -> [w1, w11], u2 -> [w2, w22]]|
* |u2 |w2 |x-v2|[u1 -> [w1, w11], u2 -> [w2, w22]]|
* |u3 |w3 |v3 |[u1 -> [w1, w11], u2 -> [w2, w22]]|
* +----+----+----+----------------------------------+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: string (nullable = true)
* |-- col3: string (nullable = true)
* |-- case1: map (nullable = false)
* | |-- key: string
* | |-- value: array (valueContainsNull = true)
* | | |-- element: string (containsNull = true)
*/
案例 2
df2.show(false)
df2.printSchema()
/**
* +----+----+----+
* |col1|col2|col3|
* +----+----+----+
* |u1 |2 |v1 |
* |u1 |6 |v11 |
* |u2 |3 |v3 |
* |u3 |4 |v3 |
* +----+----+----+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: integer (nullable = true)
* |-- col3: string (nullable = true)
*/
val case2 = Map("u1" -> (1,5), "u2" -> (2, 4))
val p = df2.withColumn("case2", typedLit(case2))
.withColumn("col3",
when(expr("col2 between case2[col1]._1 and case2[col1]._2"), concat(lit("x-"), $"col3"))
.otherwise($"col3")
)
p.show(false)
p.printSchema()
/**
* +----+----+----+----------------------------+
* |col1|col2|col3|case2 |
* +----+----+----+----------------------------+
* |u1 |2 |x-v1|[u1 -> [1, 5], u2 -> [2, 4]]|
* |u1 |6 |v11 |[u1 -> [1, 5], u2 -> [2, 4]]|
* |u2 |3 |x-v3|[u1 -> [1, 5], u2 -> [2, 4]]|
* |u3 |4 |v3 |[u1 -> [1, 5], u2 -> [2, 4]]|
* +----+----+----+----------------------------+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: integer (nullable = true)
* |-- col3: string (nullable = true)
* |-- case2: map (nullable = false)
* | |-- key: string
* | |-- value: struct (valueContainsNull = true)
* | | |-- _1: integer (nullable = false)
* | | |-- _2: integer (nullable = false)
*/
【讨论】:
又好又简单的解决方案..学习了新的 typedLit 函数..:)【参考方案3】:scala> caseTwoDF
.withColumn("data",caseTwoExpr)
.withColumn("col3",when(expr("array_contains(sequence(data[col1][0],data[col1][1]),col2)"), concat(lit("x-"),$"col3")).otherwise($"col3"))
.drop("data")
.show(false)
+----+----+----+
|col1|col2|col3|
+----+----+----+
|u1 |2 |x-v1|
|u1 |6 |v11 |
|u2 |3 |x-v3|
|u3 |4 |v3 |
+----+----+----+
【讨论】:
以上是关于在不使用 UDF 的情况下基于映射转换 Spark DataFrame 中的列的主要内容,如果未能解决你的问题,请参考以下文章