Spark中多列的窗口聚合

Posted

技术标签:

【中文标题】Spark中多列的窗口聚合【英文标题】:Window aggregation on many columns in Spark 【发布时间】:2020-02-03 21:21:40 【问题描述】:

在 Pyspark 中跨多个列进行聚合时遇到问题。有数百个布尔列显示系统的当前状态,每秒添加一行。目标是转换此数据以显示每 10 秒窗口的状态更改次数。

我计划分两步执行此操作,首先将布尔值与前一行的值进行异或运算,然后在 10 秒的窗口中进行第二次求和。这是我想出的粗略代码:

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql import types as T, functions as F

from datetime import datetime, timedelta
from random import random
import time

sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)

# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
    [(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(10000)],
    ['Time', *[f"Mm+1" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])

# Generate changes
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time')
# data_window = Window.orderBy('Time')
df = df.select('Time', *[F.col(m).bitwiseXOR(F.lag(m, 1).over(data_window)).alias(m) for m in cols])

df = df.groupBy(F.window('Time', '10 seconds')) \
    .agg(*[F.sum(m).alias(m) for m in cols]) \
    .withColumn('start_time', F.col('window')['start']) \
    .drop('window')

df.orderBy('start_time').show(20, False)

# Keep UI open
time.sleep(60*60)

使用按分钟划分的data_window,Spark 生成 52 个阶段,每个阶段都依赖于最后一个阶段。增加num_of_cols 也会增加阶段数。在我看来,这应该是一个令人尴尬的可并行化问题。将每一行与最后一行进行比较,然后按 10 秒汇总。删除data_window partitionBy 允许它在单个阶段运行,但它会强制单个分区上的所有数据来实现它。

为什么这些阶段相互依赖,有没有更好的方法来编写它来提高并行化?我认为可以同时在同一个窗口上进行多个聚合。最终这需要扩展到数百列,此时有什么技巧可以提高性能吗?

【问题讨论】:

考虑拥有一个包含所有期望列结构的单一窗口,聚合到一个数组作为输入和一个 UDF,其中包含用于实际处理的逻辑。 做到了@GeorgHeiler!遗憾的是,Pyspark 直到 3.0.0 才支持有界的用户定义聚合:( 【参考方案1】:

根据 Georg 的有益回复,我提出以下建议:

import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql import types as T, functions as F

from datetime import datetime, timedelta
from random import random
import time
import pprint


sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)


@F.pandas_udf(T.ArrayType(T.IntegerType()), F.PandasUDFType.GROUPED_AGG)
def pandas_xor(v):
    values = v.values
    if len(values) == 1:
        return values[0] * False
    elif len(values) == 2:
        return values[0] ^ values[1]
    else:
        raise RuntimeError('Too many values given to pandas_xor: '.format(values))


# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
    [(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(100000)],
    ['Time', *[f"Mm+1" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])

df = df.select('Time', F.array(*cols).alias('data'))

# XOR
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time').rowsBetween(Window.currentRow, 1)
# data_window = Window.orderBy('Time')
df = df.select('Time', pandas_xor(df.data).over(data_window).alias('data'))

df = df.groupBy(F.window('Time', '10 seconds')) \
    .agg(*[F.sum(F.element_at('data', i + 1)).alias(m) for i, m in enumerate(cols)]) \
    .withColumn('start_time', F.col('window')['start']) \
    .drop('window')

df.orderBy('start_time').show(20, False)

# Keep UI open
time.sleep(60*60)

按照以下说明使用 Spark 3.0.0preview2 运行它

    下载 Spark 3.0.0

    mkdir contrib
    wget -O contrib/spark-3.0.0-preview2.tgz 'https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop2.7.tgz'
    tar -C contrib -xf contrib/spark-3.0.0-preview2.tgz
    rm contrib/spark-3.0.0-preview2.tgz
    

    在第一个 shell 中,配置环境以使用 Pyspark 3.0.0

    export SPARK_HOME="$(pwd)/contrib/spark-3.0.0-preview2-bin-hadoop2.7"
    export PYTHONPATH="$SPARK_HOME/python/lib/pyspark.zip:$SPARK_HOME/python/lib/py4j-0.10.8.1-src.zip"
    

    启动 pyspark 工作

    time python3 so-example.py
    

    在http://localhost:4040查看本地 Spark 运行的 Web UI

【讨论】:

以上是关于Spark中多列的窗口聚合的主要内容,如果未能解决你的问题,请参考以下文章

Spark ML Transformer - 使用 rangeBetween 在窗口上聚合

Spark SQL:与时间窗口聚合

Spark-Streaming之window滑动窗口应用

基于两列或多列的 Spark DataFrame 聚合

在窗口上聚合(总和)以获得列列表

Apache Spark 3.2 内置支持会话窗口