运维大数据之某银行flume性能调优及数据重复问题解决
Posted 中亦安图
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了运维大数据之某银行flume性能调优及数据重复问题解决相关的知识,希望对你有一定的参考价值。
且看中亦科技
如何帮助客户玩转运维大数据
案例概述
某大型国有银行总行客户在flume源码基础上定制开发实现交易实时监控,投产版本出现系统log无法打开和采集数据重复的情况。双十一前夕,中亦大数据产品团队工程师赴客户现场查看分析问题,提出初步的改进建议并测试实现,有效的保障系统在双十一期间平稳运行,提高数据采集的准确性和完整性。
问题描述
flume系统采集主机每天产生2000-3000个日志文件,文件保留周期为30天,监控总文件数在8w左右,系统log偶尔无法正常打出,且采集的数据有重复。
问题分析及建议
1
架构设计需要优化
flume的设计主要用于文件的采集和传输,在flume中添加大量数据处理的逻辑违背了它的设计初衷;数据的处理我们建议放到spark中,它为我们提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)数据集和数据源(批量数据或实时的流数据)的大数据;
2
为保障双十一期间系统平稳运行,避免现有结构发生大的改动,针对系统数据重复和系统log日志无法正常打出的情况我们作出如下分析和建议:
1):系统log日志偶尔无法正常打出可能跟PollableSource的process调度周期有关,建议添加时间log,查看process实际处理时间,并结合该时间配置最佳的idleTimeOut;还可能因为source和filegroups配置不合理,建议在模拟生产环境下,测试不同source和filegroup配置时的资源消耗及运行情况,找出最合理的配置方式;
代码示例:
TaildirSource.java
public Status process() {
Status status = Status.READY;
try {
//获取轮询开始时间戳
processStartTime = System.currentTimeMillis();
//输出开始时间戳到日志
logger.info("processStartTime:"+switchToDate(processStartTime));
existingInodes.clear();
existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
if (tf.needTail()) {
tailFileProcess(tf, true);
}
}
closeTailFiles();
try {
TimeUnit.MILLISECONDS.sleep(retryInterval);
} catch (InterruptedException e) {
logger.info("Interrupted while sleeping");
}
} catch (Throwable t) {
logger.error("Unable to tail files", t);
status = Status.BACKOFF;
}
//获取轮询结束时间戳
processEndTime = System.currentTimeMillis();
//输出结束时间戳到日志
logger.info("processEndTime:"+switchToDate(processEndTime));
//输出时间间隔到日志
infoProTimeInterval(processStartTime,processEndTime);
return status;
}
//轮询时间戳转换为日期
public String switchToDate(long currentTime){
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MMdd-HHmm ss");
Date date = new Date(currentTime);
String dateStr = formatter.format(date);
return dateStr;
}
//计算时间差并输出到日志
public void infoProTimeInterval(long sTime,long eTime){
long interval = (eTime - sTime);
logger.info("processInterval:"+interval+"ms");
}
2):数据重复可能是由于过期不更新文件导致,建议在Flume首次启动时增加时间过滤并作为配置项写入配置文件,排除过期不更新的文件。
代码示例:
TaildirSourceConfigurationConstants.java
public static final String EXPIRED_TIME_OUT_ENABLED = "expiredTimeOutEnabled";
public static final boolean DEFAULT_EXPIRED_TIME_OUT_ENABLED = false;
public static final String EXPIRED_TIME_OUT = "expiredTimeOut";
public static final String DEFAULT_EXPIRED_TIME_OUT = "3650:0:0:0";
TaildirSource.java
expiredTimeOutEnabled = context.getBoolean(EXPIRED_TIME_OUT_ENABLED, DEFAULT_EXPIRED_TIME_OUT_ENABLED);
if (expiredTimeOutEnabled) {
String expiredTimeOutString = context.getString(EXPIRED_TIME_OUT, DEFAULT_EXPIRED_TIME_OUT);
String[] timeArray = expiredTimeOutString.split(":");
if (timeArray.length != 4) {
logger.error("expiredTimeOutString is inValid:" + expiredTimeOutString);
} else {
Integer d = Integer.parseInt(timeArray[0]) * 24 * 60 * 60 * 1000;
Integer H = Integer.parseInt(timeArray[1]) * 60 * 60 * 1000;
Integer m = Integer.parseInt(timeArray[2]) * 60 * 1000;
Integer s = Integer.parseInt(timeArray[3]) * 1000;
expiredTimeOut = d + H + m + s;
}
}
TaildirMatcher.java
private List<File> getMatchingFilesNoCache(long now) {
List<File> result = Lists.newArrayList();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) {
for (Path entry : stream) {
long entryLastModifiedTime = Files.getLastModifiedTime(entry).toMillis();
if (!expiredMatchExcluded || entryLastModifiedTime + expiredTimeout > now ) {
result.add(entry.toFile());
}
}
} catch (IOException e) {
logger.error("I/O exception occurred while listing parent directory. "+ "Files already matched will be returned. " + parentDir.toPath(), e);
}
return result;
通过对现场环境的具体测试分析,中亦科技大数据工程师给出了如下建议:
1)batchSize由100调为1000时系统资源消耗差别不大,当日志产生速度较快的情况下建议适当增大batchSize,如设为1000或更大;
2)通过对单个process时间间隔(processInterval)的监控,依据记录的processInterval值,建议idleTimeout时间设为process -Interval稍大的值;
增加source个数可以增大Soure读取数据的能力,建议根据生产环境实际情况配置多个source;适当增加filegroup的个数也可以减轻系统的压力,降低资源消耗,可根据实际需要增加filegroup的个数。
问题解决情况
1
针对性能问题,客户参考中亦大数据产品团队工程师的配置建议,对生产环境batchSize、idleTimeout等配置参数作了调整,并增加了source的个数,通过监控运行结果表明系统log打不出的问题不再发生。
2
针对数据重复问题,客户采纳中亦大数据产品团队工程师的建议,在flume首次启动时增加时间过滤并作为配置项写入配置文件,排除过期不更新的文件,大大降低了数据重复问题发生的概率,同时也提高了source读取文件的能力。
咨询问题回复
问题咨询一:cachePatternMatching的配置是否会对系统性能产生影响?
TaildirSource之所以能做到实时地采集文件变化的内容是因为每隔几秒它就遍历一次所要采集的文件,然后和内存中缓存的文件信息作对比,如果有变化则采集更新后的内容。现在假设内存中没有缓存匹配文件列表和这些文件对应的信息,那么对于包含成千上万个文件的目录,每次都要在遍历文件后开启线程写入磁盘既耗费资源又影响效率。而cachePatternMatching设置正是控制reader在getMatchingFiles时是否在内存里缓存匹配文件的列表及信息,可见该配置项的设置对系统的性能有很大的影响。cachePatternMatching设置为true,系统不仅可以缓存匹配文件的列表,还可以缓存文件被消耗的顺序,大大提高了系统的性能。
当然,任何事物都不是绝对的,cachePatternMatching的最终设置还应该视具体应用场景而定:当日志产生速度快但文件切换不快时我们建议设置为true;当日志产生速度不快而文件切换较快时可设置为false;默认建议设置为true,这样只有当文件新增或删除时系统才会调用遍历文件夹。
问题咨询二:文件内容改变时TaildirSource中getMatching -Files方法中时间判断的逻辑会走吗?
当配置cachePatternMatching为true,如果没有新增或者删除文件,仅文件内容改变时不会走那段时间判断的逻辑,将直接返回缓存的文件列表;如果有文件新增或者删除,则系统调用,重新获取匹配的文件列表;当配置cachePatternMatching为false,或最近一次check文件目录的时间小于该文件目录最近一次modified的时间时系统也会调用时间判断,重新获取匹配的文件列表。
问题咨询三:process调度周期可以调整吗,如果可以,要怎么调整?
process调度周期可以调整。通过修改updateTailFiles的相关逻辑即可实现,也可以通过查看process的执行时间对相关配置项作出调整,如idleTimeout。具体做法是在process开始和结束的时候分别取系统时间,通过log打出,然后分析大数量时的执行情况并对配置项作出调整。我们团队在研发过程中就曾这样做过,当时查看log发现文件有频繁打开和关闭的情况,判断可能是idletimeout设置时间较短,而process执行时间过长导致的。为了验证判断,我们依次递增延长idletimout的时间,并通过log查看文件是否频繁打开关闭,结果表明判断基本正确。
问题咨询四:文件更名频繁有可能造成跟踪文件遗漏或数据重复,这种问题该如何解决?
这个问题需要针对实际运行的时间情况来判断。例如,process的执行时间远小于日志文件轮转的时间,那么不会存在文件遗漏或者顺序错误的情况;相反,如果process的执行时间过长,在process执行期间,频繁发生同一组日志文件的轮转,则有可能引起监控的问题,即在process开始获取的文件列表,和文件inode,到process执行中途的时候,已经和文件系统中实际的文件列表以及文件inode不相匹配了。因此还是建议log查看一下process的实际执行时间是多久,时间越久,越大的概率产生该问题。至于解决方法,我们可以在每次process拿到文件后先判断inode是否是已经存在的,如果inode存在我们认为文件已经存在,直接把它从缓存列表中移除。
问题咨询五:实例化TaiFile之后,openFlie之前,文件突然更名,此时打开的文件是更名之前还是更名之后的?
实例化TailFile的构造函数里,已经open了file,所以只要文件不被关闭,文件句柄是指向inode对应的文件的。但是如果process执行时间过长,在process开始的时候,虽然TailFile Open了,但是排在后面的文件当处理该文件的时候,文件可能已经被重命名,甚至已经从列表中移除了。但是因为文件句柄是hold住的,所以文件的inode不变,但是文件名可能已经发生了变化。这个时候,意味着TailFile里文件名和inode是不匹配的。如果不关闭,沿用之前的句柄,则处理最初的文件;如果关闭,并通过原文件名打开,则实际上打开的TailFile是新的文件,inode会发生变化。
问题咨询六:日志文件种类和数量过多且存在于多个目录下,单个source和filegroup读取监控文件的压力过大,这个问题如何解决?
增加source个数(使用TaildirSource时可增加filegroup个数)可以增大soure的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个source 以保证soure有足够的能力获取到新产生的数据。batchSize参数决定source一次批量运输到channel的event条数,适当调大这个参数可提高soure搬运event到channel时的性能。另外适当增加filegroup的个数也可以减轻系统的压力,降低资源消耗,因此我们可以根据生产环境的数据情况对source种类、数量和filegroup数量加以调整。
中亦科技
更多大数据运维的实战案例,且听我们下回分解。
北京中亦安图科技股份有限公司
长按识别左侧二维码,关注“中亦科技”,了解更多内容!
以上是关于运维大数据之某银行flume性能调优及数据重复问题解决的主要内容,如果未能解决你的问题,请参考以下文章
HBase写入性能改造(续)--MemStoreflushcompact参数调优及压缩卡的使用
优化技术专题「系统性能调优实战」终极关注应用系统性能调优及原理剖析(下册)