记一次调优过程—Spark读取OBS文件入ES
Posted 追求无悔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了记一次调优过程—Spark读取OBS文件入ES相关的知识,希望对你有一定的参考价值。
需求描述:
读取OBS中的文件(文件格式为snappy压缩格式),解析数据,并将数据插入ES的过程。
数据规模:
单个目录下的文件个数最大上限为500个,每个文件压缩后为115MB,压缩比为5.5,每条数据大小为600B-1KB。
目录格式:/data/xxx/userId/yyyyMMdd
存入ES对应索引:xxx-userId-yyyyMMdd
实现过程
1. 初始化
3. 存入ES
具体插入操作:
public void batchPutJson(String index, List<String> jsonList) throws EsOperateException
try
RestHighLevelClient highClient = getHighLevelClient();
BulkRequest bulkRequest = new BulkRequest();
if (jsonList == null)
return;
int num = 0;
for (String data: jsonList)
num++;
if (num % BATCH_SIZE <= 0)
highClient.bulk(bulkRequest, RequestOptions.DEFAULT);
bulkRequest = new BulkRequest();
else
bulkRequest.add(new IndexRequest(index.toLowerCase(Locale.ROOT)).source(data, XContentType.JSON));
// 如果刚好bulkRequest没有请求,则不需要执行插入操作
if (bulkRequest.numberOfActions() > 0)
// 避免异常:ActionRequestValidationException: Validation Failed: 1: no requests added
highClient.bulk(bulkRequest, RequestOptions.DEFAULT);
catch (IOException | EsConnectException ex)
log.error("batch put json failed: ", ex);
throw new EsOperateException("batch put exception: " + ex.getMessage());
测试过程
1. ES模板事先已创建好
副本数为1,分片数为5,刷新间隔为10s
2. 索引为插入数据时自动创建
3. OBS文件
文件个数6个,单个大小115MB,数据量大约为666万条
4. 测试结果
耗时为整个任务的耗时,任务包含其他初始化过程和任务停止过程
- 不设置Repartition,1个Executor,ExecutorCPU为1U,Executor内存为2G,ES单次插入1w条,总耗时14min;
- 不设置Repartition,1个Executor,ExecutorCPU为1U,Executor内存为2G,ES单次插入4w条,总耗时17min;
- 不设置Repartition,2个Executor,ExecutorCPU为1U,Executor内存为2G,ES单次插入1w条,总耗时10min;
- 不设置Repartition,5个Executor,ExecutorCPU为1U,Executor内存为1G,ES单次插入2w条,总耗时6.5min;
优化过程
看着这个耗时时间,不禁想到,如果按照最大规格,那不是得等死。。。
开始了优化之路1. 任务耗时分析
发现任务的最大耗时之处在于这一句:dataset.toJSON().toJavaRDD() .mapPartitions(new ColdToHotFunction(this.owner, date)) .foreach((VoidFunction<String>) str -> log.info("data to es result is: ", str));
都知道,文件读取结果dataset,真正有结果是在这一句,所以这一句包含两个步骤,1是读取文件,2是存入ES,那到底是哪一步耗时呢?
通过进入任务详情查看,还是不太确定耗时大的真正原因。2. 尝试
认为是读文件和数据转化过程慢:
将Repartition设置为6,3个Executor,ExecutorCPU为2U,Executor内存为1G,ES单次插入2w条,最后总耗时还是需要6min左右,并没有实质性提高。
Executor数量 * Executor CPU >= Repartition时才能保证单次任务可以充分执行。
既然不是文件读取,那就是插入ES慢?(枉我一开始这么信任ES)
增加插入耗时打印:
emmm... 原来就是你啊,每次插入2w条,这么慢的嘛!!!
3. 优化
ES服务端已经没法改变了,那只能从业务侧进行优化了
我已经采用了批量插入,还有什么优化策略呢???
最终通过以下两点:
- 一开始刷新间隔时间为10s,改为刷新间隔60s,因为没有必要刷新那么频繁;
- 原来是在插入数据的时候自动创建的索引,改为插入数据前,先手动创建索引,且设置副本数为0,在整个过程结束之后,再将副本数更新为1。因为副本数会导致每次插入数据会插多遍,很影响速率。
优化结果检查
经过上面的优化后,再测试一下性能:
- Repartition设为10,3个Executor,ExecutorCPU为2U,Executor内存为1G,ES单次插入2w条,总耗时4min;
- Repartition设为6,3个Executor,ExecutorCPU为2U,Executor内存为1G,ES单次插入2w条,总耗时3.6min;
此种方式,在数据量越大的时候,效果应该就越明显了。
出于公司资源有限的情况考虑,Executor数量 * Executor CPU >= Repartition时能最大发挥多线程优势,所以采用了第二种方式。
心得
此次调优过程看了很多优化方面的博客,因为任务所做的事情简单,没有涉及到太多的可优化点,但是感觉还有很大的优化空间。
作为一个小菜鸡,能通过自己慢慢摸索,测试完成效率提升4倍,也是很开心的事情!!!
如果大佬们有好的建议,希望留言,我好继续进行优化,感谢!!!
以上是关于记一次调优过程—Spark读取OBS文件入ES的主要内容,如果未能解决你的问题,请参考以下文章
17 记一次 spark 读取大数据表 OOM OutOfMemoryError: GC overhead limit exceeded
17 记一次 spark 读取大数据表 OOM OutOfMemoryError: GC overhead limit exceeded