识别运行在PySpark DF色谱柱上的干净方法
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了识别运行在PySpark DF色谱柱上的干净方法相关的知识,希望对你有一定的参考价值。
给出以下形式的PySpark DataFrame:
+----+---------+
time, message
+----+---------+
| t1 | [m1] |
| t3 | [m1, m2]|
| t4 | [m1, m2]|
| t6 | [m2] |
| t7 | [m3] |
| t8 | [m3] |
| t9 | [m1] |
... ...
+----+---------+
我想将其重构为:
+--------+-------+-----+
message | start | end |
+--------+-------+-----+
m1 | t1 | t4 |
m2 | t3 | t6 |
m3 | t7 | t8 |
m1 | t9 | t9 |
(即,将message
列视为一个序列,并为每个消息标识“运行”的开始和结束),
是否有一种干净的方法可以在Spark中进行此转换?当前,我将其作为6 GB TSV进行转储,并必须对其进行处理。
如果熊猫有干净的方法进行聚合,我愿意toPandas
-将此问题累积在驱动程序上。
答案
[找到了一种合理的方法,如果您可以在应用窗口操作时进行分区(可以在任何实际数据集上使用,我可以在其中导出此问题的方法上进行分区),就可以很好地进行扩展。
将其分解为大块以便于解释。
设置:
rows = [
['t01',['m1']],
['t03',['m1','m2']],
['t04',['m2']],
['t06',['m3']],
['t07',['m3','m1']],
['t08',['m1']],
['t11',['m2']],
['t13',['m2','m4']],
['t15',['m2']],
['t20',['m4']],
['t21',[]],
['t22',['m1','m4']],
]
pdf = pd.DataFrame(rows,columns=['time', 'messages'])
schema = StructType([
StructField("time", StringType(), True),
StructField("messages", ArrayType(StringType()), True)
])
df = spark.createDataFrame(pdf,schema=schema)
使用延迟并生成消息数组的差异以标识运行的开始和结束:
w = Window().partitionBy().orderBy('time')
df2 = df.withColumn('messages_lag_1', lag('messages', 1).over(w))\
.withColumn('end_time', lag('time', 1).over(w))\
.withColumnRenamed('time', 'start_time')\
.withColumn('messages_lag_1', # or you can dropna and miss some
coalesce( # cargoculted from
col('messages_lag_1'), # https://stackoverflow.com/a/57198009
from_json(lit('[]'), ArrayType(StringType()))
)
)\
.withColumn('message_run_starts', array_except('messages', 'messages_lag_1'))\
.withColumn('message_run_ends', array_except('messages_lag_1', 'messages'))\
.drop(*['messages', 'messages_lag_1']) # ^ only on Spark > 2.4
+----------+--------+------------------+----------------+
|start_time|end_time|message_run_starts|message_run_ends|
+----------+--------+------------------+----------------+
| t01| null| [m1]| []|
| t03| t01| [m2]| []|
| t04| t03| []| [m1]|
| t06| t04| [m3]| [m2]|
| t07| t06| [m1]| []|
| t08| t07| []| [m3]|
| t11| t08| [m2]| [m1]|
| t13| t11| [m4]| []|
| t15| t13| []| [m4]|
| t20| t15| [m4]| [m2]|
| t21| t20| []| [m4]|
| t22| t21| [m1, m4]| []|
+----------+--------+------------------+----------------+
按时间和消息分组,并对开始和结束表都应用排名。联接,如果为空,则将start_time
复制到end_time
:
w_start = Window().partitionBy('message_run_starts').orderBy(col('start_time'))
df3 = df2.withColumn('message_run_starts', explode('message_run_starts')).drop('message_run_ends', 'end_time')
df3 = df3.withColumn('start_row_id',rank().over(w_start))
w_end = Window().partitionBy('message_run_ends').orderBy(col('end_time'))
df4 = df2.withColumn('message_run_ends', explode('message_run_ends')).drop('message_run_starts', 'start_time')
df4 = df4.withColumn('end_row_id',rank().over(w_end))
df_combined = df3\
.join(df4, (df3.message_run_starts == df4.message_run_ends) & (df3.start_row_id == df4.end_row_id), how='full')\
.drop(*['message_run_ends','start_row_id','end_row_id'])\
.withColumn('end_time',coalesce(col('end_time'),col('start_time')))
df_combined.show()
+----------+------------------+--------+
|start_time|message_run_starts|end_time|
+----------+------------------+--------+
| t01| m1| t03|
| t07| m1| t08|
| t22| m1| t22|
| t03| m2| t04|
| t11| m2| t15|
| t06| m3| t07|
| t13| m4| t13|
| t20| m4| t20|
| t22| m4| t22|
+----------+------------------+--------+
以上是关于识别运行在PySpark DF色谱柱上的干净方法的主要内容,如果未能解决你的问题,请参考以下文章