""" Processed files registry using repository pattern """ import fcntl import logging from pathlib import Path from typing import Set, Optional from datetime import datetime, timedelta from ..config import settings class ProcessedRegistry: """Registry for tracking processed files with caching and file locking""" def __init__(self): self.logger = logging.getLogger(__name__) self._cache: Set[str] = set() self._cache_time: Optional[datetime] = None self._cache_ttl = 60 self._initialized = False def initialize(self) -> None: """Initialize the registry""" self.load() self._initialized = True def load(self) -> Set[str]: """Load processed files from disk with caching""" now = datetime.utcnow() if self._cache and self._cache_time and (now - self._cache_time).total_seconds() < self._cache_ttl: return self._cache.copy() processed = set() registry_path = settings.processed_files_path try: registry_path.parent.mkdir(parents=True, exist_ok=True) if registry_path.exists(): with open(registry_path, 'r', encoding='utf-8') as f: for raw_line in f: line = raw_line.strip() if line and not line.startswith('#'): processed.add(line) base_name = Path(line).name processed.add(base_name) except Exception as e: self.logger.error(f"Error reading processed files registry: {e}") self._cache = processed self._cache_time = now return processed.copy() def save(self, file_path: str) -> None: """Add file to processed registry with file locking""" if not file_path: return registry_path = settings.processed_files_path try: registry_path.parent.mkdir(parents=True, exist_ok=True) with open(registry_path, 'a', encoding='utf-8') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: if file_path not in self._cache: f.write(file_path + "\n") self._cache.add(file_path) self.logger.debug(f"Added {file_path} to processed registry") finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except Exception as e: self.logger.error(f"Error saving to processed files registry: {e}") raise def is_processed(self, file_path: str) -> bool: """Check if file has been processed""" if not self._initialized: self.initialize() if file_path in self._cache: return True basename = Path(file_path).name if basename in self._cache: return True return False def remove(self, file_path: str) -> bool: """Remove file from processed registry""" registry_path = settings.processed_files_path try: if not registry_path.exists(): return False lines_to_keep = [] with open(registry_path, 'r', encoding='utf-8') as f: for line in f: if line.strip() != file_path and Path(line.strip()).name != Path(file_path).name: lines_to_keep.append(line) with open(registry_path, 'w', encoding='utf-8') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: f.writelines(lines_to_keep) self._cache.discard(file_path) self._cache.discard(Path(file_path).name) finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) return True except Exception as e: self.logger.error(f"Error removing from processed files registry: {e}") return False def clear(self) -> None: """Clear the entire registry""" registry_path = settings.processed_files_path try: if registry_path.exists(): registry_path.unlink() self._cache.clear() self._cache_time = None self.logger.info("Processed files registry cleared") except Exception as e: self.logger.error(f"Error clearing processed files registry: {e}") raise def get_all(self) -> Set[str]: """Get all processed files""" if not self._initialized: self.initialize() return self._cache.copy() def count(self) -> int: """Get count of processed files""" if not self._initialized: self.initialize() return len(self._cache) # Global instance processed_registry = ProcessedRegistry()