Java:监视目录以移动大文件

Posted

技术标签:

【中文标题】Java:监视目录以移动大文件【英文标题】:Java: Watching a directory to move large files 【发布时间】:2011-03-23 02:44:21 【问题描述】:

我一直在编写一个监视目录的程序,当在其中创建文件时,它会更改名称并将它们移动到新目录。在我的第一个实现中,我使用了 Java 的 Watch Service API,它在我测试 1kb 文件时运行良好。出现的问题是,实际上创建的文件大小在 50-300mb 之间。发生这种情况时,观察者 API 会立即找到该文件,但无法移动它,因为它仍在写入中。我尝试将观察者置于一个循环中(在可以移动文件之前会产生异常),但这似乎效率很低。

由于这不起作用,我尝试使用一个计时器,该计时器每 10 秒检查一次文件夹,然后尽可能移动文件。这是我最终采用的方法。

问题:在没有进行异常检查或不断比较大小的情况下,是否会在文件写入完成时发出信号?我喜欢对每个文件只使用一次 Watcher API 的想法,而不是不断地使用计时器检查(并遇到异常)。

非常感谢所有回复!

nt

【问题讨论】:

I tried putting the watcher in a loop (which generated exceptions until the file could be moved) but this seemed pretty inefficient. 是的,这是一个糟糕的解决方案。管理控制流不例外。 可悲的是@ntmp,从我到目前为止的测试来看,寻找异常是判断操作系统仍在“写入”或“复制”文件的最佳方式。但我同意@Sean Patrick Floyd 的观点,这是一种让它发挥作用的糟糕方法。我个人希望检查是 java.io.File API 的一部分。不知道为什么没有。将留给 JVM 人员来实现并使我们的开发人员更容易...... “检查异常”方法甚至不适用于 UNIX,因为 UNIX 文件系统不会锁定正在写入的文件。在 UNIX 上,java 会愉快地移动部分写入的文件,从而导致数据损坏。 【参考方案1】:

写入另一个文件作为原始文件已完成的指示。 如果完成创建文件 'fileorg.done' 并检查,ig 'fileorg.dat' 正在增长 仅适用于“fileorg.done”。

使用巧妙的命名约定,您应该不会有任何问题。

【讨论】:

【参考方案2】:

两种解决方案:

第一个是the answer by stacker的细微变化:

对不完整的文件使用唯一的前缀。类似于myhugefile.zip.inc 而不是myhugefile.zip。上传/创建完成后重命名文件。从手表中排除 .inc 文件。

第二种是使用同一驱动器上的不同文件夹来创建/上传/写入文件,并在准备好后将它们移动到监视文件夹。如果它们在同一个驱动器上,移动应该是一个原子操作(我猜是依赖于文件系统)。

无论哪种方式,创建文件的客户端都必须做一些额外的工作。

【讨论】:

问题是我对创建文件的客户端几乎没有控制权。我无法添加唯一前缀。我也可以指定写入文件的文件夹,但我不能告诉客户端在完成写入后将它们移动到另一个文件夹。 @ntmp 你有没有解决这个问题,请分享给我,因为我也面临同样的问题【参考方案3】:

我推测 java.io.File.canWrite() 会告诉你文件何时完成写入。

【讨论】:

我尝试了一个快速测试,一个线程写入文件,而另一个线程检查 canWrite() 方法,但它总是返回 true。 实际上我相信它只是检查操作系统以查看您是否有写入权限。从安全的角度来看,您可能有权限,但从等待它完成写入的角度来看,您可能没有权限。【参考方案4】:

这是一个非常有趣的讨论,当然这是一个生死攸关的用例:等待一个新文件被创建,然后以某种方式对该文件作出反应。这里的竞争条件很有趣,因为这里的高级要求当然是获取一个事件,然后实际获得(至少)文件的读锁。对于大文件或只是创建大量文件,这可能需要整个工作线程池,它们只是定期尝试锁定新创建的文件,并且当它们成功时,实际完成工作。但我确信 NT 意识到,必须谨慎地执行此操作以使其可扩展,因为它最终是一种轮询方法,而可扩展性和轮询并不是两个可以很好地结合在一起的词。

【讨论】:

【参考方案5】:

我今天遇到了同样的问题。我的用例在实际导入文件之前有一点延迟并不是什么大问题,我仍然想使用 NIO2 API。我选择的解决方案是等到文件没有被修改 10 秒后再对其执行任何操作。

实现的重要部分如下。程序一直等待,直到等待时间到期或发生新事件。每次修改文件时都会重置过期时间。如果一个文件在等待时间到期之前被删除,它将从列表中删除。我使用poll方法,超时时间为预期过期时间,即(lastmodified+waitTime)-currentTime

private final Map<Path, Long> expirationTimes = newHashMap();
private Long newFileWait = 10000L;

public void run() 
    for(;;) 
        //Retrieves and removes next watch key, waiting if none are present.
        WatchKey k = watchService.take();

        for(;;) 
            long currentTime = new DateTime().getMillis();

            if(k!=null)
                handleWatchEvents(k);

            handleExpiredWaitTimes(currentTime);

            // If there are no files left stop polling and block on .take()
            if(expirationTimes.isEmpty())
                break;

            long minExpiration = min(expirationTimes.values());
            long timeout = minExpiration-currentTime;
            logger.debug("timeout: "+timeout);
            k = watchService.poll(timeout, TimeUnit.MILLISECONDS);
        
    


private void handleExpiredWaitTimes(Long currentTime) 
    // Start import for files for which the expirationtime has passed
    for(Entry<Path, Long> entry : expirationTimes.entrySet()) 
        if(entry.getValue()<=currentTime) 
            logger.debug("expired "+entry);
            // do something with the file
            expirationTimes.remove(entry.getKey());
        
    


private void handleWatchEvents(WatchKey k) 
    List<WatchEvent<?>> events = k.pollEvents();
    for (WatchEvent<?> event : events) 
        handleWatchEvent(event, keys.get(k));
    
    // reset watch key to allow the key to be reported again by the watch service
    k.reset();


private void handleWatchEvent(WatchEvent<?> event, Path dir) throws IOException 
    Kind<?> kind = event.kind();

    WatchEvent<Path> ev = cast(event);
        Path name = ev.context();
        Path child = dir.resolve(name);

    if (kind == ENTRY_MODIFY || kind == ENTRY_CREATE) 
        // Update modified time
        FileTime lastModified = Attributes.readBasicFileAttributes(child, NOFOLLOW_LINKS).lastModifiedTime();
        expirationTimes.put(name, lastModified.toMillis()+newFileWait);
    

    if (kind == ENTRY_DELETE) 
        expirationTimes.remove(child);
    

【讨论】:

此线程中的最佳答案 - 现在是 2013 年,他们已经在 J​​ava 中修复了这个问题,还是仍然需要使用这样的代码? 方法 handleExpiredWaitTimes 在迭代时删除条目,所以应该使用迭代器。【参考方案6】:

当我实现一个文件系统观察器来传输上传的文件时,我不得不处理类似的情况。我为解决这个问题而实施的解决方案包括以下内容:

1-首先,维护一个未处理文件的Map(只要文件还在被复制,文件系统就会产生Modify_Event,如果flag为false,你可以忽略它们)。

2- 在你的文件处理器中,你从列表中取出一个文件并检查它是否被文件系统锁定,如果是,你会得到一个异常,只需捕获这个异常并将你的线程置于等待状态(即 10 秒)然后重试,直到锁被释放。处理完文件后,您可以将标志更改为 true 或将其从地图中移除。

如果在等待时间段内传输同一文件的多个版本,此解决方案将效率不高。

干杯, 拉姆齐

【讨论】:

【参考方案7】:

虽然在 SO 完成复制时不可能通过 Watcher Service API 通知,但所有选项似乎都在“变通”(包括这个!)。

如上所述,

1) 在 UNIX 上不能选择移动或复制;

2) 如果你有写权限,File.canWrite 总是返回 true,即使文件仍在被复制;

3) 等待超时或新事件发生是一种选择,但如果系统过载但复制未完成怎么办?如果超时值很大,程序会等待很长时间。

4) 如果您只是在使用文件而不是创建文件,则无法选择写入另一个文件来“标记”复制完成。

另一种方法是使用以下代码:

boolean locked = true;

while (locked) 
    RandomAccessFile raf = null;
    try 
            raf = new RandomAccessFile(file, "r"); // it will throw FileNotFoundException. It's not needed to use 'rw' because if the file is delete while copying, 'w' option will create an empty file.
            raf.seek(file.length()); // just to make sure everything was copied, goes to the last byte
            locked = false;
         catch (IOException e) 
            locked = file.exists();
            if (locked) 
                System.out.println("File locked: '" + file.getAbsolutePath() + "'");
                Thread.sleep(1000); // waits some time
             else  
                System.out.println("File was deleted while copying: '" + file.getAbsolutePath() + "'");
            
     finally 
            if (raf!=null) 
                raf.close();    
            
        

【讨论】:

【参考方案8】:

根据文件写入完成后您需要移动文件的紧迫程度,您还可以检查稳定的最后修改时间戳,并仅移动处于静止状态的文件。您需要它稳定的时间量可能取决于实现,但我认为具有最后修改时间戳且 15 秒未更改的东西应该足够稳定,可以移动。

【讨论】:

【参考方案9】:

我知道这是一个老问题,但也许它可以帮助某人。

我遇到了同样的问题,所以我做了以下事情:

if (kind == ENTRY_CREATE) 
            System.out.println("Creating file: " + child);

            boolean isGrowing = false;
            Long initialWeight = new Long(0);
            Long finalWeight = new Long(0);

            do 
                initialWeight = child.toFile().length();
                Thread.sleep(1000);
                finalWeight = child.toFile().length();
                isGrowing = initialWeight < finalWeight;

             while(isGrowing);

            System.out.println("Finished creating file!");

        

当文件被创建时,它会变得越来越大。所以我所做的就是比较相隔一秒的重量。应用程序将处于循环中,直到两个权重相同。

【讨论】:

不确定这是否适用于 Win7,因为在复制文件时,Win7 会在硬盘中分配所有必要的空间,然后用文件的字节“填充”它。【参考方案10】:

看起来 Apache Camel 通过尝试重命名文件 (java.io.File.renameTo) 来处理文件未完成上传问题。如果重命名失败,没有读锁,但继续尝试。重命名成功后,他们将其重命名,然后继续进行预期的处理。

请参阅下面的 operations.renameFile。以下是 Apache Camel 源代码的链接:GenericFileRenameExclusiveReadLockStrategy.java 和 FileUtil.java

public boolean acquireExclusiveReadLock( ... ) throws Exception 
   LOG.trace("Waiting for exclusive read lock to file: ", file);

   // the trick is to try to rename the file, if we can rename then we have exclusive read
   // since its a Generic file we cannot use java.nio to get a RW lock
   String newName = file.getFileName() + ".camelExclusiveReadLock";

   // make a copy as result and change its file name
   GenericFile<T> newFile = file.copyFrom(file);
   newFile.changeFileName(newName);
   StopWatch watch = new StopWatch();

   boolean exclusive = false;
   while (!exclusive) 
        // timeout check
        if (timeout > 0) 
            long delta = watch.taken();
            if (delta > timeout) 
                CamelLogger.log(LOG, readLockLoggingLevel,
                        "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
                // we could not get the lock within the timeout period, so return false
                return false;
            
        

        exclusive = operations.renameFile(file.getAbsoluteFilePath(), newFile.getAbsoluteFilePath());
        if (exclusive) 
            LOG.trace("Acquired exclusive read lock to file: ", file);
            // rename it back so we can read it
            operations.renameFile(newFile.getAbsoluteFilePath(), file.getAbsoluteFilePath());
         else 
            boolean interrupted = sleep();
            if (interrupted) 
                // we were interrupted while sleeping, we are likely being shutdown so return false
                return false;
            
        
   

   return true;

【讨论】:

【参考方案11】:

对于 linux 中的大文件,文件以 .filepart 的扩展名复制。您只需要使用 commons api 检查扩展并注册 ENTRY_CREATE 事件。我用我的 .csv 文件(1GB)对此进行了测试并添加它工作

public void run()

    try
    
        WatchKey key = myWatcher.take();
        while (key != null)
        
            for (WatchEvent event : key.pollEvents())
            
                if (FilenameUtils.isExtension(event.context().toString(), "filepart"))
                
                    System.out.println("Inside the PartFile " + event.context().toString());
                 else
                
                    System.out.println("Full file Copied " + event.context().toString());
                    //Do what ever you want to do with this files.
                
            
            key.reset();
            key = myWatcher.take();
        
     catch (InterruptedException e)
    
        e.printStackTrace();
    

【讨论】:

【参考方案12】:

如果您无法控制写入过程,请记录所有ENTRY_CREATED 事件并观察是否存在模式

在我的例子中,文件是通过 WebDav (Apache) 创建的,并且创建了许多临时文件,但同时也为同一文件触发了 两个 ENTRY_CREATED 事件。第二个ENTRY_CREATED 事件表示复制过程完成。

这是我的示例ENTRY_CREATED 事件。打印绝对文件路径(您的日志可能会有所不同,具体取决于写入文件的应用程序):

[info] application - /var/www/webdav/.davfs.tmp39dee1 was created
[info] application - /var/www/webdav/document.docx was created
[info] application - /var/www/webdav/.davfs.tmp054fe9 was created
[info] application - /var/www/webdav/document.docx was created
[info] application - /var/www/webdav/.DAV/__db.document.docx was created 

如您所见,我为 document.docx 收到了两个 ENTRY_CREATED 事件。在第二个事件之后,我知道文件已完成。就我而言,临时文件显然被忽略了。

【讨论】:

【参考方案13】:

所以,我遇到了同样的问题,并为我提供了以下解决方案。 较早的尝试失败 - 尝试监控每个文件的“lastModifiedTime”统计信息,但我注意到大文件的大小增长可能会暂停一段时间。(大小不会连续变化)

基本思路 - 为每个事件创建一个触发器文件(在临时目录中),其名称格式如下 -

OriginalFileName_lastModifiedTime_numberOfTries

这个文件是空的,所有的剧本都在名字里。原始文件只会在经过特定持续时间的间隔后才会考虑,而不会更改其“最后修改时间”统计信息。 (注意 - 因为它是一个文件统计,所以没有开销 -> O(1))

注意 - 此触发器文件由不同的服务处理(例如“FileTrigger”)。

优势 -

    不休眠或等待保持系统。 减轻文件观察器的工作以监控其他事件

FileWatcher 的代码 -

val triggerFileName: String = triggerFileTempDir + orifinalFileName + "_" + Files.getLastModifiedTime(Paths.get(event.getFile.getName.getPath)).toMillis + "_0"

// creates trigger file in temporary directory
val triggerFile: File = new File(triggerFileName)
val isCreated: Boolean = triggerFile.createNewFile()

if (isCreated)
    println("Trigger created: " + triggerFileName)
else
    println("Error in creating trigger file: " + triggerFileName)

FileTrigger 代码(间隔时间为 5 分钟的 cron 作业) -

 val actualPath : String = "Original file directory here"
 val tempPath : String = "Trigger file directory here"
 val folder : File = new File(tempPath)    
 val listOfFiles = folder.listFiles()

for (i <- listOfFiles)


    // ActualFileName_LastModifiedTime_NumberOfTries
    val triggerFileName: String = i.getName
    val triggerFilePath: String = i.toString

    // extracting file info from trigger file name
    val fileInfo: Array[String] = triggerFileName.split("_", 3)
    // 0 -> Original file name, 1 -> last modified time, 2 -> number of tries

    val actualFileName: String = fileInfo(0)
    val actualFilePath: String = actualPath + actualFileName
    val modifiedTime: Long = fileInfo(1).toLong
    val numberOfTries: Int = fileStats(2).toInt

    val currentModifiedTime: Long = Files.getLastModifiedTime(Paths.get(actualFilePath)).toMillis
    val differenceInModifiedTimes: Long = currentModifiedTime - modifiedTime
    // checks if file has been copied completely(4 intervals of 5 mins each with no modification)
    if (differenceInModifiedTimes == 0 && numberOfTries == 3)
    
        FileUtils.deleteQuietly(new File(triggerFilePath))
        println("Trigger file deleted. Original file completed : " + actualFilePath)
    
    else
    
        var newTriggerFileName: String = null
        if (differenceInModifiedTimes == 0)
        
            // updates numberOfTries by 1
            newTriggerFileName = actualFileName + "_" + modifiedTime + "_" + (numberOfTries + 1)
        
        else
        
            // updates modified timestamp and resets numberOfTries to 0
            newTriggerFileName = actualFileName + "_" + currentModifiedTime + "_" + 0
        

        // renames trigger file
        new File(triggerFilePath).renameTo(new File(tempPath + newTriggerFileName))
        println("Trigger file renamed: " + triggerFileName + " -> " + newTriggerFileName)
        

【讨论】:

以上是关于Java:监视目录以移动大文件的主要内容,如果未能解决你的问题,请参考以下文章

使用 FileSystemWatcher 监视目录

大数据必知必会的-Linux命令

大数据必知必会的-Linux命令

Hadoop大数据技术-通过shell命令访问HDFS

如何在 Java 中创建一个新的 zip 文件并向其中添加一个大目录?

使用 Java 读取文件或流的最强大的方法(以防止 DoS 攻击)