将新行与spark scala中数据框中的前一行数据合并

Posted

技术标签:

【中文标题】将新行与spark scala中数据框中的前一行数据合并【英文标题】:merge new rows with previous rows data in dataframe in spark scala 【发布时间】:2018-04-12 11:18:58 【问题描述】:

输入 Spark 数据帧 df (OLTP):

+----+---------+------+
|name|date     |amount|
+----+---------+------+
|abc |4/6/2018 |  100 |
|abc |4/6/2018 |  200 |
+----+---------+------+
|abc |4/13/2018|  300 |
+----+---------+------+

预期的 DF (OLAP):

+----+---------+------+
|name|date     |amount|
+----+---------+------+
|abc |4/6/2018 |  100 |
|abc |4/6/2018 |  200 |
+----+---------+------+
|abc |4/13/2018|   100|
+----+---------+------+
| abc|4/13/2018|   200|
+----+---------+------+
| abc|4/13/2018|   300|

+----+---------+------+

我的代码

val df = df1.union(df1) 

+----+---------+------+
|name|date     |amount|
+----+---------+------+
|abc |4/6/2018 |100   |
|abc |4/6/2018 |200   |
|abc |4/13/2018|300   |
|abc |4/6/2018 |100   |
|abc |4/6/2018 |200   |
|abc |4/13/2018|300   |
+----+---------+------+


 val w1 = org.apache.spark.sql.expressions.Window.orderBy("date")
    val ExpectedDF = df.withColumn("previousAmount",  lag("amount",1).over(w1)).withColumn("newdate", lag("date",1).over(w1))
    ExpectedDF .show(false)

+----+---------+------+--------------+---------+
|name|date     |amount|previousAmount|newdate  |
+----+---------+------+--------------+---------+
|abc |4/13/2018|300   |null          |null     |
|abc |4/13/2018|300   |300           |4/13/2018|
|abc |4/6/2018 |100   |300           |4/13/2018|
|abc |4/6/2018 |200   |100           |4/6/2018 |
|abc |4/6/2018 |100   |200           |4/6/2018 |
|abc |4/6/2018 |200   |100           |4/6/2018 |
+----+---------+------+--------------+---------+

【问题讨论】:

你能解释一下你想要做什么吗? 嗨 shankar,你好吗,实际上在 2018 年 4 月 13 日的这个问题中(如果进入 df1,则在 7 天后的每个日期)然后我们需要添加上一个日期的所有金额和名称此日期,请参阅预期 DF 请帮帮我,你帮了我很多次 如果您无法解释,我们将无法提供帮助。分享你尝试了什么? val df = df1.union(df1) 【参考方案1】:
def main(args: Array[String])
    val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlc = new org.apache.spark.sql.SQLContext(sc)
    val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
    import ss.sqlContext.implicits._
    var df1 = sqlc.read.format("com.databricks.spark.csv")
             .option("header", "true")
             .option("inferSchema", "true")
             .load("oldRecords.csv")
    df1.show(false)
    println("---- df1 row count ----"+df1.count())
    if(df1.count()>0)
      for (i <- 0 until (df1.count().toInt)-1) 
        var df2 = df1.unionAll(df1)//.union(df1)//df3
        //df2.show(false)
        var w1 = org.apache.spark.sql.expressions.Window.orderBy("date")
        var df3 = df2.withColumn("previousAmount",  lag("amount",1).over(w1)).withColumn("newdate", lag("date",1).over(w1))
        // df3.show(false)
        var df4 = df3.filter((df3.col("newdate").isNotNull))//(df3.col("new_date").isNotNull)
        //df4.show(false)
        var df5 = df4.select("name","amount","newdate").distinct() 
        println("-----------"+df5.show(false))
        df1 = df5.withColumnRenamed("newdate", "date")
      
    

【讨论】:

以上是关于将新行与spark scala中数据框中的前一行数据合并的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Spark/Scala RDD 合并/加入到 List 中,以便 RDD 中的每个值与每个 List 项一起获得一个新行

Scala(Spark)连接数据框中的列[重复]

为spark scala中的数据框中的每个组采样不同数量的随机行

如何将字符串中带有双引号的json文件加载到spark scala中的数据框中

如何将当前行的负值转移到数据框中的前一行?

在 scala spark 数据框中提取时间间隔