使用 Pyspark 从单词列表的行条目创建元组并使用 RDD 计数

Posted

技术标签:

【中文标题】使用 Pyspark 从单词列表的行条目创建元组并使用 RDD 计数【英文标题】:Using Pyspark to create tuples from a line entry of list of words and count using RDD 【发布时间】:2021-02-05 07:46:31 【问题描述】:

我有一个 RDD,由 5 个单词的列表(5 个单词 n-gram)、它们的计数、页数和表单文档数组成 (ngram)\t(count)\t(page_count)\t(books_count).

我正在尝试使用 PySpark 以 (word, count) 格式获取单个单词及其计数的最终输出。

到目前为止,我已经使用过

rdd.map(lambda x: (x.split('\t')[0].lower().split(' '), x.split('\t')[1])) \
            .collect()

为了得到表格中的单词

[('word_1', 'word_2', 'word_3', 'word_4', 'word_5'], 1),
('word_6', 'word_7', 'word_8', 'word_9', 'word_10'], 3), ...] 

我认为我需要使用某种 itertools 或类似工具来获得表单中的最终输出

[('word_1', 1),
('word_2', 1),
('word_3', 1),
('word_4', 1),
('word_5', 1),
('word_6', 3),
('word_7', 3),
('word_8', 3),
('word_9', 3),
('word_10', 3),
...]

但是,我还没有遇到任何可以执行此操作的 itertools 或其他功能。 由于我将在大型数据集上使用它,因此我想避免使用用户定义的函数。

额外的奖励问题: 是否有类似于 head/tail 的 PySpark RDD 功能?我想查看顶部和底部最常用的单词。

到目前为止,我的想法是缓存未排序的 RDD,然后使用 ascending=Trueascending=False.sort() 函数,然后使用 .take(n) 函数来获取顶部和底部最频繁的单词。

【问题讨论】:

【参考方案1】:

可以使用flatMap的方法:

rdd.flatMap(lambda x: [(i, x[1]) for i in x[0]]).collect()

输出:

[('word_1', 1),
 ('word_2', 1),
 ('word_3', 1),
 ('word_4', 1),
 ('word_5', 1),
 ('word_6', 3),
 ('word_7', 3),
 ('word_8', 3),
 ('word_9', 3),
 ('word_10', 3)]

你也可以使用flatMapValues:

rdd.map(lambda x: (x[1], x[0])).flatMapValues(lambda x: x).collect()

输出:

[(1, 'word_1'),
 (1, 'word_2'),
 (1, 'word_3'),
 (1, 'word_4'),
 (1, 'word_5'),
 (3, 'word_6'),
 (3, 'word_7'),
 (3, 'word_8'),
 (3, 'word_9'),
 (3, 'word_10')]

回答您的额外问题:Pyspark RDD collect first 163 Rows

【讨论】:

非常感谢!这行得通。我完全忘记了列表理解orz

以上是关于使用 Pyspark 从单词列表的行条目创建元组并使用 RDD 计数的主要内容,如果未能解决你的问题,请参考以下文章

展平 3D NumPy 数组中的内部元组并作为浮点数保存到 CSV

从数据框条目创建元组列表

PySpark UDF 返回可变大小的元组

Pyspark 使用 ArrayWritable

Python:元组列表:比较所有元组并检索元组的元素不等于任何其他元组的元组

在元组的ndarray中查找元组并返回搜索到的元组的索引