识别运行在PySpark DF色谱柱上的干净方法

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了识别运行在PySpark DF色谱柱上的干净方法相关的知识,希望对你有一定的参考价值。

给出以下形式的PySpark DataFrame:

+----+---------+
 time, message
+----+---------+
| t1 | [m1]    |
| t3 | [m1, m2]|
| t4 | [m1, m2]|
| t6 | [m2]    |
| t7 | [m3]    |
| t8 | [m3]    |
| t9 | [m1]    |
  ...   ...
+----+---------+

我想将其重构为:

+--------+-------+-----+
 message | start | end |
+--------+-------+-----+
   m1    |  t1   |  t4 |
   m2    |  t3   |  t6 |
   m3    |  t7   |  t8 |
   m1    |  t9   |  t9 |

(即,将message列视为一个序列,并为每个消息标识“运行”的开始和结束),

是否有一种干净的方法可以在Spark中进行此转换?当前,我将其作为6 GB TSV进行转储,并必须对其进行处理。

如果熊猫有干净的方法进行聚合,我愿意toPandas-将此问题累积在驱动程序上。

答案

[找到了一种合理的方法,如果您可以在应用窗口操作时进行分区(可以在任何实际数据集上使用,我可以在其中导出此问题的方法上进行分区),就可以很好地进行扩展。

将其分解为大块以便于解释。

设置:

rows = [
    ['t01',['m1']],
    ['t03',['m1','m2']],
    ['t04',['m2']],
    ['t06',['m3']],
    ['t07',['m3','m1']],
    ['t08',['m1']],
    ['t11',['m2']],
    ['t13',['m2','m4']],
    ['t15',['m2']],
    ['t20',['m4']],
    ['t21',[]],
    ['t22',['m1','m4']],
]

pdf = pd.DataFrame(rows,columns=['time', 'messages'])
schema = StructType([
    StructField("time", StringType(), True),
    StructField("messages", ArrayType(StringType()), True)
])
df = spark.createDataFrame(pdf,schema=schema)

使用延迟并生成消息数组的差异以标识运行的开始和结束:

w = Window().partitionBy().orderBy('time')
df2 = df.withColumn('messages_lag_1', lag('messages', 1).over(w))\
        .withColumn('end_time', lag('time', 1).over(w))\
        .withColumnRenamed('time', 'start_time')\
        .withColumn('messages_lag_1',          # or you can dropna and miss some
            coalesce(                          # cargoculted from
                col('messages_lag_1'),         # https://stackoverflow.com/a/57198009
                from_json(lit('[]'), ArrayType(StringType()))
            )
        )\
        .withColumn('message_run_starts', array_except('messages', 'messages_lag_1'))\
        .withColumn('message_run_ends', array_except('messages_lag_1', 'messages'))\
        .drop(*['messages', 'messages_lag_1']) # ^ only on Spark > 2.4

+----------+--------+------------------+----------------+
|start_time|end_time|message_run_starts|message_run_ends|
+----------+--------+------------------+----------------+
|       t01|    null|              [m1]|              []|
|       t03|     t01|              [m2]|              []|
|       t04|     t03|                []|            [m1]|
|       t06|     t04|              [m3]|            [m2]|
|       t07|     t06|              [m1]|              []|
|       t08|     t07|                []|            [m3]|
|       t11|     t08|              [m2]|            [m1]|
|       t13|     t11|              [m4]|              []|
|       t15|     t13|                []|            [m4]|
|       t20|     t15|              [m4]|            [m2]|
|       t21|     t20|                []|            [m4]|
|       t22|     t21|          [m1, m4]|              []|
+----------+--------+------------------+----------------+

按时间和消息分组,并对开始和结束表都应用排名。联接,如果为空,则将start_time复制到end_time

w_start = Window().partitionBy('message_run_starts').orderBy(col('start_time'))
df3 = df2.withColumn('message_run_starts', explode('message_run_starts')).drop('message_run_ends', 'end_time')
df3 = df3.withColumn('start_row_id',rank().over(w_start))

w_end = Window().partitionBy('message_run_ends').orderBy(col('end_time'))
df4 = df2.withColumn('message_run_ends', explode('message_run_ends')).drop('message_run_starts', 'start_time')
df4 = df4.withColumn('end_row_id',rank().over(w_end))

df_combined = df3\
    .join(df4, (df3.message_run_starts == df4.message_run_ends) & (df3.start_row_id == df4.end_row_id), how='full')\
        .drop(*['message_run_ends','start_row_id','end_row_id'])\
        .withColumn('end_time',coalesce(col('end_time'),col('start_time'))) 

df_combined.show()

+----------+------------------+--------+
|start_time|message_run_starts|end_time|
+----------+------------------+--------+
|       t01|                m1|     t03|
|       t07|                m1|     t08|
|       t22|                m1|     t22|
|       t03|                m2|     t04|
|       t11|                m2|     t15|
|       t06|                m3|     t07|
|       t13|                m4|     t13|
|       t20|                m4|     t20|
|       t22|                m4|     t22|
+----------+------------------+--------+

以上是关于识别运行在PySpark DF色谱柱上的干净方法的主要内容,如果未能解决你的问题,请参考以下文章

大型数据框上的 Pyspark groupBy

检查列 pyspark df 的值是不是存在于其他列 pyspark df

pyspark 数据帧上的向量操作

数据框上的 Pyspark UDF 列

如何根据数据类型识别列并在pyspark中转换它们?

Pyspark 中数组元素上的 UDF 还添加了静态元素