如何在火花中连接多个列,同时从另一个表中连接列名(每行不同)

Posted

技术标签:

【中文标题】如何在火花中连接多个列,同时从另一个表中连接列名(每行不同)【英文标题】:how to concat multiple columns in spark while getting the column names to be concatenated from another table (different for each row) 【发布时间】:2017-08-08 19:16:11 【问题描述】:

我正在尝试使用 concat 函数在 spark 中连接多个列。

例如下面是我必须为其添加新连接列的表

table - **t**
+---+----+  
| id|name|
+---+----+  
|  1|   a|  
|  2|   b|
+---+----+

下面是表格,其中包含有关给定 id 连接哪些列的信息(对于 id 1,列 id 和 name 需要连接,对于 id 2 只有 id)

table - **r**
+---+-------+
| id|   att |
+---+-------+
|  1|id,name|
|  2|   id  |
+---+-------+

如果我加入这两个表并执行以下操作,我可以连接但不是基于表 r(因为新列的第一行有 1,a 而第二行应该只有 2)

t.withColumn("new",concat_ws(",",t.select("att").first.mkString.split(",").map(c => col(c)): _*)).show
+---+----+-------+---+
| id|name|  att  |new|
+---+----+-------+---+
|  1|   a|id,name|1,a|
|  2|   b|  id   |2,b|
+---+----+-------+---+

我必须在上述查询中的 select 之前应用过滤器,但我不确定如何在 withColumn 中为每一行执行此操作。

如果可能的话,如下所示。

t.withColumn("new",concat_ws(",",t.**filter**("id="+this.id).select("att").first.mkString.split(",").map(c => col(c)): _*)).show

因为它需要根据 id 过滤每一行。

scala> t.filter("id=1").select("att").first.mkString.split(",").map(c => col(c))
res90: Array[org.apache.spark.sql.Column] = Array(id, name)

scala> t.filter("id=2").select("att").first.mkString.split(",").map(c => col(c))
res89: Array[org.apache.spark.sql.Column] = Array(id)

以下是最终要求的结果。

+---+----+-------+---+
| id|name|  att  |new|
+---+----+-------+---+
|  1|   a|id,name|1,a|
|  2|   b|  id   |2  |
+---+----+-------+---+

【问题讨论】:

【参考方案1】:

我们可以使用UDF

此逻辑工作的要求。

t 的列名应该与表 r

的列 att 中的列名相同
scala> input_df_1.show
+---+----+
| id|name|
+---+----+
|  1|   a|
|  2|   b|
+---+----+

scala> input_df_2.show
+---+-------+
| id|    att|
+---+-------+
|  1|id,name|
|  2|     id|
+---+-------+

scala> val join_df = input_df_1.join(input_df_2,Seq("id"),"inner")
join_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> val req_cols = input_df_1.columns
req_cols: Array[String] = Array(id, name)

scala> def new_col_udf = udf((cols : Seq[String],row : String,attr : String) => 
     |     val row_values = row.split(",")
     |     val attrs = attr.split(",")
     |     val req_val = attrs.mapat =>
     |     val index = cols.indexOf(at)
     |     row_values(index)
     |     
     |     req_val.mkString(",")
     |     )
new_col_udf: org.apache.spark.sql.expressions.UserDefinedFunction

scala>  val intermediate_df = join_df.withColumn("concat_column",concat_ws(",",'id,'name)).withColumn("new_col",new_col_udf(lit(req_cols),'concat_column,'att))
intermediate_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val result_df = intermediate_df.select('id,'name,'att,'new_col)
result_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> result_df.show
+---+----+-------+-------+
| id|name|    att|new_col|
+---+----+-------+-------+
|  1|   a|id,name|    1,a|
|  2|   b|     id|      2|
+---+----+-------+-------+

希望它能回答你的问题。

【讨论】:

【参考方案2】:

这可以在 UDF 中完成:

val cols: Seq[Column] = dataFrame.columns.map(x => col(x)).toSeq
val indices: Seq[String] = dataFrame.columns.map(x => x).toSeq

val generateNew = udf((values: Seq[Any]) => 
  val att = values(indices.indexOf("att")).toString.split(",")
  val associatedIndices = indices.filter(x => att.contains(x))
  val builder: StringBuilder  = StringBuilder.newBuilder
  values.filter(x => associatedIndices.contains(values.indexOf(x)))
  values.foreach v => builder.append(v).append(";") 
  builder.toString()
)

val dfColumns = array(cols:_*)
val dNew = dataFrame.withColumn("new", generateNew(dfColumns))

这只是一个草图,但想法是您可以将一系列项目传递给用户定义的函数,并动态选择需要的项目。

请注意,您可以传递其他类型的集合/映射 - 例如 How to pass array to UDF

【讨论】:

以上是关于如何在火花中连接多个列,同时从另一个表中连接列名(每行不同)的主要内容,如果未能解决你的问题,请参考以下文章

将多个(任意数量)火花 DataFrame 列连接成一个“|”分隔字符串

从另一个表中查找列的总值?(使用连接或其他函数)

如何使用不同的表和不同的列名连接多个查询

如何使用 join codeigniter 从另一个表中获取值

在所有表中搜索多个值

2020年2月22日-日报