From d1e6ebfc9ba956de723be1e7d9653abf95c21feb Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Wed, 15 Jun 2022 05:49:37 +0100 Subject: [PATCH] Increase wait period for watched folder notifications. Make sure file length hasn't changed during wait period. Re-publish changed files if they have already started hashing. GitHub issue #148 --- .../muwire/core/files/DirectoryWatcher.groovy | 41 ++++++++++++++----- .../com/muwire/core/files/FileHasher.groovy | 5 +++ 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/files/DirectoryWatcher.groovy b/core/src/main/groovy/com/muwire/core/files/DirectoryWatcher.groovy index 269c40df..282a591f 100644 --- a/core/src/main/groovy/com/muwire/core/files/DirectoryWatcher.groovy +++ b/core/src/main/groovy/com/muwire/core/files/DirectoryWatcher.groovy @@ -1,6 +1,7 @@ package com.muwire.core.files import java.nio.channels.FileChannel +import java.nio.channels.OverlappingFileLockException import java.nio.file.FileSystem import java.nio.file.FileSystems import java.nio.file.Files @@ -29,7 +30,7 @@ import net.i2p.util.SystemVersion @Log class DirectoryWatcher { - private static final long WAIT_TIME = 1000 + private static final long WAIT_TIME = 3000 private static final WatchEvent.Kind[] kinds static { @@ -46,7 +47,7 @@ class DirectoryWatcher { private final WatchedDirectoryManager watchedDirectoryManager private final NegativeFiles negativeFiles private final Thread watcherThread, publisherThread - private final Map waitingFiles = new ConcurrentHashMap<>() + private final Map waitingFiles = new ConcurrentHashMap<>() private final Map watchedDirectories = new ConcurrentHashMap<>() private WatchService watchService private volatile boolean shutdown @@ -135,7 +136,7 @@ class DirectoryWatcher { if (f.isDirectory()) eventBus.publish(new FileSharedEvent(file : f, fromWatch : true)) else - waitingFiles.put(f, System.currentTimeMillis()) + waitingFiles.put(f, new WaitingEntry(System.currentTimeMillis(), f.length())) } private void processModified(Path parent, Path path) { @@ -144,7 +145,7 @@ class DirectoryWatcher { if (!settings.shareHiddenFiles && f.isHidden()) return if (!negativeFiles.negativeTree.get(f)) - waitingFiles.put(f, System.currentTimeMillis()) + waitingFiles.put(f, new WaitingEntry(System.currentTimeMillis(), f.length())) } private void processDeleted(Path parent, Path path) { @@ -170,15 +171,26 @@ class DirectoryWatcher { Thread.sleep(WAIT_TIME) long now = System.currentTimeMillis() def published = [] - waitingFiles.each { file, timestamp -> - if (now - timestamp > WAIT_TIME) { + waitingFiles.each { file, waitingEntry -> + if (now - waitingEntry.timestamp > WAIT_TIME) { + final long length = file.length() + if (length != waitingEntry.length) { + log.fine("${file} length changed during wait period") + waitingEntry.length = length + return + } try (FileChannel fc = Files.newByteChannel(file.toPath(), StandardOpenOption.READ)) { - def lock = fc.tryLock(0, Long.MAX_VALUE, true) - if (lock == null) { - log.fine("Couldn't acquire read lock on $file will try again") + try { + def lock = fc.tryLock(0, Long.MAX_VALUE, true) + if (lock == null) { + log.fine("Couldn't acquire read lock on $file will try again") + return + } + lock.release() + } catch (OverlappingFileLockException ofle) { + log.fine("file $file has already started hashing") return } - lock.release() log.fine("publishing file $file") eventBus.publish new FileSharedEvent(file: file, fromWatch: true) published << file @@ -196,4 +208,13 @@ class DirectoryWatcher { throw e } } + + private static class WaitingEntry { + private final long timestamp + private volatile long length + WaitingEntry(long timestamp, long length) { + this.timestamp = timestamp + this.length = length + } + } } diff --git a/core/src/main/groovy/com/muwire/core/files/FileHasher.groovy b/core/src/main/groovy/com/muwire/core/files/FileHasher.groovy index 71ac84f7..fd763c58 100644 --- a/core/src/main/groovy/com/muwire/core/files/FileHasher.groovy +++ b/core/src/main/groovy/com/muwire/core/files/FileHasher.groovy @@ -9,6 +9,7 @@ import java.nio.MappedByteBuffer import java.nio.channels.FileChannel import java.nio.channels.FileChannel.MapMode import java.nio.channels.FileLock +import java.nio.channels.OverlappingFileLockException import java.security.MessageDigest import java.security.NoSuchAlgorithmException import java.util.logging.Level @@ -75,6 +76,10 @@ class FileHasher { buf = raf.getChannel().map(MapMode.READ_ONLY, length - lastPieceLength, lastPieceLength.toInteger()) digest.update buf output.write(digest.digest(), 0, 32) + } catch (OverlappingFileLockException ofle) { + // lock may have been re-acquired by the notifier + Thread.sleep(10) + continue } finally { raf.close() DataUtil.tryUnmap(buf)