将 PySpark 命令转换为自定义函数
Posted
技术标签:
【中文标题】将 PySpark 命令转换为自定义函数【英文标题】:Converting PySpark Commands into a Custom Function 【发布时间】:2018-08-09 23:40:20 【问题描述】:我想知道是否可以将一系列 PySpark 命令打包到一个函数中,以便这样的函数获取一个数据帧并将它们应用于数据帧。我们在 Python 中做的事情。
例如,我有以下数据框:
sevents_df.show(5)
+-------+--------+-------------+----------------+------------+-----+
|Counter|Duration|StartTime |TypeEnumeration |Floor_Number|Value|
+-------+--------+-------------+----------------+------------+-----+
| 1.0| 5460|1503067077370|UC_001 | NaN| NaN|
| 1.0| 322|1503067090480|UC_008 | NaN| NaN|
| 1.0| 990|1503067099300|UC_001 | NaN| NaN|
| 1.0| 5040|1503067396060|UC_001 | NaN| NaN|
| 1.0| 6090|1503067402150|UC_001 | NaN| NaN|
+-------+--------+-------------+----------------+------------+-----+
第一步。我要做的第一件事是过滤掉类型。我只是保留UC_001
。
sevents_filter = sevents_df.filter(sevents_df['TypeEnumeration'].isin(['UC_001']) == True)
第 2 步。删除一些列:
columns_to_drop = ['Comments', 'Floor_Number', 'Value']
sevents_clean = sevents_filter.drop(*columns_to_drop)
第 3 步。将StartTime
转换为日期
def convert_to_seconds(x):
return x/1000
udf_myFunction = udf(convert_to_seconds, IntegerType())
sevents2 = sevents2.withColumn("StartTime", udf_myFunction("StartTime"))
sevents4 = sevents2.withColumn('epoch',
f.date_format(sevents2.StartTime.cast(dataType=t.TimestampType()),"yyyy-MM-dd"))
我想把这三个步骤放在一个函数中,比如:
some udf pySpark_function(dataframe):
step 1
step 2
step 3
我想这样做的原因是因为如果我有N
数据帧,我无法想象编写这些步骤N
次。
一种解决方案是将这些N
帧连接成一帧,然后将这一巨型帧通过这些步骤一次。有没有其他方法可以一次通过一帧?
【问题讨论】:
您还可以查看构建自定义管道变压器 - 请参阅 this post。 【参考方案1】:UDF
用于处理数据框列中的值,不能用于处理整个数据框。相反,创建一个接受数据帧并返回已处理数据帧的普通方法。
def process_df(df):
df = df.filter(df['TypeEnumeration'] == 'UC_001')
columns_to_drop = ['Comments', 'Floor_Number', 'Value']
df = df.drop(*columns_to_drop)
df = df.withColumn('epoch', f.date_format((df.StartTime / 1000).cast(t.TimestampType()), "yyyy-MM-dd"))
return df
然后简单地遍历所有数据帧并使用上述方法。
注意:我对代码做了一些简化。不需要isin
,因为您只使用单个值进行过滤,并且不需要UDF
除以1000。如果可能,最好使用内置的Spark 函数而不是自定义UDF
,它更快。
【讨论】:
以上是关于将 PySpark 命令转换为自定义函数的主要内容,如果未能解决你的问题,请参考以下文章