求教: Spark的dataframe 怎么改列的名字,比如列名 SUM(_c1) 改成c1

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了求教: Spark的dataframe 怎么改列的名字,比如列名 SUM(_c1) 改成c1相关的知识,希望对你有一定的参考价值。

做了一个groupBy().agg()之后,
df.groupBy("keyy").agg("_c1" : "sum", "_c2" : "sum")

数据就变成了这样的格式。
newdf :
[Row(keyy=u'aaa', SUM(_c1)=0.123,SUM(_c2)=0.567),
Row(keyy=u'bbb', SUM(_c1)=0.234, SUM(_c2)=0.567)]

因为接下来还要对后面两列进行操作,发现sql 或者直接选取列都因为这奇葩(?)的列名,都不能运行。有什么办法能把这个dataframe的列名改成正常的名字, 比如c1之类的?只要能让后面的调用,麻烦点的方法都可以,不用考虑perforamance.........
newdf里, SUM(_c1) SUM(_c2) 不一定一个在前一个在后,有时候可能是【keyy ,SUM(_c2),SUM(_c1) 】
多谢多谢!

参考技术A 试试这样:

df.groupBy("key").agg(sum($"quantity") as "c1", avg($"quantity") as "c2")
参考技术B 看这问题人挺多的,我来提供一下解决方案.
DF.sum("value").withColumnRenamed("sum(value)","sumType").show();
参考技术C 兄弟 你可以这样:
from pyspark.sql import functions as F
df.groupBy("keyy").agg(F.sum('_c1').alias('c1'), F.sum('_c2').alias('c2')).collect()

Spark DataFrame - 区分缺少列的记录与坏值

这个问题与这个主题有关:Spark 2.2 Scala DataFrame select from string array, catching errors

我需要区分缺少列的记录(在我的用例中不是错误)和具有不适用于列类型的垃圾值的记录。

在我执行selectExpr之后,这两种情况在生成的DataFrame中显示为null。我正在寻找一种快速的方法来包含缺少列​​的记录以及良好的结果,同时将具有垃圾值的记录丢弃到坏桶中。坏的将包括像空字符串作为值的int字段,或“abc”。

例如,假设我有这样的DataFrame:Col A - string,Col B - int,Col C - string,

A    B    C
"x", "",  ""  - Error, bad value for B
"",  null,""  - Good, missing value for B
"",  "a", ""  - Bad, bad value for B
"x", "1", "x" - Good, normal case

-----编辑-----

显示创建数据帧的代码。数据以json形式出现,所有字段都被引用,因此它最初认为一切都是字符串。我需要输入几个字段为int,boolean等。有关详细信息,请参阅顶部的链接。

 val cols = dfLower.columns
 val typedCols = cols.map( c => getTypeStmtForCol(c, qradarType) )
 val result = dfLower.selectExpr(typedCols: _*)

 // This puts both records with missing columns and bad values in bad.
 // Need way to distinguish between those 2 cases.     
 val bad = dfLower.where(typedCols.map(expr(_).isNull).reduce(_ || _))
 val good = result.na.drop()

----编辑2 ----

我想我可能有个主意。如果我可以计算每个记录之前和之后的空值数,那么只有那些在select之后具有更多空值的记录才会出错。不确定如何实现......

答案

快速和脏的类型,但创建一个测试您的条件的udf并根据条件的结果返回状态。

def checkIntData=udf((columnData: String) => {
  var status = "GOOD"
  try{
    columnData.toInt
  } catch {
    case ex: Exception => {
      if(columnData == null) {
        // Do nothing. This is fine
      } else if(columnData.length == 0) {
        status = "ERROR"
      } else {
        status = "BAD"
      }
    }
  }
  status
})

val seqData = Seq(("x","","","0"),("",null,"","3"),("","a","","z"),("x","1","x",""))
val df = seqData.toDF("col1","col2","col3","col4")
val colsToCheck = df.select("col2","col4").columns
var newdf = df
// Iterate over the columns you want to check inside the dataframe. Each checked column will add a new status column to newdf
colsToCheck.map{column =>
  newdf = newdf.withColumn(column+"Status", checkIntData(newdf(column)))
}
newdf.show()

这将返回以下内容:

+----+----+----+----+----------+----------+
|col1|col2|col3|col4|col2Status|col4Status|
+----+----+----+----+----------+----------+
|   x|    |    |   0|     ERROR|      GOOD|
|    |null|    |   3|      GOOD|      GOOD|
|    |   a|    |    |       BAD|     ERROR|
|   x|   1|   x|   z|      GOOD|       BAD|
+----+----+----+----+----------+----------+

然后,您可以通过基于状态列进行过滤来创建错误存储桶。

第1列到第3列来自您的示例。我添加了第4列,以展示如何将其应用于多个列而无需编写.withColumn()一百次。我通过创建列colsToCheck数组然后迭代以将udf应用于所有选定列来实现此目的。

注意!因为我可能会因为这样做而大喊大叫,所以我想让你知道使用try / catch作为流控制被认为是反模式(又称糟糕的编程)。 Read more to find out why.

以上是关于求教: Spark的dataframe 怎么改列的名字,比如列名 SUM(_c1) 改成c1的主要内容,如果未能解决你的问题,请参考以下文章

查看 Spark Dataframe 列的内容

按日期排序 Spark DataFrame 列的数组

spark 统计Dataframe 列的中空值比例

Spark DataFrame列的合并与拆分

Spark Hive:通过另一个 DataFrame 的列的值过滤一个 DataFrame 的行

计算Spark DataFrame中每列的内核密度