scala spark 使用 expr 在列内赋值
Posted
技术标签:
【中文标题】scala spark 使用 expr 在列内赋值【英文标题】:scala spark use expr to value inside a column 【发布时间】:2019-02-12 10:29:36 【问题描述】:我需要向具有布尔值的数据框添加一个新列,评估数据框内的列。例如,我有一个数据框
+----+----+----+----+----+-----------+----------------+
|colA|colB|colC|colD|colE|colPRODRTCE| colCOND|
+----+----+----+----+----+-----------+----------------+
| 1| 1| 1| 1| 3| 39|colA=1 && colB>0|
| 1| 1| 1| 1| 3| 45| colD=1|
| 1| 1| 1| 1| 3| 447|colA>8 && colC=1|
+----+----+----+----+----+-----------+----------------+
在我的新专栏中,我需要评估 colCOND 的表达式是真还是假。
如果你有这样的东西很容易:
val df = List(
(1,1,1,1,3),
(2,2,3,4,4)
).toDF("colA", "colB", "colC", "colD", "colE")
val myExpression = "colA<colC"
import org.apache.spark.sql.functions.expr
df.withColumn("colRESULT",expr(myExpression)).show()
+----+----+----+----+----+---------+
|colA|colB|colC|colD|colE|colRESULT|
+----+----+----+----+----+---------+
| 1| 1| 1| 1| 3| false|
| 2| 2| 3| 4| 4| true|
+----+----+----+----+----+---------+
但我必须在每一行中计算一个不同的表达式,它在 colCOND 列内。
我想创建一个包含所有列的 UDF 函数,但我的真实数据框有很多列。我该怎么做?
谢谢大家
【问题讨论】:
你有解决办法吗?我在这里面临完全相同的问题。 @omnisius - 请看我的回答。谢谢你。 我打算用 Python 试试,非常感谢! 【参考方案1】:如果&&改成AND,可以试试
package spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
object DataFrameLogicWithColumn extends App
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
val sourceDF = Seq((1,1,1,1,3,39,"colA=1 AND colB>0"),
(1,1,1,1,3,45,"colD=1"),
(1,1,1,1,3,447,"colA>8 AND colC=1")
).toDF("colA", "colB", "colC", "colD", "colE", "colPRODRTCE", "colCOND").persist(MEMORY_AND_DISK)
val exprs = sourceDF.select('colCOND).distinct().as[String].collect()
val d1 = exprs.map(i =>
val df = sourceDF.filter('colCOND.equalTo(i))
df.withColumn("colRESULT", expr(i))
)
val resultDF = d1.reduce(_ union _)
resultDF.show(false)
// +----+----+----+----+----+-----------+-----------------+---------+
// |colA|colB|colC|colD|colE|colPRODRTCE|colCOND |colRESULT|
// +----+----+----+----+----+-----------+-----------------+---------+
// |1 |1 |1 |1 |3 |39 |colA=1 AND colB>0|true |
// |1 |1 |1 |1 |3 |447 |colA>8 AND colC=1|false |
// |1 |1 |1 |1 |3 |45 |colD=1 |true |
// +----+----+----+----+----+-----------+-----------------+---------+
sourceDF.unpersist()
可以试试DataSet
case class c1 (colA: Int, colB: Int, colC: Int, colD: Int, colE: Int, colPRODRTCE: Int, colCOND: String)
case class cRes (colA: Int, colB: Int, colC: Int, colD: Int, colE: Int, colPRODRTCE: Int, colCOND: String, colResult: Boolean)
val sourceData = Seq(c1(1,1,1,1,3,39,"colA=1 AND colB>0"),
c1(1,1,1,1,3,45,"colD=1"),
c1(1,1,1,1,3,447,"colA>8 AND colC=1")
).toDS()
def f2(a: c1): Boolean=
// we need parse value with colCOUND
a.colCOND match
case "colA=1 AND colB>0" => (a.colA == 1 && a.colB > 0) == true
case _ => false
val res2 = sourceData
.map(i => cRes(i.colA, i.colB, i.colC, i.colD, i.colE, i.colPRODRTCE, i.colCOND,
f2(i)))
【讨论】:
很好,但是当我们对大量数据使用 collect 时,它会降低性能。你能告诉我任何其他的解决方案吗 v1 - 尝试 persist(), unpersist() v2 - 尝试数据集以上是关于scala spark 使用 expr 在列内赋值的主要内容,如果未能解决你的问题,请参考以下文章