ConcurrentLinkedQueueFileAlterationObserverFileAlterationMonitor实现实时同步文件的功能
Posted 阿啄debugIT
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentLinkedQueueFileAlterationObserverFileAlterationMonitor实现实时同步文件的功能相关的知识,希望对你有一定的参考价值。
前言
文件同步,有很多种方法,如shell、python语言实现,但是用Java进行实现,可以更加的稳定,及维护,支持更大文件集的监控,备份,归档,复制等……
用Java进行实现,主要涉及到ConcurrentLinkedQueue、FileAlterationObserver、FileAlterationMonitor等核心类
SynFile主体类
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SynFile
private static final Logger log;
private static String srcPath;
private static String dstPath;
public static String oldPath;
private static String checkPath;
private static int MAX_FILE_COUNT;
private ConcurrentLinkedQueue<File> pendingQueue = new ConcurrentLinkedQueue();
private ConcurrentLinkedQueue<File> compressQueue = new ConcurrentLinkedQueue();
public static void main(String[] args) throws Exception
SynFile test = new SynFile();
test.watch();
test.syncDetentionFiles();
private void syncDetentionFiles()
private void watch() throws Exception
static
log = LoggerFactory.getLogger(SynFile.class);
srcPath = "/home/tmpuser/synFile/";
dstPath = "/home/tmpuser/synFileAlter/work/";
oldPath = "/home/tmpuser/synFileAlter/old/";
checkPath = "/home/tmpuser/synFileAlter/bak/";
MAX_FILE_COUNT = 20000;
主体类中,让目录进行静态化,提前加载,初始化;并声明两个队列,追加与压缩队列
watch方法
private void watch() throws Exception
FileFilter filter = FileFilterUtils.and(new IOFileFilter[] new MyFileFilter() );
FileAlterationObserver fileAlterationObserver = new FileAlterationObserver(srcPath, filter);
fileAlterationObserver.addListener(new FileAlterationListenerAdaptor()
public void onDirectoryCreate(File directory)
if (SynFile.log.isInfoEnabled())
SynFile.log.info("onDirectoryCreate");
super.onDirectoryCreate(directory);
public void onDirectoryDelete(File directory)
if (SynFile.log.isInfoEnabled())
SynFile.log.info("onDirectoryDelete");
super.onDirectoryDelete(directory);
public void onFileChange(File file)
if (SynFile.log.isInfoEnabled())
SynFile.log.info("onFileChange");
super.onFileChange(file);
public void onFileCreate(File file)
SynFile.log.info("onFileCreate ", file.getAbsoluteFile());
SynFile.this.pendingQueue.add(file);
super.onFileCreate(file);
public void onFileDelete(File file)
if (SynFile.log.isInfoEnabled())
SynFile.log.info("onFileDelete");
super.onFileDelete(file);
public void onStart(FileAlterationObserver observer)
SynFile.log.info("onStart");
super.onStart(observer);
while (!SynFile.this.pendingQueue.isEmpty())
File item = (File)SynFile.this.pendingQueue.poll();
FileUtil.copy(item, new File(SynFile.dstPath + item.getName()));
SynFile.this.compressQueue.add(item);
if (SynFile.this.compressQueue.size() > SynFile.MAX_FILE_COUNT)
FileUtil.compress(SynFile.this.compressQueue, SynFile.oldPath + DateUtil.getCurrentDate("yyyyMMddHHmmss") + ".zip");
);
FileAlterationMonitor filealterationMonitor = new FileAlterationMonitor(1000L);
filealterationMonitor.addObserver(fileAlterationObserver);
filealterationMonitor.start();
FileFilter filter = FileFilterUtils.and(new IOFileFilter[] new MyFileFilter() );引入自定义的filter
import java.io.File;
import org.apache.commons.io.filefilter.IOFileFilter;
class MyFileFilter
implements IOFileFilter
public boolean accept(File file)
return true;
public boolean accept(File dir, String name)
return true;
FileAlterationObserver(srcPath, filter);通过fliter进行观察监听目标文件夹,并与FileAlterationListenerAdaptor组合,进行文件夹动态监听,及触发事件,再注入到FileAlterationMonitor构造方法中,进行自动监视,及开启监视线程……
syncDetentionFiles方法
private void syncDetentionFiles()
log.info("sync Detention Files Start...");
File srcDir = new File(srcPath);
if ((!srcDir.exists()) || (!srcDir.isDirectory()))
log.error("srcPath : is not exist!", srcPath);
else
log.info("srcPath : exist! Scan files ...", srcPath);
File[] srcFileArray = srcDir.listFiles();
assert (srcFileArray != null);
log.info("srcPath : contains files", srcPath, Integer.valueOf(srcFileArray.length));
for (File file : srcFileArray)
String fileName = file.getName();
if (!fileName.contains("_"))
log.error("**********文件: 没有时间戳**********", fileName);
else
int index = fileName.lastIndexOf("_");
String fileDate = fileName.substring(index + 1, index + 1 + 8);
String checkDir = checkPath + fileDate + "/";
if ((!FileUtil.isFileExist(checkDir + fileName)) && (!FileUtil.isFileExist(dstPath + fileName)))
log.info("File: is neither in dstPath , nor in checkDir ", new Object[] fileName, dstPath, checkDir );
FileUtil.copy(file, new File(dstPath + fileName));
log.info("File: copy to ", fileName, dstPath + fileName);
this.compressQueue.add(file);
就是普通的方法,压缩队列中,若是有文件,则把文件进行转移复制;若是没有,则往队列中压入文件
以上是关于ConcurrentLinkedQueueFileAlterationObserverFileAlterationMonitor实现实时同步文件的功能的主要内容,如果未能解决你的问题,请参考以下文章