Spark / Scala - RDD填充最后一个非空值
Posted
技术标签:
【中文标题】Spark / Scala - RDD填充最后一个非空值【英文标题】:Spark/Scala - RDD fill with last non null value 【发布时间】:2021-10-25 02:13:26 【问题描述】:我有一个如下所示的 rdd:
timestamp,user_id,search_id
[2021-08-14 14:38:31,user_a,null]
[2021-08-14 14:42:01,user_a,ABC]
[2021-08-14 14:55:12,user_a,null]
[2021-08-14 14:56:19,user_a,null]
[2021-08-14 15:01:36,user_a,null]
[2021-08-14 15:02:22,user_a,null]
[2021-08-15 07:38:07,user_b,XYZ]
[2021-08-15 07:39:59,user_b,null]
我想通过用 user_id 分组的最新非空值(如果有)填充“search_id”中的空值,将没有 search_id 的事件与以前的 search_ids 关联。
因此,我的输出将如下所示:
timestamp,user_id,search_id
[2021-08-14 14:38:31,user_a,null]
[2021-08-14 14:42:01,user_a,ABC]
[2021-08-14 14:55:12,user_a,ABC]
[2021-08-14 14:56:19,user_a,ABC]
[2021-08-14 15:01:36,user_a,ABC]
[2021-08-14 15:02:22,user_a,ABC]
[2021-08-15 07:38:07,user_b,XYZ]
[2021-08-15 07:39:59,user_b,XYZ]
我找到了使用 org.apache.spark.sql.functions.last
和此处的窗口的 spark 数据帧的解决方案 --> Spark Window function last not null value 但我的上下文目前不允许我将 rdd 转换为数据帧,所以我想知道是否有任何你知道如何做到这一点。
【问题讨论】:
“我的上下文不允许我将 rdd 转换为数据帧”是什么意思? 抱歉,应该指定:由于遗留问题,我需要在仍在使用 spark 【参考方案1】:我猜 groupBy 用户 (https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/rdd/RDD.html#groupBy(scala.Function1,%20scala.reflect.ClassTag) ) 然后 flatMapWith (不要忘记对分组项目进行排序,因为 groupBy 不保留顺序)这将修复您的搜索 ID。所有这一切都假设您每个用户没有太多项目
【讨论】:
【参考方案2】:完成此操作的一种方法是知道我们需要调用 lag() 函数的最大次数。 试试这个。
输入:
val df1=spark.sql("""
select timestamp'2021-08-14 14:38:31' timestamp, 'user_a' user_id, 'null' search_id union all
select '2021-08-14 14:42:01' , 'user_a', 'ABC' union all
select '2021-08-14 14:55:12' , 'user_a', 'null' union all
select '2021-08-14 14:56:19' , 'user_a', 'null' union all
select '2021-08-14 15:01:36' , 'user_a', 'null' union all
select '2021-08-14 15:02:22' , 'user_a', 'null' union all
select '2021-08-15 07:38:07' , 'user_b', 'XYZ' union all
select '2021-08-15 07:39:59' , 'user_b', 'null'
""")
df1.orderBy("timestamp").show(false)
df1.printSchema
df1.createOrReplaceTempView("df1")
+-------------------+-------+---------+
|timestamp |user_id|search_id|
+-------------------+-------+---------+
|2021-08-14 14:38:31|user_a |null |
|2021-08-14 14:42:01|user_a |ABC |
|2021-08-14 14:55:12|user_a |null |
|2021-08-14 14:56:19|user_a |null |
|2021-08-14 15:01:36|user_a |null |
|2021-08-14 15:02:22|user_a |null |
|2021-08-15 07:38:07|user_b |XYZ |
|2021-08-15 07:39:59|user_b |null |
+-------------------+-------+---------+
现在计算最大次数
val max_count = spark.sql(" select max(c) from (select count(*) c from df1 group by user_id)").as[Long].first
max_count: Long = 6
创建一个可变数据框,以便我们可以循环并将其分配给相同的 df。
var df2=df1
for( i <- 1 to max_count.toInt )
df2=df2.withColumn("search_id",expr(""" case when search_id <> 'null' then search_id
else lag(search_id) over(partition by user_id order by timestamp) end """))
df2.orderBy("timestamp").show(false)
+-------------------+-------+---------+
|timestamp |user_id|search_id|
+-------------------+-------+---------+
|2021-08-14 14:38:31|user_a |null |
|2021-08-14 14:42:01|user_a |ABC |
|2021-08-14 14:55:12|user_a |ABC |
|2021-08-14 14:56:19|user_a |ABC |
|2021-08-14 15:01:36|user_a |ABC |
|2021-08-14 15:02:22|user_a |ABC |
|2021-08-15 07:38:07|user_b |XYZ |
|2021-08-15 07:39:59|user_b |XYZ |
+-------------------+-------+---------+
【讨论】:
以上是关于Spark / Scala - RDD填充最后一个非空值的主要内容,如果未能解决你的问题,请参考以下文章
Spark:scala - 如何将集合从 RDD 转换为另一个 RDD
Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行