在火花数据框中使用 for 循环添加新列

Posted

技术标签:

【中文标题】在火花数据框中使用 for 循环添加新列【英文标题】:Addition of new columns using forloop in spark dataframe 【发布时间】:2019-05-19 06:01:48 【问题描述】:

我有一个动态创建的 spark 数据框。还有一个需要从数据框中选择的列列表。

我需要遍历所需的列列表并检查数据框中是否存在这些列。如果找到,则需要重命名数据框列,否则如果找不到,则创建新列并将其放置为“null”值。

我尝试过使用 forloop 然后 if 条件如下:

我的数据框有列:a.col1,a.col2,a.col3,a.col4,b.col2

val cols_needed = "a.col1,a.col2,a.col3,a.col4,b.col1,b.col2".split(",")
for(c <- cols_needed)

  if(Try(df(c)).isFailure)
  
    df.withColumn(c, lit(null))
  
  else
  
    df.withColumn(`c`,df(c))
  

我需要在循环之后使用这个数据框进行进一步的操作。

还尝试使用 for 循环和 yield,但它给了我一个数据帧数组。

如何在 forloop 和 if 条件中更新此数据框。 因为当我尝试在 forloop 之外打印架构时,它具有旧架构而不是更新架构。

任何建议都会有所帮助。

谢谢

【问题讨论】:

【参考方案1】:

API 是不可变的,因此 withColumn 不会改变现有的 val df 而是返回一个新的。例如

val newDf = cols_needed.foldLeft(df) 
  case (tmpdf, c) =>
    if (Try(df(c)).isFailure) 
      tmpdf.withColumn(c, lit(null))
     else 
      tmpdf.withColumn(`c`, df(c))
    

// now newDf contains the added columns

【讨论】:

这个解决方案就像魅力一样。尝试了很多不同的替代方案,但徒劳无功。非常感谢您的及时帮助。 我发现一篇文章可能有助于解释 spark forloop 的性能,仅供参考 medium.com/@david.mudrauskas/… 这些主题不相关,在这篇文章中,循环仅在构建 DAG 时在驱动程序中

以上是关于在火花数据框中使用 for 循环添加新列的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark 数据框中循环遍历两列时将值添加到新列

将列添加到由python中的for循环计算的数据框中

创建一个for循环以分块读取大数据并创建新列

嵌套for循环熊猫数据框不会创建新列

如果 ID 存在于其他数据框中,则 Python Pandas 数据框在新列中添加“1”

如何编写一个简单的 for 循环,使用键值对根据旧列中的值填充新列?