基于另一列中的值的一列上的pyspark滞后函数
Posted
技术标签:
【中文标题】基于另一列中的值的一列上的pyspark滞后函数【英文标题】:pyspark lag function on one column based on the value in another column 【发布时间】:2019-04-11 12:36:53 【问题描述】:我希望能够根据其中一列中的值创建滞后值。
在给定的数据中,Qdf 是 Question 数据框,Adf 是 Answer 数据框。我已经给出了一个额外的解释列(我在最终数据中实际上不需要)
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql import SQLContext
ID = ['A' for i in range(0,10)]+ ['B' for i in range(0,10)]
Day = range(1,11)+range(1,11)
Delay = [2, 2, 2, 3, 2, 4, 3, 2, 2, 2, 2, 2, 3, 2, 4, 3, 2, 2, 2, 3]
Despatched = [2, 3, 1, 4, 6, 2, 6, 5, 3, 6, 3, 1, 2, 4, 1, 2, 3, 3, 6, 1]
Delivered = [0, 0, 2, 3, 1, 0, 10, 0, 0, 13, 0, 0, 3, 1, 0, 6, 0, 0, 6, 3]
Explanation = ["-", "-", "-", "-", "-", "-", "10 (4+6)", "-", "-", "13 (2+6+5)", "-", "-", "-", "-", "-", "6 (2+4)", "-", "-", "6 (1+2+3)", "-"]
QSchema = StructType([StructField("ID", StringType()),StructField("Day", IntegerType()),StructField("Delay", IntegerType()),StructField("Despatched", IntegerType())])
Qdata = map(list, zip(*[ID,Day,Delay,Despatched]))
Qdf = spark.createDataFrame(Qdata,schema=QSchema)
Qdf.show()
+---+---+-----+----------+
| ID|Day|Delay|Despatched|
+---+---+-----+----------+
| A| 1| 2| 2|
| A| 2| 2| 3|
| A| 3| 2| 1|
| A| 4| 3| 4|
| A| 5| 2| 6|
| A| 6| 4| 2|
| A| 7| 3| 6|
| A| 8| 2| 5|
| A| 9| 2| 3|
| A| 10| 2| 6|
| B| 1| 2| 3|
| B| 2| 2| 1|
| B| 3| 3| 2|
| B| 4| 2| 4|
| B| 5| 4| 1|
| B| 6| 3| 2|
| B| 7| 2| 3|
| B| 8| 2| 3|
| B| 9| 2| 6|
| B| 10| 3| 1|
+---+---+-----+----------+
发货数量应在延迟时间后记录为已发货。理想情况下,如果我可以根据延迟在发送的列上应用lag function
,那就太好了。 Answer 数据集如下所示:
Adata = map(list, zip(*[ID,Day,Delay,Despatched,Delivered,Explanation]))
ASchema = StructType([StructField("ID", StringType()),StructField("Day", IntegerType()),StructField("Delay", IntegerType()),StructField("Despatched", IntegerType()),StructField("Delivered", IntegerType()),StructField("Explanation", StringType())])
Adf = spark.createDataFrame(Adata,schema=ASchema)
Adf.show()
+---+---+-----+----------+---------+-----------+
| ID|Day|Delay|Despatched|Delivered|Explanation|
+---+---+-----+----------+---------+-----------+
| A| 1| 2| 2| 0| -|
| A| 2| 2| 3| 0| -|
| A| 3| 2| 1| 2| -|
| A| 4| 3| 4| 3| -|
| A| 5| 2| 6| 1| -|
| A| 6| 4| 2| 0| -|
| A| 7| 3| 6| 10| 10 (4+6)|
| A| 8| 2| 5| 0| -|
| A| 9| 2| 3| 0| -|
| A| 10| 2| 6| 13| 13 (2+6+5)|
| B| 1| 2| 3| 0| -|
| B| 2| 2| 1| 0| -|
| B| 3| 3| 2| 3| -|
| B| 4| 2| 4| 1| -|
| B| 5| 4| 1| 0| -|
| B| 6| 3| 2| 6| 6 (2+4)|
| B| 7| 2| 3| 0| -|
| B| 8| 2| 3| 0| -|
| B| 9| 2| 6| 6| 6 (1+2+3)|
| B| 10| 3| 1| 3| -|
+---+---+-----+----------+---------+-----------+
我已经尝试了以下代码以获得 2 的恒定延迟:
Qdf1=Qdf.withColumn('Delivered_lag',func.lag(Qdf['Despatched'],2).over(Window.partitionBy("ID").orderBy("Day")))
但是,当我尝试在一个列上使用滞后并在另一列上滞后时,我收到错误:
Qdf1=Qdf.withColumn('Delivered_lag',func.lag(Qdf['Despatched'],Qdf['Delay']).over(Window.partitionBy("ID").orderBy("Day")))
TypeError: 'Column' 对象不可调用
我怎样才能克服这个问题?我正在使用 PySpark 2.3.1 版和 python 2.7.13 版。
【问题讨论】:
能否请您在使用 lag() 的地方添加代码 sn-p? @cronoik 添加了代码 sn-p 和使用的库 【参考方案1】:lag-函数将一个固定值作为计数参数,但你可以做的是用when 和otherwise 创建一个循环来得到你想要的:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
ID = ['A' for i in range(0,10)]+ ['B' for i in range(0,10)]
#I had to modify this line as I'am working with python3
Day = list(range(1,11))+list(range(1,11))
Delay = [2, 2, 2, 3, 2, 4, 3, 2, 2, 2, 2, 2, 3, 2, 4, 3, 2, 2, 2, 3]
Despatched = [2, 3, 1, 4, 6, 2, 6, 5, 3, 6, 3, 1, 2, 4, 1, 2, 3, 3, 6, 1]
Delivered = [0, 0, 2, 3, 1, 0, 10, 0, 0, 13, 0, 0, 3, 1, 0, 6, 0, 0, 6, 3]
Explanation = ["-", "-", "-", "-", "-", "-", "10 (4+6)", "-", "-", "13 (2+6+5)", "-", "-", "-", "-", "-", "6 (2+4)", "-", "-", "6 (1+2+3)", "-"]
QSchema = T.StructType([T.StructField("ID", T.StringType()),T.StructField("Day", T.IntegerType()),T.StructField("Delay", T.IntegerType()),T.StructField("Despatched", T.IntegerType())])
Qdata = map(list, zip(*[ID,Day,Delay,Despatched]))
Qdf = spark.createDataFrame(Qdata,schema=QSchema)
#until here it was basically your code
#At first we add an empty Delivered_lag column to the Qdf
#That allows us to use the same functionality for all iterations of the following loop
Qdf = Qdf.withColumn('Delivered_lag', F.lit(None).cast(T.IntegerType()))
#Now we loop over the distinctive values of Qdf.delay and run the lag function for every value
#otherwise is necessary to keep the previous calculated values
for delay in Qdf.select('delay').distinct().collect():
Qdf = Qdf.withColumn('Delivered_lag', F.when(Qdf['Delay'] == delay.delay, F.lag(Qdf['Despatched'],delay.delay).over(Window.partitionBy("ID").orderBy("Day"))).otherwise(Qdf['Delivered_lag']))
Qdf.show()
输出:
+---+---+-----+----------+-------------+
| ID|Day|Delay|Despatched|Delivered_lag|
+---+---+-----+----------+-------------+
| B| 1| 2| 3| null|
| B| 2| 2| 1| null|
| B| 3| 3| 2| null|
| B| 4| 2| 4| 1|
| B| 5| 4| 1| 3|
| B| 6| 3| 2| 2|
| B| 7| 2| 3| 1|
| B| 8| 2| 3| 2|
| B| 9| 2| 6| 3|
| B| 10| 3| 1| 3|
| A| 1| 2| 2| null|
| A| 2| 2| 3| null|
| A| 3| 2| 1| 2|
| A| 4| 3| 4| 2|
| A| 5| 2| 6| 1|
| A| 6| 4| 2| 3|
| A| 7| 3| 6| 4|
| A| 8| 2| 5| 2|
| A| 9| 2| 3| 6|
| A| 10| 2| 6| 5|
+---+---+-----+----------+-------------+
【讨论】:
我希望有一个比循环更有效的解决方案,但这很有效。谢谢! 这之前没有出现,***.com/questions/45961164/…以上是关于基于另一列中的值的一列上的pyspark滞后函数的主要内容,如果未能解决你的问题,请参考以下文章
如何根据 PySpark 数据框的另一列中的值修改列? F.当边缘情况
Pyspark 通过使用另一列中的值替换 Spark 数据框列中的字符串