Spark UDF 检索最后一个非空值

Posted

技术标签:

【中文标题】Spark UDF 检索最后一个非空值【英文标题】:Spark UDF to retrieve Last non null value 【发布时间】:2019-04-22 12:44:26 【问题描述】:

输入数据集

 Dataset<Row> inputDS = spark.read.format("avro").path("hdfs://namenode:8020/..")
 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188264901  |  0002019000000|        0       | 0     |Value         |    5   |
 |1554188264901  |  0002019000000|        0       | 0     |SetPoint      |    7   |
 |1554188276412  |  0002019000000|        0       | 0     |Voltage       |    9   |
 |1554188276412  |  0002019000000|        0       | 0     |SetPoint      |    10  |  
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 |1554188639407  |  0002019000000|        0       | 0     |Voltage       |    3   |
 +---------------+---------------+----------------+-------+--------------+--------+

中间数据集

 inputDS.createOrReplaceTempView("abc");
 Dataset<Row> intermediateDS<Row> =
 spark.sql("select time,thingId,controller,module,variableName,value,count(time) over (partition by time) as time_count from abc")
                                        .filter("time_count=1").drop("time_count");

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 |1554188639407  |  0002019000000|        0       | 0     |Voltage       |    3   |
 +---------------+---------------+----------------+-------+--------------+--------+

中间数据集只不过是像上面那样只出现一次的时间列。

所需的输出数据集

 +---------------+---------------+----------------+-------+--------------+--------+
 |  time         | thingId       |     controller | module| variableName |  value |
 +---------------+---------------+----------------+-------+--------------+--------+
 |1554188639406  |  0002019000000|        0       | 0     |SetPoint      |    6   |
 |1554188639406  |  0002019000000|        0       | 0     |Voltage       |    9   |  // last non null value for the set (thingId, controller, module) and variableName='Voltage'
 |1554188639407  |  0002019000000|        0       | 0     |Voltage       |    3   |  
 |1554188639407  |  0002019000000|        0       | 0     |SetPoint      |    10  |  // last non null value for the set (thingId, controller, module) and variableName='SetPoint'
 +---------------+---------------+----------------+-------+--------------+--------+

为了获得所需的输出,我尝试使用 UDF,如下所示

 spark.udf().register("getLastvalue_udf",getValue,DataType.StringType);

 intermediateDS=intermediateDS.withColumn("Last_Value",callUDF("getLastvalue_udf",col("variableName")));

 UDF1<String,String> getValue = new UDF1<String,String>()

 @Override
 public String call(String t1)

 String variableName="";

 if(t1=="SetPoint")
 variableName="Voltage";
 else
 variableName="SetPoint";
 

 String value = String.valueOf(spark.sql("SELECT LAST(value) OVER (order by time desc) as value from abc where "
  +" variableName="+ variableName +") limit 1")

 return value;
 

UDF 刚刚返回[value:String]spark.sql() 在 UDF 中不起作用。

1.) 如何从上述 UDF 获取所需的输出或建议我使用任何其他解决方法。

2.) 是否可以在 map 函数中调用 spark sql ? 谢谢。

【问题讨论】:

您的评论“集合(thingId、控制器、模块)和 variableName='?' 的最后一个非空值”你能解释一下吗? @VikasKushwaha,对于 thingId、controller、module、variableName 中的每个组合,例如 0001999000100,0,1,Voltage,“对于集合”是一个集合。我使用滞后sql函数对其进行了整理,请在下面找到答案。 【参考方案1】:

Lag函数解决了从表中的前一行返回值的情况

代码如下:

import static org.apache.spark.sql.expressions.Window;
import static org.apache.spark.sql.expressions.WindowSpec;
import static org.apache.spark.sql.functions;

WindowSpec lagWindow = Window.partitionBy("thingId","controller","module","variableName").orderBy("time");
DS.withColumn("value",when(col("value").equalTo(""),lag("value",1).over(lagWindow)).otherwise(col("value")));

【讨论】:

以上是关于Spark UDF 检索最后一个非空值的主要内容,如果未能解决你的问题,请参考以下文章

Spark / Scala - RDD填充最后一个非空值

如何使用 Spark DataFrame 将最后一个非空值结转到后续行

计算Spark DataFrame中的非空值的数量

CustomSQLUtil 在使用 Liferay 7 进行查询检索期间获取空值

将空值设置为列表中最接近的最后一个非空值 - LINQ

如何在候选实体中检索 UDF 字段的特定值