Spark中多列的窗口聚合
Posted
技术标签:
【中文标题】Spark中多列的窗口聚合【英文标题】:Window aggregation on many columns in Spark 【发布时间】:2020-02-03 21:21:40 【问题描述】:在 Pyspark 中跨多个列进行聚合时遇到问题。有数百个布尔列显示系统的当前状态,每秒添加一行。目标是转换此数据以显示每 10 秒窗口的状态更改次数。
我计划分两步执行此操作,首先将布尔值与前一行的值进行异或运算,然后在 10 秒的窗口中进行第二次求和。这是我想出的粗略代码:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql import types as T, functions as F
from datetime import datetime, timedelta
from random import random
import time
sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)
# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
[(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(10000)],
['Time', *[f"Mm+1" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])
# Generate changes
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time')
# data_window = Window.orderBy('Time')
df = df.select('Time', *[F.col(m).bitwiseXOR(F.lag(m, 1).over(data_window)).alias(m) for m in cols])
df = df.groupBy(F.window('Time', '10 seconds')) \
.agg(*[F.sum(m).alias(m) for m in cols]) \
.withColumn('start_time', F.col('window')['start']) \
.drop('window')
df.orderBy('start_time').show(20, False)
# Keep UI open
time.sleep(60*60)
使用按分钟划分的data_window
,Spark 生成 52 个阶段,每个阶段都依赖于最后一个阶段。增加num_of_cols
也会增加阶段数。在我看来,这应该是一个令人尴尬的可并行化问题。将每一行与最后一行进行比较,然后按 10 秒汇总。删除data_window
partitionBy 允许它在单个阶段运行,但它会强制单个分区上的所有数据来实现它。
为什么这些阶段相互依赖,有没有更好的方法来编写它来提高并行化?我认为可以同时在同一个窗口上进行多个聚合。最终这需要扩展到数百列,此时有什么技巧可以提高性能吗?
【问题讨论】:
考虑拥有一个包含所有期望列结构的单一窗口,聚合到一个数组作为输入和一个 UDF,其中包含用于实际处理的逻辑。 做到了@GeorgHeiler!遗憾的是,Pyspark 直到 3.0.0 才支持有界的用户定义聚合:( 【参考方案1】:根据 Georg 的有益回复,我提出以下建议:
import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql import types as T, functions as F
from datetime import datetime, timedelta
from random import random
import time
import pprint
sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)
@F.pandas_udf(T.ArrayType(T.IntegerType()), F.PandasUDFType.GROUPED_AGG)
def pandas_xor(v):
values = v.values
if len(values) == 1:
return values[0] * False
elif len(values) == 2:
return values[0] ^ values[1]
else:
raise RuntimeError('Too many values given to pandas_xor: '.format(values))
# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
[(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(100000)],
['Time', *[f"Mm+1" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])
df = df.select('Time', F.array(*cols).alias('data'))
# XOR
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time').rowsBetween(Window.currentRow, 1)
# data_window = Window.orderBy('Time')
df = df.select('Time', pandas_xor(df.data).over(data_window).alias('data'))
df = df.groupBy(F.window('Time', '10 seconds')) \
.agg(*[F.sum(F.element_at('data', i + 1)).alias(m) for i, m in enumerate(cols)]) \
.withColumn('start_time', F.col('window')['start']) \
.drop('window')
df.orderBy('start_time').show(20, False)
# Keep UI open
time.sleep(60*60)
按照以下说明使用 Spark 3.0.0preview2 运行它
下载 Spark 3.0.0
mkdir contrib
wget -O contrib/spark-3.0.0-preview2.tgz 'https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop2.7.tgz'
tar -C contrib -xf contrib/spark-3.0.0-preview2.tgz
rm contrib/spark-3.0.0-preview2.tgz
在第一个 shell 中,配置环境以使用 Pyspark 3.0.0
export SPARK_HOME="$(pwd)/contrib/spark-3.0.0-preview2-bin-hadoop2.7"
export PYTHONPATH="$SPARK_HOME/python/lib/pyspark.zip:$SPARK_HOME/python/lib/py4j-0.10.8.1-src.zip"
启动 pyspark 工作
time python3 so-example.py
在http://localhost:4040查看本地 Spark 运行的 Web UI
【讨论】:
以上是关于Spark中多列的窗口聚合的主要内容,如果未能解决你的问题,请参考以下文章