Files
BagheeraView/duplicatecache.py
Ignacio Serantes 07afab6ca3 v0.9.19
2026-04-08 15:47:29 +02:00

894 lines
36 KiB
Python

"""
Duplicate Cache and Detection Module for Bagheera.
This module provides the core logic for detecting duplicate images using
perceptual hashing (dHash) and managing a persistent cache of these hashes
and their relationships using LMDB.
Classes:
DuplicateCache: Manages the LMDB database for hashes and exceptions.
DuplicateDetector: Background thread that performs the duplicate analysis.
"""
import os
import logging
import struct
import time
import collections
import shutil
import lmdb
from pathlib import Path
import PIL.Image
from PySide6.QtCore import (
QObject, QThread, Signal, QMutex, QSemaphore, QReadWriteLock,
QMutexLocker, QReadLocker, QWriteLocker, QRunnable
)
import imagehash # For perceptual hashing
from constants import (
DUPLICATE_CACHE_PATH, DUPLICATE_HASH_DB_NAME,
DUPLICATE_EXCEPTIONS_DB_NAME, DUPLICATE_PENDING_DB_NAME,
MAX_DHASH_DISTANCE, UITexts
)
logger = logging.getLogger(__name__)
# Result structure for duplicate detection
DuplicateResult = collections.namedtuple(
'DuplicateResult',
['path1', 'path2', 'hash_value', 'is_exception', 'similarity', 'timestamp'])
class BKTree:
"""A Burkhard-Keller tree for efficient similarity searching using Hamming
distance."""
def __init__(self, distance_func):
self.distance_func = distance_func
self.tree = None
def add(self, item):
if self.tree is None:
self.tree = (item, {})
return
node = self.tree
while True:
val, children = node
dist = self.distance_func(item, val)
if dist == 0:
return
if dist in children:
node = children[dist]
else:
children[dist] = (item, {})
break
def query(self, item, max_dist):
if self.tree is None:
return []
results = []
candidates = [self.tree]
while candidates:
val, children = candidates.pop()
dist = self.distance_func(item, val)
if dist <= max_dist:
results.append((val, dist))
for d in range(max(0, dist - max_dist), dist + max_dist + 1):
if d in children:
candidates.append(children[d])
return results
class HashWorker(QRunnable):
"""Worker to calculate image hash in a thread pool."""
def __init__(self, path, detector, result_dict, mutex, semaphore):
super().__init__()
self.path = path
self.detector = detector
self.result_dict = result_dict
self.mutex = mutex
self.semaphore = semaphore
def run(self):
if self.detector._is_running:
try:
# imagehash requires a PIL/Pillow image object.
with PIL.Image.open(self.path) as pil_img:
# Using dHash from imagehash library as default
h = str(imagehash.dhash(pil_img))
with QMutexLocker(self.mutex):
self.result_dict[self.path] = h
except Exception as e:
logger.warning(f"HashWorker failed for {self.path}: {e}")
self.semaphore.release()
class DuplicateCache(QObject):
"""
Manages a persistent LMDB cache for perceptual hashes and duplicate relationships.
Uses (device_id, inode) as primary keys for robustness against file renames/moves.
"""
def __init__(self):
super().__init__()
self._lmdb_env = None
self._hash_db = None
self._exceptions_db = None
self._pending_db = None
self._db_lock = QMutex() # Protects LMDB transactions
# In-memory cache for hashes: (dev, inode) -> (hash_value, path)
self._hash_cache = {}
self._hash_cache_lock = QReadWriteLock()
self.lmdb_open()
def lmdb_open(self):
cache_dir = Path(DUPLICATE_CACHE_PATH)
cache_dir.mkdir(parents=True, exist_ok=True)
try:
self._lmdb_env = lmdb.open(
DUPLICATE_CACHE_PATH,
map_size=10 * 1024 * 1024 * 1024, # 10GB default
max_dbs=3, # For hashes, exceptions and pending
readonly=False,
create=True
)
self._hash_db = self._lmdb_env.open_db(DUPLICATE_HASH_DB_NAME)
self._exceptions_db = self._lmdb_env.open_db(DUPLICATE_EXCEPTIONS_DB_NAME)
self._pending_db = self._lmdb_env.open_db(DUPLICATE_PENDING_DB_NAME)
logger.info(f"Duplicate LMDB cache opened: {DUPLICATE_CACHE_PATH}")
except Exception as e:
logger.error(f"Failed to open duplicate LMDB cache: {e}")
self._lmdb_env = None
def lmdb_close(self):
if self._lmdb_env:
self._lmdb_env.close()
self._lmdb_env = None
self._hash_db = None
self._exceptions_db = None
self._pending_db = None
def get_hash_stats(self):
"""Returns (count, size_bytes) for the hash database."""
count = 0
if not self._lmdb_env:
return 0, 0
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=False) as txn:
count = txn.stat(db=self._hash_db)['entries']
size = 0
disk_path = os.path.join(DUPLICATE_CACHE_PATH, "data.mdb")
if os.path.exists(disk_path):
size = os.path.getsize(disk_path)
return count, size
def clear_hashes(self):
"""Clears all hashes from the database by recreating the environment."""
with QWriteLocker(self._hash_cache_lock):
self._hash_cache.clear()
self.lmdb_close()
try:
if os.path.exists(DUPLICATE_CACHE_PATH):
shutil.rmtree(DUPLICATE_CACHE_PATH)
self.lmdb_open()
logger.info("Duplicate hash cache cleared.")
except Exception as e:
logger.error(f"Error clearing duplicate LMDB: {e}")
def __del__(self):
self.lmdb_close()
@staticmethod
def _get_inode_info(path):
try:
stat_info = os.stat(path)
return stat_info.st_dev, struct.pack('Q', stat_info.st_ino)
except OSError:
return 0, None
def _get_lmdb_key(self, dev_id, inode_key_bytes):
return f"{dev_id}-{inode_key_bytes.hex()}".encode('utf-8')
def get_hash_and_path(self, dev_id, inode_key_bytes):
"""Retrieves hash, mtime and path for a given (dev_id, inode_key_bytes)."""
# Check in-memory cache first
with QReadLocker(self._hash_cache_lock):
cached_data = self._hash_cache.get((dev_id, inode_key_bytes))
if cached_data:
return cached_data # (hash_value, mtime, path)
# Check LMDB
if not self._lmdb_env:
return None, 0, None
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=False) as txn:
lmdb_key = self._get_lmdb_key(dev_id, inode_key_bytes)
value_bytes = txn.get(lmdb_key, db=self._hash_db)
if value_bytes:
# Handle format "hash_value_str|mtime|path_str" or old "hash|path"
parts = value_bytes.decode('utf-8').split('|', 2)
if len(parts) == 3:
hash_str, mtime_str, path_str = parts
mtime = float(mtime_str)
elif len(parts) == 2:
hash_str, path_str = parts
mtime = 0.0 # Force re-hash
else:
return None, 0, None
with QWriteLocker(self._hash_cache_lock):
self._hash_cache[(dev_id, inode_key_bytes)] = (
hash_str, mtime, path_str)
return hash_str, mtime, path_str
return None, 0, None
def get_hash_for_path(self, path, current_mtime, dev_id=None, inode_key_bytes=None):
if dev_id is None or inode_key_bytes is None:
dev_id, inode_key_bytes = self._get_inode_info(path)
if not inode_key_bytes:
return None
hash_value, cached_mtime, _ = self.get_hash_and_path(dev_id, inode_key_bytes)
# Return hash only if mtime matches (with small float tolerance)
if hash_value and abs(cached_mtime - current_mtime) < 0.001:
return hash_value
return None
def add_hash_for_path(self,
path, hash_value, mtime, dev_id=None, inode_key_bytes=None):
if dev_id is None or inode_key_bytes is None:
dev_id, inode_key_bytes = self._get_inode_info(path)
if not inode_key_bytes or not self._lmdb_env:
return False
value_str = f"{hash_value}|{mtime}|{path}"
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=True) as txn:
lmdb_key = self._get_lmdb_key(dev_id, inode_key_bytes)
txn.put(lmdb_key, value_str.encode('utf-8'), db=self._hash_db)
with QWriteLocker(self._hash_cache_lock):
self._hash_cache[(dev_id, inode_key_bytes)] = (hash_value, mtime, path)
return True
def remove_hash_for_path(self, path, clear_relationships=True):
"""
Removes the hash entry for a path.
Args:
path: File path.
clear_relationships: If True, also wipes all entries in pending and
exceptions DBs involving this file.
"""
dev_id, inode_key_bytes = self._get_inode_info(path)
if not inode_key_bytes or not self._lmdb_env:
return False
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=True) as txn:
lmdb_key = self._get_lmdb_key(dev_id, inode_key_bytes)
txn.delete(lmdb_key, db=self._hash_db)
with QWriteLocker(self._hash_cache_lock):
self._hash_cache.pop((dev_id, inode_key_bytes), None)
# Also remove any exceptions involving this path
if clear_relationships:
self._remove_pair_entries_for_path(
dev_id, inode_key_bytes, self._exceptions_db)
self._remove_pair_entries_for_path(
dev_id, inode_key_bytes, self._pending_db)
return True
def _get_pair_lmdb_key_from_ids(self, dev1, inode1, dev2, inode2):
# Ensure canonical order for exception keys
key_parts = sorted([f"{dev1}-{inode1.hex()}", f"{dev2}-{inode2.hex()}"])
return f"{key_parts[0]}-{key_parts[1]}".encode('utf-8')
def _get_pair_lmdb_key(self, path1, path2):
dev1, inode1 = self._get_inode_info(path1)
dev2, inode2 = self._get_inode_info(path2)
if not inode1 or not inode2:
return None
return self._get_pair_lmdb_key_from_ids(dev1, inode1, dev2, inode2)
def mark_as_exception(self,
path1, path2, is_exception=True, similarity=None,
timestamp=None):
if not self._lmdb_env:
return False
dev1, inode1 = self._get_inode_info(path1)
dev2, inode2 = self._get_inode_info(path2)
if not inode1 or not inode2:
return False
exception_key = self._get_pair_lmdb_key_from_ids(dev1, inode1, dev2, inode2)
if not exception_key:
return False
# Store paths in value to make exception recovery independent of hash DB
ts = timestamp if timestamp is not None else int(time.time())
val_str = f"{path1}|{path2}|{similarity if similarity is not None else ''}|{ts}"
value = val_str.encode('utf-8')
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=True) as txn:
if is_exception:
txn.put(exception_key, value, db=self._exceptions_db)
else:
txn.delete(exception_key, db=self._exceptions_db)
return True
def is_exception(self, path1, path2):
if not self._lmdb_env:
return False
dev1, inode1 = self._get_inode_info(path1)
dev2, inode2 = self._get_inode_info(path2)
if not inode1 or not inode2:
return False
exception_key = self._get_pair_lmdb_key_from_ids(dev1, inode1, dev2, inode2)
if not exception_key:
return False
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=False) as txn:
return txn.get(exception_key, db=self._exceptions_db) is not None
def _remove_pair_entries_for_path(self,
target_dev, target_inode, db_handle, txn=None):
"""Removes all entries involving a specific (dev, inode) pair from a pair-based
DB."""
if not self._lmdb_env:
return
target_inode_hex = target_inode.hex()
def do_remove(t):
cursor = t.cursor(db=db_handle)
keys_to_delete = []
for key_bytes, _ in cursor:
key_str = key_bytes.decode('utf-8')
parts = key_str.split('-')
if len(parts) < 4:
continue
dev1, inode1_hex, dev2, inode2_hex = int(
parts[0]), parts[1], int(parts[2]), parts[3]
if (dev1 == target_dev and inode1_hex == target_inode_hex) or \
(dev2 == target_dev and inode2_hex == target_inode_hex):
keys_to_delete.append(key_bytes)
for key in keys_to_delete:
t.delete(key, db=db_handle)
if txn:
do_remove(txn)
else:
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=True) as t:
do_remove(t)
def mark_as_pending(self,
path1, path2, is_pending=True, similarity=None, timestamp=None):
"""Marks a pair as pending review."""
if not self._lmdb_env or self._pending_db is None:
return False
key = self._get_pair_lmdb_key(path1, path2)
if not key:
return False
# Store paths in value to allow reconstruction without scanning
ts = timestamp if timestamp is not None else int(time.time())
val_str = f"{path1}|{path2}|{similarity if similarity is not None else ''}|{ts}"
value = val_str.encode('utf-8')
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=True) as txn:
if is_pending:
txn.put(key, value, db=self._pending_db)
else:
# Check if it exists before deleting to avoid errors
if txn.get(key, db=self._pending_db):
txn.delete(key, db=self._pending_db)
return True
def mark_as_pending_batch(self, pairs_data):
"""
Marks multiple pairs as pending review in a single transaction.
pairs_data: list of (path1, path2, similarity, timestamp)
"""
if not self._lmdb_env or self._pending_db is None or not pairs_data:
return False
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=True) as txn:
for p1, p2, similarity, timestamp in pairs_data:
key = self._get_pair_lmdb_key(p1, p2)
if not key:
continue
ts = timestamp if timestamp is not None else int(time.time())
sim_str = str(similarity) if similarity is not None else ""
val_str = f"{p1}|{p2}|{sim_str}|{ts}"
value = val_str.encode('utf-8')
txn.put(key, value, db=self._pending_db)
return True
def get_all_exceptions_set(self):
"""Returns a set of canonical pairs (frozenset) marked as exceptions."""
exceptions = set()
if not self._lmdb_env or self._exceptions_db is None:
return exceptions
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=False) as txn:
cursor = txn.cursor(db=self._exceptions_db)
for _, value_bytes in cursor:
try:
parts = value_bytes.decode('utf-8').split('|')
if len(parts) >= 2:
exceptions.add(frozenset((parts[0], parts[1])))
except Exception:
continue
return exceptions
def get_all_pending_duplicates(self):
"""Retrieves all pending duplicate pairs from the database."""
results = []
if not self._lmdb_env or self._pending_db is None:
return results
keys_to_delete = []
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=False) as txn:
cursor = txn.cursor(db=self._pending_db)
for key, value_bytes in cursor:
try:
parts = value_bytes.decode('utf-8').split('|')
p1, p2 = parts[0], parts[1]
sim = int(parts[2]) if len(parts) > 2 and parts[2] else None
ts = int(parts[3]) if len(parts) > 3 else 0
if os.path.exists(p1) and os.path.exists(p2):
results.append(
DuplicateResult(p1, p2, None, False, sim, ts))
else:
keys_to_delete.append(key)
except Exception:
keys_to_delete.append(key)
continue
if keys_to_delete:
try:
with self._lmdb_env.begin(write=True) as txn:
for k in keys_to_delete:
txn.delete(k, db=self._pending_db)
logger.info(f"Cleaned up {len(keys_to_delete)} invalid "
"pending duplicates (files deleted externally)")
except Exception as e:
logger.error(f"Error cleaning up pending duplicates from DB: {e}")
return results
def get_all_exceptions(self):
"""Retrieves all duplicate pairs marked as exceptions from the database."""
results = []
if not self._lmdb_env or self._exceptions_db is None:
return results
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=False) as txn:
cursor = txn.cursor(db=self._exceptions_db)
for key_bytes, value_bytes in cursor:
try:
p1, p2 = None, None
sim = None
ts = 0
val_str = value_bytes.decode('utf-8')
if '|' in val_str:
# New format: paths are stored in the value
parts = val_str.split('|')
if len(parts) >= 2:
p1, p2 = parts[0], parts[1]
if len(parts) > 2 and parts[2]:
sim = int(parts[2])
if len(parts) > 3:
ts = int(parts[3])
else:
ts = int(os.path.getmtime(p1)) \
if os.path.exists(p1) else 0
if not p1 or not p2:
# Legacy format fallback: lookup paths in hash db
key_str = key_bytes.decode('utf-8')
kp = key_str.split('-')
if len(kp) == 4:
k1, k2 = f"{kp[0]}-{kp[1]}".encode(),
f"{kp[2]}-{kp[3]}".encode()
v1, v2 = txn.get(k1, db=self._hash_db), \
txn.get(k2, db=self._hash_db)
if v1 and v2:
# Format is hash|mtime|path|dist... path is always
# index 2
p1 = v1.decode('utf-8').split('|')[2]
p2 = v2.decode('utf-8').split('|')[2]
if p1 and p2:
if os.path.exists(p1) and os.path.exists(p2):
results.append(
DuplicateResult(p1, p2, None, True, sim, ts))
except Exception:
continue
return results
def clean_stale_hashes(self):
"""
Removes hash entries from the database for files that no longer exist on disk.
"""
if not self._lmdb_env or self._hash_db is None:
return 0
keys_to_delete = []
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=False) as txn:
cursor = txn.cursor(db=self._hash_db)
for key, value_bytes in cursor:
try:
# value_bytes is "hash|mtime|path|last_dist"
parts = value_bytes.decode('utf-8').split('|')
if len(parts) >= 3:
path = parts[2]
if not os.path.exists(path):
keys_to_delete.append(key)
except Exception:
keys_to_delete.append(key) # Corrupted entry
continue
if keys_to_delete:
with self._lmdb_env.begin(write=True) as txn:
for k in keys_to_delete:
txn.delete(k, db=self._hash_db)
logger.info(f"Cleaned up {len(keys_to_delete)} stale hash "
"entries (files deleted externally)")
return len(keys_to_delete)
def get_all_hashes_with_paths(self):
"""Retrieves all hashes from the database along with their associated paths and
inode info."""
# hash_value -> [(path, dev_id, inode_key_bytes)]
all_hashes = collections.defaultdict(list)
if not self._lmdb_env:
return all_hashes
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=False) as txn:
cursor = txn.cursor(db=self._hash_db)
for key_bytes, value_bytes in cursor:
# key_bytes is like "dev_id-inode_hex"
key_str = key_bytes.decode('utf-8')
parts = key_str.split('-')
dev_id = int(parts[0])
inode_key_bytes = bytes.fromhex(parts[1])
# value_bytes is "hash|mtime|path|last_dist"
parts_val = value_bytes.decode('utf-8').split('|')
if len(parts_val) >= 3:
hash_value = parts_val[0]
path = parts_val[2]
else:
continue
all_hashes[hash_value].append((path, dev_id, inode_key_bytes))
return all_hashes
def rename_entry(self, old_path, new_path):
"""
Updates the cache entry for a file that has been renamed or moved.
This involves deleting the old (dev, inode) entry and adding a new one
with the new (dev, inode) and path, preserving the hash value.
"""
old_dev, old_inode_key_bytes = self._get_inode_info(old_path)
new_dev, new_inode_key_bytes = self._get_inode_info(new_path)
if not old_inode_key_bytes or not new_inode_key_bytes or not self._lmdb_env:
return False
# If the (dev, inode) pair is the same, only the path in the value needs
# updating.
# This happens if the file is renamed within the same filesystem.
if (old_dev, old_inode_key_bytes) == (new_dev, new_inode_key_bytes):
hash_value, mtime, _ = self.get_hash_and_path(old_dev, old_inode_key_bytes)
if hash_value:
self.add_hash_for_path(new_path, hash_value, mtime)
self._update_pair_paths(old_path, new_path, self._pending_db)
return True
return False
# If (dev, inode) changed (cross-filesystem move), we need to:
# 1. Get the hash from the old entry.
# 2. Remove the old entry.
# 3. Add a new entry with the new (dev, inode) and path, using the old hash.
hash_value, mtime, _ = self.get_hash_and_path(old_dev, old_inode_key_bytes)
if hash_value:
# This removes the old (dev, inode) entry
self.remove_hash_for_path(old_path)
# Adds new (dev, inode) entry
self.add_hash_for_path(new_path, hash_value, mtime)
self._update_pair_paths(old_path, new_path, self._pending_db)
return True
return False
def _update_pair_paths(self, old_path, new_path, db_handle):
"""Updates stored paths in a pair-based DB value when a file is renamed."""
if not self._lmdb_env or db_handle is None:
return
with QMutexLocker(self._db_lock):
with self._lmdb_env.begin(write=True) as txn:
cursor = txn.cursor(db=db_handle)
for key, value_bytes in cursor:
val_str = value_bytes.decode('utf-8')
if old_path in val_str:
p1, p2 = val_str.split('|')
np1 = new_path if p1 == old_path else p1
np2 = new_path if p2 == old_path else p2
txn.put(key, f"{np1}|{np2}".encode('utf-8'), db=db_handle)
class DuplicateDetector(QThread):
"""
Worker thread for detecting duplicate images using perceptual hashing.
"""
progress_update = Signal(int, int, str) # current, total, message
duplicates_found = Signal(list) # List of DuplicateResult
detection_finished = Signal()
def __init__(self,
paths_to_scan, duplicate_cache, pool_manager,
method="histogram_hashing", threshold=90, force_full=False):
super().__init__()
self.paths_to_scan = paths_to_scan
self.duplicate_cache = duplicate_cache
self.pool_manager = pool_manager
self.method = method
self.threshold = threshold # Similarity percentage (50-100)
self.force_full = force_full
self._is_running = True
def stop(self):
self._is_running = False
self.wait() # Add this line
def run(self):
total_files = len(self.paths_to_scan)
found_duplicates = []
# To store frozenset((path1, path2)) for uniqueness
unique_duplicate_pairs = set()
last_update_time = 0
pool = self.pool_manager.get_pool()
# 1. Load existing pending duplicates from cache to avoid recalculation (unless
# force_full)
if not self.force_full:
pending = self.duplicate_cache.get_all_pending_duplicates()
for p in pending:
if p.path1 in self.paths_to_scan and p.path2 in self.paths_to_scan:
if p.similarity is None or p.similarity >= self.threshold:
found_duplicates.append(p)
unique_duplicate_pairs.add(frozenset((p.path1, p.path2)))
# Convert similarity threshold (percentage) to Hamming distance
distance_threshold = int(MAX_DHASH_DISTANCE * (100 - self.threshold) / 100)
logger.info(
f"Duplicate detection: Method={self.method}, "
f"Similarity Threshold={self.threshold}%, Hamming "
f"Distance Threshold={distance_threshold}")
# 2. Phase 1: Hash Collection (Parallelized)
path_to_hash = {}
dirty_hashes_objs = set()
dirty_paths = set()
paths_to_hash_parallel = []
processed_initial = 0
for i, path in enumerate(self.paths_to_scan):
if not self._is_running:
break
try:
stat_info = os.stat(path)
mtime = stat_info.st_mtime
dev, inode = stat_info.st_dev, struct.pack('Q', stat_info.st_ino)
# Update UI during initial cache check (Phase 1 part A)
processed_initial += 1
cached_h = \
self.duplicate_cache.get_hash_for_path(path, mtime, dev, inode)
if cached_h:
path_to_hash[path] = (cached_h, dev, inode)
else:
dirty_paths.add(path)
paths_to_hash_parallel.append((path, mtime, dev, inode))
if time.perf_counter() - last_update_time > 0.05:
# Scale this part to 0-50% of the total bar
progress = int((processed_initial / total_files) * total_files)
self.progress_update.emit(
progress, total_files * 2,
UITexts.DUPLICATE_MSG_HASHING.format(filename="..."))
last_update_time = time.perf_counter()
except OSError:
continue
if paths_to_hash_parallel and self._is_running:
batch_size = pool.maxThreadCount() * 2
results_mutex = QMutex()
new_hashes = {}
sem = QSemaphore(0)
# Phase 1 part B: Parallel hashing for new/changed files
processed_hashing = total_files - len(paths_to_hash_parallel)
for i in range(0, len(paths_to_hash_parallel), batch_size):
if not self._is_running:
break
current_batch = paths_to_hash_parallel[i : i + batch_size]
for p_data in current_batch:
pool.start(HashWorker(
p_data[0], self, new_hashes, results_mutex, sem))
for j in range(len(current_batch)):
while not sem.tryAcquire(1, 100):
if not self._is_running:
break
if not self._is_running:
break
processed_hashing += 1
if time.perf_counter() - last_update_time > 0.03:
self.progress_update.emit(
processed_hashing, total_files * 2,
UITexts.DUPLICATE_MSG_HASHING.format(filename="..."))
last_update_time = time.perf_counter()
for p, mtime, dev, inode in paths_to_hash_parallel:
h = new_hashes.get(p)
if h:
path_to_hash[p] = (h, dev, inode)
dirty_hashes_objs.add(imagehash.hex_to_hash(h))
self.duplicate_cache.add_hash_for_path(p, h, mtime, dev, inode)
if not self._is_running:
self.detection_finished.emit()
return
# 3. Phase 2: Comparison (Optimized with BK-Tree)
hash_map = collections.defaultdict(list)
bk_tree = BKTree(lambda a, b: a - b)
path_items = list(path_to_hash.items())
total_items = len(path_items)
for i, (p, (h_str, dev, inode)) in enumerate(path_items):
if not self._is_running:
break
# Sub-phase: Indexing hashes into the BK-Tree for comparison
if time.perf_counter() - last_update_time > 0.05 \
or i == 0 or i == total_items - 1:
# Scale Indexing to 50% - 75% range of the total bar
indexing_progress = int((i / total_items) * (total_files / 2)) \
if total_items > 0 else 0
self.progress_update.emit(
total_files + indexing_progress, total_files * 2,
UITexts.DUPLICATE_MSG_ANALYZING.format(filename="..."))
last_update_time = time.perf_counter()
h_obj = imagehash.hex_to_hash(h_str)
if h_obj not in hash_map:
bk_tree.add(h_obj)
hash_map[h_obj].append((p, dev, inode))
if self.force_full or p in dirty_paths:
dirty_hashes_objs.add(h_obj)
# Optimization: Only query the tree for hashes associated with new or modified
# files.
# This finds pairs (Dirty, Clean) and (Dirty, Dirty). (Clean, Clean) were
# handled in previous runs.
hashes_to_query = list(dirty_hashes_objs) \
if not self.force_full else list(hash_map.keys())
total_queries = len(hashes_to_query)
pending_db_updates = []
# Pre-load exceptions into memory to avoid thousands of DB lookups
self.progress_update.emit(
total_files, total_files * 2,
UITexts.DUPLICATE_MSG_ANALYZING.format(filename="..."))
exceptions_set = self.duplicate_cache.get_all_exceptions_set()
if total_queries == 0:
# Nothing new to analyze, jump to end of detection phase
self.progress_update.emit(
total_files * 2, total_files * 2,
UITexts.DUPLICATE_MSG_ANALYZING.format(filename="... (OK)"))
for i, h1 in enumerate(hashes_to_query):
if not self._is_running:
break
items1 = hash_map[h1]
# Update progress more frequently during analysis phase
if time.perf_counter() - last_update_time > 0.05 \
or i == 0 or i == total_queries - 1:
# Scale Comparison to 75% - 100% range
comparison_progress = int(((i + 1) / total_queries)
* (total_files / 2)) \
if total_queries > 0 else (total_files / 2)
self.progress_update.emit(
int(total_files * 1.5 + comparison_progress), total_files * 2,
UITexts.DUPLICATE_MSG_ANALYZING.format(filename="..."))
last_update_time = time.perf_counter()
# Query tree for similar hashes
for h2, distance in bk_tree.query(h1, distance_threshold):
items2 = hash_map[h2]
for p1, dev1, ino1 in items1:
for p2, dev2, ino2 in items2:
if not self._is_running:
break
if (dev1, ino1) == (dev2, ino2):
continue
# Optimization: Skip pair if BOTH were already verified
if not self.force_full \
and p1 not in dirty_paths and p2 not in dirty_paths:
continue
canonical = frozenset((p1, p2))
if not self._is_running:
break
if canonical not in unique_duplicate_pairs:
if canonical not in exceptions_set:
sim = int((1.0 - (distance / MAX_DHASH_DISTANCE)) * 100)
ts = int(time.time())
res = DuplicateResult(p1, p2, str(h1), False, sim, ts)
found_duplicates.append(res)
unique_duplicate_pairs.add(canonical)
# Frequent UI heartbeat for large duplicate groups
if time.perf_counter() - last_update_time > 0.05:
comparison_progress = int(((i + 1) / total_queries)
* (total_files / 2))
self.progress_update.emit(
int(total_files * 1.5 + comparison_progress),
total_files * 2,
UITexts.DUPLICATE_MSG_ANALYZING.format(
filename="..."))
last_update_time = time.perf_counter()
# Collect for batch update to improve performance
pending_db_updates.append((p1, p2, sim, ts))
# Periodically flush pending updates to DB
if len(pending_db_updates) >= 50:
self.duplicate_cache.mark_as_pending_batch(pending_db_updates)
pending_db_updates = []
# Final flush of remaining updates
if pending_db_updates:
self.duplicate_cache.mark_as_pending_batch(pending_db_updates)
self.duplicates_found.emit(found_duplicates)
self.detection_finished.emit()