delay processing of files until after 1 second after the last MODIFY event

pull/5/head
Zlatin Balevsky 2019-06-17 23:08:16 +01:00
parent 7e2c4d48c6
commit c46f1b1ccd
1 changed files with 32 additions and 2 deletions

View File

@ -8,6 +8,7 @@ import java.nio.file.StandardWatchEventKinds
import java.nio.file.WatchEvent import java.nio.file.WatchEvent
import java.nio.file.WatchKey import java.nio.file.WatchKey
import java.nio.file.WatchService import java.nio.file.WatchService
import java.util.concurrent.ConcurrentHashMap
import com.muwire.core.EventBus import com.muwire.core.EventBus
@ -16,8 +17,11 @@ import groovy.util.logging.Log
@Log @Log
class DirectoryWatcher { class DirectoryWatcher {
private static final long WAIT_TIME = 1000
private final EventBus eventBus private final EventBus eventBus
private final Thread watcherThread private final Thread watcherThread, publisherThread
private final Map<File, Long> waitingFiles = new ConcurrentHashMap<>()
private WatchService watchService private WatchService watchService
private volatile boolean shutdown private volatile boolean shutdown
@ -25,16 +29,20 @@ class DirectoryWatcher {
this.eventBus = eventBus this.eventBus = eventBus
this.watcherThread = new Thread({watch() } as Runnable, "directory-watcher") this.watcherThread = new Thread({watch() } as Runnable, "directory-watcher")
watcherThread.setDaemon(true) watcherThread.setDaemon(true)
this.publisherThread = new Thread({publish()} as Runnable, "watched-files-publisher")
publisherThread.setDaemon(true)
} }
void start() { void start() {
watchService = FileSystems.getDefault().newWatchService() watchService = FileSystems.getDefault().newWatchService()
watcherThread.start() watcherThread.start()
publisherThread.start()
} }
void stop() { void stop() {
shutdown = true shutdown = true
watcherThread.interrupt() watcherThread.interrupt()
publisherThread.interrupt()
watchService.close() watchService.close()
} }
@ -67,11 +75,33 @@ class DirectoryWatcher {
private void processModified(Path parent, Path path) { private void processModified(Path parent, Path path) {
File f = join(parent, path) File f = join(parent, path)
eventBus.publish(new FileSharedEvent(file : f)) waitingFiles.put(f, System.currentTimeMillis())
} }
private static File join(Path parent, Path path) { private static File join(Path parent, Path path) {
File parentFile = parent.toFile().getCanonicalFile() File parentFile = parent.toFile().getCanonicalFile()
new File(parentFile, path.toFile().getName()) new File(parentFile, path.toFile().getName())
} }
private void publish() {
try {
while(!shutdown) {
Thread.sleep(WAIT_TIME)
long now = System.currentTimeMillis()
def published = []
waitingFiles.each { file, timestamp ->
if (now - timestamp > WAIT_TIME) {
eventBus.publish new FileSharedEvent(file : file)
published << file
}
}
published.each {
waitingFiles.remove(it)
}
}
} catch (InterruptedException e) {
if (!shutdown)
throw e
}
}
} }