获取 RDD 中每个键的最大值和最小值

Posted

技术标签:

【中文标题】获取 RDD 中每个键的最大值和最小值【英文标题】:Get Max & Min value for each key in the RDD 【发布时间】:2021-01-02 15:36:32 【问题描述】:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc , 10)
rdd = ssc.sparkContext.parallelize(pd_binance)
rdd.take(1)

这是结果的一小部分:

[['0.02703300', '1.30900000'],
   ['0.02703300', '0.61800000'],
   ['0.02704600', '3.90800000'],
   ['0.02704700', '4.00000000'],
   ['0.02704700', '7.44600000']

我想得到每个键的最大值和最小值,如何?

【问题讨论】:

【参考方案1】:

正如@mck 所说,您可以使用 reduceByKey,但如果您从未使用过函数式编程,理解起来可能会有点复杂。

该方法的作用是将函数应用于执行groupByKey 的结果值。让我们一步一步来分析。

>>> rdd.groupByKey().take(1)
[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f1fd90>)]

这样做我们获得了一个 RDD,每个键都有一个条目(配对 RDD 中的第一列),并且值是可迭代的。我们可以将其视为一个列表。

我们从基础 RDD 得到

[['0.02703300', '1.30900000'],
   ['0.02703300', '0.61800000'],
   ['0.02704600', '3.90800000'],
   ['0.02704700', '4.00000000'],
   ['0.02704700', '7.44600000']]

到一个分组

[('0.02704600', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2fe20>),
 ('0.02704700', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f910>), 
 ('0.02703300', <pyspark.resultiterable.ResultIterable object at 0x7fac15f2f550>)]

那么我们必须做的是对值应用所需的函数。我们可以将所需的函数传递给mapValues method(在我的例子中,我直接传递一个 lambda 函数)

>>> rdd.groupByKey().mapValues(lambda k: (max(k), min(k))).collect()
[('0.02704600', ('3.90800000', '3.90800000')), 
('0.02704700', ('7.44600000', '4.00000000')), 
('0.02703300', ('1.30900000', '0.61800000'))]

有一些注意事项:

    reducebyKey 更加整洁高效。虽然它可能会令人困惑 如果您想要最大值和最小值,请尝试按照我展示的方式同时执行(您也可以使用 reduceByKey 执行此操作)。这样,您只需执行一次即可,而不是对数据进行两次传递。 尝试使用 DataFrame (SQL) API。它更现代,并尝试为您优化计算。 reduceByKey 函数需要有点不同,因为它获取两个项而不是一个可迭代项
>>> rdd.reduceByKey(lambda a, b: (max(a,b), min(a, b))).collect()
[('0.02704600', '3.90800000'), 
('0.02704700', ('7.44600000', '4.00000000')), 
('0.02703300', ('1.30900000', '0.61800000'))]

【讨论】:

您使用的是哪个 Spark/Python 版本?因为我刚刚尝试过这个并返回了一个错误:TypeError: '>' not supported between 'float' and 'tuple'【参考方案2】:

你可以使用reduceByKey:

minimum = rdd.reduceByKey(min)
maximum = rdd.reduceByKey(max)

【讨论】:

以上是关于获取 RDD 中每个键的最大值和最小值的主要内容,如果未能解决你的问题,请参考以下文章

获取 ActiveRecord 中每个组的最小值/最大值

使用 pyspark 在单次遍历数据中查找最小值/最大值

获取Laravel中每个不同项目名称的最小值和最大值的集合项

MySQL 查询最大最小值优化

SQL:在一列中获取最小值和最大值

从Awk中的多维数组中的子数组获取最小值和最大值