从 PySpark 中的 RDD 中的数据中查找最小和最大日期

Posted

技术标签:

【中文标题】从 PySpark 中的 RDD 中的数据中查找最小和最大日期【英文标题】:find the minimum and maximum date from the data in a RDD in PySpark 【发布时间】:2015-11-25 04:26:49 【问题描述】:

我将SparkIpython 一起使用,并且有一个RDD,其中包含打印时这种格式的数据:

print rdd1.collect()

[u'2010-12-08 00:00:00', u'2010-12-18 01:20:00', u'2012-05-13 00:00:00',....]

每个数据都是一个datetimestamp,我想在这个RDD 中找到最小值和最大值。我该怎么做?

【问题讨论】:

【参考方案1】:

例如,您可以使用 aggregate 函数(有关其工作原理的说明,请参阅:What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?)

from datetime import datetime    

rdd  = sc.parallelize([
    u'2010-12-08 00:00:00', u'2010-12-18 01:20:00', u'2012-05-13 00:00:00'])

def seq_op(acc, x):
    """ Given a tuple (min-so-far, max-so-far) and a date string
    return a tuple (min-including-current, max-including-current)
    """
    d = datetime.strptime(x, '%Y-%m-%d %H:%M:%S')
    return (min(d, acc[0]), max(d, acc[1]))

def comb_op(acc1, acc2):
    """ Given a pair of tuples (min-so-far, max-so-far)
    return a tuple (min-of-mins, max-of-maxs)
    """
    return (min(acc1[0], acc2[0]), max(acc1[1], acc2[1]))

# (initial-min <- max-date, initial-max <- min-date)
rdd.aggregate((datetime.max, datetime.min), seq_op, comb_op)

## (datetime.datetime(2010, 12, 8, 0, 0), datetime.datetime(2012, 5, 13, 0, 0))

DataFrames:

from pyspark.sql import Row
from pyspark.sql.functions import from_unixtime, unix_timestamp, min, max

row = Row("ts")
df = rdd.map(row).toDF()

df.withColumn("ts", unix_timestamp("ts")).agg(
    from_unixtime(min("ts")).alias("min_ts"), 
    from_unixtime(max("ts")).alias("max_ts")
).show()

## +-------------------+-------------------+
## |             min_ts|             max_ts|
## +-------------------+-------------------+
## |2010-12-08 00:00:00|2012-05-13 00:00:00|
## +-------------------+-------------------+

【讨论】:

如果您能提及一些关于代码如何工作(尤其是两个函数)的cmets,以便我能够正确理解它,是否有可能?【参考方案2】:

如果您的 RDD 包含日期时间对象,那么简单地使用有什么问题

rdd1.min()
rdd1.max()

见documentation

这个例子适合我

rdd = sc.parallelize([u'2010-12-08 00:00:00', u'2010-12-18 01:20:00', u'2012-05-13 00:00:00'])
from datetime import datetime
rddT = rdd.map(lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S")).cache()
print rddT.min()
print rddT.max()

【讨论】:

【参考方案3】:

如果您使用的是 Dataframes,那么您什么都不需要:

import pyspark.sql.functions as F 
#Anohter imports, session, attributions etc
# This brings min and max(considering you need only min and max values)
df.select(F.min('datetime_column_name'),F.max('datetime_column_name')).show()

就是这样!

【讨论】:

这很简单,对我有用!谢谢

以上是关于从 PySpark 中的 RDD 中的数据中查找最小和最大日期的主要内容,如果未能解决你的问题,请参考以下文章

使用 map/filter 在 Pyspark 中的 RDD 中查找最大元素

如何从 Pyspark 中的 RDD 中过滤

从 Pyspark 中的 RDD 中提取字典

Pyspark中的平均向量与查找表

将 RDD 行拆分到 Pyspark 中的不同列

PySpark|RDD编程基础