使用 Scala 在以 Spark 中的列值为条件的广播 Map 上执行查找

Posted

技术标签:

【中文标题】使用 Scala 在以 Spark 中的列值为条件的广播 Map 上执行查找【英文标题】:Perform lookup on a broadcasted Map conditoned on column value in Spark using Scala 【发布时间】:2019-06-11 16:00:48 【问题描述】:

我想对myMap 执行查找。当col2 值为“0000”时,我想用与col1 键相关的值来更新它。否则我想保留现有的col2 值。

val myDF :

+-----+-----+
|col1 |col2 |
+-----+-----+
|1    |a    | 
|2    |0000 |
|3    |c    |
|4    |0000 |
+-----+-----+

val myMap : Map[String, String] ("2" -> "b", "4" -> "d")
val broadcastMyMap = spark.sparkContext.broadcast(myMap)

def lookup = udf((key:String) => broadcastMyMap.value.get(key))

myDF.withColumn("col2", when ($"col2" === "0000", lookup($"col1")).otherwise($"col2"))

我在 spark-shell 中使用了上面的代码,它工作正常,但是当我构建应用程序 jar 并使用 spark-submit 将其提交给 Spark 时,它会引发错误:

org.apache.spark.SparkException: Failed to execute user defined  function(anonfun$5: (string) => string)

Caused by: java.lang.NullPointerException

有没有办法在不使用 UDF 的情况下执行查找,这在性能方面不是最佳选择,或者修复错误? 我认为我不能只使用 join,因为必须保留的 myDF.col2 的某些值可能会在操作中被替换。

【问题讨论】:

我可以在发布问题时看到缺少作业。它自身的语法是错误的,例如val myMap : Map[String, String] ("2" -> "b", "4" -> "d") 没有= 是什么意思第二件事是如果你想发布这样的准备变量` val mydf = Seq((1, "a"), (2 , "0000"), (3, "c"), (4, "0000")).toDF("col1", "col2") `否则,回答者必须从表中准备样本数据的双重工作。下次照顾所有这些事情。 【参考方案1】:

您的 NullPointerException 无效。我用下面的示例程序证明了这一点。 它工作正常。你执行下面的程序。

package com.example

import org.apache.log4j.Level, Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedFunction


object MapLookupDF 
  Logger.getLogger("org").setLevel(Level.OFF)

  def main(args: Array[String]) 
    import org.apache.spark.sql.functions._

    val spark = SparkSession.builder.
      master("local[*]")
      .appName("MapLookupDF")
      .getOrCreate()
    import spark.implicits._
    val mydf = Seq((1, "a"), (2, "0000"), (3, "c"), (4, "0000")).toDF("col1", "col2")
    mydf.show
    val myMap: Map[String, String] = Map("2" -> "b", "4" -> "d")
    println(myMap.toString)
    val broadcastMyMap = spark.sparkContext.broadcast(myMap)

    def lookup: UserDefinedFunction = udf((key: String) => 
      println("getting the value for the key " + key)
      broadcastMyMap.value.get(key)
    
    )

    val finaldf = mydf.withColumn("col2", when($"col2" === "0000", lookup($"col1")).otherwise($"col2"))
    finaldf.show
  

结果:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+----+----+
|col1|col2|
+----+----+
|   1|   a|
|   2|0000|
|   3|   c|
|   4|0000|
+----+----+

Map(2 -> b, 4 -> d)
getting the value for the key 2
getting the value for the key 4
+----+----+
|col1|col2|
+----+----+
|   1|   a|
|   2|   b|
|   3|   c|
|   4|   d|
+----+----+

注意:广播的小地图不会有明显的降级。

如果您想使用数据框,可以将地图转换为数据框

val df = myMap.toSeq.toDF("key", "val")

Map(2 -> b, 4 -> d) in dataframe format will be like
+----+----+
|key|val  |
+----+----+
|   2|   b|
|   4|   d|
+----+----+

然后像this一样加入

DIY...

【讨论】:

感谢您的回答!缺少的.value 是输入错误,不幸的是我已经在使用此代码,但它仍然无法正常工作。

以上是关于使用 Scala 在以 Spark 中的列值为条件的广播 Map 上执行查找的主要内容,如果未能解决你的问题,请参考以下文章

根据火花数据框scala中的列值过滤行

使用 Map 替换 Spark 中的列值

比较Scala中连续行中的列值

将 Spark Dataframes 的每一行转换为一个字符串,并在 scala 中的每列值之间使用分隔符

如何遍历 spark 数据集并更新 Java 中的列值?

根据 C#2.0 中的列值过滤 DataTable 行