Spark生成包含(SQL LIKE)字符串的列名列表

Posted

技术标签:

【中文标题】Spark生成包含(SQL LIKE)字符串的列名列表【英文标题】:Spark generate a list of column names that contains(SQL LIKE) a string 【发布时间】:2019-01-11 00:36:26 【问题描述】:

下面这个是一个简单的语法,用于在特定列中搜索字符串,使用 SQL Like 功能。

val dfx = df.filter($"name".like(s"%$productName%"))

问题是我如何获取 每一个列 NAME,在其 VALUES 中包含特定字符串,并生成一个新列,其中包含每一行的这些“列名”列表。 p>

到目前为止,这是我采用的方法,但由于我无法在 UDF 中使用 spark-sql “Like”函数,因此卡住了。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._

import spark.implicits._
val df1 = Seq(
  (0, "mango", "man", "dit"), 
  (1, "i-man", "man2", "mane"),
  (2, "iman", "mango", "ho"),
  (3, "dim",  "kim", "sim")
).toDF("id", "col1", "col2", "col3")

val df2 = df1.columns.foldLeft(df1) 
  (acc: DataFrame, colName: String) =>
    acc.withColumn(colName, concat(lit(colName + "="), col(colName)))


val df3 = df2.withColumn("merged_cols", split(concat_ws("X",  df2.columns.map(c=> col(c)):_*), "X"))

这是一个示例输出。 注意这里只有 3 列,但在实际工作中,我将读取多个可以包含动态列数的表。

+--------------------------------------------+
|id  |   col1|  col2|  col3|      merged_cols
+--------------------------------------------+
  0  |  mango| man  |  dit | col1, col2
  1  |  i-man| man2 | mane | col1, col2, col3
  2  |  iman | mango| ho   | col1, col2
  3  |   dim |  kim |   sim| 
+--------------------------------------------+

【问题讨论】:

【参考方案1】:

这可以在列上使用foldLeft 以及whenotherwise 来完成:

val e = "%man%"

val df2 = df1.columns.foldLeft(df.withColumn("merged_cols", lit("")))(df, c) => 
    df.withColumn("merged_cols", when(col(c).like(e), concat($"merged_cols", lit(s"$c,"))).otherwise($"merged_cols"))
  .withColumn("merged_cols", expr("substring(merged_cols, 1, length(merged_cols)-1)"))

满足条件e 的所有列都将附加到merged_cols 列中的字符串。请注意,该列必须存在才能使第一个追加工作,因此在发送到foldLeft 时将其添加(包含一个空字符串)到数据帧中。

代码中的最后一行只是删除了最后添加的额外,。如果您希望将结果作为一个数组,只需添加 .withColumn("merged_cols", split($"merged_cols", ",")) 即可。


另一种方法是使用UDF。这在处理许多列时可能是首选,因为foldLeft 将创建多个数据帧副本。这里使用了正则表达式(不是 SQL,因为它对整列进行操作)。

val e = ".*man.*"

val concat_cols = udf((vals: Seq[String], names: Seq[String]) => 
  vals.zip(names).filtercase (v, n) => v.matches(e).map(_._2)
)

val df2 = df.withColumn("merged_cols", concat_cols(array(df.columns.map(col(_)): _*), typedLit(df.columns.toSeq)))

注意typedLit 可用于 Spark 2.2+ 版本,使用旧版本时请改用 array(df.columns.map(lit(_)): _*)

【讨论】:

有一个问题,它适用于 3 列而不是 18 列! foldLeft 不能在 Spark/scala 上并行化,即使只有 1 行 18 列,驱动程序也会死掉。 @data-maniac:我做了一些测试,当我在本地运行时它可以处理 18 列或更多列,但速度非常慢。我使用上面的 udf 添加了一种替代方法,使用更多列时应该会更好。

以上是关于Spark生成包含(SQL LIKE)字符串的列名列表的主要内容,如果未能解决你的问题,请参考以下文章

使用 spark sql 重命名 Parquet 文件中列名中的空格

如何在 Spark SQL 中使用连字符转义列名

spark-sql/Scala 中的反透视列名是数字

SQL中的一些关键字用法

Spark Scala:如何在 LIKE 语句中使用通配符作为文字

SQL C# 使用 LIKE 和通配符生成 SQL 语句;在“bla”附近给出不正确的语法