wip on persisting and loading of watched directory metadata, emit the event to register on autowatch service

pull/53/head
Zlatin Balevsky 2020-03-26 06:21:41 +00:00
parent 244ce43794
commit dfc62b943f
3 changed files with 54 additions and 27 deletions

View File

@ -390,11 +390,6 @@ public class Core {
i2pAcceptor, hostCache, trustService, searchManager, uploadManager, fileManager, connectionEstablisher, i2pAcceptor, hostCache, trustService, searchManager, uploadManager, fileManager, connectionEstablisher,
certificateManager, chatServer) certificateManager, chatServer)
log.info("initializing directory watcher")
directoryWatcher = new DirectoryWatcher(eventBus, fileManager, home, props)
eventBus.register(DirectoryWatchedEvent.class, directoryWatcher)
eventBus.register(WatchedDirectoryConvertedEvent.class, directoryWatcher)
eventBus.register(DirectoryUnsharedEvent.class, directoryWatcher)
log.info("initializing hasher service") log.info("initializing hasher service")
hasherService = new HasherService(new FileHasher(), eventBus, fileManager, props) hasherService = new HasherService(new FileHasher(), eventBus, fileManager, props)
@ -427,6 +422,13 @@ public class Core {
register(WatchedDirectoryConvertedEvent.class, watchedDirectoryManager) register(WatchedDirectoryConvertedEvent.class, watchedDirectoryManager)
} }
log.info("initializing directory watcher")
directoryWatcher = new DirectoryWatcher(eventBus, fileManager, home, watchedDirectoryManager)
eventBus.with {
register(DirectoryWatchedEvent.class, directoryWatcher)
register(WatchedDirectoryConvertedEvent.class, directoryWatcher)
register(DirectoryUnsharedEvent.class, directoryWatcher)
}
} }
public void startServices() { public void startServices() {

View File

@ -16,6 +16,7 @@ import com.muwire.core.EventBus
import com.muwire.core.MuWireSettings import com.muwire.core.MuWireSettings
import com.muwire.core.SharedFile import com.muwire.core.SharedFile
import com.muwire.core.files.directories.WatchedDirectoryConvertedEvent import com.muwire.core.files.directories.WatchedDirectoryConvertedEvent
import com.muwire.core.files.directories.WatchedDirectoryManager
import groovy.util.logging.Log import groovy.util.logging.Log
import net.i2p.util.SystemVersion import net.i2p.util.SystemVersion
@ -34,20 +35,20 @@ class DirectoryWatcher {
} }
private final File home private final File home
private final MuWireSettings muOptions
private final EventBus eventBus private final EventBus eventBus
private final FileManager fileManager private final FileManager fileManager
private final WatchedDirectoryManager watchedDirectoryManager
private final Thread watcherThread, publisherThread private final Thread watcherThread, publisherThread
private final Map<File, Long> waitingFiles = new ConcurrentHashMap<>() private final Map<File, Long> waitingFiles = new ConcurrentHashMap<>()
private final Map<File, WatchKey> watchedDirectories = new ConcurrentHashMap<>() private final Map<File, WatchKey> watchedDirectories = new ConcurrentHashMap<>()
private WatchService watchService private WatchService watchService
private volatile boolean shutdown private volatile boolean shutdown
DirectoryWatcher(EventBus eventBus, FileManager fileManager, File home, MuWireSettings muOptions) { DirectoryWatcher(EventBus eventBus, FileManager fileManager, File home, WatchedDirectoryManager watchedDirectoryManager) {
this.home = home this.home = home
this.muOptions = muOptions
this.eventBus = eventBus this.eventBus = eventBus
this.fileManager = fileManager this.fileManager = fileManager
this.watchedDirectoryManager = watchedDirectoryManager
this.watcherThread = new Thread({watch() } as Runnable, "directory-watcher") this.watcherThread = new Thread({watch() } as Runnable, "directory-watcher")
watcherThread.setDaemon(true) watcherThread.setDaemon(true)
this.publisherThread = new Thread({publish()} as Runnable, "watched-files-publisher") this.publisherThread = new Thread({publish()} as Runnable, "watched-files-publisher")
@ -72,26 +73,13 @@ class DirectoryWatcher {
Path path = canonical.toPath() Path path = canonical.toPath()
WatchKey wk = path.register(watchService, kinds) WatchKey wk = path.register(watchService, kinds)
watchedDirectories.put(canonical, wk) watchedDirectories.put(canonical, wk)
if (muOptions.watchedDirectories.add(canonical.toString()))
saveMuSettings()
} }
void onDirectoryUnsharedEvent(DirectoryUnsharedEvent e) { void onDirectoryUnsharedEvent(DirectoryUnsharedEvent e) {
WatchKey wk = watchedDirectories.remove(e.directory) WatchKey wk = watchedDirectories.remove(e.directory)
wk?.cancel() wk?.cancel()
if (muOptions.watchedDirectories.remove(e.directory.toString()))
saveMuSettings()
} }
private void saveMuSettings() {
File muSettingsFile = new File(home, "MuWire.properties")
muSettingsFile.withPrintWriter("UTF-8", {
muOptions.write(it)
})
}
private void watch() { private void watch() {
try { try {
while(!shutdown) { while(!shutdown) {
@ -134,7 +122,7 @@ class DirectoryWatcher {
SharedFile sf = fileManager.fileToSharedFile.get(f) SharedFile sf = fileManager.fileToSharedFile.get(f)
if (sf != null) if (sf != null)
eventBus.publish(new FileUnsharedEvent(unsharedFile : sf, deleted : true)) eventBus.publish(new FileUnsharedEvent(unsharedFile : sf, deleted : true))
else if (muOptions.watchedDirectories.contains(f.toString())) else if (watchedDirectoryManager.isWatched(f))
eventBus.publish(new DirectoryUnsharedEvent(directory : f, deleted : true)) eventBus.publish(new DirectoryUnsharedEvent(directory : f, deleted : true))
else else
log.fine("Entry was not relevant"); log.fine("Entry was not relevant");

View File

@ -1,18 +1,26 @@
package com.muwire.core.files.directories package com.muwire.core.files.directories
import java.nio.file.Files
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory import java.util.concurrent.ThreadFactory
import com.muwire.core.EventBus import com.muwire.core.EventBus
import com.muwire.core.files.DirectoryWatchedEvent
import com.muwire.core.files.FileManager import com.muwire.core.files.FileManager
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
class WatchedDirectoryManager { class WatchedDirectoryManager {
private final File home private final File home
private final EventBus eventBus private final EventBus eventBus
private final FileManager fileManager private final FileManager fileManager
private final Map<File, WatchedDirectory> watchedDirs = new ConcurrentHashMap<>()
private final ExecutorService diskIO = Executors.newSingleThreadExecutor({r -> private final ExecutorService diskIO = Executors.newSingleThreadExecutor({r ->
Thread t = new Thread(r, "disk-io") Thread t = new Thread(r, "disk-io")
t.setDaemon(true) t.setDaemon(true)
@ -25,25 +33,54 @@ class WatchedDirectoryManager {
WatchedDirectoryManager(File home, EventBus eventBus, FileManager fileManager) { WatchedDirectoryManager(File home, EventBus eventBus, FileManager fileManager) {
this.home = new File(home, "directories") this.home = new File(home, "directories")
this.home.mkdir()
this.eventBus = eventBus this.eventBus = eventBus
this.fileManager = fileManager this.fileManager = fileManager
} }
public boolean isWatched(File f) {
watchedDirs.containsKey(f)
}
public void shutdown() { public void shutdown() {
diskIO.shutdown() diskIO.shutdown()
timer.cancel() timer.cancel()
} }
void onWatchedDirectoryConfigurationEvent(WatchedDirectoryConfigurationEvent e) { void onWatchedDirectoryConfigurationEvent(WatchedDirectoryConfigurationEvent e) {
if (!converting) { if (converting) {
// update state def newDir = new WatchedDirectory(e.directory)
// conversion is always autowatch really
newDir.autoWatch = e.autoWatch
persist(newDir)
} else {
// TODO: update state and stuff
} }
// always persist
} }
void onWatchedDirectoryConvertedEvent(WatchedDirectoryConvertedEvent e) { void onWatchedDirectoryConvertedEvent(WatchedDirectoryConvertedEvent e) {
converting = false converting = false
// load diskIO.submit({
def slurper = new JsonSlurper()
Files.walk(home.toPath()).filter({
it.getFileName().toString().endsWith(".json")
}).
forEach {
def parsed = slurper.parse(it.toFile())
WatchedDirectory wd = WatchedDirectory.fromJson(parsed)
watchedDirs.put(wd.directory, wd)
if (wd.autoWatch)
eventBus.publish(new DirectoryWatchedEvent(directory : wd.directory))
}
} as Runnable)
}
private void persist(WatchedDirectory dir) {
diskIO.submit({
def json = JsonOutput.toJson(dir.toJson())
def targetFile = new File(home, dir.getEncodedName() + ".json")
targetFile.text = json
} as Runnable)
} }
} }