ConcurrentLinkedQueueFileAlterationObserverFileAlterationMonitor实现实时同步文件的功能

Posted 阿啄debugIT

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentLinkedQueueFileAlterationObserverFileAlterationMonitor实现实时同步文件的功能相关的知识,希望对你有一定的参考价值。

前言

文件同步,有很多种方法,如shell、python语言实现,但是用Java进行实现,可以更加的稳定,及维护,支持更大文件集的监控,备份,归档,复制等……

用Java进行实现,主要涉及到ConcurrentLinkedQueue、FileAlterationObserver、FileAlterationMonitor等核心类

SynFile主体类

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SynFile

  private static final Logger log;
  private static String srcPath;
  private static String dstPath;
  public static String oldPath;
  private static String checkPath;
  private static int MAX_FILE_COUNT;
  private ConcurrentLinkedQueue<File> pendingQueue = new ConcurrentLinkedQueue();
  private ConcurrentLinkedQueue<File> compressQueue = new ConcurrentLinkedQueue();

  public static void main(String[] args) throws Exception 
    SynFile test = new SynFile();
    test.watch();
    test.syncDetentionFiles();
  

  private void syncDetentionFiles() 
   
  

  private void watch() throws Exception 
    
  

  static
  
    log = LoggerFactory.getLogger(SynFile.class);
    srcPath = "/home/tmpuser/synFile/";
    dstPath = "/home/tmpuser/synFileAlter/work/";
    oldPath = "/home/tmpuser/synFileAlter/old/";
    checkPath = "/home/tmpuser/synFileAlter/bak/";
    MAX_FILE_COUNT = 20000;
  

主体类中,让目录进行静态化,提前加载,初始化;并声明两个队列,追加与压缩队列

watch方法

private void watch() throws Exception 
    FileFilter filter = FileFilterUtils.and(new IOFileFilter[]  new MyFileFilter() );
    FileAlterationObserver fileAlterationObserver = new FileAlterationObserver(srcPath, filter);
    fileAlterationObserver.addListener(new FileAlterationListenerAdaptor()
    
      public void onDirectoryCreate(File directory)
      
        if (SynFile.log.isInfoEnabled()) 
          SynFile.log.info("onDirectoryCreate");
        
        super.onDirectoryCreate(directory);
      

      public void onDirectoryDelete(File directory)
      
        if (SynFile.log.isInfoEnabled()) 
          SynFile.log.info("onDirectoryDelete");
        
        super.onDirectoryDelete(directory);
      

      public void onFileChange(File file)
      
        if (SynFile.log.isInfoEnabled()) 
          SynFile.log.info("onFileChange");
        
        super.onFileChange(file);
      

      public void onFileCreate(File file)
      
        SynFile.log.info("onFileCreate ", file.getAbsoluteFile());
        SynFile.this.pendingQueue.add(file);
        super.onFileCreate(file);
      

      public void onFileDelete(File file)
      
        if (SynFile.log.isInfoEnabled()) 
          SynFile.log.info("onFileDelete");
        
        super.onFileDelete(file);
      

      public void onStart(FileAlterationObserver observer)
      
        SynFile.log.info("onStart");
        super.onStart(observer);
        while (!SynFile.this.pendingQueue.isEmpty()) 
          File item = (File)SynFile.this.pendingQueue.poll();
          FileUtil.copy(item, new File(SynFile.dstPath + item.getName()));
          SynFile.this.compressQueue.add(item);
        
        if (SynFile.this.compressQueue.size() > SynFile.MAX_FILE_COUNT)
          FileUtil.compress(SynFile.this.compressQueue, SynFile.oldPath + DateUtil.getCurrentDate("yyyyMMddHHmmss") + ".zip");
      
    );
    FileAlterationMonitor filealterationMonitor = new FileAlterationMonitor(1000L);
    filealterationMonitor.addObserver(fileAlterationObserver);
    filealterationMonitor.start();
  

FileFilter filter = FileFilterUtils.and(new IOFileFilter[] new MyFileFilter() );引入自定义的filter

import java.io.File;
import org.apache.commons.io.filefilter.IOFileFilter;

class MyFileFilter
  implements IOFileFilter

  public boolean accept(File file)
  
    return true;
  

  public boolean accept(File dir, String name)
  
    return true;
  

FileAlterationObserver(srcPath, filter);通过fliter进行观察监听目标文件夹,并与FileAlterationListenerAdaptor组合,进行文件夹动态监听,及触发事件,再注入到FileAlterationMonitor构造方法中,进行自动监视,及开启监视线程……

syncDetentionFiles方法

private void syncDetentionFiles() 
    log.info("sync Detention Files Start...");
    File srcDir = new File(srcPath);
    if ((!srcDir.exists()) || (!srcDir.isDirectory())) 
      log.error("srcPath : is not exist!", srcPath);
     else 
      log.info("srcPath : exist! Scan files ...", srcPath);
      File[] srcFileArray = srcDir.listFiles();
      assert (srcFileArray != null);
      log.info("srcPath : contains  files", srcPath, Integer.valueOf(srcFileArray.length));

      for (File file : srcFileArray) 
        String fileName = file.getName();
        if (!fileName.contains("_")) 
          log.error("**********文件: 没有时间戳**********", fileName);
         else 
          int index = fileName.lastIndexOf("_");
          String fileDate = fileName.substring(index + 1, index + 1 + 8);
          String checkDir = checkPath + fileDate + "/";
          if ((!FileUtil.isFileExist(checkDir + fileName)) && (!FileUtil.isFileExist(dstPath + fileName))) 
            log.info("File:  is neither in dstPath , nor in checkDir ", new Object[]  fileName, dstPath, checkDir );
            FileUtil.copy(file, new File(dstPath + fileName));
            log.info("File:  copy to ", fileName, dstPath + fileName);
          
        
        this.compressQueue.add(file);
      
    
  

就是普通的方法,压缩队列中,若是有文件,则把文件进行转移复制;若是没有,则往队列中压入文件

 

 

以上是关于ConcurrentLinkedQueueFileAlterationObserverFileAlterationMonitor实现实时同步文件的功能的主要内容,如果未能解决你的问题,请参考以下文章