如何使用 mapreduce 和 pyspark 查找某一年某一天的频率

Posted

技术标签:

【中文标题】如何使用 mapreduce 和 pyspark 查找某一年某一天的频率【英文标题】:How to find frequencies of a days of a certain year using mapreduce and pyspark 【发布时间】:2016-05-13 14:57:50 【问题描述】:

我有一个文本文件 (61Gb),每行包含一个表示日期的字符串,例如2010 年 12 月 16 日星期四 18:53:32 +0000

在单核上迭代文件耗时太长,因此我想使用 Pyspark 和 Mapreduce 技术快速查找某年某天的行频。

我认为是一个好的开始:

import dateutil.parser
text_file = sc.textFile('dates.txt')
date_freqs = text_file.map(lambda line: dateutil.parser.parse(line)) \
        .map(lambda date: date + 1) \
        .reduceByKey(lambda a, b: a + b)

不幸的是,我不明白如何过滤某一年并按键减少。关键是日子。

示例输出:

12 月 16 日星期四 26543

12 月 17 日 345 日星期四 等等

【问题讨论】:

【参考方案1】:

正如another answer 中提到的,dateutil.parser.parse 返回一个具有yearmonthday 属性的datetime object:

>>> dt = dateutil.parser.parse('Thu Dec 16 18:53:32 +0000 2010')
>>> dt.year
2010
>>> dt.month
12
>>> dt.day
16

从这个 RDD 开始:

>>> rdd = sc.parallelize([
...     'Thu Oct 21 5:12:38 +0000 2010',
...     'Thu Oct 21 4:12:38 +0000 2010',
...     'Wed Sep 22 15:46:40 +0000 2010',
...     'Sun Sep 4 22:28:48 +0000 2011',
...     'Sun Sep 4 21:28:48 +0000 2011'])

以下是获取所有年-月-日组合计数的方法:

>>> from operator import attrgetter
>>> counts = rdd.map(dateutil.parser.parse).map(
...     attrgetter('year', 'month', 'day')).countByValue()
>>> counts
defaultdict(<type 'int'>, (2010, 9, 22): 1, (2010, 10, 21): 2, (2011, 9, 4): 2)

要得到你想要的输出:

>>> for k, v in counts.iteritems():
...     print datetime.datetime(*k).strftime('%a %b %y'), v
...
Wed Sep 10 1
Thu Oct 10 2
Sun Sep 11 2

如果您只想计算某一年的数据,您可以在计算前过滤 RDD:

>>> counts = rdd.map(dateutil.parser.parse).map(
...    attrgetter('year', 'month', 'day')).filter(
...    lambda (y, m, d): y == 2010).countByValue()
>>> counts
defaultdict(<type 'int'>, (2010, 9, 22): 1, (2010, 10, 21): 2)

【讨论】:

【参考方案2】:

类似这样的事情可能是一个好的开始:

import dateutil.parser
text_file = sc.textFile('dates.txt')
date_freqs = text_file.map(lambda line: dateutil.parser.parse(line))
    .keyBy((_.year, _.month, _.day)) // somehow get the year, month, day to key by
    .countByKey()

【讨论】:

【参考方案3】:

我应该补充一点,dateutil 在 Python 中不是标准的。如果您的集群上没有 sudo,这可能会造成问题。作为一种解决方案,我想建议使用日期时间:

import datetime
def parse_line(d):
    f = "%a %b %d %X %Y"
    date_list = d.split()
    date = date_list[:4]
    date.append(date_list[5])
    date = ' '.join(date)
    return datetime.datetime.strptime(date, f)

counts = rdd.map(parse_line)\
    .map(attrgetter('year', 'month', 'day'))\
    .filter(lambda (y, m, d): y == 2015)\
    .countByValue()

我对使用 Parquet、行/列等更好的解决方案感兴趣。

【讨论】:

以上是关于如何使用 mapreduce 和 pyspark 查找某一年某一天的频率的主要内容,如果未能解决你的问题,请参考以下文章

如何用mapreduce分布式实现kmeans算法

spark 能执行udf 不能执行udaf,啥原因

如何使用 PySpark、SparkSQL 和 Cassandra?

如何:Pyspark 数据帧持久使用和回读

如何在 Pyspark 中使用 groupby 和数组元素?

如何使用 PySpark 测量逻辑回归的精度和召回率?