Spark 中的查找表

Posted

技术标签:

【中文标题】Spark 中的查找表【英文标题】:Lookup table in Spark 【发布时间】:2017-12-19 07:24:06 【问题描述】:

我在 Spark 中有一个没有明确定义的架构的数据框,我想将其用作查找表。例如下面的数据框:

+------------------------------------------------------------------------+
|lookupcolumn                                                            |
+------------------------------------------------------------------------+
|[val1,val2,val3,val4,val5,val6]                                         |
+------------------------------------------------------------------------+

架构如下所示:

 |-- lookupcolumn: struct (nullable = true)
 |    |-- key1: string (nullable = true)
 |    |-- key2: string (nullable = true)
 |    |-- key3: string (nullable = true)
 |    |-- key4: string (nullable = true)
 |    |-- key5: string (nullable = true)
 |    |-- key6: string (nullable = true)

我说的是“未明确定义的模式”,因为在读取数据时键的数量是未知的,所以我将其留给 Spark 来推断模式。

现在,如果我有另一个数据框,其列如下:

+-----------------+
|       datacolumn|
+-----------------+
|         key1    |
|         key3    |
|         key5    |
|         key2    |
|         key4    |
+-----------------+

我希望结果是:

+-----------------+
|     resultcolumn|
+-----------------+
|         val1    |
|         val3    |
|         val5    |
|         val2    |
|         val4    |
+-----------------+

我尝试了这样的UDF

val get_val = udf((keyindex: String) => 
    val res = lookupDf.select($"lookupcolumn"(keyindex).alias("result"))
    res.head.toString
)

但是会抛出空指针异常错误。

谁能告诉我UDF 有什么问题,以及是否有更好/更简单的方法在 Spark 中进行此查找?

【问题讨论】:

您的查找数据框是单排还是多排? 只有一行。我想如果我可以将它分解为多行,键和值在不同的列中会更容易,这样我就可以进行连接,但我不知道该怎么做。 嗯,不。结果列有值,而数据列有键。 【参考方案1】:

我假设查找表非常小,在这种情况下,将它收集到驱动程序并将其转换为普通的Map 会更有意义。然后在UDF 函数中使用这个Map。它可以通过多种方式完成,例如:

val values = lookupDf.select("lookupcolumn.*").head.toSeq.map(_.toString)
val keys = lookupDf.select("lookupcolumn.*").columns
val lookup_map = keys.zip(values).toMap

使用上面的lookup_map 变量,UDF 将是:

val lookup = udf((key: String) => lookup_map.get(key))

最终的dataframe可以通过:

val df2 = df.withColumn("resultcolumn", lookup($"datacolumn"))

【讨论】:

谢谢,这行得通。但是,当键不在表中时,有没有办法让 UDF 返回 null?目前它会抛出一个错误。 @PramodKumar:是的,这是可能的。我稍微更改了 udf,现在当密钥不存在时它应该返回 null。也可以通过将get() 更改为getOrElse() 来返回默认值。

以上是关于Spark 中的查找表的主要内容,如果未能解决你的问题,请参考以下文章

使用查找表中的 withColumn 动态添加新列

Spark:对 RDD 中的高效批量查找

使用 Scala 在以 Spark 中的列值为条件的广播 Map 上执行查找

如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区

在 Spark 数据框中的 n 列中按行查找最频繁的值

PySpark 单元测试方法