Spark UDF检索上一个非null值
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark UDF检索上一个非null值相关的知识,希望对你有一定的参考价值。
输入数据集
Dataset<Row> inputDS = spark.read.format("avro").path("hdfs://namenode:8020/..")
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188264901 | 0002019000000| 0 | 0 |Value | 5 |
|1554188264901 | 0002019000000| 0 | 0 |SetPoint | 7 |
|1554188276412 | 0002019000000| 0 | 0 |Voltage | 9 |
|1554188276412 | 0002019000000| 0 | 0 |SetPoint | 10 |
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
|1554188639407 | 0002019000000| 0 | 0 |Voltage | 3 |
+---------------+---------------+----------------+-------+--------------+--------+
中级数据集
inputDS.createOrReplaceTempView("abc");
Dataset<Row> intermediateDS<Row> =
spark.sql("select time,thingId,controller,module,variableName,value,count(time) over (partition by time) as time_count from abc")
.filter("time_count=1").drop("time_count");
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
|1554188639407 | 0002019000000| 0 | 0 |Voltage | 3 |
+---------------+---------------+----------------+-------+--------------+--------+
中间数据集只是时间列,只发生一次,如上所述。
必需的输出数据集
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
|1554188639406 | 0002019000000| 0 | 0 |Voltage | 9 | // last non null value for the set (thingId, controller, module) and variableName='Voltage'
|1554188639407 | 0002019000000| 0 | 0 |Voltage | 3 |
|1554188639407 | 0002019000000| 0 | 0 |SetPoint | 10 | // last non null value for the set (thingId, controller, module) and variableName='SetPoint'
+---------------+---------------+----------------+-------+--------------+--------+
为了获得所需的输出,我尝试使用UDF,如下所示
spark.udf().register("getLastvalue_udf",getValue,DataType.StringType);
intermediateDS=intermediateDS.withColumn("Last_Value",callUDF("getLastvalue_udf",col("variableName")));
UDF1<String,String> getValue = new UDF1<String,String>()
@Override
public String call(String t1)
String variableName="";
if(t1=="SetPoint")
variableName="Voltage";
else
variableName="SetPoint";
String value = String.valueOf(spark.sql("SELECT LAST(value) OVER (order by time desc) as value from abc where "
+" variableName="+ variableName +") limit 1")
return value;
但UDF
刚刚返回[value:String]
。 spark.sql()
不在UDF中工作。
1.)如何从UDF上方获取所需输出或建议我使用任何其他解决方法。
2.)是否可以在map函数中调用spark sql?谢谢。
答案
Lag函数解决了从表中前一行返回值的情况
代码如下:
import static org.apache.spark.sql.expressions.Window;
import static org.apache.spark.sql.expressions.WindowSpec;
import static org.apache.spark.sql.functions;
WindowSpec lagWindow = Window.partitionBy("thingId","controller","module","variableName").orderBy("time");
DS.withColumn("value",when(col("value").equalTo(""),lag("value",1).over(lagWindow)).otherwise(col("value")));
以上是关于Spark UDF检索上一个非null值的主要内容,如果未能解决你的问题,请参考以下文章
Redshift 中的 Python UDF 函数始终返回 NULL 值
传递值为 null 的变量与传递常量 null 时,Sql Server UDF 的行为不同