求教: Spark的dataframe 怎么改列的名字,比如列名 SUM(_c1) 改成c1
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了求教: Spark的dataframe 怎么改列的名字,比如列名 SUM(_c1) 改成c1相关的知识,希望对你有一定的参考价值。
做了一个groupBy().agg()之后,
df.groupBy("keyy").agg("_c1" : "sum", "_c2" : "sum")
数据就变成了这样的格式。
newdf :
[Row(keyy=u'aaa', SUM(_c1)=0.123,SUM(_c2)=0.567),
Row(keyy=u'bbb', SUM(_c1)=0.234, SUM(_c2)=0.567)]
因为接下来还要对后面两列进行操作,发现sql 或者直接选取列都因为这奇葩(?)的列名,都不能运行。有什么办法能把这个dataframe的列名改成正常的名字, 比如c1之类的?只要能让后面的调用,麻烦点的方法都可以,不用考虑perforamance.........
newdf里, SUM(_c1) SUM(_c2) 不一定一个在前一个在后,有时候可能是【keyy ,SUM(_c2),SUM(_c1) 】
多谢多谢!
df.groupBy("key").agg(sum($"quantity") as "c1", avg($"quantity") as "c2") 参考技术B 看这问题人挺多的,我来提供一下解决方案.
DF.sum("value").withColumnRenamed("sum(value)","sumType").show(); 参考技术C 兄弟 你可以这样:
from pyspark.sql import functions as F
df.groupBy("keyy").agg(F.sum('_c1').alias('c1'), F.sum('_c2').alias('c2')).collect()
Spark DataFrame - 区分缺少列的记录与坏值
这个问题与这个主题有关:Spark 2.2 Scala DataFrame select from string array, catching errors
我需要区分缺少列的记录(在我的用例中不是错误)和具有不适用于列类型的垃圾值的记录。
在我执行selectExpr之后,这两种情况在生成的DataFrame中显示为null。我正在寻找一种快速的方法来包含缺少列的记录以及良好的结果,同时将具有垃圾值的记录丢弃到坏桶中。坏的将包括像空字符串作为值的int字段,或“abc”。
例如,假设我有这样的DataFrame:Col A - string,Col B - int,Col C - string,
A B C
"x", "", "" - Error, bad value for B
"", null,"" - Good, missing value for B
"", "a", "" - Bad, bad value for B
"x", "1", "x" - Good, normal case
-----编辑-----
显示创建数据帧的代码。数据以json形式出现,所有字段都被引用,因此它最初认为一切都是字符串。我需要输入几个字段为int,boolean等。有关详细信息,请参阅顶部的链接。
val cols = dfLower.columns
val typedCols = cols.map( c => getTypeStmtForCol(c, qradarType) )
val result = dfLower.selectExpr(typedCols: _*)
// This puts both records with missing columns and bad values in bad.
// Need way to distinguish between those 2 cases.
val bad = dfLower.where(typedCols.map(expr(_).isNull).reduce(_ || _))
val good = result.na.drop()
----编辑2 ----
我想我可能有个主意。如果我可以计算每个记录之前和之后的空值数,那么只有那些在select之后具有更多空值的记录才会出错。不确定如何实现......
快速和脏的类型,但创建一个测试您的条件的udf并根据条件的结果返回状态。
def checkIntData=udf((columnData: String) => {
var status = "GOOD"
try{
columnData.toInt
} catch {
case ex: Exception => {
if(columnData == null) {
// Do nothing. This is fine
} else if(columnData.length == 0) {
status = "ERROR"
} else {
status = "BAD"
}
}
}
status
})
val seqData = Seq(("x","","","0"),("",null,"","3"),("","a","","z"),("x","1","x",""))
val df = seqData.toDF("col1","col2","col3","col4")
val colsToCheck = df.select("col2","col4").columns
var newdf = df
// Iterate over the columns you want to check inside the dataframe. Each checked column will add a new status column to newdf
colsToCheck.map{column =>
newdf = newdf.withColumn(column+"Status", checkIntData(newdf(column)))
}
newdf.show()
这将返回以下内容:
+----+----+----+----+----------+----------+
|col1|col2|col3|col4|col2Status|col4Status|
+----+----+----+----+----------+----------+
| x| | | 0| ERROR| GOOD|
| |null| | 3| GOOD| GOOD|
| | a| | | BAD| ERROR|
| x| 1| x| z| GOOD| BAD|
+----+----+----+----+----------+----------+
然后,您可以通过基于状态列进行过滤来创建错误存储桶。
第1列到第3列来自您的示例。我添加了第4列,以展示如何将其应用于多个列而无需编写.withColumn()
一百次。我通过创建列colsToCheck
数组然后迭代以将udf应用于所有选定列来实现此目的。
注意!因为我可能会因为这样做而大喊大叫,所以我想让你知道使用try / catch作为流控制被认为是反模式(又称糟糕的编程)。 Read more to find out why.
以上是关于求教: Spark的dataframe 怎么改列的名字,比如列名 SUM(_c1) 改成c1的主要内容,如果未能解决你的问题,请参考以下文章