与纯 python 替代方案相比,Pyspark 代码性能不够
Posted
技术标签:
【中文标题】与纯 python 替代方案相比,Pyspark 代码性能不够【英文标题】:Pyspark code is not performant enough when compared to pure python alternative 【发布时间】:2017-06-18 17:04:49 【问题描述】:我将下面粘贴在 python 中的现有代码转换为 pyspark。
Python 代码:
import json
import csv
def main():
# create a simple JSON array
with open('paytm_tweets_data_1495614657.json') as str:
tweetsList = []
# change the JSON string into a JSON object
jsonObject = json.load(str)
#print(jsonObject)
# # print the keys and values
for i in range(len(jsonObject)):
tweetsList.insert(i,jsonObject[i]["text"])
#print(tweetsList)
displaySentiment(tweetsList)
def displaySentiment(tweetsList):
aDict =
from sentiment import sentiment_score
for i in range(len(tweetsList)):
aDict[tweetsList[i]] = sentiment_score(tweetsList[i])
print (aDict)
with open('PaytmtweetSentiment.csv', 'w') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames = ["Tweets", "Sentiment Value"])
writer.writeheader()
writer = csv.writer(csv_file)
for key, value in aDict.items():
writer.writerow([key, value])
if __name__ == '__main__':
main()
转换后的 Pyspark 代码:
import json
import csv
import os
from pyspark import SparkContext, SparkConf
from pyspark.python.pyspark.shell import spark
os.environ['PYSPARK_PYTHON'] = "/usr/local/bin/python3"
def main():
path = "/Users/i322865/DeepInsights/bitbucket-code/ai-engine/twitter-sentiment-analysis/flipkart_tweets_data_1495601666.json"
peopleDF = spark.read.json(path).rdd
df = peopleDF.map(lambda row: row['text'])
displaySentiment(df.collect())
def displaySentiment(tweetsList):
from sentiment import sentiment_score
aDict = sentiment_score(tweetsList)
#
with open('paytmtweetSentiment.csv', 'w') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames = ["Tweets", "Sentiment Value"])
writer.writeheader()
writer = csv.writer(csv_file)
for i in range(len(tweetsList)):
writer.writerow([tweetsList[i], aDict[i]])
print([tweetsList[i], aDict[i]])
if __name__ == '__main__':
conf = SparkConf().setAppName("Test").setMaster("local")
sc = SparkContext.getOrCreate(conf=conf)
main()
我运行了这两个程序,但没有看到任何显着的性能改进。我错过了什么?请问您能发表一些想法吗?
另外,我也应该使用“减少”吗?我目前只使用“地图”。
【问题讨论】:
这种类型的问题不适合该网站,但事实上,这仍然是一个糟糕的代码,说实话,当然没有冒犯! Pyspark 不是一种编程语言。另一方面,Python 是。 @eliasah 抱歉,修改了问题。感谢您的快速反馈。 调用df.collect()
两次当然性能较差。完全调用它会使 Spark 几乎无用
@cricket_007 谢谢。对打印声明感到抱歉。说我是否消除/删除了它(现在检查问题)。请你能告诉我我还缺少什么吗?我也应该使用“减少”吗?我目前只使用“地图”。
我不知道您期望发生什么,但是将 df.collect()
传递给 displaySentiment
不会给您带来任何好处。您在这里加速的只是读取 JSON 数据,无论如何,这些数据都受磁盘 IO 的约束。您需要将 dataframe 传递到 displaySentiment
,而不是 python 列表
【参考方案1】:
如果您想在 PySpark 中并行处理某些内容,请不要 collect()
回到 Python 列表
def calc_sentiment(tweetsDf): # You should pass a dataframe
from sentiment import sentiment_score
# Add a new column over the Tweets for the sentiment
return tweetsDf.withColumn('sentiment_score', sentiment_score(tweetsDf.text))
显然,sentiment_score
也需要更改为接受和返回 PySpark Column
然后,你会有这样的东西
def main():
path = "..../twitter-sentiment-analysis/flipkart_tweets_data_1495601666.json"
twitterDf = spark.read.json(path)
# Don't call collect, only sample the Dataframe
sentimentDf = calc_sentiment(twitterDf)
sentimentDf.show(5)
# TODO: Write sentimentDf to a CSV
sentimentDf.write.csv(....)
【讨论】:
感谢您的建议。只是一个附加问题,而不是 .json 文件,如果我传递流数据,是否需要在上面建议的代码中进行任何更改 只要您可以将流放入 Dataframe 对象,那么我不知道【参考方案2】:除了其他人指出的收集问题外,您的 PySpark 实现可能会更慢,这仅仅是因为 Spark 不适用于您当前的用例。
从根本上说,Spark 旨在加速对大型分布式数据集(多台机器)的操作,而不是本地并行化。为此,它使用开销结构和流程。
对于单个/小型数据集,这种开销很容易占据主导地位并减慢您的解决方案。 This article 讨论 Hadoop 的使用,非常相似。您可能已经尝试过multiprocessing?
如果您确定 Spark 适合您,那么发布一个新问题可能会有所帮助,详细说明您的 Spark 设置、如何衡量性能以及您的数据集。
【讨论】:
【参考方案3】:我认为你没有看到任何加速是完全有道理的。您首先创建一个 RDD(以便分发数据),然后收集它们以运行您的第二个功能,即分析功能。实际上,您通过将所有数据收集到继续应用您的 displaySentiment() 函数的驱动程序机器来破坏您的第一个函数所做的事情。所以你实际上做的是在驱动机器上运行程序,这只是一台机器。因此没有加速。
【讨论】:
以上是关于与纯 python 替代方案相比,Pyspark 代码性能不够的主要内容,如果未能解决你的问题,请参考以下文章
在 python 中使用 pandas,numpy 是不是有 pyspark.ml.feature StringIndexer 的替代方法?
在实现 INotifyPropertyChanged 时,[CallerMemberName] 与替代方案相比是不是慢?
在实现 INotifyPropertyChanged 时,[CallerMemberName] 与替代方案相比是不是慢?
YMatrix + PLPython替代Spark实现车联网算法