diff --git a/core/src/main/groovy/com/muwire/core/files/DirectoryWatcher.groovy b/core/src/main/groovy/com/muwire/core/files/DirectoryWatcher.groovy index 51081fce..d2b162ef 100644 --- a/core/src/main/groovy/com/muwire/core/files/DirectoryWatcher.groovy +++ b/core/src/main/groovy/com/muwire/core/files/DirectoryWatcher.groovy @@ -8,6 +8,7 @@ import java.nio.file.StandardWatchEventKinds import java.nio.file.WatchEvent import java.nio.file.WatchKey import java.nio.file.WatchService +import java.util.concurrent.ConcurrentHashMap import com.muwire.core.EventBus @@ -16,8 +17,11 @@ import groovy.util.logging.Log @Log class DirectoryWatcher { + private static final long WAIT_TIME = 1000 + private final EventBus eventBus - private final Thread watcherThread + private final Thread watcherThread, publisherThread + private final Map waitingFiles = new ConcurrentHashMap<>() private WatchService watchService private volatile boolean shutdown @@ -25,16 +29,20 @@ class DirectoryWatcher { this.eventBus = eventBus this.watcherThread = new Thread({watch() } as Runnable, "directory-watcher") watcherThread.setDaemon(true) + this.publisherThread = new Thread({publish()} as Runnable, "watched-files-publisher") + publisherThread.setDaemon(true) } void start() { watchService = FileSystems.getDefault().newWatchService() watcherThread.start() + publisherThread.start() } void stop() { shutdown = true watcherThread.interrupt() + publisherThread.interrupt() watchService.close() } @@ -67,11 +75,33 @@ class DirectoryWatcher { private void processModified(Path parent, Path path) { File f = join(parent, path) - eventBus.publish(new FileSharedEvent(file : f)) + waitingFiles.put(f, System.currentTimeMillis()) } private static File join(Path parent, Path path) { File parentFile = parent.toFile().getCanonicalFile() new File(parentFile, path.toFile().getName()) } + + private void publish() { + try { + while(!shutdown) { + Thread.sleep(WAIT_TIME) + long now = System.currentTimeMillis() + def published = [] + waitingFiles.each { file, timestamp -> + if (now - timestamp > WAIT_TIME) { + eventBus.publish new FileSharedEvent(file : file) + published << file + } + } + published.each { + waitingFiles.remove(it) + } + } + } catch (InterruptedException e) { + if (!shutdown) + throw e + } + } }