pyspark - 如何在分层随机抽样中使用(df.sampleByKey())选择每层的确切记录数
Posted
技术标签:
【中文标题】pyspark - 如何在分层随机抽样中使用(df.sampleByKey())选择每层的确切记录数【英文标题】:pyspark - how to select exact number of records per strata using (df.sampleByKey()) in stratified random sampling 【发布时间】:2020-04-09 14:36:04 【问题描述】:我有一个 spark 数据框(我正在使用 pyspark)'orders'。它有以下列
['id', 'orderdate', 'customerid', 'status']
我正在尝试使用关键列作为“状态”进行分层随机抽样。我的目标如下
>> create a new dataframe with exactly 5 random records per status
所以我选择的方法是使用 .sampleBy('strata_key',fraction_dict)。但我面临的挑战是为每个状态选择确切的分数值,这样每次我应该得到每个状态的 5 个随机记录。我遵循以下方法
1.为每个状态的总计数创建一个字典,如下所示
#Total count of records for each order 'status' in 'ORDERS' dataframe is as below
d=dict([(x['status'],x['count']) for x in orders.groupBy("status").count().collect()])
print(d)
输出:
'PENDING_PAYMENT': 15030, 'COMPLETE': 22899, 'ON_HOLD': 3798, 'PAYMENT_REVIEW': 729, 'PROCESSING': 8275, 'CLOSED': 7556, 'SUSPECTED_FRAUD': 1558,
'PENDING': 7610, 'CANCELED': 1428
2.创建了一个函数,该函数生成获取精确 N 条记录所需的小数值
#Exact number of records needed per status
N=5
#function calculates fraction
def fraction_calc(count_dict,N)
d_mod=
for i in d:
d_mod[i]=(N/d[i])
return d_mod
#creating dictionary of fractions using above function
fraction=fraction_calc(d,5)
print(fraction)
输出:
'PENDING_PAYMENT': 0.00033266799733865603, 'COMPLETE': 0.000218350146294598, 'ON_HOLD': 0.0013164823591363876, 'PAYMENT_REVIEW': 0.006858710562414266, 'PROCESSING': 0.0006042296072507553, 'CLOSED': 0.0006617257808364214, 'SUSPECTED_FRAUD': 0.003209242618741977, 'PENDING': 0.000657030223390276, 'CANCELED': 0.0035014005602240898
3.创建使用启动采样API .sampleBy()进行采样的最终数据帧
#creating final sampled dataframe
df_sample=orders.sampleBy("status",fraction)
但我仍然没有得到每个状态准确的 5 条记录。示例输出如下
#Checking count per status of resultant sample dataframe
df_sample.groupBy("status").count().show()
+---------------+-----+
| status|count|
+---------------+-----+
|PENDING_PAYMENT| 3|
| COMPLETE| 6|
| ON_HOLD| 7|
| PAYMENT_REVIEW| 4|
| PROCESSING| 6|
| CLOSED| 6|
|SUSPECTED_FRAUD| 7|
| PENDING| 9|
| CANCELED| 5|
+---------------+-----+
我应该在这里做什么来实现我的目标。
【问题讨论】:
【参考方案1】:找到了解决办法
from pyspark.sql.window import Window
from pyspark.sql.functions import rand,row_number
1。使用 rand() 内置函数生成随机数的列“键”,然后将行号分配给通过“键”按“order_status”列顺序创建的分区窗口的每个元素。代码如下
df_sample=df.withColumn("key",rand()).\
withColumn("rnk", row_number().\
over(Window.partitionBy("status").\
orderBy("key"))).\
where("rnk<=5").drop("key","rnk")
2。现在我得到每个状态正好 5 个随机记录。示例输出如下。每次 Spark 会话都会发生变化。
#Checking count per status of resultant sample dataframe
df_sample.groupBy("status").count().show()
+---------------+-----+
| status |count|
+---------------+-----+
|PENDING_PAYMENT| 5|
| COMPLETE| 5|
| ON_HOLD| 5|
| PAYMENT_REVIEW| 5|
| PROCESSING| 5|
| CLOSED| 5|
|SUSPECTED_FRAUD| 5|
| PENDING| 5|
| CANCELED| 5|
+---------------+-----+
【讨论】:
以上是关于pyspark - 如何在分层随机抽样中使用(df.sampleByKey())选择每层的确切记录数的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Python 进行随机分层抽样(不是训练/测试拆分)?