与纯 python 替代方案相比,Pyspark 代码性能不够

Posted

技术标签:

【中文标题】与纯 python 替代方案相比,Pyspark 代码性能不够【英文标题】:Pyspark code is not performant enough when compared to pure python alternative 【发布时间】:2017-06-18 17:04:49 【问题描述】:

我将下面粘贴在 python 中的现有代码转换为 pyspark。

Python 代码:

import json
import csv


def main():
    # create a simple JSON array
    with open('paytm_tweets_data_1495614657.json') as str:

        tweetsList = []
        # change the JSON string into a JSON object
        jsonObject = json.load(str)

        #print(jsonObject)

        # # print the keys and values
        for i in range(len(jsonObject)):
            tweetsList.insert(i,jsonObject[i]["text"])

        #print(tweetsList)
    displaySentiment(tweetsList)



def displaySentiment(tweetsList):
    aDict = 

    from sentiment import sentiment_score

    for i in range(len(tweetsList)):
        aDict[tweetsList[i]] = sentiment_score(tweetsList[i])
    print (aDict)


    with open('PaytmtweetSentiment.csv', 'w') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames = ["Tweets", "Sentiment Value"])
        writer.writeheader()
        writer = csv.writer(csv_file)
        for key, value in aDict.items():
            writer.writerow([key, value])


if __name__ == '__main__':
    main()

转换后的 Pyspark 代码:

import json
import csv
import os
from pyspark import SparkContext, SparkConf
from pyspark.python.pyspark.shell import spark

os.environ['PYSPARK_PYTHON'] = "/usr/local/bin/python3"


def main():
    path = "/Users/i322865/DeepInsights/bitbucket-code/ai-engine/twitter-sentiment-analysis/flipkart_tweets_data_1495601666.json"
    peopleDF = spark.read.json(path).rdd
    df = peopleDF.map(lambda row: row['text'])
    displaySentiment(df.collect())



def displaySentiment(tweetsList):
    from sentiment import sentiment_score

    aDict = sentiment_score(tweetsList)

    #
    with open('paytmtweetSentiment.csv', 'w') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames = ["Tweets", "Sentiment Value"])
        writer.writeheader()
        writer = csv.writer(csv_file)
        for i in range(len(tweetsList)):
            writer.writerow([tweetsList[i], aDict[i]])
            print([tweetsList[i], aDict[i]])


if __name__ == '__main__':
    conf = SparkConf().setAppName("Test").setMaster("local")
    sc = SparkContext.getOrCreate(conf=conf)
    main()

我运行了这两个程序,但没有看到任何显着的性能改进。我错过了什么?请问您能发表一些想法吗?

另外,我也应该使用“减少”吗?我目前只使用“地图”。

【问题讨论】:

这种类型的问题不适合该网站,但事实上,这仍然是一个糟糕的代码,说实话,当然没有冒犯! Pyspark 不是一种编程语言。另一方面,Python 是。 @eliasah 抱歉,修改了问题。感谢您的快速反馈。 调用df.collect() 两次当然性能较差。完全调用它会使 Spark 几乎无用 @cricket_007 谢谢。对打印声明感到抱歉。说我是否消除/删除了它(现在检查问题)。请你能告诉我我还缺少什么吗?我也应该使用“减少”吗?我目前只使用“地图”。 我不知道您期望发生什么,但是将 df.collect() 传递给 displaySentiment 不会给您带来任何好处。您在这里加速的只是读取 JSON 数据,无论如何,这些数据都受磁盘 IO 的约束。您需要将 dataframe 传递到 displaySentiment,而不是 python 列表 【参考方案1】:

如果您想在 PySpark 中并行处理某些内容,请不要 collect() 回到 Python 列表

def calc_sentiment(tweetsDf):  # You should pass a dataframe
    from sentiment import sentiment_score

    # Add a new column over the Tweets for the sentiment
    return tweetsDf.withColumn('sentiment_score', sentiment_score(tweetsDf.text))

显然,sentiment_score 也需要更改为接受和返回 PySpark Column

然后,你会有这样的东西

def main():
    path = "..../twitter-sentiment-analysis/flipkart_tweets_data_1495601666.json"
    twitterDf = spark.read.json(path)

    # Don't call collect, only sample the Dataframe
    sentimentDf = calc_sentiment(twitterDf)
    sentimentDf.show(5)

    # TODO: Write sentimentDf to a CSV
    sentimentDf.write.csv(....)

【讨论】:

感谢您的建议。只是一个附加问题,而不是 .json 文件,如果我传递流数据,是否需要在上面建议的代码中进行任何更改 只要您可以将流放入 Dataframe 对象,那么我不知道【参考方案2】:

除了其他人指出的收集问题外,您的 PySpark 实现可能会更慢,这仅仅是因为 Spark 不适用于您当前的用例。

从根本上说,Spark 旨在加速对大型分布式数据集(多台机器)的操作,而不是本地并行化。为此,它使用开销结构和流程。

对于单个/小型数据集,这种开销很容易占据主导地位并减慢您的解决方案。 This article 讨论 Hadoop 的使用,非常相似。您可能已经尝试过multiprocessing?

如果您确定 Spark 适合您,那么发布一个新问题可能会有所帮助,详细说明您的 Spark 设置、如何衡量性能以及您的数据集。

【讨论】:

【参考方案3】:

我认为你没有看到任何加速是完全有道理的。您首先创建一个 RDD(以便分发数据),然后收集它们以运行您的第二个功能,即分析功能。实际上,您通过将所有数据收集到继续应用您的 displaySentiment() 函数的驱动程序机器来破坏您的第一个函数所做的事情。所以你实际上做的是在驱动机器上运行程序,这只是一台机器。因此没有加速。

【讨论】:

以上是关于与纯 python 替代方案相比,Pyspark 代码性能不够的主要内容,如果未能解决你的问题,请参考以下文章

在 python 中使用 pandas,numpy 是不是有 pyspark.ml.feature StringIndexer 的替代方法?

在实现 INotifyPropertyChanged 时,[CallerMemberName] 与替代方案相比是不是慢?

在实现 INotifyPropertyChanged 时,[CallerMemberName] 与替代方案相比是不是慢?

YMatrix + PLPython替代Spark实现车联网算法

与 Python+Numba LLVM/JIT 编译的代码相比,Julia 的性能

调度 pyspark 笔记本