如何使用 PySpark 中的自定义函数在同一 ML 管道中传递分桶器?

Posted

技术标签:

【中文标题】如何使用 PySpark 中的自定义函数在同一 ML 管道中传递分桶器?【英文标题】:How to pass the bucketizer in same ML pipeline with custom functions in PySpark? 【发布时间】:2018-07-23 08:27:38 【问题描述】:

这里是示例代码。目标是删除列标题中的特殊字符,并对标题名称包含“bag”的任何列进行分桶。

data = pd.DataFrame(
    'ball_column': [0, 1, 2, 3],
    'keep_column': [7, 8, 9, 10],
    'hall_column': [14, 15, 16, 17],
    'bag_this_1': [21, 31, 41, 51],
    'bag_this_2': [21, 31, 41, 51]
)
df = spark.createDataFrame(data)
df.show()



+-----------+-----------+-----------+----------+----------+
|ball_column|keep_column|hall_column|bag_this_1|bag_this_2|
+-----------+-----------+-----------+----------+----------+
|          0|          7|         14|        21|        21|
|          1|          8|         15|        31|        31|
|          2|          9|         16|        41|        41|
|          3|         10|         17|        51|        51|
+-----------+-----------+-----------+----------+----------+

第一个类编辑列名:它从列标题中删除所有特殊字符并仅返回标题中的字母和数字。

class EditColumnNameWithReplacement(Transformer):
    def __init__(self, existing, new):
        super().__init__()
        self.existing = existing
        self.new = new

    def _transform(self, df: DataFrame) -> DataFrame:

        for (x, y) in zip(self.existing, self.new):
            df = df.withColumnRenamed(x, y)

        return df.select(*self.new)


## Capture 'bigInt' columns, and drop the rest
bigint_list = [
    name for name, types in df.dtypes if types == 'bigint' or types == 'double'
]
edited_columns = [''.join(y for y in x if y.isalnum()) for x in bigint_list]

reformattedColumns = EditColumnNameWithReplacement(
    existing=bigint_list, new=edited_columns)

model = Pipeline(stages=[reformattedColumns]).fit(df).transform(df)

接下来,对列列表进行分桶。它选择包含单词bag 的标题并将值打包。

spike_cols = [col for col in model.columns if "bag" in col]

bagging = [
    Bucketizer(
        splits=[-float("inf"), 10, 100, float("inf")],
        inputCol=x,
        outputCol=x + "bucketed") for x in spike_cols
]

model_1 = Pipeline(stages=bagging).fit(model).transform(model)
model_1.show()

如何在单个管道中添加两个函数(reformattedColumnsbagging)而不是创建 2 个单独的管道来执行任务? ?

【问题讨论】:

【参考方案1】:

您需要更改一些小事情。

由于您没有安装第一个管道并对其进行转换,因此您不能使用以下内容:

spike_cols = [col for col in model.columns if "bag" in col]
                             ------------- <- This

改为使用edited_columns 来引用这些列:

spike_cols = [col for col in edited_columns if "bag" in col]

其次,您只需将阶段合并到一个列表中:

stages_ = [reformattedColumns] + bagging

Pipeline(stages=stages_).fit(df).transform(df).show()
// +--------+--------+----------+----------+----------+----------------+----------------+
// |bagthis1|bagthis2|ballcolumn|hallcolumn|keepcolumn|bagthis1bucketed|bagthis2bucketed|
// +--------+--------+----------+----------+----------+----------------+----------------+
// |      21|      21|         0|        14|         7|             1.0|             1.0|
// |      31|      31|         1|        15|         8|             1.0|             1.0|
// |      41|      41|         2|        16|         9|             1.0|             1.0|
// |      51|      51|         3|        17|        10|             1.0|             1.0|
// +--------+--------+----------+----------+----------+----------------+----------------+

整个代码:

import pandas as pd
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Bucketizer
from pyspark.sql import SparkSession, DataFrame

data = pd.DataFrame(
    'ball_column': [0, 1, 2, 3],
    'keep_column': [7, 8, 9, 10],
    'hall_column': [14, 15, 16, 17],
    'bag_this_1': [21, 31, 41, 51],
    'bag_this_2': [21, 31, 41, 51]
)

df = spark.createDataFrame(data)

df.show()

class EditColumnNameWithReplacement(Transformer):
    def __init__(self, existing, new):
        super().__init__()
        self.existing = existing
        self.new = new

    def _transform(self, df: DataFrame) -> DataFrame:
        for (x, y) in zip(self.existing, self.new):
            df = df.withColumnRenamed(x, y)

        return df.select(*self.new)

## Capture 'bigInt' columns, and drop the rest
bigint_list = [name for name, types in df.dtypes if types == 'bigint' or types == 'double']
edited_columns = [''.join(y for y in x if y.isalnum()) for x in bigint_list]
spike_cols = [col for col in edited_columns if "bag" in col]

reformattedColumns = EditColumnNameWithReplacement(
    existing=bigint_list, new=edited_columns)

bagging = [
    Bucketizer(
        splits=[-float("inf"), 10, 100, float("inf")],
        inputCol=x,
        outputCol=x + "bucketed") for x in spike_cols
    ]

stages_ = [reformattedColumns] + bagging

Pipeline(stages=stages_).fit(df).transform(df).show()

【讨论】:

谢谢!这很完美。一个附带的问题,有没有办法使用替换的桶装机?也就是上面的bagging函数桶在列上没有创建更多的列inputColoutputCol

以上是关于如何使用 PySpark 中的自定义函数在同一 ML 管道中传递分桶器?的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 数据框上的自定义函数

如何在 pyspark 操作中轻松使用我的自定义类方法?

pyspark 数据框中的自定义排序

udf(用户定义函数)如何在 pyspark 中工作?

pyspark groupby 并应用自定义函数

如何将不同的数据发送到同一个 UITableView 中的自定义单元格?