如何使用 PySpark 中的自定义函数在同一 ML 管道中传递分桶器?
Posted
技术标签:
【中文标题】如何使用 PySpark 中的自定义函数在同一 ML 管道中传递分桶器?【英文标题】:How to pass the bucketizer in same ML pipeline with custom functions in PySpark? 【发布时间】:2018-07-23 08:27:38 【问题描述】:这里是示例代码。目标是删除列标题中的特殊字符,并对标题名称包含“bag”的任何列进行分桶。
data = pd.DataFrame(
'ball_column': [0, 1, 2, 3],
'keep_column': [7, 8, 9, 10],
'hall_column': [14, 15, 16, 17],
'bag_this_1': [21, 31, 41, 51],
'bag_this_2': [21, 31, 41, 51]
)
df = spark.createDataFrame(data)
df.show()
+-----------+-----------+-----------+----------+----------+
|ball_column|keep_column|hall_column|bag_this_1|bag_this_2|
+-----------+-----------+-----------+----------+----------+
| 0| 7| 14| 21| 21|
| 1| 8| 15| 31| 31|
| 2| 9| 16| 41| 41|
| 3| 10| 17| 51| 51|
+-----------+-----------+-----------+----------+----------+
第一个类编辑列名:它从列标题中删除所有特殊字符并仅返回标题中的字母和数字。
class EditColumnNameWithReplacement(Transformer):
def __init__(self, existing, new):
super().__init__()
self.existing = existing
self.new = new
def _transform(self, df: DataFrame) -> DataFrame:
for (x, y) in zip(self.existing, self.new):
df = df.withColumnRenamed(x, y)
return df.select(*self.new)
## Capture 'bigInt' columns, and drop the rest
bigint_list = [
name for name, types in df.dtypes if types == 'bigint' or types == 'double'
]
edited_columns = [''.join(y for y in x if y.isalnum()) for x in bigint_list]
reformattedColumns = EditColumnNameWithReplacement(
existing=bigint_list, new=edited_columns)
model = Pipeline(stages=[reformattedColumns]).fit(df).transform(df)
接下来,对列列表进行分桶。它选择包含单词bag
的标题并将值打包。
spike_cols = [col for col in model.columns if "bag" in col]
bagging = [
Bucketizer(
splits=[-float("inf"), 10, 100, float("inf")],
inputCol=x,
outputCol=x + "bucketed") for x in spike_cols
]
model_1 = Pipeline(stages=bagging).fit(model).transform(model)
model_1.show()
如何在单个管道中添加两个函数(reformattedColumns
和 bagging
)而不是创建 2 个单独的管道来执行任务? ?
【问题讨论】:
【参考方案1】:您需要更改一些小事情。
由于您没有安装第一个管道并对其进行转换,因此您不能使用以下内容:
spike_cols = [col for col in model.columns if "bag" in col]
------------- <- This
改为使用edited_columns
来引用这些列:
spike_cols = [col for col in edited_columns if "bag" in col]
其次,您只需将阶段合并到一个列表中:
stages_ = [reformattedColumns] + bagging
Pipeline(stages=stages_).fit(df).transform(df).show()
// +--------+--------+----------+----------+----------+----------------+----------------+
// |bagthis1|bagthis2|ballcolumn|hallcolumn|keepcolumn|bagthis1bucketed|bagthis2bucketed|
// +--------+--------+----------+----------+----------+----------------+----------------+
// | 21| 21| 0| 14| 7| 1.0| 1.0|
// | 31| 31| 1| 15| 8| 1.0| 1.0|
// | 41| 41| 2| 16| 9| 1.0| 1.0|
// | 51| 51| 3| 17| 10| 1.0| 1.0|
// +--------+--------+----------+----------+----------+----------------+----------------+
整个代码:
import pandas as pd
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Bucketizer
from pyspark.sql import SparkSession, DataFrame
data = pd.DataFrame(
'ball_column': [0, 1, 2, 3],
'keep_column': [7, 8, 9, 10],
'hall_column': [14, 15, 16, 17],
'bag_this_1': [21, 31, 41, 51],
'bag_this_2': [21, 31, 41, 51]
)
df = spark.createDataFrame(data)
df.show()
class EditColumnNameWithReplacement(Transformer):
def __init__(self, existing, new):
super().__init__()
self.existing = existing
self.new = new
def _transform(self, df: DataFrame) -> DataFrame:
for (x, y) in zip(self.existing, self.new):
df = df.withColumnRenamed(x, y)
return df.select(*self.new)
## Capture 'bigInt' columns, and drop the rest
bigint_list = [name for name, types in df.dtypes if types == 'bigint' or types == 'double']
edited_columns = [''.join(y for y in x if y.isalnum()) for x in bigint_list]
spike_cols = [col for col in edited_columns if "bag" in col]
reformattedColumns = EditColumnNameWithReplacement(
existing=bigint_list, new=edited_columns)
bagging = [
Bucketizer(
splits=[-float("inf"), 10, 100, float("inf")],
inputCol=x,
outputCol=x + "bucketed") for x in spike_cols
]
stages_ = [reformattedColumns] + bagging
Pipeline(stages=stages_).fit(df).transform(df).show()
【讨论】:
谢谢!这很完美。一个附带的问题,有没有办法使用替换的桶装机?也就是上面的bagging
函数桶在列上没有创建更多的列inputCol
和outputCol
以上是关于如何使用 PySpark 中的自定义函数在同一 ML 管道中传递分桶器?的主要内容,如果未能解决你的问题,请参考以下文章