Java:监视目录以移动大文件
Posted
技术标签:
【中文标题】Java:监视目录以移动大文件【英文标题】:Java: Watching a directory to move large files 【发布时间】:2011-03-23 02:44:21 【问题描述】:我一直在编写一个监视目录的程序,当在其中创建文件时,它会更改名称并将它们移动到新目录。在我的第一个实现中,我使用了 Java 的 Watch Service API,它在我测试 1kb 文件时运行良好。出现的问题是,实际上创建的文件大小在 50-300mb 之间。发生这种情况时,观察者 API 会立即找到该文件,但无法移动它,因为它仍在写入中。我尝试将观察者置于一个循环中(在可以移动文件之前会产生异常),但这似乎效率很低。
由于这不起作用,我尝试使用一个计时器,该计时器每 10 秒检查一次文件夹,然后尽可能移动文件。这是我最终采用的方法。
问题:在没有进行异常检查或不断比较大小的情况下,是否会在文件写入完成时发出信号?我喜欢对每个文件只使用一次 Watcher API 的想法,而不是不断地使用计时器检查(并遇到异常)。
非常感谢所有回复!
nt
【问题讨论】:
I tried putting the watcher in a loop (which generated exceptions until the file could be moved) but this seemed pretty inefficient.
是的,这是一个糟糕的解决方案。管理控制流不例外。
可悲的是@ntmp,从我到目前为止的测试来看,寻找异常是判断操作系统仍在“写入”或“复制”文件的最佳方式。但我同意@Sean Patrick Floyd 的观点,这是一种让它发挥作用的糟糕方法。我个人希望检查是 java.io.File API 的一部分。不知道为什么没有。将留给 JVM 人员来实现并使我们的开发人员更容易......
“检查异常”方法甚至不适用于 UNIX,因为 UNIX 文件系统不会锁定正在写入的文件。在 UNIX 上,java 会愉快地移动部分写入的文件,从而导致数据损坏。
【参考方案1】:
写入另一个文件作为原始文件已完成的指示。 如果完成创建文件 'fileorg.done' 并检查,ig 'fileorg.dat' 正在增长 仅适用于“fileorg.done”。
使用巧妙的命名约定,您应该不会有任何问题。
【讨论】:
【参考方案2】:两种解决方案:
第一个是the answer by stacker的细微变化:
对不完整的文件使用唯一的前缀。类似于myhugefile.zip.inc
而不是myhugefile.zip
。上传/创建完成后重命名文件。从手表中排除 .inc 文件。
第二种是使用同一驱动器上的不同文件夹来创建/上传/写入文件,并在准备好后将它们移动到监视文件夹。如果它们在同一个驱动器上,移动应该是一个原子操作(我猜是依赖于文件系统)。
无论哪种方式,创建文件的客户端都必须做一些额外的工作。
【讨论】:
问题是我对创建文件的客户端几乎没有控制权。我无法添加唯一前缀。我也可以指定写入文件的文件夹,但我不能告诉客户端在完成写入后将它们移动到另一个文件夹。 @ntmp 你有没有解决这个问题,请分享给我,因为我也面临同样的问题【参考方案3】:我推测 java.io.File.canWrite() 会告诉你文件何时完成写入。
【讨论】:
我尝试了一个快速测试,一个线程写入文件,而另一个线程检查 canWrite() 方法,但它总是返回 true。 实际上我相信它只是检查操作系统以查看您是否有写入权限。从安全的角度来看,您可能有权限,但从等待它完成写入的角度来看,您可能没有权限。【参考方案4】:这是一个非常有趣的讨论,当然这是一个生死攸关的用例:等待一个新文件被创建,然后以某种方式对该文件作出反应。这里的竞争条件很有趣,因为这里的高级要求当然是获取一个事件,然后实际获得(至少)文件的读锁。对于大文件或只是创建大量文件,这可能需要整个工作线程池,它们只是定期尝试锁定新创建的文件,并且当它们成功时,实际完成工作。但我确信 NT 意识到,必须谨慎地执行此操作以使其可扩展,因为它最终是一种轮询方法,而可扩展性和轮询并不是两个可以很好地结合在一起的词。
【讨论】:
【参考方案5】:我今天遇到了同样的问题。我的用例在实际导入文件之前有一点延迟并不是什么大问题,我仍然想使用 NIO2 API。我选择的解决方案是等到文件没有被修改 10 秒后再对其执行任何操作。
实现的重要部分如下。程序一直等待,直到等待时间到期或发生新事件。每次修改文件时都会重置过期时间。如果一个文件在等待时间到期之前被删除,它将从列表中删除。我使用poll方法,超时时间为预期过期时间,即(lastmodified+waitTime)-currentTime
private final Map<Path, Long> expirationTimes = newHashMap();
private Long newFileWait = 10000L;
public void run()
for(;;)
//Retrieves and removes next watch key, waiting if none are present.
WatchKey k = watchService.take();
for(;;)
long currentTime = new DateTime().getMillis();
if(k!=null)
handleWatchEvents(k);
handleExpiredWaitTimes(currentTime);
// If there are no files left stop polling and block on .take()
if(expirationTimes.isEmpty())
break;
long minExpiration = min(expirationTimes.values());
long timeout = minExpiration-currentTime;
logger.debug("timeout: "+timeout);
k = watchService.poll(timeout, TimeUnit.MILLISECONDS);
private void handleExpiredWaitTimes(Long currentTime)
// Start import for files for which the expirationtime has passed
for(Entry<Path, Long> entry : expirationTimes.entrySet())
if(entry.getValue()<=currentTime)
logger.debug("expired "+entry);
// do something with the file
expirationTimes.remove(entry.getKey());
private void handleWatchEvents(WatchKey k)
List<WatchEvent<?>> events = k.pollEvents();
for (WatchEvent<?> event : events)
handleWatchEvent(event, keys.get(k));
// reset watch key to allow the key to be reported again by the watch service
k.reset();
private void handleWatchEvent(WatchEvent<?> event, Path dir) throws IOException
Kind<?> kind = event.kind();
WatchEvent<Path> ev = cast(event);
Path name = ev.context();
Path child = dir.resolve(name);
if (kind == ENTRY_MODIFY || kind == ENTRY_CREATE)
// Update modified time
FileTime lastModified = Attributes.readBasicFileAttributes(child, NOFOLLOW_LINKS).lastModifiedTime();
expirationTimes.put(name, lastModified.toMillis()+newFileWait);
if (kind == ENTRY_DELETE)
expirationTimes.remove(child);
【讨论】:
此线程中的最佳答案 - 现在是 2013 年,他们已经在 Java 中修复了这个问题,还是仍然需要使用这样的代码? 方法 handleExpiredWaitTimes 在迭代时删除条目,所以应该使用迭代器。【参考方案6】:当我实现一个文件系统观察器来传输上传的文件时,我不得不处理类似的情况。我为解决这个问题而实施的解决方案包括以下内容:
1-首先,维护一个未处理文件的Map(只要文件还在被复制,文件系统就会产生Modify_Event,如果flag为false,你可以忽略它们)。
2- 在你的文件处理器中,你从列表中取出一个文件并检查它是否被文件系统锁定,如果是,你会得到一个异常,只需捕获这个异常并将你的线程置于等待状态(即 10 秒)然后重试,直到锁被释放。处理完文件后,您可以将标志更改为 true 或将其从地图中移除。
如果在等待时间段内传输同一文件的多个版本,此解决方案将效率不高。
干杯, 拉姆齐
【讨论】:
【参考方案7】:虽然在 SO 完成复制时不可能通过 Watcher Service API 通知,但所有选项似乎都在“变通”(包括这个!)。
如上所述,
1) 在 UNIX 上不能选择移动或复制;
2) 如果你有写权限,File.canWrite 总是返回 true,即使文件仍在被复制;
3) 等待超时或新事件发生是一种选择,但如果系统过载但复制未完成怎么办?如果超时值很大,程序会等待很长时间。
4) 如果您只是在使用文件而不是创建文件,则无法选择写入另一个文件来“标记”复制完成。
另一种方法是使用以下代码:
boolean locked = true;
while (locked)
RandomAccessFile raf = null;
try
raf = new RandomAccessFile(file, "r"); // it will throw FileNotFoundException. It's not needed to use 'rw' because if the file is delete while copying, 'w' option will create an empty file.
raf.seek(file.length()); // just to make sure everything was copied, goes to the last byte
locked = false;
catch (IOException e)
locked = file.exists();
if (locked)
System.out.println("File locked: '" + file.getAbsolutePath() + "'");
Thread.sleep(1000); // waits some time
else
System.out.println("File was deleted while copying: '" + file.getAbsolutePath() + "'");
finally
if (raf!=null)
raf.close();
【讨论】:
【参考方案8】:根据文件写入完成后您需要移动文件的紧迫程度,您还可以检查稳定的最后修改时间戳,并仅移动处于静止状态的文件。您需要它稳定的时间量可能取决于实现,但我认为具有最后修改时间戳且 15 秒未更改的东西应该足够稳定,可以移动。
【讨论】:
【参考方案9】:我知道这是一个老问题,但也许它可以帮助某人。
我遇到了同样的问题,所以我做了以下事情:
if (kind == ENTRY_CREATE)
System.out.println("Creating file: " + child);
boolean isGrowing = false;
Long initialWeight = new Long(0);
Long finalWeight = new Long(0);
do
initialWeight = child.toFile().length();
Thread.sleep(1000);
finalWeight = child.toFile().length();
isGrowing = initialWeight < finalWeight;
while(isGrowing);
System.out.println("Finished creating file!");
当文件被创建时,它会变得越来越大。所以我所做的就是比较相隔一秒的重量。应用程序将处于循环中,直到两个权重相同。
【讨论】:
不确定这是否适用于 Win7,因为在复制文件时,Win7 会在硬盘中分配所有必要的空间,然后用文件的字节“填充”它。【参考方案10】:看起来 Apache Camel 通过尝试重命名文件 (java.io.File.renameTo) 来处理文件未完成上传问题。如果重命名失败,没有读锁,但继续尝试。重命名成功后,他们将其重命名,然后继续进行预期的处理。
请参阅下面的 operations.renameFile。以下是 Apache Camel 源代码的链接:GenericFileRenameExclusiveReadLockStrategy.java 和 FileUtil.java
public boolean acquireExclusiveReadLock( ... ) throws Exception
LOG.trace("Waiting for exclusive read lock to file: ", file);
// the trick is to try to rename the file, if we can rename then we have exclusive read
// since its a Generic file we cannot use java.nio to get a RW lock
String newName = file.getFileName() + ".camelExclusiveReadLock";
// make a copy as result and change its file name
GenericFile<T> newFile = file.copyFrom(file);
newFile.changeFileName(newName);
StopWatch watch = new StopWatch();
boolean exclusive = false;
while (!exclusive)
// timeout check
if (timeout > 0)
long delta = watch.taken();
if (delta > timeout)
CamelLogger.log(LOG, readLockLoggingLevel,
"Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
// we could not get the lock within the timeout period, so return false
return false;
exclusive = operations.renameFile(file.getAbsoluteFilePath(), newFile.getAbsoluteFilePath());
if (exclusive)
LOG.trace("Acquired exclusive read lock to file: ", file);
// rename it back so we can read it
operations.renameFile(newFile.getAbsoluteFilePath(), file.getAbsoluteFilePath());
else
boolean interrupted = sleep();
if (interrupted)
// we were interrupted while sleeping, we are likely being shutdown so return false
return false;
return true;
【讨论】:
【参考方案11】:对于 linux 中的大文件,文件以 .filepart 的扩展名复制。您只需要使用 commons api 检查扩展并注册 ENTRY_CREATE 事件。我用我的 .csv 文件(1GB)对此进行了测试并添加它工作
public void run()
try
WatchKey key = myWatcher.take();
while (key != null)
for (WatchEvent event : key.pollEvents())
if (FilenameUtils.isExtension(event.context().toString(), "filepart"))
System.out.println("Inside the PartFile " + event.context().toString());
else
System.out.println("Full file Copied " + event.context().toString());
//Do what ever you want to do with this files.
key.reset();
key = myWatcher.take();
catch (InterruptedException e)
e.printStackTrace();
【讨论】:
【参考方案12】:如果您无法控制写入过程,请记录所有ENTRY_CREATED
事件并观察是否存在模式。
在我的例子中,文件是通过 WebDav (Apache) 创建的,并且创建了许多临时文件,但同时也为同一文件触发了 两个 ENTRY_CREATED
事件。第二个ENTRY_CREATED
事件表示复制过程完成。
这是我的示例ENTRY_CREATED
事件。打印绝对文件路径(您的日志可能会有所不同,具体取决于写入文件的应用程序):
[info] application - /var/www/webdav/.davfs.tmp39dee1 was created
[info] application - /var/www/webdav/document.docx was created
[info] application - /var/www/webdav/.davfs.tmp054fe9 was created
[info] application - /var/www/webdav/document.docx was created
[info] application - /var/www/webdav/.DAV/__db.document.docx was created
如您所见,我为 document.docx 收到了两个 ENTRY_CREATED
事件。在第二个事件之后,我知道文件已完成。就我而言,临时文件显然被忽略了。
【讨论】:
【参考方案13】:所以,我遇到了同样的问题,并为我提供了以下解决方案。 较早的尝试失败 - 尝试监控每个文件的“lastModifiedTime”统计信息,但我注意到大文件的大小增长可能会暂停一段时间。(大小不会连续变化)
基本思路 - 为每个事件创建一个触发器文件(在临时目录中),其名称格式如下 -
OriginalFileName_lastModifiedTime_numberOfTries
这个文件是空的,所有的剧本都在名字里。原始文件只会在经过特定持续时间的间隔后才会考虑,而不会更改其“最后修改时间”统计信息。 (注意 - 因为它是一个文件统计,所以没有开销 -> O(1))
注意 - 此触发器文件由不同的服务处理(例如“FileTrigger”)。
优势 -
-
不休眠或等待保持系统。
减轻文件观察器的工作以监控其他事件
FileWatcher 的代码 -
val triggerFileName: String = triggerFileTempDir + orifinalFileName + "_" + Files.getLastModifiedTime(Paths.get(event.getFile.getName.getPath)).toMillis + "_0"
// creates trigger file in temporary directory
val triggerFile: File = new File(triggerFileName)
val isCreated: Boolean = triggerFile.createNewFile()
if (isCreated)
println("Trigger created: " + triggerFileName)
else
println("Error in creating trigger file: " + triggerFileName)
FileTrigger 代码(间隔时间为 5 分钟的 cron 作业) -
val actualPath : String = "Original file directory here"
val tempPath : String = "Trigger file directory here"
val folder : File = new File(tempPath)
val listOfFiles = folder.listFiles()
for (i <- listOfFiles)
// ActualFileName_LastModifiedTime_NumberOfTries
val triggerFileName: String = i.getName
val triggerFilePath: String = i.toString
// extracting file info from trigger file name
val fileInfo: Array[String] = triggerFileName.split("_", 3)
// 0 -> Original file name, 1 -> last modified time, 2 -> number of tries
val actualFileName: String = fileInfo(0)
val actualFilePath: String = actualPath + actualFileName
val modifiedTime: Long = fileInfo(1).toLong
val numberOfTries: Int = fileStats(2).toInt
val currentModifiedTime: Long = Files.getLastModifiedTime(Paths.get(actualFilePath)).toMillis
val differenceInModifiedTimes: Long = currentModifiedTime - modifiedTime
// checks if file has been copied completely(4 intervals of 5 mins each with no modification)
if (differenceInModifiedTimes == 0 && numberOfTries == 3)
FileUtils.deleteQuietly(new File(triggerFilePath))
println("Trigger file deleted. Original file completed : " + actualFilePath)
else
var newTriggerFileName: String = null
if (differenceInModifiedTimes == 0)
// updates numberOfTries by 1
newTriggerFileName = actualFileName + "_" + modifiedTime + "_" + (numberOfTries + 1)
else
// updates modified timestamp and resets numberOfTries to 0
newTriggerFileName = actualFileName + "_" + currentModifiedTime + "_" + 0
// renames trigger file
new File(triggerFilePath).renameTo(new File(tempPath + newTriggerFileName))
println("Trigger file renamed: " + triggerFileName + " -> " + newTriggerFileName)
【讨论】:
以上是关于Java:监视目录以移动大文件的主要内容,如果未能解决你的问题,请参考以下文章