在 pyspark 中处理大数据的优化
Posted
技术标签:
【中文标题】在 pyspark 中处理大数据的优化【英文标题】:optimization for processing big data in pyspark 【发布时间】:2016-10-17 15:05:03 【问题描述】:不是问题->需要建议
我在 20gb+6gb=26Gb csv 文件上运行 1+3(1-master,3-slave(每个 16gb RAM)。
这就是我的操作方式
df = spark.read.csv() #20gb
df1 = spark.read.csv() #6gb
df_merged= df.join(df1,'name','left') ###merging
df_merged.persists(StorageLevel.MEMORY_AND_DISK) ##if i do MEMORY_ONLY will I gain more performance?????
print('No. of records found: ',df_merged.count()) ##just ensure persist by calling an action
df_merged.registerTempTable('table_satya')
query_list= [query1,query2,query3] ###sql query string to be fired
city_list = [city1, city2,city3...total 8 cities]
file_index=0 ###will create files based on increasing index
for query_str in query_list:
result = spark.sql(query_str) #ex: select * from table_satya where date >= '2016-01-01'
#result.persist() ###willit increase performance
for city in city_list:
df_city = result.where(result.city_name==city)
#store as csv file(pandas style single file)
df_city.collect().toPandas().to_csv('file_'+str(file_index)+'.csv',index=False)
file_index += 1
df_merged.unpersist() ###do I even need to do it or Spark can handle it internally
目前它需要很长时间。
#persist(On count())-34 mins.
#each result(on firing each sql query)-around (2*8=16min toPandas() Op)
# #for each toPandas().to_csv() - around 2 min each
#for 3 query 16*3= 48min
#total 34+48 = 82 min ###Need optimization seriously
那么任何人都可以建议我如何优化上述过程以获得更好的性能(时间和内存两者。)
我担心的原因是:我在 Python-Pandas 平台(带有序列化 pickle 数据的 64Gb 单机)上执行上述操作,并且能够在 8-12 分钟内完成。随着我的数据量似乎在增长,因此需要采用像 spark 这样的技术。
提前致谢。 :)
【问题讨论】:
任何时候序列化到磁盘都会招致巨大的 I/O 损失。 Spark 的强大之处在于只使用内存来保持速度。您需要持久化到内存中,并确保您有足够的内存来处理数据。你还想确保你已经正确配置了你的环境......透明的大页面应该关闭,swappiness 设置为 0 或 1。 您的原始数据是只有您要查找的八个城市还是更多? 这是正确的吗?result.where('city_name'==city)
。这似乎要求result.where(False)
。你的意思是像result.where("city_name='%s'" % city)
这样的东西吗?
@StevenRumbalski-不,我还有很多城市(大约 90 个)。是的 result.where(result.city_name==city)。我已经更正了。
@tadamhicks-我如何设置“透明大页面应该关闭,swappiness 设置为 0 或 1”。这样做的变量是什么。请帮忙。你的评论是指,如果我这样做 df_merged.persists(StorageLevel.MEMORY_ONLY) ,我会获得性能提升吗???
【参考方案1】:
我认为您最好的选择是缩减源数据的大小。您提到您的源数据有 90 个城市,但您只对其中 8 个城市感兴趣。过滤掉您不想要的城市,并将您想要的城市保存在单独的 csv 文件中:
import itertools
import csv
city_list = [city1, city2,city3...total 8 cities]
with open('f1.csv', 'rb') as f1, open('f2.csv', 'rb') as f2:
r1, r2 = csv.reader(f1), csv.reader(f2)
header = next(r1)
next(r2) # discard headers in second file
city_col = header.index('city_name')
city_files = []
city_writers =
try:
for city in city_list:
f = open(city+'.csv', 'wb')
city_files.append(f)
writer = csv.writer(f)
writer.writerow(header)
city_writers[city] = writer
for row in itertools.chain(r1, r2):
city_name = row[city_col]
if city_name in city_writers:
city_writers[city_name].writerow(row)
finally:
for f in city_files:
f.close()
在遍历每个城市之后,为该城市创建一个 DataFrame,然后在嵌套循环中运行您的三个查询。每个 DataFrame 在内存中的适配应该没有问题,并且查询应该运行得很快,因为它们运行在一个小得多的数据集上。
【讨论】:
以上是关于在 pyspark 中处理大数据的优化的主要内容,如果未能解决你的问题,请参考以下文章