如果 csv 文件中包含 Pyspark 中的特定关键字,则跳过该行

Posted

技术标签:

【中文标题】如果 csv 文件中包含 Pyspark 中的特定关键字,则跳过该行【英文标题】:skip lines from csv file if it contains specific keyword in Pyspark 【发布时间】:2018-11-13 07:17:56 【问题描述】:

我有一个 CSV 文件,其详细信息如下所示:

emp_id,emp_name,emp_city,emp_salary
1,VIKRANT SINGH RANA    ,NOIDA   ,10000
3,GOVIND NIMBHAL        ,DWARKA  ,92000
2,RAGHVENDRA KUMAR GUPTA,GURGAON ,50000
4,ABHIJAN SINHA         ,SAKET   ,65000
5,SUPER DEVELOPER       ,USA     ,50000
6,RAJAT TYAGI           ,UP      ,65000
7,AJAY SHARMA           ,NOIDA   ,70000
8,SIDDHARTH BASU        ,SAKET   ,72000
9,ROBERT                ,GURGAON ,70000
9,ABC                   ,ROBERT  ,10000
9,XYZ                   ,ROBERTGURGAON,70000

如果它包含关键字“ROBERT”并且预期输出为:

,我想跳过这些行
+------+--------------------+-------------+----------+
|emp_id|            emp_name|     emp_city|emp_salary|
+------+--------------------+-------------+----------+
|     1|VIKRANT SINGH RAN...|     NOIDA   |     10000|
|     3|GOVIND NIMBHAL   ...|     DWARKA  |     92000|
|     2|RAGHVENDRA KUMAR ...|     GURGAON |     50000|
|     4|ABHIJAN SINHA    ...|     SAKET   |     65000|
|     5|SUPER DEVELOPER  ...|     USA     |     50000|
|     6|RAJAT TYAGI      ...|     UP      |     65000|
|     7|AJAY SHARMA      ...|     NOIDA   |     70000|
|     8|SIDDHARTH BASU   ...|     SAKET   |     72000|
+------+--------------------+-------------+----------+

我可以将此文件加载到数据框中,并可以使用以下表达式对每一列进行过滤

newdf = emp_df.where(~ col("emp_city").like("ROBERT%"))

我正在寻找一些解决方案,以便在将其加载到数据框之前对其进行过滤,而无需遍历所有列来查找特定字符串。

【问题讨论】:

在创建数据框之前,您是否尝试使用lambda 函数将结果读取到rddfilter?像这样的东西:rdd.filter(lambda row : 'ROBERT' not in row).toDF(rdd.first()) 是的,它有效。谢谢 很高兴听到它有帮助 :) 并感谢您为此写出正确的答案。 【参考方案1】:

我能够使用 RDD 对其进行过滤。

textdata = sc.textFile(PATH_TO_FILE)
header=textdata.first();
textnewdata = textdata.filter(lambda x:x != header)
newRDD = textnewdata.filter(lambda row : 'ROBERT' not in row)

[u'1,VIKRANT SINGH RANA    ,NOIDA   ,10000', 
u'3,GOVIND NIMBHAL        ,DWARKA  ,92000', 
u'2,RAGHVENDRA KUMAR GUPTA,GURGAON ,50000', 
u'4,ABHIJAN SINHA         ,SAKET   ,65000', 
u'5,SUPER DEVELOPER       ,USA     ,50000', 
u'6,RAJAT TYAGI           ,UP      ,65000', 
u'7,AJAY SHARMA           ,NOIDA   ,70000', 
u'8,SIDDHARTH BASU        ,SAKET   ,72000']

newsplitRDD = newRDD.map(lambda l: l.split(","))

newDF = newsplitRDD.toDF()

>>> newDF.show();
+---+--------------------+--------+-----+
| _1|                  _2|      _3|   _4|
+---+--------------------+--------+-----+
|  1|VIKRANT SINGH RAN...|NOIDA   |10000|
|  3|GOVIND NIMBHAL   ...|DWARKA  |92000|
|  2|RAGHVENDRA KUMAR ...|GURGAON |50000|
|  4|ABHIJAN SINHA    ...|SAKET   |65000|
|  5|SUPER DEVELOPER  ...|USA     |50000|
|  6|RAJAT TYAGI      ...|UP      |65000|
|  7|AJAY SHARMA      ...|NOIDA   |70000|
|  8|SIDDHARTH BASU   ...|SAKET   |72000|
+---+--------------------+--------+-----+

【讨论】:

以上是关于如果 csv 文件中包含 Pyspark 中的特定关键字,则跳过该行的主要内容,如果未能解决你的问题,请参考以下文章

PYSPARK - 如何读取 S3 中所有子文件夹中的所有 csv 文件?

如何将存储在 HDFS 中包含行的文本文件转换为 Pyspark 中的数据框?

在 pyspark 中处理大数字的数据类型

读入火花数据框时如何从csv文件中删除列

使用 pyspark 读取多个 csv 文件

读取包含嵌入逗号的引用字段的 csv 文件