在 pyspark 中处理大数据的优化

Posted

技术标签:

【中文标题】在 pyspark 中处理大数据的优化【英文标题】:optimization for processing big data in pyspark 【发布时间】:2016-10-17 15:05:03 【问题描述】:

不是问题->需要建议

我在 20gb+6gb=26Gb csv 文件上运行 1+3(1-master,3-slave(每个 16gb RAM)。

这就是我的操作方式

df = spark.read.csv() #20gb
df1 = spark.read.csv() #6gb
df_merged= df.join(df1,'name','left') ###merging 
df_merged.persists(StorageLevel.MEMORY_AND_DISK) ##if i do MEMORY_ONLY will I gain more performance?????
print('No. of records found: ',df_merged.count())  ##just ensure persist by calling an action
df_merged.registerTempTable('table_satya')
query_list= [query1,query2,query3]  ###sql query string to be fired
city_list = [city1, city2,city3...total 8 cities]
file_index=0 ###will create files based on increasing index
for query_str in query_list:
   result = spark.sql(query_str) #ex: select * from table_satya where date >= '2016-01-01'
   #result.persist()  ###willit increase performance
   for city in city_list:
        df_city = result.where(result.city_name==city)
        #store as csv file(pandas style single file)
        df_city.collect().toPandas().to_csv('file_'+str(file_index)+'.csv',index=False)
        file_index += 1

df_merged.unpersist()  ###do I even need to do it or Spark can handle it internally

目前它需要很长时间。

#persist(On count())-34 mins.
#each result(on firing each sql query)-around (2*8=16min toPandas() Op)
#          #for each toPandas().to_csv() - around 2 min each
#for 3 query 16*3= 48min
#total 34+48 = 82 min  ###Need optimization seriously

那么任何人都可以建议我如何优化上述过程以获得更好的性能(时间和内存两者。)

我担心的原因是:我在 Python-Pandas 平台(带有序列化 pickle 数据的 64Gb 单机)上执行上述操作,并且能够在 8-12 分钟内完成。随着我的数据量似乎在增长,因此需要采用像 spark 这样的技术。

提前致谢。 :)

【问题讨论】:

任何时候序列化到磁盘都会招致巨大的 I/O 损失。 Spark 的强大之处在于只使用内存来保持速度。您需要持久化到内存中,并确保您有足够的内存来处理数据。你还想确保你已经正确配置了你的环境......透明的大页面应该关闭,swappiness 设置为 0 或 1。 您的原始数据是只有您要查找的八个城市还是更多? 这是正确的吗? result.where('city_name'==city)。这似乎要求result.where(False)。你的意思是像result.where("city_name='%s'" % city) 这样的东西吗? @StevenRumbalski-不,我还有很多城市(大约 90 个)。是的 result.where(result.city_name==city)。我已经更正了。 @tadamhicks-我如何设置“透明大页面应该关闭,swappiness 设置为 0 或 1”。这样做的变量是什么。请帮忙。你的评论是指,如果我这样做 df​​_merged.persists(StorageLevel.MEMORY_ONLY) ,我会获得性能提升吗??? 【参考方案1】:

我认为您最好的选择是缩减源数据的大小。您提到您的源数据有 90 个城市,但您只对其中 8 个城市感兴趣。过滤掉您不想要的城市,并将您想要的城市保存在单独的 csv 文件中:

import itertools
import csv

city_list = [city1, city2,city3...total 8 cities]

with open('f1.csv', 'rb') as f1, open('f2.csv', 'rb') as f2:
    r1, r2 = csv.reader(f1), csv.reader(f2)
    header = next(r1)
    next(r2) # discard headers in second file
    city_col = header.index('city_name')
    city_files = []
    city_writers = 
    try:
    for city in city_list:
            f = open(city+'.csv', 'wb')
            city_files.append(f)
            writer = csv.writer(f)
            writer.writerow(header)
            city_writers[city] = writer
        for row in itertools.chain(r1, r2):
            city_name = row[city_col]
            if city_name in city_writers:
                city_writers[city_name].writerow(row)
    finally:
        for f in city_files:
            f.close()

在遍历每个城市之后,为该城市创建一个 DataFrame,然后在嵌套循环中运行您的三个查询。每个 DataFrame 在内存中的适配应该没有问题,并且查询应该运行得很快,因为它们运行在一个小得多的数据集上。

【讨论】:

以上是关于在 pyspark 中处理大数据的优化的主要内容,如果未能解决你的问题,请参考以下文章

pyspark玩转大数据

如何在 pyspark 中读取大的 zip 文件

Python+大数据学习笔记

Pyspark基础使用

PHP-大数据量怎么处理优化

采用spring batch 处理大数据量,瓶颈在数据库吞吐量时,该如何优化?