将 PySpark 命令转换为自定义函数

Posted

技术标签:

【中文标题】将 PySpark 命令转换为自定义函数【英文标题】:Converting PySpark Commands into a Custom Function 【发布时间】:2018-08-09 23:40:20 【问题描述】:

我想知道是否可以将一系列 PySpark 命令打包到一个函数中,以便这样的函数获取一个数据帧并将它们应用于数据帧。我们在 Python 中做的事情。

例如,我有以下数据框:

sevents_df.show(5)

+-------+--------+-------------+----------------+------------+-----+
|Counter|Duration|StartTime    |TypeEnumeration |Floor_Number|Value|
+-------+--------+-------------+----------------+------------+-----+
|    1.0|    5460|1503067077370|UC_001          |         NaN|  NaN|
|    1.0|     322|1503067090480|UC_008          |         NaN|  NaN|
|    1.0|     990|1503067099300|UC_001          |         NaN|  NaN|
|    1.0|    5040|1503067396060|UC_001          |         NaN|  NaN|
|    1.0|    6090|1503067402150|UC_001          |         NaN|  NaN|
+-------+--------+-------------+----------------+------------+-----+

第一步。我要做的第一件事是过滤掉类型。我只是保留UC_001

sevents_filter = sevents_df.filter(sevents_df['TypeEnumeration'].isin(['UC_001']) == True)

第 2 步。删除一些列:

columns_to_drop = ['Comments', 'Floor_Number', 'Value']
sevents_clean = sevents_filter.drop(*columns_to_drop)

第 3 步。将StartTime 转换为日期

def convert_to_seconds(x):
    return x/1000

udf_myFunction = udf(convert_to_seconds, IntegerType())
sevents2 = sevents2.withColumn("StartTime", udf_myFunction("StartTime"))
sevents4 = sevents2.withColumn('epoch',
                               f.date_format(sevents2.StartTime.cast(dataType=t.TimestampType()),"yyyy-MM-dd"))

我想把这三个步骤放在一个函数中,比如:

some udf pySpark_function(dataframe):
    step 1
    step 2
    step 3

我想这样做的原因是因为如果我有N 数据帧,我无法想象编写这些步骤N 次。

一种解决方案是将这些N 帧连接成一帧,然后将这一巨型帧通过这些步骤一次。有没有其他方法可以一次通过一帧?

【问题讨论】:

您还可以查看构建自定义管道变压器 - 请参阅 this post。 【参考方案1】:

UDF 用于处理数据框列中的值,不能用于处理整个数据框。相反,创建一个接受数据帧并返回已处理数据帧的普通方法。

def process_df(df):
    df = df.filter(df['TypeEnumeration'] == 'UC_001')

    columns_to_drop = ['Comments', 'Floor_Number', 'Value']
    df = df.drop(*columns_to_drop)

    df = df.withColumn('epoch', f.date_format((df.StartTime / 1000).cast(t.TimestampType()), "yyyy-MM-dd"))

    return df

然后简单地遍历所有数据帧并使用上述方法。

注意:我对代码做了一些简化。不需要isin,因为您只使用单个值进行过滤,并且不需要UDF 除以1000。如果可能,最好使用内置的Spark 函数而不是自定义UDF,它更快。

【讨论】:

以上是关于将 PySpark 命令转换为自定义函数的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 使用自定义函数

为啥我不能将其转换为自定义的 React 钩子?

pyspark 为自定义模块返回名为错误的无模块

我们可以在 javascript 中将通用对象转换为自定义对象类型吗?

Swift之深入解析如何将代码添加为自定义LLDB命令

如何使方法 JSON 可序列化以在自定义 Pyspark 转换器中使用