如何在 Apache Spark 中获取上一行的数据
Posted
技术标签:
【中文标题】如何在 Apache Spark 中获取上一行的数据【英文标题】:How to get data of previous row in Apache Spark 【发布时间】:2017-07-10 11:53:48 【问题描述】:从 Spark 数据框中查找每个城市的上个月销售额
|City| Month |Sale|
+----+----------- +----- +
| c1| JAN-2017| 49 |
| c1| FEB-2017| 46 |
| c1| MAR-2017| 83 |
| c2| JAN-2017| 59 |
| c2| MAY-2017| 60 |
| c2| JUN-2017| 49 |
| c2| JUL-2017| 73 |
+----+-----+----+-------
需要的解决方案是
|City| Month |Sale |previous_sale|
+----+-----+-------+-------------+--------
| c1| JAN-2017| 49| NULL |
| c1| FEB-2017| 46| 49 |
| c1| MAR-2017| 83| 46 |
| c2| JAN-2017| 59| NULL |
| c2| MAY-2017| 60| 59 |
| c2| JUN-2017| 49| 60 |
| c2| JUL-2017| 73| 49 |
+----+-----+----+-------------+-----------
请帮帮我
【问题讨论】:
【参考方案1】:你可以使用lag函数来获取之前的值
如果要按月份排序,则需要转换为正确的日期。对于"JAN-2017"
到"01-01-2017"
是这样的。
import spark.implicits._
val df = spark.sparkContext.parallelize(Seq(
("c1", "JAN-2017", 49),
("c1", "FEB-2017", 46),
("c1", "MAR-2017", 83),
("c2", "JAN-2017", 59),
("c2", "MAY-2017", 60),
("c2", "JUN-2017", 49),
("c2", "JUL-2017", 73)
)).toDF("city", "month", "sales")
val window = Window.partitionBy("city").orderBy("month")
df.withColumn("previous_sale", lag($"sales", 1, null).over(window)).show
输出:
+----+--------+-----+----+
|city| month|sales| previous_sale|
+----+--------+-----+----+
| c1|FEB-2017| 46|null|
| c1|JAN-2017| 49| 46|
| c1|MAR-2017| 83| 49|
| c2|JAN-2017| 59|null|
| c2|JUL-2017| 73| 59|
| c2|JUN-2017| 49| 73|
| c2|MAY-2017| 60| 49|
+----+--------+-----+----+
您可以使用此 UDF 创建一个默认日期,如 01/月/年,即使它有不同的年份,也会使用日期排序
val fullDate = udf((value :String )=>
val months = List("JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC")
val splited = value.split("-")
new Date(splited(1).toInt, months.indexOf(splited(0)) + 1, 1)
)
df.withColumn("month", fullDate($"month")).show()
希望这能帮到你!
【讨论】:
嗨 Shankar,这工作正常,但我需要不使用 Window 的代码。 希望你能帮助我。 我认为这是最好的方法,不使用Window可能不太容易获得。 我认为这是我们得到的最好的选择,我不知道为什么你的 PM 不接受,我可以给你加入的逻辑,首先创建一个新列,每个列的数字都在增加城市 c1 c2 然后与自身加入并增加 id -1 找出一个城市的最大日期和最小日期并计算它们之间的所有月份【参考方案2】:package com.incedo.pharma
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.SparkSession
//import org.apache.spark.sql.expressions.Window
import java.sql.Date
class SecondTask
def previousMonthSale()
val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
import ss.sqlContext.implicits._
val df = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", "|")
.load("taskTwoData.csv")
val df1 = df.withColumn("Timestamp", unix_timestamp(df("Date"), "MM/dd/yyyy").cast("timestamp"))
val df2 = df1.withColumn("Month",month(df1("Timestamp")))
val df3 = df2.groupBy("City", "Month").agg(sum(col("Sale")).alias("Current_Month_Total_Sale")).orderBy("City","Month")
val df4 = df3.withColumn("pre_month",df3("Month")-1)
val df5 = df4.alias("a").join(df3.alias("b"),$"a.pre_month" === $"b.Month" && $"a.City" === $"b.City","left_outer")
.select($"a.City",$"a.Month",$"a.Current_Month_Total_Sale",($"b.Current_Month_Total_Sale")
.alias("Previous_Month_Total_Sale")).na.fill(0,Seq("Previous_Month_Total_Sale"))
val df6 = df5.withColumn("Percent_Sale_Change",round(((df5("Current_Month_Total_Sale") - df5("Previous_Month_Total_Sale"))/df5("Current_Month_Total_Sale"))*100,2))
val df7 = df6.groupBy("City").max("Current_Month_Total_Sale").alias("Max_Sale").orderBy("City")
//df7.show()
val df8 = df6.join(df7, Seq("City"))
val df9 = df8.withColumn("Percent_Sale_By_Max_Sale", round(df8("Current_Month_Total_Sale")/df8("max(Current_Month_Total_Sale)"),2))
.drop("max(Current_Month_Total_Sale)")
df9.toDF().show()
object taskTwo
def main(arr: Array[String])
new SecondTask().previousMonthSale()
`enter code here`
【讨论】:
嗨 Shankar,请查看我的解决方案 嗨 Shankar,请在此 URL ***.com/questions/45883039/… 中解决问题,请指导我。以上是关于如何在 Apache Spark 中获取上一行的数据的主要内容,如果未能解决你的问题,请参考以下文章
如何获取有关当前执行程序 Apache-Spark 的元数据?
如何在 2.2.0 中获取给定 Apache Spark Dataframe 的 Cassandra cql 字符串?
Apache Spark - 如何从两个 RDD 中获取不匹配的行