scala spark 使用 expr 在列内赋值

Posted

技术标签:

【中文标题】scala spark 使用 expr 在列内赋值【英文标题】:scala spark use expr to value inside a column 【发布时间】:2019-02-12 10:29:36 【问题描述】:

我需要向具有布尔值的数据框添加一个新列,评估数据框内的列。例如,我有一个数据框

+----+----+----+----+----+-----------+----------------+
|colA|colB|colC|colD|colE|colPRODRTCE|         colCOND|
+----+----+----+----+----+-----------+----------------+
|   1|   1|   1|   1|   3|         39|colA=1 && colB>0|
|   1|   1|   1|   1|   3|         45|          colD=1|
|   1|   1|   1|   1|   3|        447|colA>8 && colC=1|
+----+----+----+----+----+-----------+----------------+

在我的新专栏中,我需要评估 colCOND 的表达式是真还是假。

如果你有这样的东西很容易:

  val df = List(
    (1,1,1,1,3),
    (2,2,3,4,4)
  ).toDF("colA", "colB", "colC", "colD", "colE")

  val myExpression = "colA<colC"

  import org.apache.spark.sql.functions.expr

  df.withColumn("colRESULT",expr(myExpression)).show()

+----+----+----+----+----+---------+
|colA|colB|colC|colD|colE|colRESULT|
+----+----+----+----+----+---------+
|   1|   1|   1|   1|   3|    false|
|   2|   2|   3|   4|   4|     true|
+----+----+----+----+----+---------+

但我必须在每一行中计算一个不同的表达式,它在 colCOND 列内。

我想创建一个包含所有列的 UDF 函数,但我的真实数据框有很多列。我该怎么做?

谢谢大家

【问题讨论】:

你有解决办法吗?我在这里面临完全相同的问题。 @omnisius - 请看我的回答。谢谢你。 我打算用 Python 试试,非常感谢! 【参考方案1】:

如果&&改成AND,可以试试

package spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

object DataFrameLogicWithColumn extends App
  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  val sourceDF = Seq((1,1,1,1,3,39,"colA=1 AND colB>0"),
    (1,1,1,1,3,45,"colD=1"),
    (1,1,1,1,3,447,"colA>8 AND colC=1")
  ).toDF("colA", "colB", "colC", "colD", "colE", "colPRODRTCE", "colCOND").persist(MEMORY_AND_DISK)


  val exprs = sourceDF.select('colCOND).distinct().as[String].collect()

  val d1 = exprs.map(i => 
    val df = sourceDF.filter('colCOND.equalTo(i))
    df.withColumn("colRESULT", expr(i))
  )

  val resultDF = d1.reduce(_ union _)

  resultDF.show(false)
  //  +----+----+----+----+----+-----------+-----------------+---------+
  //  |colA|colB|colC|colD|colE|colPRODRTCE|colCOND          |colRESULT|
  //  +----+----+----+----+----+-----------+-----------------+---------+
  //  |1   |1   |1   |1   |3   |39         |colA=1 AND colB>0|true     |
  //  |1   |1   |1   |1   |3   |447        |colA>8 AND colC=1|false    |
  //  |1   |1   |1   |1   |3   |45         |colD=1           |true     |
  //  +----+----+----+----+----+-----------+-----------------+---------+

sourceDF.unpersist()    

可以试试DataSet

    case class c1 (colA: Int, colB: Int, colC: Int, colD: Int, colE: Int, colPRODRTCE: Int, colCOND: String)

    case class cRes (colA: Int, colB: Int, colC: Int, colD: Int, colE: Int, colPRODRTCE: Int, colCOND: String, colResult: Boolean)

    val sourceData = Seq(c1(1,1,1,1,3,39,"colA=1 AND colB>0"),
      c1(1,1,1,1,3,45,"colD=1"),
      c1(1,1,1,1,3,447,"colA>8 AND colC=1")
    ).toDS()

    def f2(a: c1): Boolean=
      // we need parse value with colCOUND
      a.colCOND match 
        case "colA=1 AND colB>0" => (a.colA == 1 && a.colB > 0) == true
        case _ => false
      
    

    val res2 = sourceData
      .map(i => cRes(i.colA, i.colB, i.colC, i.colD, i.colE, i.colPRODRTCE, i.colCOND,
        f2(i)))

【讨论】:

很好,但是当我们对大量数据使用 collect 时,它会降低性能。你能告诉我任何其他的解决方案吗 v1 - 尝试 persist(), unpersist() v2 - 尝试数据集

以上是关于scala spark 使用 expr 在列内赋值的主要内容,如果未能解决你的问题,请参考以下文章

在列内添加逗号分隔值

在列内检查特定字符串,android studio

扑。如何在列内创建树形图结构

在列内垂直居中 Bootstrap 3 按钮

颤振让孩子在列内匹配另一个孩子的宽度

iview 中table列 一列显示多个数据(后台返回数组显示在列内)