将新行与spark scala中数据框中的前一行数据合并
Posted
技术标签:
【中文标题】将新行与spark scala中数据框中的前一行数据合并【英文标题】:merge new rows with previous rows data in dataframe in spark scala 【发布时间】:2018-04-12 11:18:58 【问题描述】:输入 Spark 数据帧 df (OLTP):
+----+---------+------+
|name|date |amount|
+----+---------+------+
|abc |4/6/2018 | 100 |
|abc |4/6/2018 | 200 |
+----+---------+------+
|abc |4/13/2018| 300 |
+----+---------+------+
预期的 DF (OLAP):
+----+---------+------+
|name|date |amount|
+----+---------+------+
|abc |4/6/2018 | 100 |
|abc |4/6/2018 | 200 |
+----+---------+------+
|abc |4/13/2018| 100|
+----+---------+------+
| abc|4/13/2018| 200|
+----+---------+------+
| abc|4/13/2018| 300|
+----+---------+------+
我的代码
val df = df1.union(df1)
+----+---------+------+
|name|date |amount|
+----+---------+------+
|abc |4/6/2018 |100 |
|abc |4/6/2018 |200 |
|abc |4/13/2018|300 |
|abc |4/6/2018 |100 |
|abc |4/6/2018 |200 |
|abc |4/13/2018|300 |
+----+---------+------+
val w1 = org.apache.spark.sql.expressions.Window.orderBy("date")
val ExpectedDF = df.withColumn("previousAmount", lag("amount",1).over(w1)).withColumn("newdate", lag("date",1).over(w1))
ExpectedDF .show(false)
+----+---------+------+--------------+---------+
|name|date |amount|previousAmount|newdate |
+----+---------+------+--------------+---------+
|abc |4/13/2018|300 |null |null |
|abc |4/13/2018|300 |300 |4/13/2018|
|abc |4/6/2018 |100 |300 |4/13/2018|
|abc |4/6/2018 |200 |100 |4/6/2018 |
|abc |4/6/2018 |100 |200 |4/6/2018 |
|abc |4/6/2018 |200 |100 |4/6/2018 |
+----+---------+------+--------------+---------+
【问题讨论】:
你能解释一下你想要做什么吗? 嗨 shankar,你好吗,实际上在 2018 年 4 月 13 日的这个问题中(如果进入 df1,则在 7 天后的每个日期)然后我们需要添加上一个日期的所有金额和名称此日期,请参阅预期 DF 请帮帮我,你帮了我很多次 如果您无法解释,我们将无法提供帮助。分享你尝试了什么? val df = df1.union(df1) 【参考方案1】:def main(args: Array[String])
val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
val sc = new SparkContext(conf)
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
import ss.sqlContext.implicits._
var df1 = sqlc.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("oldRecords.csv")
df1.show(false)
println("---- df1 row count ----"+df1.count())
if(df1.count()>0)
for (i <- 0 until (df1.count().toInt)-1)
var df2 = df1.unionAll(df1)//.union(df1)//df3
//df2.show(false)
var w1 = org.apache.spark.sql.expressions.Window.orderBy("date")
var df3 = df2.withColumn("previousAmount", lag("amount",1).over(w1)).withColumn("newdate", lag("date",1).over(w1))
// df3.show(false)
var df4 = df3.filter((df3.col("newdate").isNotNull))//(df3.col("new_date").isNotNull)
//df4.show(false)
var df5 = df4.select("name","amount","newdate").distinct()
println("-----------"+df5.show(false))
df1 = df5.withColumnRenamed("newdate", "date")
【讨论】:
以上是关于将新行与spark scala中数据框中的前一行数据合并的主要内容,如果未能解决你的问题,请参考以下文章
如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行
为spark scala中的数据框中的每个组采样不同数量的随机行