在 pyspark 中执行 NLTK

Posted

技术标签:

【中文标题】在 pyspark 中执行 NLTK【英文标题】:Perform NLTK in pyspark 【发布时间】:2020-05-24 19:25:16 【问题描述】:

我是 pyspark 的新手,我已经开发了一个在 HDFS 文件上执行 NLTK 的程序,以下是它的步骤。我使用的是 spark 2.3.1

1.从 HDFS 获取文件

2。执行词形还原

3.删除标点符号。

4.将RDD转换为DataFrame

5.执行分词器

6.删除停用词

7.分解列数据为每条记录创建唯一的行

8.我想将所有文件数据保存到一个文件中,因此我将输出与旧文件合并

9.现在将整个合并后的输出写入 HDFS

10.然后删除旧文件并将 spark 创建的文件重命名为不同的名称

11.我正在为所有二元和三元文件执行此操作。

这是我的 pyspark 代码。

%pyspark

import os
import pyspark
import csv
import nltk
import json
import string
import re

from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import NGram
from pyspark import SparkContext, SparkConf as sc
from pyspark.sql.types import StringType

from nltk.corpus import stopwords
nltk.download('stopwords')

from pyspark.sql import SQLContext
from pyspark.sql.functions import explode,regexp_replace

import pandas
import hdfs



nltk.download('punkt')
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')


from pyspark import SparkContext, SparkConf
# conf = SparkConf().setAppName("PySpark App")
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

hdfs_dst_dir = "/user/zeppelin/achyuttest.csv/"
counter=0

#Lemmatizen

def lemma(x):

    lemmatizer = WordNetLemmatizer()
    return lemmatizer.lemmatize(x)



for i in range(1,50001):
    data = sc.textFile('hdfs:///user/spark/Patentdata/ElectronicsPatents/Link\ /abstract.txt'.format(i), use_unicode=False)

    print(type(data))

    if data.isEmpty():
        continue


    else:
        lem_words = data.map(lemma)


        list_punct=list(string.punctuation)


        len_list = lem_words.collect()


        test_str = len_list[0]
        test_df = test_str.split(' ')


        data_df = data.map(lambda x: (x, )).toDF(['lem_words'])






# Perform Tokenizer

        tokenizer =  Tokenizer(inputCol="lem_words", outputCol="tokenized_data")
        outputdata = tokenizer.transform(data_df)
        outputdata = outputdata.select('tokenized_data')




    # Remove stop words

        remover = StopWordsRemover(inputCol='tokenized_data', outputCol='words_clean')
        outputdata = remover.transform(outputdata).select('words_clean')


#Explode one Row into multiple Row with value

        result_df = outputdata.withColumn("exploded", explode("words_clean")).select("exploded")

        result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))



        print("Link  ========>",i)
#Merge with old output

        if counter>0:
            old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/unigram.csv', use_unicode=False)
            old_data_df = old_data.map(lambda x: (x, )).toDF(['words_clean'])


            result_df = old_data_df.union(result_df)

        else:
            pass

#Write DataFrame to HDFS

        result_df.coalesce(1).write.mode('append').csv(hdfs_dst_dir)

        fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

    # Rename file

    #list files in the directory


        list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))


    #filter name of the file starts with part-

        print("Get FileName")
        file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

        print(file_name)
    #rename the file


        new_filename = "unigram.csv"

    # Remove Old file

        fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
        fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))



## Bigrams

        bigram = NGram(n=2, inputCol="words_clean", outputCol="bigrams")

        bigramDataFrame = bigram.transform(outputdata)




    #Explode one Row into multiple Row with value

        result_df = bigramDataFrame.withColumn("exploded", explode("bigrams")).select("exploded")
        result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))


    #Merge with old output

        if counter>0:
            old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/bigram.csv', use_unicode=False)
            old_data_df = old_data.map(lambda x: (x, )).toDF(["exploded"])


            result_df = old_data_df.union(result_df)

        else:
            pass


    # Write Output in file

        result_df.coalesce(1).write.mode('append').csv('hdfs:///user/zeppelin/achyuttest.csv')

    # Rename file

    #list files in the directory

        list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))

    #filter name of the file starts with part-

        file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

    #rename the file

        new_filename = "bigram.csv"

        fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
        fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))





## TriGram

        trigram = NGram(n=3, inputCol="words_clean", outputCol="trigrams")

        trigramDataFrame = trigram.transform(outputdata)


    #Explode one Row into multiple Row with value

        result_df = trigramDataFrame.withColumn("exploded", explode("trigrams")).select("exploded")
        result_df=result_df.select(regexp_replace('exploded',"[^a-zA-Z\\s]",""))

    #Merge with old output

        if counter>0:
            old_data = sc.textFile('hdfs:///user/zeppelin/achyuttest.csv/trigram.csv', use_unicode=False)
            old_data_df = old_data.map(lambda x: (x, )).toDF(["exploded"])


            result_df = old_data_df.union(result_df)

        else:
            pass


#Save DataFrame in HDFS
        result_df.coalesce(1).write.mode('append').csv('hdfs:///user/zeppelin/achyuttest.csv')

    # Rename file

    #list files in the directory

        list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir))

    #filter name of the file starts with part-

        file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

    #rename the file

        new_filename = "trigram.csv"

        fs.delete(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))
        fs.rename(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+file_name),spark._jvm.org.apache.hadoop.fs.Path(hdfs_dst_dir+''+new_filename))

        counter = counter+1

我正在对 50K 文件执行此代码,而我的 spark 花费了太多时间来执行此程序。 (2天过去了,还在继续……)

我正在虚拟机中运行 HDP(运行一个节点 HDP 沙箱)这是我的系统规范...

====> 来宾操作系统::

    内存:12930 MB

    CPU:6CPU

===> 纱线规格::

1.内存:4608 MB

    最大容器内存:4608 MB

    最大容器大小(Vcores):4

    虚拟核心数:4

===> Zeppelin Pyspark 解释器规范:: 1. spark.executor.memory: 空白(文档中指定的意思是1g)

所以我有两个问题。

    我的代码是否正确? 我必须在 YARN 和 Zeppelin Interpreter 中指定哪个值,这样它才能快速高效地工作。

谢谢。

【问题讨论】:

为什么你一次读取一个文件 - for i in range(1,50001): ?目前您所有的数据都只处理 1 个执行程序,您可以尝试使用多个执行程序,即一次加载所有文件,以便您获得更多线程来完成相同的工作吗?? 如果可能的话,也可以拆分你的代码来处理多线程和单线程的工作。 @Srinivas 我是 Spark 新手,请您重构我的代码或提供一些好的资源,以便我可以使用多线程。 能否根据功能将代码拆分成多个函数?现在您已将所有代码保存在一个地方.. @Srinivas 在这里我必须实现的是对 50K 文件执行 NLTK 并将整个输出保存在 CSV 文件中(将有三个 CSV 文件 unigram.csv、bigram.csv 和 trigram.csv )为此,每次我阅读旧文件并在最后附加内容时,删除旧文件并将此内容另存为同一位置的新文件。我无法确定如何将整个代码拆分为函数,您能帮我解决一下吗? 【参考方案1】:

我正在回答我的第一个问题。

根据旧代码,我正在为文件夹中的每个文件制作一个 RDD,因此花费了太多时间(处理 3K 文件需要 19 小时。)

但现在我所做的是在 Single RDD 操作中读取所有输入文件,并对其执行所有操作。 (现在新代码需要大约 15 分钟来处理 3K 文件。)


最终代码

评论用于额外理解

专利检测-local.py

"""
To Run this code
Set Pyspark_python 

$ export PYSPARK_PYTHON=/usr/bin/python3
$ pip install nltk


RUN ON Spark::

$ ./bin/spark-submit file_path/Patentdetect-local.py
"""



import pyspark
import nltk
import string
import os
import re

from pyspark import SparkContext
from nltk.stem import WordNetLemmatizer

from pyspark.ml.feature import NGram
from pyspark.sql.types import ArrayType,StructType,StructField,StringType
from pyspark.sql.functions import explode,array,split,collect_list
from pyspark.sql.window import Window
from pyspark.sql import SparkSession


sc = SparkContext.getOrCreate()

spark = SparkSession.builder.appName('Spark Example').getOrCreate()


Source_path="<path>/*/abstract.txt"

Destination_path="<path>/spark-outputs/parquet/Electronics-50/"



data=sc.textFile(Source_path)


data.persist()
lower_casetext = data.map(lambda x:x.lower())



# splitting_rdd = lower_casetext.map(lambda x:x.split(" "))
# print(splitting_rdd.collect())


# Function to perform Sentence tokeniaztion
def sent_TokenizeFunct(x):
    return nltk.sent_tokenize(x)

sentencetokenization_rdd = lower_casetext.map(sent_TokenizeFunct)

# Function to perform Word tokenization

def word_TokenizeFunct(x):
    splitted = [word for line in x for word in line.split()]
    return splitted

wordtokenization_rdd = sentencetokenization_rdd.map(word_TokenizeFunct)


# Remove Stop Words

def removeStopWordsFunct(x):
    from nltk.corpus import stopwords
    stop_words=set(stopwords.words('english'))
    filteredSentence = [w for w in x if not w in stop_words]
    return filteredSentence
stopwordRDD = wordtokenization_rdd.map(removeStopWordsFunct)


# Remove Punctuation marks

def removePunctuationsFunct(x):
    list_punct=list(string.punctuation)
    filtered = [''.join(c for c in s if c not in list_punct) for s in x] 
    filtered_space = [s for s in filtered if s] #remove empty space 
    return filtered
rmvPunctRDD = stopwordRDD.map(removePunctuationsFunct)

# Perform Lemmatization

def lemma(x):

    lemmatizer = WordNetLemmatizer()

    final_rdd = [lemmatizer.lemmatize(s) for s in x]
    return final_rdd

lem_wordsRDD = rmvPunctRDD.map(lemma)

# Join tokens

# def joinTokensFunct(x):
#     joinedTokens_list = []
#     x = " ".join(x)
#     return x

# joinedTokensRDD = lem_wordsRDD.map(joinTokensFunct)


##Create DataFrame from RDD

df = lem_wordsRDD.map(lambda x: (x, )).toDF(["features"])

tokenized_df = df.withColumn("values", explode("features")).select("values")


## Write DataFrame Output

# tokenized_df.write.mode('append').csv(Destination_path)

## Change File-name

# for old_file_name in os.listdir(Destination_path):
#   src = Destination_path+old_file_name
#   dst = Destination_path+"unigram.csv"
    
#   if old_file_name.startswith("part-"):
#       os.rename(src, dst)
        # break


## For Bigrams following commented line is enough
# # tokenized_df.select(F.concat_ws(" ",F.col("values"),F.lead("values").over(Window.orderBy(F.lit(None))))).show()



## Create Final DataFrme 

final_df = tokenized_df.select(collect_list("values").alias("listed_data"))

# final_df.show(truncate=False)

final_df.persist()


## Unigram

unigram = NGram(n=1, inputCol="listed_data", outputCol="unigrams")

unigramDataFrame = unigram.transform(final_df)

unigram_FinalDataFrame = unigramDataFrame.withColumn("unigram_final",explode("unigrams")).select("unigram_final")


## Write DataFrame Outputs

unigram_FinalDataFrame.write.mode('append').parquet(Destination_path)

# Change filename
for old_file_name in os.listdir(Destination_path):
    src = Destination_path+old_file_name
    dst = Destination_path+"unigram.parquet"
    
    if old_file_name.startswith("part-"):
        os.rename(src, dst)


## Bigram

bigram = NGram(n=2, inputCol="listed_data", outputCol="bigrams")

bigramDataFrame = bigram.transform(final_df)

bigram_FinalDataFrame = bigramDataFrame.withColumn("bigram_final",explode("bigrams")).select("bigram_final")


## Write DataFrame Outputs

bigram_FinalDataFrame.write.mode('append').parquet(Destination_path)

## Change filename
for old_file_name in os.listdir(Destination_path):
    src = Destination_path+old_file_name
    dst = Destination_path+"bigram.parquet"
    
    if old_file_name.startswith("part-"):
        os.rename(src, dst)
        # break

## Trigram 

trigram = NGram(n=3, inputCol="listed_data", outputCol="trigram")

trigramDataFrame = trigram.transform(final_df)

trigram_FinalDataFrame = trigramDataFrame.withColumn("trigram_final",explode("trigram")).select("trigram_final")

## Write DataFrame Outputs
trigram_FinalDataFrame.write.mode('append').parquet(Destination_path)

# Change Filename
for old_file_name in os.listdir(Destination_path):
    src = Destination_path+old_file_name
    dst = Destination_path+"trigram.parquet"
    
    if old_file_name.startswith("part-"):
        os.rename(src, dst)
        # break

final_df.unpersist()
data.unpersist()

【讨论】:

以上是关于在 pyspark 中执行 NLTK的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark,执行者在执行连接时失去连接

PySpark 在一个目录中执行所有测试用例

pySpark forEachPartition - 代码在哪里执行

在 PySpark 并行执行上下文中使用 JAR 依赖项

在 Pyspark 中执行 Python 函数生成的变量

pySpark:获取执行者 ID