如何评估作为列值的表达式?

Posted

技术标签:

【中文标题】如何评估作为列值的表达式?【英文标题】:How to evaluate expressions that are the column values? 【发布时间】:2018-04-24 10:16:19 【问题描述】:

我有一个包含数百万行的大数据框,如下所示:

A    B    C    Eqn
12   3    4    A+B
32   8    9    B*C
56   12   2    A+B*C

如何评估Eqn 列中的表达式?

【问题讨论】:

【参考方案1】:

您可以创建一个自定义 UDF 来评估这些算术函数

def evalUDF = udf((a:Int, b:Int, c:Int, eqn:String) => 
 val eqnParts = eqn
    .replace("A", a.toString)
    .replace("B", b.toString)
    .replace("C", c.toString)
    .split("""\b""")
    .toList

  val (sum, _) = eqnParts.tail.foldLeft((eqnParts.head.toInt, ""))
    case ((runningTotal, "+"), num) => (runningTotal + num.toInt, "") 
    case ((runningTotal, "-"), num) => (runningTotal - num.toInt, "") 
    case ((runningTotal, "*"), num) => (runningTotal * num.toInt, "") 
    case ((runningTotal, _), op) => (runningTotal, op)
  

  sum
)

evalDf
  .withColumn("eval", evalUDF('A, 'B, 'C, 'Eqn))
  .show()

输出:

+---+---+---+-----+----+
|  A|  B|  C|  Eqn|eval|
+---+---+---+-----+----+
| 12|  3|  4|  A+B|  15|
| 32|  8|  9|  B*C|  72|
| 56| 12|  2|A+B*C| 136|
+---+---+---+-----+----+

正如你所看到的,这很有效,但非常脆弱(空格、未知运算符等会破坏代码)并且不遵守操作顺序(否则最后应该是 92)

所以你可以自己写所有的东西,或者找到一些已经这样做的库(比如https://gist.github.com/daixque/1610753)?

也许性能开销会很大(尤其是你开始使用递归解析器),但至少你可以在数据帧上执行它而不是先收集它

【讨论】:

如果公式可能比此处显示的更复杂,我建议使用 Shutting Yard 算法 (en.wikipedia.org/wiki/Shunting-yard_algorithm),但我同意 UDF 是不可避免的方法。【参考方案2】:

我认为执行 DataFrame 中的 SQL 的唯一方法是先到 select("Eqn").collect,然后在源数据集上迭代地执行 SQL。

由于 SQL 位于 DataFrame 中,它只不过是对将在 Spark 执行器上执行的分布式计算的描述,因此您无法在处理执行器上的 SQL 时提交 Spark 作业。在执行管道中为时已晚。您应该重新使用驱动程序,以便能够提交新的 Spark 作业,例如执行 SQL。

使用驱动程序上的 SQL,您可以为每个 SQL 获取相应的行,然后只需 withColumn 即可执行 SQL(及其行)。

我认为编写它比开发一个有效的 Spark 应用程序更容易,但我就是这样做的。

【讨论】:

【参考方案3】:

我迟到了,但万一有人在找

使用变量的通用数学表达式解释器 无法硬编码到 UDF 中的复杂/未知表达式(已接受的答案)

那么你可以使用javax.script.ScriptEngineManager

import javax.script.SimpleBindings;
import javax.script.ScriptEngineManager
import java.util.Map
import java.util.HashMap


def calculateFunction = (mathExpression: String, A : Double, B : Double, C : Double ) => 
    val vars: Map[String, Object] = new HashMap[String, Object]();
    vars.put("A",A.asInstanceOf[Object])
    vars.put("B",B.asInstanceOf[Object])
    vars.put("C",C.asInstanceOf[Object])
    val engine = new ScriptEngineManager().getEngineByExtension("js");
    val result = engine.eval(mathExpression, new SimpleBindings(vars));
    result.asInstanceOf[Double]


val calculateUDF = spark.udf.register("calculateFunction",calculateFunction)

注意:这将处理通用表达式并且很健壮,但它的性能比公认的答案差很多并且内存很重 .

【讨论】:

以上是关于如何评估作为列值的表达式?的主要内容,如果未能解决你的问题,请参考以下文章

在 IntelliJ 中评估返回值的表达式

正则表达式在sql中查找列值的长度

MySQL优化COUNT()查询

优化特定类型的查询

如何理解“临时对象被销毁作为评估完整表达式的最后一步”?有人可以通过一些简单的例子说清楚吗?

评估作为字符串给出的表达式