如何在 spark DataFrame 中将多个浮点列连接到一个 ArrayType(FloatType()) 中?
Posted
技术标签:
【中文标题】如何在 spark DataFrame 中将多个浮点列连接到一个 ArrayType(FloatType()) 中?【英文标题】:How can I concat several float columns into one ArrayType(FloatType()) in spark DataFrame? 【发布时间】:2019-02-26 01:21:12 【问题描述】:在读取 CSV 文件后,我有一个火花 DataFrame
,其中包含许多浮点列。
我想将所有浮点列合并为一个ArrayType(FloatType())
。
任何想法如何使用 PySpark(或 Scala)做到这一点?
【问题讨论】:
你的意思是编程合并所有而不使用这样的代码:df.select(concat(col("col1"), col("col2")))? 【参考方案1】:如果你知道所有的浮点列名。你可以试试这个(scala)
val names = Seq("float_col1", "float_col2","float_col3"...."float_col10");
df.withColumn("combined", array(names.map(frame(_)):_*))
【讨论】:
【参考方案2】:这是 Scala 的另一个版本:
data.printSchema
root
|-- Int_Col1: integer (nullable = false)
|-- Str_Col1: string (nullable = true)
|-- Float_Col1: float (nullable = false)
|-- Float_Col2: float (nullable = false)
|-- Str_Col2: string (nullable = true)
|-- Float_Col3: float (nullable = false)
data.show()
+--------+--------+----------+----------+--------+----------+
|Int_Col1|Str_Col1|Float_Col1|Float_Col2|Str_Col2|Float_Col3|
+--------+--------+----------+----------+--------+----------+
| 1| ABC| 10.99| 20.99| a| 9.99|
| 2| XYZ| 999.1343| 9858.1| b| 488.99|
+--------+--------+----------+----------+--------+----------+
添加一个新的array<float>
字段以连接所有float
值。
val df = data.withColumn("Float_Arr_Col",array().cast("array<float>"))
然后过滤需要的数据类型并使用foldLeft
连接浮点列
df.dtypes
.collect case (dn, dt) if dt.startsWith("FloatType") => dn
.foldLeft(df)((accDF, c) => accDF.withColumn("Float_Arr_Col",
array_union(col("Float_Arr_Col"),array(col(c)))))
.show(false)
输出:
+--------+--------+----------+----------+--------+----------+--------------------------+
|Int_Col1|Str_Col1|Float_Col1|Float_Col2|Str_Col2|Float_Col3|Float_Arr_Col |
+--------+--------+----------+----------+--------+----------+--------------------------+
|1 |ABC |10.99 |20.99 |a |9.99 |[10.99, 20.99, 9.99] |
|2 |XYZ |999.1343 |9858.1 |b |488.99 |[999.1343, 9858.1, 488.99]|
+--------+--------+----------+----------+--------+----------+--------------------------+
希望这会有所帮助!
【讨论】:
【参考方案3】:找到了解决办法。非常简单,但很难找到。
float_cols = ['_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10']
df.withColumn('combined', array([col(c) for c in float_cols]))
【讨论】:
以上是关于如何在 spark DataFrame 中将多个浮点列连接到一个 ArrayType(FloatType()) 中?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark 中将两个 DataFrame 与组合列连接起来?
如何在 Spark Streaming 中将 RDD 转换为 DataFrame,而不仅仅是 Spark
如何在 Spark 中将 JavaPairInputDStream 转换为 DataSet/DataFrame
如何在 Scala(Spark 2.0)中将带有字符串的 DataFrame 转换为带有 Vectors 的 DataFrame