204 lines
8.6 KiB
Python
204 lines
8.6 KiB
Python
import os
|
|
try:
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
HAVE_WATCHDOG = True
|
|
except ImportError:
|
|
HAVE_WATCHDOG = False
|
|
from PySide6.QtCore import QObject, Signal, QTimer
|
|
from constants import IMAGE_EXTENSIONS
|
|
|
|
|
|
class FileSystemWatcher(QObject):
|
|
"""
|
|
Monitors file system events (created, deleted, modified) for specified directories.
|
|
Emits signals to notify the main application thread of changes.
|
|
"""
|
|
file_created = Signal(str)
|
|
file_deleted = Signal(str)
|
|
file_modified = Signal(str)
|
|
_file_modified_from_handler = Signal(str) # Internal signal from handler thread
|
|
file_moved = Signal(str, str)
|
|
monitoring_status_changed = Signal(bool) # Nuevo: Señal para el estado de monitoreo
|
|
directory_moved = Signal(str, str)
|
|
directory_modified = Signal(str) # For changes that might not be specific files
|
|
|
|
_modified_events_queue = {} # {path: QTimer}
|
|
|
|
def __init__(self, parent=None):
|
|
super().__init__(parent)
|
|
self._watched_directories = set()
|
|
|
|
if HAVE_WATCHDOG:
|
|
self._observer = Observer()
|
|
self._event_handler = self._Handler(self)
|
|
self._observer.start()
|
|
else:
|
|
self._observer = None # Keep observer as None if watchdog is not available
|
|
|
|
# Debounce timer for modified events to avoid multiple signals for a single save
|
|
self._debounce_interval = 500 # milliseconds
|
|
|
|
# Connect the internal signal to the debouncing slot
|
|
if HAVE_WATCHDOG:
|
|
self._file_modified_from_handler.connect(self._on_file_modified_debounced)
|
|
|
|
def _on_file_modified_debounced(self, path):
|
|
"""Slot to handle modified events from the watchdog thread, debounced in the
|
|
main thread."""
|
|
# Debounce timer for modified events to avoid multiple signals for a single save
|
|
if path in self._modified_events_queue:
|
|
self._modified_events_queue[path].stop()
|
|
else:
|
|
# Ensure timer lives in the main thread (parent is self)
|
|
timer = QTimer(self)
|
|
timer.setSingleShot(True)
|
|
timer.setInterval(self._debounce_interval)
|
|
timer.timeout.connect(lambda p=path: self._emit_modified_after_debounce(p))
|
|
self._modified_events_queue[path] = timer
|
|
self._modified_events_queue[path].start()
|
|
|
|
def _emit_modified_after_debounce(self, path):
|
|
"""Emits the file_modified signal after the debounce period."""
|
|
self.file_modified.emit(path)
|
|
if path in self._modified_events_queue:
|
|
# Safely delete the QTimer object when done
|
|
self._modified_events_queue[path].deleteLater()
|
|
del self._modified_events_queue[path]
|
|
|
|
def add_path(self, path):
|
|
"""Adds a directory to be monitored."""
|
|
if not HAVE_WATCHDOG or self._observer is None:
|
|
return
|
|
|
|
# Normalize and expand path to ensure consistent comparison
|
|
abs_path = os.path.normpath(os.path.abspath(os.path.expanduser(path)))
|
|
|
|
# 1. Check if path is already covered by an existing watch (exact or parent)
|
|
for watched in self._watched_directories:
|
|
if abs_path == watched:
|
|
return
|
|
parent_prefix = watched if watched.endswith(os.sep) else watched + os.sep
|
|
if abs_path.startswith(parent_prefix):
|
|
return # Path is a subdirectory of an already watched directory
|
|
|
|
old_monitoring_state = bool(self._watched_directories)
|
|
|
|
# 2. Check if this new path covers existing watches (is a parent of them)
|
|
# If so, consolidate them into this single parent watch
|
|
child_prefix = abs_path if abs_path.endswith(os.sep) else abs_path + os.sep
|
|
covered_children = [w for w in self._watched_directories
|
|
if w.startswith(child_prefix)]
|
|
|
|
try:
|
|
if covered_children:
|
|
self._observer.unschedule_all()
|
|
for child in covered_children:
|
|
self._watched_directories.remove(child)
|
|
self._watched_directories.add(abs_path)
|
|
for p in self._watched_directories:
|
|
self._observer.schedule(self._event_handler, p, recursive=True)
|
|
print(f"Consolidated monitoring at parent: {abs_path}")
|
|
else:
|
|
self._observer.schedule(self._event_handler, abs_path, recursive=True)
|
|
self._watched_directories.add(abs_path)
|
|
print(f"Monitoring: {abs_path}")
|
|
except Exception as e:
|
|
print(f"Error scheduling watchdog for {abs_path}: {e}")
|
|
return
|
|
|
|
if not old_monitoring_state and self._watched_directories:
|
|
self.monitoring_status_changed.emit(True)
|
|
|
|
def remove_path(self, path):
|
|
"""Removes a directory from monitoring."""
|
|
if not HAVE_WATCHDOG or self._observer is None:
|
|
return
|
|
abs_path = os.path.normpath(os.path.abspath(os.path.expanduser(path)))
|
|
if abs_path in self._watched_directories:
|
|
old_monitoring_state = bool(self._watched_directories)
|
|
self._observer.unschedule_all() # Simpler to unschedule all and re-add
|
|
self._watched_directories.remove(abs_path)
|
|
for p in list(self._watched_directories): # Iterate over a copy
|
|
self._observer.schedule(self._event_handler, p, recursive=True)
|
|
print(f"Stopped monitoring: {abs_path}")
|
|
if HAVE_WATCHDOG and old_monitoring_state and not self._watched_directories:
|
|
self.monitoring_status_changed.emit(False)
|
|
|
|
def clear_paths(self):
|
|
"""Clears all monitored paths."""
|
|
if not HAVE_WATCHDOG or not self._observer:
|
|
return
|
|
|
|
old_monitoring_state = bool(self._watched_directories)
|
|
self._observer.unschedule_all()
|
|
self._watched_directories.clear()
|
|
print("Cleared all monitored paths.")
|
|
if old_monitoring_state:
|
|
self.monitoring_status_changed.emit(False)
|
|
|
|
def stop(self):
|
|
"""Stops the file system observer."""
|
|
if HAVE_WATCHDOG and self._observer:
|
|
self._observer.stop()
|
|
self._observer.join()
|
|
|
|
for timer in self._modified_events_queue.values():
|
|
timer.stop()
|
|
|
|
if HAVE_WATCHDOG:
|
|
print("FileSystemWatcher stopped.")
|
|
|
|
if HAVE_WATCHDOG:
|
|
class _Handler(FileSystemEventHandler):
|
|
# Signal to communicate to main thread
|
|
file_modified_from_thread = Signal(str)
|
|
"""Custom event handler for watchdog events."""
|
|
def __init__(self, watcher):
|
|
super().__init__()
|
|
self.watcher = watcher
|
|
|
|
def on_created(self, event):
|
|
if event.is_directory:
|
|
self.watcher.directory_modified.emit(event.src_path)
|
|
return
|
|
if self._is_image_file(event.src_path):
|
|
self.watcher.file_created.emit(event.src_path)
|
|
|
|
def on_deleted(self, event):
|
|
if event.is_directory:
|
|
self.watcher.directory_modified.emit(event.src_path)
|
|
return
|
|
if self._is_image_file(event.src_path):
|
|
self.watcher.file_deleted.emit(event.src_path)
|
|
|
|
def on_moved(self, event):
|
|
if event.is_directory:
|
|
self.watcher.directory_moved.emit(event.src_path, event.dest_path)
|
|
self.watcher.directory_modified.emit(event.src_path)
|
|
self.watcher.directory_modified.emit(event.dest_path)
|
|
return
|
|
self.watcher.file_moved.emit(event.src_path, event.dest_path)
|
|
|
|
def on_closed(self, event):
|
|
if event.is_directory:
|
|
self.watcher.directory_modified.emit(event.src_path)
|
|
return
|
|
if self._is_image_file(event.src_path):
|
|
self.watcher.file_modified.emit(event.src_path)
|
|
|
|
def on_modified(self, event):
|
|
if event.is_directory:
|
|
self.watcher.directory_modified.emit(event.src_path)
|
|
return
|
|
if self._is_image_file(event.src_path):
|
|
self.watcher._file_modified_from_handler.emit(event.src_path)
|
|
|
|
def _emit_modified(self, path):
|
|
self.watcher.file_modified.emit(path)
|
|
if path in self.watcher._modified_events_queue:
|
|
del self.watcher._modified_events_queue[path]
|
|
|
|
def _is_image_file(self, path):
|
|
return os.path.splitext(path)[1].lower() in IMAGE_EXTENSIONS
|