Spark:在scala中的数据帧上使用动态过滤器进行聚合
Posted
技术标签:
【中文标题】Spark:在scala中的数据帧上使用动态过滤器进行聚合【英文标题】:Spark: Aggregation with dynamic filter on a dataframe in scala 【发布时间】:2019-05-30 01:48:27 【问题描述】:我有一个类似的数据框
scala> testDf.show()
+------+--------+---------+------------+----------------------------------------+
| id| item| value| value_name| condition|
+------+--------+---------+------------+----------------------------------------+
| 11| 3210| 0| OFF| value==0|
| 12| 3210| 1| OFF| value==0|
| 13| 3210| 0| OFF| value==0|
| 14| 3210| 0| OFF| value==0|
| 15| 3210| 1| OFF| value==0|
| 16| 5440| 5| ON| value>0 && value<10|
| 17| 5440| 0| ON| value>0 && value<10|
| 18| 5440| 6| ON| value>0 && value<10|
| 19| 5440| 7| ON| value>0 && value<10|
| 20| 5440| 0| ON| value>0 && value<10|
| 21| 7780| A| TYPE| Set("A","B").contains(value.toString)|
| 22| 7780| A| TYPE| Set("A","B").contains(value.toString)|
| 23| 7780| A| TYPE| Set("A","B").contains(value.toString)|
| 24| 7780| C| TYPE| Set("A","B").contains(value.toString)|
| 25| 7780| C| TYPE| Set("A","B").contains(value.toString)|
+------+--------+---------+------------+----------------------------------------+
scala> testDf.printSchema
root
|-- id: string (nullable = true)
|-- item: string (nullable = true)
|-- value: string (nullable = true)
|-- value_name: string (nullable = true)
|-- condition: string (nullable = true)
我想删除一些带有“条件”列的行。 但我有麻烦了。
我尝试了以下测试代码。 但它似乎无法正常工作。
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.Row
import scala.collection.mutable
val encoder = RowEncoder(testDf.schema);
testDf.flatMap(row =>
val result = new mutable.MutableList[Row];
val setting_value = row.getAs[String]("setting_value").toInt
val condition = row.getAs[String]("condition").toBoolean
if (condition)
result+=row;
;
result;
)(encoder).show();
这是错误。
19/05/30 02:04:31 ERROR TaskSetManager: Task 0 in stage 267.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 267.0 failed 4 times, most recent failure: Lost task 0.3 in stage 267.0 (TID 3763, .compute.internal, executor 1): java.lang.IllegalArgumentException: For input string: "setting_value==0"
at scala.collection.immutable.StringLike$class.parseBoolean(StringLike.scala:291)
at scala.collection.immutable.StringLike$class.toBoolean(StringLike.scala:261)
at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:29)
at $anonfun$1.apply(<console>:40)
at $anonfun$1.apply(<console>:37)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我想保留与条件列的值匹配的行。 这是想要的结果。
+------+--------+---------+------------+----------------------------------------+
| id| item| value| value_name| condition|
+------+--------+---------+------------+----------------------------------------+
| 11| 3210| 0| OFF| value==0|
| 13| 3210| 0| OFF| value==0|
| 14| 3210| 0| OFF| value==0|
| 16| 5440| 5| ON| value>0 && value<10|
| 18| 5440| 6| ON| value>0 && value<10|
| 19| 5440| 7| ON| value>0 && value<10|
| 21| 7780| A| TYPE| Set("A","B").contains(value.toString)|
| 22| 7780| A| TYPE| Set("A","B").contains(value.toString)|
| 23| 7780| A| TYPE| Set("A","B").contains(value.toString)|
+------+--------+---------+------------+----------------------------------------+
如果你有一个好主意,请帮助我。 谢谢。
【问题讨论】:
【参考方案1】:这是使用带有 UDF 函数的 scala reflection API 的一种方法。 udf 处理 int 和 string 值的两种情况:
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
val tb = currentMirror.mkToolBox()
val df = Seq(("0","value==0"),
("1", "value==0"),
("6", """value>0 && value<10"""),
("7", """value>0 && value<10"""),
("0", """value>0 && value<10"""),
("A", """Set("A","B").contains(value.toString)"""),
("C", """Set("A","B").contains(value.toString)""")).toDF("value", "condition")
def isAllDigits(x: String) = x.forall(Character.isDigit)
val evalExpressionUDF = udf((value: String, expr: String) =>
val result = isAllDigits(value) match
case true => tb.eval(tb.parse(expr.replace("value", s"""$value.toInt""")))
case false => tb.eval(tb.parse(expr.replace("value", s""""$value"""")))
result.asInstanceOf[Boolean]
)
df.withColumn("eval", evalExpressionUDF($"value", $"condition"))
.where($"eval" === true)
.show(false)
evalExpressionUDF
的案例:
mkToolBox
执行字符串代码
string:将字符串值括在""
中,然后用双引号字符串替换表达式并执行字符串代码
输出:
+-----+-------------------------------------+----+
|value| condition |eval|
+-----+-------------------------------------+----+
|0 |value==0 |true|
|6 |value>0 && value<10 |true|
|7 |value>0 && value<10 |true|
|A |Set("A","B").contains(value.toString)|true|
+-----+-------------------------------------+----+
PS:我知道上述解决方案的性能可能很差,因为它会调用反射,尽管我不知道有替代方案。
【讨论】:
非常感谢。您的回答很有帮助。但我在下面得到一个错误。 @ 987654327@ 错误:`org.apache.spark.SparkException:任务不可序列化原因:java.io.NotSerializableException:scala.tools.reflect.ToolBoxFactory$ToolBoxImpl 序列化堆栈:-对象不可序列化(类:scala.tools.reflect .ToolBoxFactory$ToolBoxImpl,值:scala.tools.reflect.ToolBoxFactory$ToolBoxImpl@7341617c) ` 您好,您运行的是什么 Spark 版本?还有什么环境spark-shell
,databricks
?
在多节点的spark集群环境中,似乎出现了错误。有没有办法解决这个问题?
我使用 spark-shell。我的火花版本是 2.4.0。 Scala 版本是 2.11.12。
我只是在本地模式下运行它,我得到了同样的错误。【参考方案2】:
在上述情况下,Spark 正在尝试将 String 值转换为布尔值。它不评估表达式本身。 表达式评估必须由用户使用外部库或自定义代码完成。 我能想到的最接近的(虽然不是确切的场景)是How to evaluate a math expression given in string form?。
【讨论】:
我尝试使用 scala.tools.reflect.ToolBox 中的 eval 函数。但它不起作用......以上是关于Spark:在scala中的数据帧上使用动态过滤器进行聚合的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 spark-scala 在 spark 数据帧上执行枢轴?