编写一个python函数在pyspark数据帧中自动执行数据标记
Posted
技术标签:
【中文标题】编写一个python函数在pyspark数据帧中自动执行数据标记【英文标题】:write a python function to execute data labelling automatically in pyspark dataframe 【发布时间】:2021-09-12 13:25:44 【问题描述】:我有一个名为ARRIVAL DELAY
的列,根据它的值,我必须编写一个python 函数来自动执行数据标记。
例如。低于 5 的值被认为是早(0),5 到 20 被认为是准时(1),高于 20 被认为是晚(2)。
但该函数必须提前、准时和晚确定每个类别的范围(例如,使用 bin 大小)
目前我的代码是硬编码的。
from pyspark.sql.functions import when,col,lit
flightsDf = flightsDf.withColumn("multiClassArrDelay", when((flightsDf.ARRIVAL_DELAY < 5), lit(0)).when((flightsDf.ARRIVAL_DELAY >= 5) & (flightsDf.ARRIVAL_DELAY <= 20), lit(1)) \
.otherwise(lit(2)))
【问题讨论】:
请阅读ML标签的description。 【参考方案1】:根据您的数据和您的要求,有几个选项可以计算箱之间的边界。
使用最小值和最大值
最简单的选择是获取列的min 和max 值并将此范围分成三部分:
#create some test data first
import numpy as np
data=np.concatenate((np.random.normal(10, 5, 1000),np.random.normal(25, 5, 1000)))
data=data[np.where(data >= 0)]
flightsDf=spark.createDataFrame(map(lambda d: [float(d)], data), schema=['ARRIVAL_DELAY'])
from pyspark.sql import functions as F
min,max=flightsDf.select(F.min('ARRIVAL_DELAY'), F.max('ARRIVAL_DELAY')).first()
boundaries=[(max-min)/3, (max-min)/3*2]
boundaries
现在包含两个值,将最小值和最大值之间的范围分成三个长度相等的范围。现在可以使用这些值代替硬编码值:
flightsDf.withColumn('multiClassArrDelay',
F.when(F.col('ARRIVAL_DELAY') < boundaries[0], F.lit(0))
.otherwise(F.when(F.col('ARRIVAL_DELAY') < boundaries[1], F.lit(1))
.otherwise(F.lit(2))))
当计算我们得到的每个 bin 中的元素时(对于上面提到的测试数据)
+------------------+-----+
|multiClassArrDelay|count|
+------------------+-----+
| 0| 808|
| 1| 937|
| 2| 223|
+------------------+-----+
使用元素数量相等的 bin
对于某些用例,如果每个 bin 包含(大约)相同数量的项目可能会有所帮助。可以使用percentile_approx 代替min
和max
来计算bin 的边界:
boundaries=flightsDf.select(F.percentile_approx('ARRIVAL_DELAY', [1/3,2/3])).first()[0]
通过这种方法,我们可以在每个 bin 中获得大致相同数量的项目:
+------------------+-----+
|multiClassArrDelay|count|
+------------------+-----+
| 0| 655|
| 1| 656|
| 2| 657|
+------------------+-----+
使用符合您业务需求的垃圾箱
在运行机器学习任务时,最好将数据分组到在数据来源域中有意义的 bin 中。延误 30 分钟的短途航班通常与延误相同的洲际航班不同。
【讨论】:
以上是关于编写一个python函数在pyspark数据帧中自动执行数据标记的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:内部连接两个 pyspark 数据帧并从第一个数据帧中选择所有列,从第二个数据帧中选择几列
当 ID 匹配时,在其他 Pyspark 数据帧中按列划分 Pyspark 数据帧列