如何根据数据集中的行长过滤 RDD。?

Posted

技术标签:

【中文标题】如何根据数据集中的行长过滤 RDD。?【英文标题】:How can I filter an RDD based on length of lines in the dataset.? 【发布时间】:2019-04-15 23:33:57 【问题描述】:

我想过滤掉从数据集创建的 RDD,基于 行长使用:Pyspark shell

我的数据文件是这样的

> fzDTn342L3Q   djjohnnykey 599 Music   185 1005    3.67    3   1   KDrJSNIGNDQ MacjQFNVLlQ oZ6f2vaH858 fYSjMDNa4S8 JUatPzf_eSc QfBFl7kU35c rG-rQ-YGdSA kOq6sFmoUr0 IRj1IABVBis AVsZ0VH3eN4 r1pS_4qouUc YgaNW1KRgK4 ZlGdVR7mBy4 nKFLE3DX4OQ EtQjN6CQeCc afe-0VY4YiI ekV5NseEdy8 IQs6CrER5fY jTLcoIxMI-E yfvW1ITcMpM
> 
> kOq6sFmoUr0   djjohnnykey 599 Music   113 992 0   0   1   MacjQFNVLlQ fYSjMDNa4S8 4vso1y_-cvk 8BwAX6YBx3E QeUQyf8H7vM jmc21-Nhewg hZUU2-UBaGk SaLaotssH0w PUlcrBaYpwI tjIK2xop4L0 BNlL15OYnFY _pzP7OLInjk 4daGJ6TMcp4 _8jM9R-1yRk KDrJSNIGNDQ oZ6f2vaH858 JUatPzf_eSc QfBFl7kU35c rG-rQ-YGdSA fzDTn342L3Q

这里的第 4 列是类别。数据文件中的一些行 不包含此字段,因此长度较短。这促使 我根据这个标准过滤掉数据集并进一步形成 具有类别的数据集上的 RDD。

我已尝试从数据集创建初始 RDD。

>>> data="/Users/sk/Documents/BigData/0222/0.txt"
>>> input = sc.textFile(data)

现在我按制表符拆分并保存在 RDDS 行中

>>> lines = input.map(lambda x: (str(x.split('\t'))))

之后我想过滤掉长度小于3的行。

>>> data="/Users/sk/Documents/BigData/0222/1.txt"
>>> input = sc.textFile(data)
>>> lines = input.map(lambda x: (str(x.split('\t'))))
>>> lines.count()
3169

>>> newinput=input.filter(lambda x: len(x)>3)
>>> newinput.count()
3169

在此之后,它不会改变我的 rdd 中的任何内容。任何人都可以 请帮忙。

【问题讨论】:

您可能需要先收集()然后再计数()。像newinput=input.filter(lambda x: len(x)>3).collect() & 然后newinput.count() 你能上传文本文件吗? 数据集可以在这里找到drive.google.com/file/d/0ByJLBTmJojjzR2x0MzVpc2Z6enM/view @St1id3r 我尝试了您的解决方案,但出现以下错误:>>> newinput=input.filter(lambda x: len(x)>3).collect() >>> newinput.count() Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: count() takes exactly one argument (0 given) 【参考方案1】:

与您的解决方案有关的几件事。鉴于这是 Python,不确定是否建议使用 RDD(您可能想重新考虑一下)。使用 Dataframe 会更容易且性能更好。

>>> x =  spark.read.option("sep","\t").csv("/data/youtubedata.txt")
>>> x.count()
4100
>>> from pyspark.sql.functions import length
>>> from pyspark.sql.functions import col, size
>>> x.filter(length(col("_c3")) > 3).count()
4066
>>> x.filter(x._c3.isNull()).count()
34
>>> x.filter(x._c3.isNotNull()).count()
4066

更新:更新计数。

【讨论】:

我尝试了您的解决方案。但是>>> input = sc.textFile(data) >>> input.count() 3169 的行数仍然没有变化 将我的答案更新到您的测试数据。此外,如果您查看非空计数,这似乎会增加计数。让我知道这是否适合您。

以上是关于如何根据数据集中的行长过滤 RDD。?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Java 和 Spark SQL 打印数据集中的行内容?

如何使用 mapPartitions 函数将 Rdd 转换为数据集

如何根据另一个维度的当前级别过滤一个维度?

Spark RDD数据过滤

如何从 RDD 创建 Spark 数据集

spark RDD(弹性分布式数据集)可以更新吗?