Python Spark 作业优化

Posted

技术标签:

【中文标题】Python Spark 作业优化【英文标题】:Python Spark Job Optimization 【发布时间】:2018-09-02 17:07:32 【问题描述】:

我正在使用

在 Dataproc 集群上运行 PySpark (2.3) 3 个节点(4 个 CPU) 每个 8 GB 内存。

数据有近 130 万行 4 列,即:

Date,unique_id (Alphanumeric) , category(10 distinct values) and Prediction (0 or 1) 

P.S - 这是时间序列数据

我们正在使用 Facebook 的预测模型进行预测建模,并且由于 Prophet 只接受 Pandas 数据框作为输入,下面是我正在做的将 Spark 数据框转换为 Pandas 数据框的操作。

def prediction_func(spark_df):

    import pandas as pd 
    # Lines of code to convert spark df to pandas df 
    # Calling prophet model with the converted pandas df 
    return pandas_df 

predictions = spark_df.groupby('category').apply(prediction_func)

整个过程在 dataproc 上大约需要 1.5 小时。

我确信在应用 prediction_func 之前有更好的方法对数据进行分组和分区。

任何建议将不胜感激。

【问题讨论】:

为什么是groupby('category') @user6910411 - 因为数据均匀分布在 10 个类别中。以为表现会更好。没有? 【参考方案1】:

由于您的代码不依赖于分组变量,您应该完全删除 groupBy 并使用 scalar UDF 代替 Grouped Map。

这样您就不需要随机播放,并且可以利用数据局部性和可用资源。

您必须重新定义您的函数以获取所有必需的列并返回pandas.Series

def prediction_func(*cols: pandas.Series) -> pandas.Series:
    ...  # Combine cols into a single pandas.DataFrame and apply the model
    return ...  # Convert result to pandas.Series and return

示例用法:

from pyspark.sql.functions import PandasUDFType, pandas_udf, rand
import pandas as pd
import numpy as np

df = spark.range(100).select(rand(1), rand(2), rand(3)).toDF("x", "y", "z")

@pandas_udf("double", PandasUDFType.SCALAR)
def dummy_prediction_function(x, y, z):
    pdf  = pd.DataFrame("x": x, "y": y, "z": z)
    pdf["prediction"] = 1.0
    return pdf["prediction"]

df.withColumn("prediction", dummy_prediction_function("x", "y", "z")).show(3)
+-------------------+-------------------+--------------------+----------+       
|                  x|                  y|                   z|prediction|
+-------------------+-------------------+--------------------+----------+
|0.13385709732307427| 0.2630967864682161| 0.11641995793557336|       1.0|
| 0.5897562959687032|0.19795734254405561|   0.605595773295928|       1.0|
|0.01540012100242305|0.25419718814653214|0.006007018601722036|       1.0|
+-------------------+-------------------+--------------------+----------+
only showing top 3 rows

【讨论】:

以上是关于Python Spark 作业优化的主要内容,如果未能解决你的问题,请参考以下文章

spark-submit参数

使用火花动作在 Oozie 中的 python Spark 作业

如何将 Azure Blob 存储容器挂载为在 Python 中以独立模式在本地运行的 Spark 作业的目录?

spark学习之作业优化

spark学习之作业优化

是否可以从 Scala(spark) 调用 python 函数