如何在火花中连接多个列,同时从另一个表中连接列名(每行不同)
Posted
技术标签:
【中文标题】如何在火花中连接多个列,同时从另一个表中连接列名(每行不同)【英文标题】:how to concat multiple columns in spark while getting the column names to be concatenated from another table (different for each row) 【发布时间】:2017-08-08 19:16:11 【问题描述】:我正在尝试使用 concat 函数在 spark 中连接多个列。
例如下面是我必须为其添加新连接列的表
table - **t**
+---+----+
| id|name|
+---+----+
| 1| a|
| 2| b|
+---+----+
下面是表格,其中包含有关给定 id 连接哪些列的信息(对于 id 1,列 id 和 name 需要连接,对于 id 2 只有 id)
table - **r**
+---+-------+
| id| att |
+---+-------+
| 1|id,name|
| 2| id |
+---+-------+
如果我加入这两个表并执行以下操作,我可以连接但不是基于表 r(因为新列的第一行有 1,a 而第二行应该只有 2)
t.withColumn("new",concat_ws(",",t.select("att").first.mkString.split(",").map(c => col(c)): _*)).show
+---+----+-------+---+
| id|name| att |new|
+---+----+-------+---+
| 1| a|id,name|1,a|
| 2| b| id |2,b|
+---+----+-------+---+
我必须在上述查询中的 select 之前应用过滤器,但我不确定如何在 withColumn 中为每一行执行此操作。
如果可能的话,如下所示。
t.withColumn("new",concat_ws(",",t.**filter**("id="+this.id).select("att").first.mkString.split(",").map(c => col(c)): _*)).show
因为它需要根据 id 过滤每一行。
scala> t.filter("id=1").select("att").first.mkString.split(",").map(c => col(c))
res90: Array[org.apache.spark.sql.Column] = Array(id, name)
scala> t.filter("id=2").select("att").first.mkString.split(",").map(c => col(c))
res89: Array[org.apache.spark.sql.Column] = Array(id)
以下是最终要求的结果。
+---+----+-------+---+
| id|name| att |new|
+---+----+-------+---+
| 1| a|id,name|1,a|
| 2| b| id |2 |
+---+----+-------+---+
【问题讨论】:
【参考方案1】:我们可以使用UDF
此逻辑工作的要求。
表 t 的列名应该与表 r
的列 att 中的列名相同scala> input_df_1.show
+---+----+
| id|name|
+---+----+
| 1| a|
| 2| b|
+---+----+
scala> input_df_2.show
+---+-------+
| id| att|
+---+-------+
| 1|id,name|
| 2| id|
+---+-------+
scala> val join_df = input_df_1.join(input_df_2,Seq("id"),"inner")
join_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
scala> val req_cols = input_df_1.columns
req_cols: Array[String] = Array(id, name)
scala> def new_col_udf = udf((cols : Seq[String],row : String,attr : String) =>
| val row_values = row.split(",")
| val attrs = attr.split(",")
| val req_val = attrs.mapat =>
| val index = cols.indexOf(at)
| row_values(index)
|
| req_val.mkString(",")
| )
new_col_udf: org.apache.spark.sql.expressions.UserDefinedFunction
scala> val intermediate_df = join_df.withColumn("concat_column",concat_ws(",",'id,'name)).withColumn("new_col",new_col_udf(lit(req_cols),'concat_column,'att))
intermediate_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]
scala> val result_df = intermediate_df.select('id,'name,'att,'new_col)
result_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]
scala> result_df.show
+---+----+-------+-------+
| id|name| att|new_col|
+---+----+-------+-------+
| 1| a|id,name| 1,a|
| 2| b| id| 2|
+---+----+-------+-------+
希望它能回答你的问题。
【讨论】:
【参考方案2】:这可以在 UDF 中完成:
val cols: Seq[Column] = dataFrame.columns.map(x => col(x)).toSeq
val indices: Seq[String] = dataFrame.columns.map(x => x).toSeq
val generateNew = udf((values: Seq[Any]) =>
val att = values(indices.indexOf("att")).toString.split(",")
val associatedIndices = indices.filter(x => att.contains(x))
val builder: StringBuilder = StringBuilder.newBuilder
values.filter(x => associatedIndices.contains(values.indexOf(x)))
values.foreach v => builder.append(v).append(";")
builder.toString()
)
val dfColumns = array(cols:_*)
val dNew = dataFrame.withColumn("new", generateNew(dfColumns))
这只是一个草图,但想法是您可以将一系列项目传递给用户定义的函数,并动态选择需要的项目。
请注意,您可以传递其他类型的集合/映射 - 例如 How to pass array to UDF
【讨论】:
以上是关于如何在火花中连接多个列,同时从另一个表中连接列名(每行不同)的主要内容,如果未能解决你的问题,请参考以下文章
将多个(任意数量)火花 DataFrame 列连接成一个“|”分隔字符串