如何使用循环中迭代的变量在 for 循环中创建数据框
Posted
技术标签:
【中文标题】如何使用循环中迭代的变量在 for 循环中创建数据框【英文标题】:How to create a data frame in a for loop with the variable that is iterating in loop 【发布时间】:2019-06-19 21:15:46 【问题描述】:所以我有一个巨大的数据框,它是单个表的组合,它在末尾有一个标识符列,指定表号,如下所示
+----------------------------+
| col1 col2 .... table_num |
+----------------------------+
| x y 1 |
| a b 1 |
| . . . |
| . . . |
| q p 2 |
+----------------------------+
(original table)
我必须根据表 num 将其拆分为多个小数据框。组合起来创建它的表的数量非常大,因此单独创建不相交的子集数据帧是不可行的,所以我在想如果我做一个 for 循环迭代 table_num 的最小值到最大值我可以完成这个任务,但我可以' t似乎这样做,任何帮助表示赞赏。
这是我想出来的
for (x < min(table_num) to max(table_num))
var df(x)= spark.sql("select * from df1 where state = x")
df(x).collect()
但我不认为该声明是正确的。
所以本质上我需要的是看起来像这样的 df
+-----------------------------+
| col1 col2 ... table_num |
+-----------------------------+
| x y 1 |
| a b 1 |
+-----------------------------+
+------------------------------+
| col1 col2 ... table_num |
+------------------------------+
| xx xy 2 |
| aa bb 2 |
+------------------------------+
+-------------------------------+
| col1 col2 ... table_num |
+-------------------------------+
| xxy yyy 3 |
| aaa bbb 3 |
+-------------------------------+
...等等...
(how I would like the Dataframes split)
【问题讨论】:
将一个数据帧拆分为多个的最终结果是什么?您会在某处写入数据吗? 【参考方案1】:在 Spark Arrays 中几乎可以是数据类型。当制作为 vars 时,您可以动态地添加和删除其中的元素。下面我将把表 nums 隔离到它们自己的数组中,这样我就可以轻松地遍历它们。隔离后,我通过一个 while 循环将每个表作为唯一元素添加到 DF Holder Array。要查询数组的元素,请使用 DFHolderArray(n-1) 其中 n 是您要查询的位置,0 是第一个元素。
//This will go and turn the distinct row nums in a queriable (this is 100% a word) array
val tableIDArray = inputDF.selectExpr("table_num").distinct.rdd.map(x=>x.mkString.toInt).collect
//Build the iterator
var iterator = 1
//holders for DF and transformation step
var tempDF = spark.sql("select 'foo' as bar")
var interimDF = tempDF
//This will be an array for dataframes
var DFHolderArray : Array[org.apache.spark.sql.DataFrame] = Array(tempDF)
//loop while the you have note reached end of array
while(iterator<=tableIDArray.length)
//Call the table that is stored in that location of the array
tempDF = spark.sql("select * from df1 where state = '" + tableIDArray(iterator-1) + "'")
//Fluff
interimDF = tempDF.withColumn("User_Name", lit("Stack_Overflow"))
//If logic to overwrite or append the DF
DFHolderArray = if (iterator==1)
Array(interimDF)
else
DFHolderArray ++ Array(interimDF)
iterator = iterator + 1
//To query the data
DFHolderArray(0).show(10,false)
DFHolderArray(1).show(10,false)
DFHolderArray(2).show(10,false)
//....
【讨论】:
【参考方案2】:方法是收集所有唯一键并构建相应的数据框。我给它添加了一些功能性的味道。
样本数据集:
name,year,country,id
Bayern Munich,2014,Germany,7747
Bayern Munich,2014,Germany,7747
Bayern Munich,2014,Germany,7746
Borussia Dortmund,2014,Germany,7746
Borussia Mönchengladbach,2014,Germany,7746
Schalke 04,2014,Germany,7746
Schalke 04,2014,Germany,7753
Lazio,2014,Germany,7753
代码:
val df = spark.read.format(source = "csv")
.option("header", true)
.option("delimiter", ",")
.option("inferSchema", true)
.load("groupby.dat")
import spark.implicits._
//collect data for each key into a data frame
val uniqueIds = df.select("id").distinct().map(x => x.mkString.toInt).collect()
// List buffer to hold separate data frames
var dataframeList: ListBuffer[org.apache.spark.sql.DataFrame] = ListBuffer()
println(uniqueIds.toList)
// filter data
uniqueIds.foreach(x =>
val tempDF = df.filter(col("id") === x)
dataframeList += tempDF
)
//show individual data frames
for (tempDF1 <- dataframeList)
tempDF1.show()
【讨论】:
【参考方案3】:一种方法是将write
DataFrame 划分为Parquet files 和read
将它们重新划分为Map
,如下所示:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("a", "b", 1), ("c", "d", 1), ("e", "f", 1),
("g", "h", 2), ("i", "j", 2)
).toDF("c1", "c2", "table_num")
val filePath = "/path/to/parquet/files"
df.write.partitionBy("table_num").parquet(filePath)
val tableNumList = df.select("table_num").distinct.map(_.getAs[Int](0)).collect
// tableNumList: Array[Int] = Array(1, 2)
val dfMap = ( for n <- tableNumList yield
(n, spark.read.parquet(s"$filePath/table_num=$n").withColumn("table_num", lit(n)))
).toMap
从Map
访问各个数据帧:
dfMap(1).show
// +---+---+---------+
// | c1| c2|table_num|
// +---+---+---------+
// | a| b| 1|
// | c| d| 1|
// | e| f| 1|
// +---+---+---------+
dfMap(2).show
// +---+---+---------+
// | c1| c2|table_num|
// +---+---+---------+
// | g| h| 2|
// | i| j| 2|
// +---+---+---------+
【讨论】:
以上是关于如何使用循环中迭代的变量在 for 循环中创建数据框的主要内容,如果未能解决你的问题,请参考以下文章