将 Spark 中的多个 ArrayType 列合并为一个 ArrayType 列
Posted
技术标签:
【中文标题】将 Spark 中的多个 ArrayType 列合并为一个 ArrayType 列【英文标题】:Combine multiple ArrayType Columns in Spark into one ArrayType Column 【发布时间】:2018-08-30 06:20:12 【问题描述】:我想在 spark 中合并多个 ArrayType[StringType] 列来创建一个 ArrayType[StringType]。为了合并两列,我在这里找到了解决方案:
Merge two spark sql columns of type Array[string] into a new Array[string] column
但是,如果我不知道编译时的列数,我该如何进行组合。在运行时,我会知道要合并的所有列的名称。
一种选择是使用上述***问题中定义的UDF,在一个循环中多次添加两列。但这涉及对整个数据帧的多次读取。有没有办法一次性做到这一点?
+------+------+---------+
| col1 | col2 | combined|
+------+------+---------+
| [a,b]| [i,j]|[a,b,i,j]|
| [c,d]| [k,l]|[c,d,k,l]|
| [e,f]| [m,n]|[e,f,m,n]|
| [g,h]| [o,p]|[g,h,o,p]|
+------+----+-----------+
【问题讨论】:
【参考方案1】:val arrStr: Array[String] = Array("col1", "col2")
val arrCol: Array[Column] = arrString.map(c => df(c))
val assembleFunc = udf r: Row => assemble(r.toSeq: _*)
val outputDf = df.select(col("*"), assembleFunc(struct(arrCol:
_*)).as("combined"))
def assemble(rowEntity: Any*):
collection.mutable.WrappedArray[String] =
var outputArray =
rowEntity(0).asInstanceOf[collection.mutable.WrappedArray[String]]
rowEntity.drop(1).foreach
case v: collection.mutable.WrappedArray[String] =>
outputArray ++= v
case null =>
throw new SparkException("Values to assemble cannot be
null.")
case o =>
throw new SparkException(s"$o of type $o.getClass.getName
is not supported.")
outputArray
outputDf.show(false)
【讨论】:
【参考方案2】:处理数据框架构并获取ArrayType[StringType]
类型的所有列。
使用前两列的functions.array_union 创建一个新数据框
遍历其余列并将它们中的每一个添加到组合列中
>>>from pyspark import Row
>>>from pyspark.sql.functions import array_union
>>>df = spark.createDataFrame([Row(col1=['aa1', 'bb1'],
col2=['aa2', 'bb2'],
col3=['aa3', 'bb3'],
col4= ['a', 'ee'], foo="bar"
)])
>>>df.show()
+----------+----------+----------+-------+---+
| col1| col2| col3| col4|foo|
+----------+----------+----------+-------+---+
|[aa1, bb1]|[aa2, bb2]|[aa3, bb3]|[a, ee]|bar|
+----------+----------+----------+-------+---+
>>>cols = [col_.name for col_ in df.schema
... if col_.dataType == ArrayType(StringType())
... or col_.dataType == ArrayType(StringType(), False)
... ]
>>>print(cols)
['col1', 'col2', 'col3', 'col4']
>>>
>>>final_df = df.withColumn("combined", array_union(cols[:2][0], cols[:2][1]))
>>>
>>>for col_ in cols[2:]:
... final_df = final_df.withColumn("combined", array_union(col('combined'), col(col_)))
>>>
>>>final_df.select("combined").show(truncate=False)
+-------------------------------------+
|combined |
+-------------------------------------+
|[aa1, bb1, aa2, bb2, aa3, bb3, a, ee]|
+-------------------------------------+
【讨论】:
以上是关于将 Spark 中的多个 ArrayType 列合并为一个 ArrayType 列的主要内容,如果未能解决你的问题,请参考以下文章
Spark DataFrame ArrayType 或 MapType 用于检查列中的值
将 Spark 数据帧写入 Redshift:保存 StructField(user_agent,ArrayType(StringType,true),true)
是否可以使用Crealytics spark-excel软件包将具有ArrayType列的Spark数据框写入Excel?