Pyspark:从随机项目函数创建一个集合

Posted

技术标签:

【中文标题】Pyspark:从随机项目函数创建一个集合【英文标题】:Pyspark: Create a set from a random item function 【发布时间】:2021-11-12 23:29:54 【问题描述】:

pyspark 新手,希望获得任何关于根据给定列表中的随机选择生成一组项目的指针。这些随机选择需要附加到一个列表中,但必须是唯一的,所以在 python 实现中,我使用了一个集合来启动,在 while 语句的上下文中

import string
import random
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
   return ''.join(random.choice(chars) for _ in range(size))
my_set=set()
while len(my_set)<n+1:  #n being the number of items desired
  my_set.add(id_generator())

(id_generator 语法归功于https://***.com/a/2257449/8840174)

我想做的是利用 spark 的分布式计算并更快地完成上述操作。

在流程方面,我认为需要发生这样的事情:将集合保存在驱动程序节点上,并将函数分发给可用于执行 id_generator() 的工作人员,直到我的集合中有 n 个唯一项。 pyspark 中似乎没有用于 random.choices 的等效函数,所以也许我需要使用 UDF 装饰器在 pyspark 中注册该函数?

这是 0,1 之间的分布不是从某个项目列表中选择的随机选择。 https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.rand.html

@udf
def id_generator():
  import string
  import random
  def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))
  return id_generator()

类似上面的东西?虽然我仍然不清楚集合如何/是否在 spark 上工作。

https://***.com/a/61777594/8840174

以上是正确的想法,尽管我不知道从单个项目 spark 数据框中收集值对于数百万次迭代是一个好主意。

该代码适用于直接 python,但如果可能的话,我想加快几个小时的速度。 (我需要根据各种规则/值列表生成几个随机生成的列,以从头开始创建数据集)。

*我知道 id_generator() 的大小为 6,有 2,176,782,336 种组合http://mathcentral.uregina.ca/QQ/database/QQ.09.00/churilla1.html,所以重复的机会不大,但即使没有 set() 要求,我仍在努力实现最佳实现将列表中的随机选择附加到 pyspark 中的另一个列表。

感谢您的任何意见!

编辑 这看起来很有希望Random numbers generation in PySpark

【问题讨论】:

我发现了另一个有用的关于这个的 SO 帖子,在这里分享,以防有人徘徊到这个帖子。 ***.com/questions/66135534/… 【参考方案1】:

如果 Spark 是最好的选择,这真的取决于您的用例,但是您可以在生成的数据帧上使用函数的 udf 并删除重复项。这种方法的缺点是,由于删除了重复项,因此很难达到您可能需要的确切数据点数量。

注意 1:我稍微调整了您的函数以使用 random.choices。

注意 2:如果在多个节点上运行,您可能需要确保每个节点使用不同的随机种子。

import string
import random
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

SIZE = 10 ** 6

spark = SparkSession.builder.getOrCreate()

@udf(StringType())
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choices(chars, k=size))

df = spark.range(SIZE)

df = df.withColumn('sample', id_generator()).drop('id')

print(f'Count: df.count()')
print(f'Unique count: df.dropDuplicates().count()')

df.show(5)

这给出了:

Count: 1000000                                                                  
Unique count: 999783                                                            
+------+
|sample|
+------+
|QTOVIM|
|NEH0SY|
|DJW5Q3|
|WMEKRF|
|OQ09N9|
+------+
only showing top 5 rows

【讨论】:

啊,有趣的是,spark.range() 函数将以您想要的长度创建空数据框,并且 withColumn 将迭代地为每个记录应用该随机生成器。 IE。您不需要在范围内进行任何循环。我的印象是,在循环中附加到数据帧对内存使用效率低下(在循环迭代中保存 df 的副本),并且附加到列表是 python 中的首选路线(在我的情况下,添加到集合解决了独特性方面也是如此)。但这很方便,虽然不是完全独特的,但这已经足够我接受了。谢谢!

以上是关于Pyspark:从随机项目函数创建一个集合的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 pyspark 从 python 列表中选择随机文本值?

在 PySpark 中,有没有办法使用运行时给出的 Python 类的函数来动态注册 UDF? [复制]

使用 Pyspark 训练随机森林回归模型

在 PySpark 中使用 rdd.map 解压和编码字符串

从 Pyspark 中的 RDD 中提取字典

pyspark:删除所有行中具有相同值的列