如何根据数据集中的行长过滤 RDD。?
Posted
技术标签:
【中文标题】如何根据数据集中的行长过滤 RDD。?【英文标题】:How can I filter an RDD based on length of lines in the dataset.? 【发布时间】:2019-04-15 23:33:57 【问题描述】:我想过滤掉从数据集创建的 RDD,基于 行长使用:Pyspark shell
我的数据文件是这样的
> fzDTn342L3Q djjohnnykey 599 Music 185 1005 3.67 3 1 KDrJSNIGNDQ MacjQFNVLlQ oZ6f2vaH858 fYSjMDNa4S8 JUatPzf_eSc QfBFl7kU35c rG-rQ-YGdSA kOq6sFmoUr0 IRj1IABVBis AVsZ0VH3eN4 r1pS_4qouUc YgaNW1KRgK4 ZlGdVR7mBy4 nKFLE3DX4OQ EtQjN6CQeCc afe-0VY4YiI ekV5NseEdy8 IQs6CrER5fY jTLcoIxMI-E yfvW1ITcMpM
>
> kOq6sFmoUr0 djjohnnykey 599 Music 113 992 0 0 1 MacjQFNVLlQ fYSjMDNa4S8 4vso1y_-cvk 8BwAX6YBx3E QeUQyf8H7vM jmc21-Nhewg hZUU2-UBaGk SaLaotssH0w PUlcrBaYpwI tjIK2xop4L0 BNlL15OYnFY _pzP7OLInjk 4daGJ6TMcp4 _8jM9R-1yRk KDrJSNIGNDQ oZ6f2vaH858 JUatPzf_eSc QfBFl7kU35c rG-rQ-YGdSA fzDTn342L3Q
这里的第 4 列是类别。数据文件中的一些行 不包含此字段,因此长度较短。这促使 我根据这个标准过滤掉数据集并进一步形成 具有类别的数据集上的 RDD。
我已尝试从数据集创建初始 RDD。
>>> data="/Users/sk/Documents/BigData/0222/0.txt"
>>> input = sc.textFile(data)
现在我按制表符拆分并保存在 RDDS 行中
>>> lines = input.map(lambda x: (str(x.split('\t'))))
之后我想过滤掉长度小于3的行。
>>> data="/Users/sk/Documents/BigData/0222/1.txt"
>>> input = sc.textFile(data)
>>> lines = input.map(lambda x: (str(x.split('\t'))))
>>> lines.count()
3169
>>> newinput=input.filter(lambda x: len(x)>3)
>>> newinput.count()
3169
在此之后,它不会改变我的 rdd 中的任何内容。任何人都可以 请帮忙。
【问题讨论】:
您可能需要先收集()然后再计数()。像newinput=input.filter(lambda x: len(x)>3).collect()
& 然后newinput.count()
你能上传文本文件吗?
数据集可以在这里找到drive.google.com/file/d/0ByJLBTmJojjzR2x0MzVpc2Z6enM/view
@St1id3r 我尝试了您的解决方案,但出现以下错误:>>> newinput=input.filter(lambda x: len(x)>3).collect() >>> newinput.count() Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: count() takes exactly one argument (0 given)
【参考方案1】:
与您的解决方案有关的几件事。鉴于这是 Python,不确定是否建议使用 RDD(您可能想重新考虑一下)。使用 Dataframe 会更容易且性能更好。
>>> x = spark.read.option("sep","\t").csv("/data/youtubedata.txt")
>>> x.count()
4100
>>> from pyspark.sql.functions import length
>>> from pyspark.sql.functions import col, size
>>> x.filter(length(col("_c3")) > 3).count()
4066
>>> x.filter(x._c3.isNull()).count()
34
>>> x.filter(x._c3.isNotNull()).count()
4066
更新:更新计数。
【讨论】:
我尝试了您的解决方案。但是>>> input = sc.textFile(data) >>> input.count() 3169
的行数仍然没有变化
将我的答案更新到您的测试数据。此外,如果您查看非空计数,这似乎会增加计数。让我知道这是否适合您。以上是关于如何根据数据集中的行长过滤 RDD。?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Java 和 Spark SQL 打印数据集中的行内容?