如何使用循环中迭代的变量在 for 循环中创建数据框

Posted

技术标签:

【中文标题】如何使用循环中迭代的变量在 for 循环中创建数据框【英文标题】:How to create a data frame in a for loop with the variable that is iterating in loop 【发布时间】:2019-06-19 21:15:46 【问题描述】:

所以我有一个巨大的数据框,它是单个表的组合,它在末尾有一个标识符列,指定表号,如下所示

+----------------------------+
| col1 col2 .... table_num   |
+----------------------------+
| x     y            1       |
| a     b            1       |
| .     .            .       |
| .     .            .       |
| q     p            2       |
+----------------------------+

(original table)

我必须根据表 num 将其拆分为多个小数据框。组合起来创建它的表的数量非常大,因此单独创建不相交的子集数据帧是不可行的,所以我在想如果我做一个 for 循环迭代 table_num 的最小值到最大值我可以完成这个任务,但我可以' t似乎这样做,任何帮助表示赞赏。

这是我想出来的

for (x < min(table_num) to max(table_num)) 

var df(x)= spark.sql("select * from df1 where state = x")
df(x).collect()

但我不认为该声明是正确的。

所以本质上我需要的是看起来像这样的 df

+-----------------------------+
| col1  col2  ...   table_num |
+-----------------------------+
| x      y             1      |
| a      b             1      |
+-----------------------------+


+------------------------------+
| col1   col2  ...   table_num |
+------------------------------+
| xx      xy             2     |
| aa      bb             2     |
+------------------------------+

+-------------------------------+
| col1    col2  ...   table_num |
+-------------------------------+
| xxy      yyy             3    |
| aaa      bbb             3    |
+-------------------------------+

...等等...

(how I would like the Dataframes split)

【问题讨论】:

将一个数据帧拆分为多个的最终结果是什么?您会在某处写入数据吗? 【参考方案1】:

在 Spark Arrays 中几乎可以是数据类型。当制作为 vars 时,您可以动态地添加和删除其中的元素。下面我将把表 nums 隔离到它们自己的数组中,这样我就可以轻松地遍历它们。隔离后,我通过一个 while 循环将每个表作为唯一元素添加到 DF Holder Array。要查询数组的元素,请使用 DFHolderArray(n-1) 其中 n 是您要查询的位置,0 是第一个元素。

//This will go and turn the distinct row nums in a queriable (this is 100% a word) array
val tableIDArray = inputDF.selectExpr("table_num").distinct.rdd.map(x=>x.mkString.toInt).collect

//Build the iterator
var iterator = 1  

//holders for DF and transformation step
var tempDF = spark.sql("select 'foo' as bar")
var interimDF = tempDF

//This will be an array for dataframes
var DFHolderArray : Array[org.apache.spark.sql.DataFrame] = Array(tempDF) 

//loop while the you have note reached end of array
while(iterator<=tableIDArray.length) 
  //Call the table that is stored in that location of the array
  tempDF = spark.sql("select * from df1 where state = '" + tableIDArray(iterator-1) + "'")
  //Fluff
  interimDF = tempDF.withColumn("User_Name", lit("Stack_Overflow"))
  //If logic to overwrite or append the DF
  DFHolderArray = if (iterator==1) 
    Array(interimDF)
   else 
    DFHolderArray ++ Array(interimDF)
  
  iterator = iterator + 1


//To query the data
DFHolderArray(0).show(10,false)
DFHolderArray(1).show(10,false)
DFHolderArray(2).show(10,false)
//....

【讨论】:

【参考方案2】:

方法是收集所有唯一键并构建相应的数据框。我给它添加了一些功能性的味道。

样本数据集:

  name,year,country,id
  Bayern Munich,2014,Germany,7747
  Bayern Munich,2014,Germany,7747
  Bayern Munich,2014,Germany,7746
  Borussia Dortmund,2014,Germany,7746
  Borussia Mönchengladbach,2014,Germany,7746
  Schalke 04,2014,Germany,7746
  Schalke 04,2014,Germany,7753
  Lazio,2014,Germany,7753

代码:

  val df = spark.read.format(source = "csv")
    .option("header", true)
    .option("delimiter", ",")
    .option("inferSchema", true)
    .load("groupby.dat")

  import spark.implicits._

  //collect data for each key into a data frame
  val uniqueIds = df.select("id").distinct().map(x => x.mkString.toInt).collect()
  // List buffer to hold separate data frames
  var dataframeList: ListBuffer[org.apache.spark.sql.DataFrame] = ListBuffer()
  println(uniqueIds.toList)

  // filter data
  uniqueIds.foreach(x => 
    val tempDF = df.filter(col("id") === x)
    dataframeList += tempDF
  )

  //show individual data frames
  for (tempDF1 <- dataframeList) 
    tempDF1.show()
  

【讨论】:

【参考方案3】:

一种方法是将write DataFrame 划分为Parquet files 和read 将它们重新划分为Map,如下所示:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  ("a", "b", 1), ("c", "d", 1), ("e", "f", 1), 
  ("g", "h", 2), ("i", "j", 2)
).toDF("c1", "c2", "table_num")

val filePath = "/path/to/parquet/files"

df.write.partitionBy("table_num").parquet(filePath)

val tableNumList = df.select("table_num").distinct.map(_.getAs[Int](0)).collect
// tableNumList: Array[Int] = Array(1, 2)

val dfMap = ( for  n <- tableNumList  yield
    (n, spark.read.parquet(s"$filePath/table_num=$n").withColumn("table_num", lit(n)))
  ).toMap

Map 访问各个数据帧:

dfMap(1).show
// +---+---+---------+
// | c1| c2|table_num|
// +---+---+---------+
// |  a|  b|        1|
// |  c|  d|        1|
// |  e|  f|        1|
// +---+---+---------+

dfMap(2).show
// +---+---+---------+
// | c1| c2|table_num|
// +---+---+---------+
// |  g|  h|        2|
// |  i|  j|        2|
// +---+---+---------+

【讨论】:

以上是关于如何使用循环中迭代的变量在 for 循环中创建数据框的主要内容,如果未能解决你的问题,请参考以下文章

如何使用for循环将返回值分配给变量

在 Python 的 for 循环中使用多个变量

如何获取在 for 循环中创建的变量名

在循环内创建的变量在 C 中的迭代期间更改值

如何使用for循环或条件在pandas数据框的子集中创建多个回归模型(statsmodel)?

在 for 循环的每次迭代中保存变量并稍后加载它们