如何使用 mapreduce 和 pyspark 查找某一年某一天的频率
Posted
技术标签:
【中文标题】如何使用 mapreduce 和 pyspark 查找某一年某一天的频率【英文标题】:How to find frequencies of a days of a certain year using mapreduce and pyspark 【发布时间】:2016-05-13 14:57:50 【问题描述】:我有一个文本文件 (61Gb),每行包含一个表示日期的字符串,例如2010 年 12 月 16 日星期四 18:53:32 +0000
在单核上迭代文件耗时太长,因此我想使用 Pyspark 和 Mapreduce 技术快速查找某年某天的行频。
我认为是一个好的开始:
import dateutil.parser
text_file = sc.textFile('dates.txt')
date_freqs = text_file.map(lambda line: dateutil.parser.parse(line)) \
.map(lambda date: date + 1) \
.reduceByKey(lambda a, b: a + b)
不幸的是,我不明白如何过滤某一年并按键减少。关键是日子。
示例输出:
12 月 16 日星期四 26543
12 月 17 日 345 日星期四 等等
【问题讨论】:
【参考方案1】:正如another answer 中提到的,dateutil.parser.parse
返回一个具有year
、month
和day
属性的datetime object:
>>> dt = dateutil.parser.parse('Thu Dec 16 18:53:32 +0000 2010')
>>> dt.year
2010
>>> dt.month
12
>>> dt.day
16
从这个 RDD 开始:
>>> rdd = sc.parallelize([
... 'Thu Oct 21 5:12:38 +0000 2010',
... 'Thu Oct 21 4:12:38 +0000 2010',
... 'Wed Sep 22 15:46:40 +0000 2010',
... 'Sun Sep 4 22:28:48 +0000 2011',
... 'Sun Sep 4 21:28:48 +0000 2011'])
以下是获取所有年-月-日组合计数的方法:
>>> from operator import attrgetter
>>> counts = rdd.map(dateutil.parser.parse).map(
... attrgetter('year', 'month', 'day')).countByValue()
>>> counts
defaultdict(<type 'int'>, (2010, 9, 22): 1, (2010, 10, 21): 2, (2011, 9, 4): 2)
要得到你想要的输出:
>>> for k, v in counts.iteritems():
... print datetime.datetime(*k).strftime('%a %b %y'), v
...
Wed Sep 10 1
Thu Oct 10 2
Sun Sep 11 2
如果您只想计算某一年的数据,您可以在计算前过滤 RDD:
>>> counts = rdd.map(dateutil.parser.parse).map(
... attrgetter('year', 'month', 'day')).filter(
... lambda (y, m, d): y == 2010).countByValue()
>>> counts
defaultdict(<type 'int'>, (2010, 9, 22): 1, (2010, 10, 21): 2)
【讨论】:
【参考方案2】:类似这样的事情可能是一个好的开始:
import dateutil.parser
text_file = sc.textFile('dates.txt')
date_freqs = text_file.map(lambda line: dateutil.parser.parse(line))
.keyBy((_.year, _.month, _.day)) // somehow get the year, month, day to key by
.countByKey()
【讨论】:
【参考方案3】:我应该补充一点,dateutil 在 Python 中不是标准的。如果您的集群上没有 sudo,这可能会造成问题。作为一种解决方案,我想建议使用日期时间:
import datetime
def parse_line(d):
f = "%a %b %d %X %Y"
date_list = d.split()
date = date_list[:4]
date.append(date_list[5])
date = ' '.join(date)
return datetime.datetime.strptime(date, f)
counts = rdd.map(parse_line)\
.map(attrgetter('year', 'month', 'day'))\
.filter(lambda (y, m, d): y == 2015)\
.countByValue()
我对使用 Parquet、行/列等更好的解决方案感兴趣。
【讨论】:
以上是关于如何使用 mapreduce 和 pyspark 查找某一年某一天的频率的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 PySpark、SparkSQL 和 Cassandra?