mirror of https://github.com/zlatinb/muwire
Increase wait period for watched folder notifications. Make sure file length hasn't changed during wait period. Re-publish changed files if they have already started hashing. GitHub issue #148
parent
9514192e4c
commit
d1e6ebfc9b
|
@ -1,6 +1,7 @@
|
||||||
package com.muwire.core.files
|
package com.muwire.core.files
|
||||||
|
|
||||||
import java.nio.channels.FileChannel
|
import java.nio.channels.FileChannel
|
||||||
|
import java.nio.channels.OverlappingFileLockException
|
||||||
import java.nio.file.FileSystem
|
import java.nio.file.FileSystem
|
||||||
import java.nio.file.FileSystems
|
import java.nio.file.FileSystems
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
|
@ -29,7 +30,7 @@ import net.i2p.util.SystemVersion
|
||||||
@Log
|
@Log
|
||||||
class DirectoryWatcher {
|
class DirectoryWatcher {
|
||||||
|
|
||||||
private static final long WAIT_TIME = 1000
|
private static final long WAIT_TIME = 3000
|
||||||
|
|
||||||
private static final WatchEvent.Kind[] kinds
|
private static final WatchEvent.Kind[] kinds
|
||||||
static {
|
static {
|
||||||
|
@ -46,7 +47,7 @@ class DirectoryWatcher {
|
||||||
private final WatchedDirectoryManager watchedDirectoryManager
|
private final WatchedDirectoryManager watchedDirectoryManager
|
||||||
private final NegativeFiles negativeFiles
|
private final NegativeFiles negativeFiles
|
||||||
private final Thread watcherThread, publisherThread
|
private final Thread watcherThread, publisherThread
|
||||||
private final Map<File, Long> waitingFiles = new ConcurrentHashMap<>()
|
private final Map<File, WaitingEntry> waitingFiles = new ConcurrentHashMap<>()
|
||||||
private final Map<File, WatchKey> watchedDirectories = new ConcurrentHashMap<>()
|
private final Map<File, WatchKey> watchedDirectories = new ConcurrentHashMap<>()
|
||||||
private WatchService watchService
|
private WatchService watchService
|
||||||
private volatile boolean shutdown
|
private volatile boolean shutdown
|
||||||
|
@ -135,7 +136,7 @@ class DirectoryWatcher {
|
||||||
if (f.isDirectory())
|
if (f.isDirectory())
|
||||||
eventBus.publish(new FileSharedEvent(file : f, fromWatch : true))
|
eventBus.publish(new FileSharedEvent(file : f, fromWatch : true))
|
||||||
else
|
else
|
||||||
waitingFiles.put(f, System.currentTimeMillis())
|
waitingFiles.put(f, new WaitingEntry(System.currentTimeMillis(), f.length()))
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processModified(Path parent, Path path) {
|
private void processModified(Path parent, Path path) {
|
||||||
|
@ -144,7 +145,7 @@ class DirectoryWatcher {
|
||||||
if (!settings.shareHiddenFiles && f.isHidden())
|
if (!settings.shareHiddenFiles && f.isHidden())
|
||||||
return
|
return
|
||||||
if (!negativeFiles.negativeTree.get(f))
|
if (!negativeFiles.negativeTree.get(f))
|
||||||
waitingFiles.put(f, System.currentTimeMillis())
|
waitingFiles.put(f, new WaitingEntry(System.currentTimeMillis(), f.length()))
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processDeleted(Path parent, Path path) {
|
private void processDeleted(Path parent, Path path) {
|
||||||
|
@ -170,15 +171,26 @@ class DirectoryWatcher {
|
||||||
Thread.sleep(WAIT_TIME)
|
Thread.sleep(WAIT_TIME)
|
||||||
long now = System.currentTimeMillis()
|
long now = System.currentTimeMillis()
|
||||||
def published = []
|
def published = []
|
||||||
waitingFiles.each { file, timestamp ->
|
waitingFiles.each { file, waitingEntry ->
|
||||||
if (now - timestamp > WAIT_TIME) {
|
if (now - waitingEntry.timestamp > WAIT_TIME) {
|
||||||
|
final long length = file.length()
|
||||||
|
if (length != waitingEntry.length) {
|
||||||
|
log.fine("${file} length changed during wait period")
|
||||||
|
waitingEntry.length = length
|
||||||
|
return
|
||||||
|
}
|
||||||
try (FileChannel fc = Files.newByteChannel(file.toPath(), StandardOpenOption.READ)) {
|
try (FileChannel fc = Files.newByteChannel(file.toPath(), StandardOpenOption.READ)) {
|
||||||
def lock = fc.tryLock(0, Long.MAX_VALUE, true)
|
try {
|
||||||
if (lock == null) {
|
def lock = fc.tryLock(0, Long.MAX_VALUE, true)
|
||||||
log.fine("Couldn't acquire read lock on $file will try again")
|
if (lock == null) {
|
||||||
|
log.fine("Couldn't acquire read lock on $file will try again")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lock.release()
|
||||||
|
} catch (OverlappingFileLockException ofle) {
|
||||||
|
log.fine("file $file has already started hashing")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lock.release()
|
|
||||||
log.fine("publishing file $file")
|
log.fine("publishing file $file")
|
||||||
eventBus.publish new FileSharedEvent(file: file, fromWatch: true)
|
eventBus.publish new FileSharedEvent(file: file, fromWatch: true)
|
||||||
published << file
|
published << file
|
||||||
|
@ -196,4 +208,13 @@ class DirectoryWatcher {
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class WaitingEntry {
|
||||||
|
private final long timestamp
|
||||||
|
private volatile long length
|
||||||
|
WaitingEntry(long timestamp, long length) {
|
||||||
|
this.timestamp = timestamp
|
||||||
|
this.length = length
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import java.nio.MappedByteBuffer
|
||||||
import java.nio.channels.FileChannel
|
import java.nio.channels.FileChannel
|
||||||
import java.nio.channels.FileChannel.MapMode
|
import java.nio.channels.FileChannel.MapMode
|
||||||
import java.nio.channels.FileLock
|
import java.nio.channels.FileLock
|
||||||
|
import java.nio.channels.OverlappingFileLockException
|
||||||
import java.security.MessageDigest
|
import java.security.MessageDigest
|
||||||
import java.security.NoSuchAlgorithmException
|
import java.security.NoSuchAlgorithmException
|
||||||
import java.util.logging.Level
|
import java.util.logging.Level
|
||||||
|
@ -75,6 +76,10 @@ class FileHasher {
|
||||||
buf = raf.getChannel().map(MapMode.READ_ONLY, length - lastPieceLength, lastPieceLength.toInteger())
|
buf = raf.getChannel().map(MapMode.READ_ONLY, length - lastPieceLength, lastPieceLength.toInteger())
|
||||||
digest.update buf
|
digest.update buf
|
||||||
output.write(digest.digest(), 0, 32)
|
output.write(digest.digest(), 0, 32)
|
||||||
|
} catch (OverlappingFileLockException ofle) {
|
||||||
|
// lock may have been re-acquired by the notifier
|
||||||
|
Thread.sleep(10)
|
||||||
|
continue
|
||||||
} finally {
|
} finally {
|
||||||
raf.close()
|
raf.close()
|
||||||
DataUtil.tryUnmap(buf)
|
DataUtil.tryUnmap(buf)
|
||||||
|
|
Loading…
Reference in New Issue