利用pandas_udf加速机器学习任务

Posted 山顶夕景

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用pandas_udf加速机器学习任务相关的知识,希望对你有一定的参考价值。

note

  • pandas udf和python udf区别:前者向量化是在不同partition上处理
  • @pandas_udf使用panda API来处理分布式数据集,而toPandas()将分布式数据集转换为本地数据,然后使用pandas进行处理,如果Pyspark的dataframe非常大,直接使用toPandas()很容易导致OOM。

文章目录

一、Pyspark中的udf

1.1 udf的简单介绍

  • 在java分布式系统中执行python程序是挺耗性能的(如下图Pyspark多进程框架,数据在JVM和Python中进行传输,有额外的序列化和调用开销),apache arrow项目由此发起,以加速大数据分析项目运行速度。
  • apache arrow是一种内存中的列式数据格式,用于spark中JVM和python进程之间的数据高效传输。在调用Arrow之前,需要将spark配置选项设置为true:spark.conf.set("spark.sql.execution.arrow.enabled", "true"),但在spark3.0后的版本中需要改为spark.sql.execution.arrow.pyspark.enabled

  • udf自定义函数,可让我们在使用pyspark进行业务分析时高效自定义功能,一般分为两种:
    • event level:是对一条事件or数据进行计算
    • aggregation function: 对某个aggregation key的自定义聚合计算。如对pyspark中df使用collection_list或者collect_set把需要聚合的信息变成一个list后,通过event level的udf实现。
      • ex:计算用户多次登陆时间的最大值(如下代码)。
      • 上面栗子的缺点:如果主键是热点,即聚合出的元素很多,容易OOM,可只对聚合出的list先进行裁剪,如按照时间排序,保留最后topk的事件。
@udf(SomeType())
def find_max(lis):
  return max(lis)

SparkDataFrame.groupBy("userId"). \\
     agg( 
        find_max(fn.collect_list('log_duration'))
    ).show()
  • 为什么 RDD filter() 方法那么慢呢?原因是 lambda 函数不能直接应用于驻留在 JVM 内存中的 DataFrame。
    • 内部实际发生的是 Spark 在集群节点上的 Spark 执行程序旁边启动 Python 工作线程。在执行时,Spark 工作器将 lambda 函数发送给这些 Python 工作器。接下来,Spark worker 开始序列化他们的 RDD 分区,并通过套接字将它们通过管道传输到 Python worker,lambda 函数在每行上进行评估。对于结果行,整个序列化/反序列化过程在再次发生,以便实际的 filter() 可以应用于结果集。
    • 因为数据来回复制过多,在分布式 Java 系统中执行 Python 函数在执行时间方面非常昂贵。这个底层的探索:只要避免Python UDF,PySpark 程序将大约与基于 Scala 的 Spark 程序一样快。如果无法避免 UDF,至少应该尝试使它们尽可能高效。

1.2 udf的写法

  • 写代码创建函数
  • 写一个单元测试
  • 保证测试通过,并且计算结果与业务实际需求相符
  • 将函数register到pyspark,注意必须声明函数返回值的数据类型(参考下表进行pyspark和python数据类型的对照)

1.3 udf的使用场景

在详细介绍对应的pandas_udf用法前先通过一张图看下在不同场合适合使用哪种udf:

上图源自《Data Analysis with Python and PySpark》。

具体而言:
(1)注册函数到spark:

from fractions import Fraction
from typing import Tuple, Optional

Frac = Tuple[int, int]


def py_reduce_fraction(frac: Frac) -> Optional[Frac]:
    """Reduce a fraction represented as a 2-tuple of integers."""
    num, denom = frac
    if denom:
        answer = Fraction(num, denom)
        return answer.numerator, answer.denominator
    return None


assert py_reduce_fraction((3, 6)) == (1, 2)
assert py_reduce_fraction((1, 0)) is None


def py_fraction_to_float(frac: Frac) -> Optional[float]:
    """Transforms a fraction represented as a 2-tuple of integers into a float."""
    num, denom = frac
    if denom:
        return num / denom
    return None


assert py_fraction_to_float((2, 8)) == 0.25
assert py_fraction_to_float((10, 0)) is None


SparkFrac = T.ArrayType(T.LongType())
reduce_fraction = F.udf(py_reduce_fraction, SparkFrac)

frac_df = frac_df.withColumn(
    "reduced_fraction", reduce_fraction(F.col("fraction"))
)
print("=====================udf test2:\\n")
frac_df.show(5, False)
# +--------+----------------+
# |fraction|reduced_fraction|
# +--------+----------------+
# |[0, 1]  |[0, 1]          |
# |[0, 2]  |[0, 1]          |
# |[0, 3]  |[0, 1]          |
# |[0, 4]  |[0, 1]          |
# |[0, 5]  |[0, 1]          |
# +--------+----------------+
# only showing top 5 rows

(2)也可以使用decorator语法糖:

@F.udf(T.DoubleType())
def fraction_to_float(frac: Frac) -> Optional[float]:
    """Transforms a fraction represented as a 2-tuple of integers into a float."""
    num, denom = frac
    if denom:
        return num / denom
    return None


frac_df = frac_df.withColumn(
    "fraction_float", fraction_to_float(F.col("reduced_fraction"))
)
print("================udf test 3:\\n")
frac_df.select("reduced_fraction", "fraction_float").distinct().show(5, False)
# +----------------+-------------------+
# |reduced_fraction|fraction_float     |
# +----------------+-------------------+
# |[3, 50]         |0.06               |
# |[3, 67]         |0.04477611940298507|
# |[7, 76]         |0.09210526315789473|
# |[9, 23]         |0.391304347826087  |
# |[9, 25]         |0.36               |
# +----------------+-------------------+
# only showing top 5 rows
assert fraction_to_float.func((1, 2)) == 0.5

二、pandas_udf三大用法


第一个SCALAR和pandas中的transform类似,第二个GROUPED_MAP是最灵活的。

(1)Scalar向量化标量操作

  • 可以与select和withColumn等函数一起使用。python 函数应该以pandas.series作为输入,并返回一个长度相同的pandas.series
  • 在内部,spark 将通过将列拆分为batch,并将每个batch的函数作为数据的子集调用,然后将结果连接在一起,来执行 padas UDF。
  • Pandas_UDF是在PySpark 2.3版本中新增的API,Spark经过Arrow传输数据,使用Pandas处理数据。Pandas_UDF使用关键字pandas_udf做为装饰器或声明一个函数进行定义, Pandas_UDF包括Scalar(标量映射)和Grouped Map(分组映射)等类型。栗子:
from pyspark.sql.functions import pandas_udf,PandasUDFType
from pyspark.sql.types import IntegerType,StringType
slen=pandas_udf(lambda s:s.str.len(),IntegerType())
@pandas_udf(StringType())
def to_upper(s):
   return s.str.upper()

@pandas_udf(IntegerType(),PandasUDFType.SCALAR)
def add_one(x):
   return x+1

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
df.withColumn('slen(name)',slen("name")).show()
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
|         8|      JOHN DOE|          22|
+----------+--------------+------------+

(2)Grouped Map

Grouped Map和后面的Grouped Aggregate都适合pyspark的split-apply-combine计算模式:

类似在pandaspandas.groupby().apply,pyspark中使用pandas_udf可以加速大数据的处理逻辑。如下面的例子:

from pyspark.sql.functions import pandas_udf,PandasUDFType

df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_value1", DoubleType()),
    StructField("avg_value2", DoubleType()),
    StructField("sum_avg", DoubleType()),
    StructField("sub_avg", DoubleType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])

df3.groupby("key").apply(g).show()
+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
|  a|       0.0|      21.0|   21.0|  -21.0|
|  b|       6.5|      -1.5|    5.0|    8.0|
+---+----------+----------+-------+-------+

当然也不是一定要先对某个字段groupby操作,比如在直接导入torch训练好的模型参数(下面对最简单的linear线性模型举例),对一个很大的pyspark中dataframe进行使用pandas_udf预测:

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd
import torch
from torch import nn

class Linear(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Linear, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)

    def forward(self, x):  # 前向传播
        out = self.linear(x)  # 输入x,输出out
        return out


conf = SparkConf() \\
  .setAppName("dataframe") \\
  .set("spark.sql.execution.arrow.pyspark.enabled", "true")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

@pandas_udf("uid long, aid long, score float", PandasUDFType.GROUPED_MAP)
def age_predict(df):
    linear = Linear(2, 1)
    linear.load_state_dict(torch.load('linear.pth'))
    linear.eval()
    df['score'] = linear(torch.from_numpy(df.values).type(torch.float32)).detach().numpy()
    return df.loc[:, ['uid', 'aid', 'score']]

df = spark.read.format("json").load("hdfs:///tmp/predict.json").repartition(2)

#此处的F.spark_partition_id()即为我的文件分区数量
res = df.groupby(F.spark_partition_id()).apply(age_predict)

(3)Grouped Aggregate

Grouped aggregate Panda UDF类似于Spark聚合函数。Grouped aggregate Panda UDF常常与groupBy().agg()pyspark.sql.window一起使用。它定义了来自一个或多个的聚合。级数到标量值,其中每个pandas.Series表示组或窗口中的一列。

【栗子】求每个id的平均值分数。

from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))
@pandas_udf("double", PandasUDFType.GROUPED_AGG)  
def mean_udf(v):
     return v.mean()
df.groupby("id").agg(mean_udf(df['v'])).show() 

三、案例介绍

3.1 进行基础的数据计算

根据用户的活动结束时间和活动持续时间计算活动开始时间,其中通过pandas_udf对df中的每一行进行处理,返回处理结果。

import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
 
spark = SparkSession.builder.appName("demo3").config("spark.some.config.option", "some-value").getOrCreate()
df3 = spark.createDataFrame(
    [(18862669710, '/未知类型', 'IM传文件', 'QQ接收文件', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582),
     (18862669710, '/未知类型', 'IM传文件', 'QQ接收文件', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582),
     (18862228190, '/移动终端', '移动终端应用', '移动腾讯视频', 292.0, '2018-03-08 21:45:45', 178111558212, 1781115582),
     (18862669710, '/未知类型', '访问网站', '搜索引擎', 52.0, '2018-03-08 21:45:46', 178111558222, 1781115582)],
    ('online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class'))
 
 
def compute(x):
    x['end_time'] = pd.to_datetime(x['datetime'], errors='coerce', format='%Y-%m-%d')
    x['end_time_convert_seconds'] = pd.to_timedelta(x['end_time']).dt.total_seconds().astype(int)
    x['start_time'] = pd.to_datetime(x['end_time_convert_seconds'] - x['access_seconds'], unit='s')
    x = x.sort_values(by=['start_time'], ascending=True)
    result = x[['online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class','start_time', 'end_time']]
    return result
 
 
schema = Struct

以上是关于利用pandas_udf加速机器学习任务的主要内容,如果未能解决你的问题,请参考以下文章

机器学习与流体动力学:谷歌AI利用「ML+TPU」实现流体模拟数量级加速

穷!深度学习中如何更好地利用显存资源?

使用新的物理模拟引擎加速强化学习

科研一对一 | UVA+哥大 | 机器学习深度学习:数据中心中任务失败的预测

人机共生时代,分布式机器学习是如何加速的?

机器学习实战:GNN(图神经网络)加速器的FPGA解决方案