""" Processed files registry - Optimized version with bloom filter and better caching """ import fcntl import logging import time from pathlib import Path from typing import Set, Optional from datetime import datetime, timedelta from config import settings class BloomFilter: """Simple Bloom Filter for fast membership testing""" def __init__(self, size: int = 10000, hash_count: int = 3): self.size = size self.hash_count = hash_count self.bit_array = [0] * size def _hashes(self, item: str) -> list[int]: """Generate hash positions for item""" import hashlib digest = hashlib.md5(item.encode()).digest() return [ int.from_bytes(digest[i:i+4], 'big') % self.size for i in range(0, min(self.hash_count * 4, len(digest)), 4) ] def add(self, item: str) -> None: for pos in self._hashes(item): self.bit_array[pos] = 1 def might_contain(self, item: str) -> bool: return all(self.bit_array[pos] for pos in self._hashes(item)) class ProcessedRegistry: """Registry for tracking processed files with caching and file locking""" def __init__(self): self.logger = logging.getLogger(__name__) self._cache: Set[str] = set() self._cache_time: Optional[float] = None self._cache_ttl = 300 # 5 minutos (antes era 60s) self._initialized = False self._bloom_filter = BloomFilter(size=10000, hash_count=3) self._write_lock = False # Write batching def initialize(self) -> None: """Initialize the registry""" self.load() self._initialized = True self.logger.info(f"Processed registry initialized ({self.count()} files)") def load(self) -> Set[str]: """Load processed files from disk with caching""" now = time.time() # Return cached data if still valid if self._cache and self._cache_time: age = now - self._cache_time if age < self._cache_ttl: return self._cache # Return reference, not copy for read-only processed = set() registry_path = settings.processed_files_path try: registry_path.parent.mkdir(parents=True, exist_ok=True) if registry_path.exists(): with open(registry_path, 'r', encoding='utf-8') as f: for raw_line in f: line = raw_line.strip() if line and not line.startswith('#'): processed.add(line) # Add basename for both path and basename lookups base_name = Path(line).name processed.add(base_name) # Update bloom filter self._bloom_filter.add(line) self._bloom_filter.add(base_name) except Exception as e: self.logger.error(f"Error reading processed files registry: {e}") self._cache = processed self._cache_time = now return processed # Return reference, not copy def save(self, file_path: str) -> None: """Add file to processed registry with file locking""" if not file_path: return registry_path = settings.processed_files_path try: registry_path.parent.mkdir(parents=True, exist_ok=True) # Check cache first if file_path in self._cache: return # Append to file with open(registry_path, 'a', encoding='utf-8') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: f.write(file_path + "\n") # Update in-memory structures self._cache.add(file_path) self._bloom_filter.add(file_path) self._cache_time = time.time() self.logger.debug(f"Added {file_path} to processed registry") finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except Exception as e: self.logger.error(f"Error saving to processed files registry: {e}") raise def is_processed(self, file_path: str) -> bool: """Check if file has been processed - O(1) with bloom filter""" if not self._initialized: self.initialize() # Fast bloom filter check first if not self._bloom_filter.might_contain(file_path): return False # Check cache (O(1) for both full path and basename) if file_path in self._cache: return True basename = Path(file_path).name if basename in self._cache: return True return False def save_batch(self, file_paths: list[str]) -> int: """Add multiple files to registry efficiently""" saved_count = 0 try: registry_path = settings.processed_files_path registry_path.parent.mkdir(parents=True, exist_ok=True) with open(registry_path, 'a', encoding='utf-8') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: lines_to_write = [] for file_path in file_paths: if file_path and file_path not in self._cache: lines_to_write.append(file_path + "\n") self._cache.add(file_path) self._bloom_filter.add(file_path) saved_count += 1 if lines_to_write: f.writelines(lines_to_write) self._cache_time = time.time() self.logger.debug(f"Added {saved_count} files to processed registry") finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except Exception as e: self.logger.error(f"Error saving batch to processed files registry: {e}") return saved_count def remove(self, file_path: str) -> bool: """Remove file from processed registry""" registry_path = settings.processed_files_path try: if not registry_path.exists(): return False lines_to_keep = [] with open(registry_path, 'r', encoding='utf-8') as f: for line in f: stripped = line.strip() if stripped != file_path and Path(stripped).name != Path(file_path).name: lines_to_keep.append(line) with open(registry_path, 'w', encoding='utf-8') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: f.writelines(lines_to_keep) self._cache.discard(file_path) self._cache.discard(Path(file_path).name) # Rebuild bloom filter self._bloom_filter = BloomFilter(size=10000, hash_count=3) for item in self._cache: self._bloom_filter.add(item) finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) return True except Exception as e: self.logger.error(f"Error removing from processed files registry: {e}") return False def clear(self) -> None: """Clear the entire registry""" registry_path = settings.processed_files_path try: if registry_path.exists(): registry_path.unlink() self._cache.clear() self._cache_time = None self._bloom_filter = BloomFilter(size=10000, hash_count=3) self.logger.info("Processed files registry cleared") except Exception as e: self.logger.error(f"Error clearing processed files registry: {e}") raise def get_all(self) -> Set[str]: """Get all processed files""" if not self._initialized: self.initialize() return self._cache.copy() def count(self) -> int: """Get count of processed files""" if not self._initialized: self.initialize() return len(self._cache) def get_stats(self) -> dict: """Get registry statistics""" return { "total_files": len(self._cache), "cache_age_seconds": time.time() - self._cache_time if self._cache_time else 0, "cache_ttl_seconds": self._cache_ttl, "bloom_filter_size": self._bloom_filter.size, "initialized": self._initialized } # Global instance processed_registry = ProcessedRegistry()