""" diversity_memory.py - Sistema de memoria de diversidad entre generaciones Persistencia cross-generation para evitar repetición de familias de samples. Incluye TTL automático, penalización acumulativa y thread-safety. """ import json import logging import os import threading from collections import defaultdict from pathlib import Path from typing import Dict, List, Optional, Tuple, Any from datetime import datetime logger = logging.getLogger("DiversityMemory") # ============================================================================= # CONFIGURACIÓN # ============================================================================= DIVERSITY_MEMORY_FILE = "diversity_memory.json" MAX_GENERATIONS_TTL = 10 # Familias expiran después de 10 generaciones CRITICAL_ROLES = {'kick', 'clap', 'hat', 'hat_closed', 'hat_open', 'bass_loop', 'vocal_loop', 'top_loop'} # Fórmula de penalización acumulativa # 0 usos → 1.0 (sin penalización) # 1 uso → 0.7 (penalización leve) # 2 usos → 0.5 (penalización media) # 3+ usos → 0.3 (penalización fuerte) PENALTY_FORMULA = {0: 1.0, 1: 0.7, 2: 0.5, 3: 0.3} MAX_PENALTY = 0.3 # Keywords para detección de familias FAMILY_KEYWORDS = { # Drums por tipo de máquina '808': ['808', 'tr808', 'tr-808', 'eight-oh-eight'], '909': ['909', 'tr909', 'tr-909', 'nine-oh-nine'], '707': ['707', 'tr707'], '606': ['606', 'tr606'], 'acoustic': ['acoustic', 'real', 'live', 'studio', 'analog_real'], 'vinyl': ['vinyl', 'vin', 'recorded', 'sampled_drum'], 'digital': ['digital', 'digi', 'synthetic', 'synth', 'electronic'], 'analog': ['analog', 'analogue', 'moog', 'oberheim', 'sequential'], # Bass por tipo 'reese': ['reese', 'reese_bass'], 'acid': ['acid', '303', 'tb303', 'bassline'], 'sub': ['sub', 'subby', 'sub_bass'], 'growl': ['growl', 'wobble', 'dubstep'], # Vocals por estilo 'vocal_chop': ['chop', 'chopped', 'stutter'], 'vocal_phrase': ['phrase', 'hook', 'shout'], 'vocal_verse': ['verse', 'acapella', 'acappella'], # Loops por textura 'percu_shaker': ['shaker', 'shake'], 'percu_conga': ['conga', 'bongo', 'latin'], 'percu_tribal': ['tribal', 'ethnic', 'world'], } # ============================================================================= # ESTRUCTURA DE DATOS # ============================================================================= class DiversityMemory: """Memoria thread-safe de diversidad con persistencia JSON.""" def __init__(self, project_dir: Optional[Path] = None): """ Inicializa la memoria de diversidad. Args: project_dir: Directorio del proyecto para guardar el archivo JSON """ self._lock = threading.RLock() # Determinar directorio del proyecto if project_dir is None: # Buscar en directorios conocidos possible_dirs = [ Path(__file__).parent.parent, # MCP_Server/../ Path.home() / "Documents" / "AbletonMCP_AI", Path(os.getcwd()), ] for pd in possible_dirs: if pd.exists() and pd.is_dir(): project_dir = pd break self._file_path = (project_dir / DIVERSITY_MEMORY_FILE) if project_dir else Path(DIVERSITY_MEMORY_FILE) # Datos en memoria self._used_families: Dict[str, int] = defaultdict(int) self._used_paths: Dict[str, int] = defaultdict(int) self._generation_count: int = 0 self._last_updated: str = datetime.now().isoformat() # Cargar datos existentes self._load() def _load(self) -> None: """Carga la memoria desde el archivo JSON.""" if self._file_path.exists(): try: with open(self._file_path, 'r', encoding='utf-8') as f: data = json.load(f) self._used_families = defaultdict(int, data.get('used_families', {})) self._used_paths = defaultdict(int, data.get('used_paths', {})) self._generation_count = data.get('generation_count', 0) self._last_updated = data.get('last_updated', datetime.now().isoformat()) logger.debug(f"DiversityMemory cargada desde {self._file_path}") logger.debug(f" - Familias usadas: {len(self._used_families)}") logger.debug(f" - Paths usados: {len(self._used_paths)}") logger.debug(f" - Generación #{self._generation_count}") except Exception as e: logger.warning(f"Error cargando diversity_memory.json: {e}") # Resetear a valores por defecto self._reset_data() else: logger.debug(f"Archivo {self._file_path} no existe, iniciando memoria vacía") def _save(self) -> None: """Guarda la memoria al archivo JSON.""" with self._lock: data = { 'used_families': dict(self._used_families), 'used_paths': dict(self._used_paths), 'generation_count': self._generation_count, 'last_updated': datetime.now().isoformat(), 'version': '1.0' } try: # Crear directorio si no existe self._file_path.parent.mkdir(parents=True, exist_ok=True) with open(self._file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) logger.debug(f"DiversityMemory guardada en {self._file_path}") except Exception as e: logger.error(f"Error guardando diversity_memory.json: {e}") def _reset_data(self) -> None: """Resetea los datos a valores iniciales.""" self._used_families.clear() self._used_paths.clear() self._generation_count = 0 self._last_updated = datetime.now().isoformat() def record_sample_usage(self, role: str, sample_path: str, sample_name: str) -> None: """ Registra el uso de un sample en esta generación. Args: role: Rol del sample (ej: 'kick', 'clap') sample_path: Path completo al archivo sample_name: Nombre del archivo """ if role not in CRITICAL_ROLES: return # Solo tracking de roles críticos with self._lock: family = self._detect_family(sample_path, sample_name) if family: self._used_families[family] += 1 logger.debug(f"Registrada familia '{family}' para rol '{role}' (usos: {self._used_families[family]})") # Siempre registrar el path self._used_paths[sample_path] += 1 def record_generation_complete(self) -> None: """ Marca el fin de una generación y aplica TTL. Decrementa contadores y elimina familias expiradas. """ with self._lock: self._generation_count += 1 # Aplicar TTL a familias families_to_remove = [] for family, count in self._used_families.items(): if count > 0: # TTL: después de MAX_GENERATIONS_TTL, eliminar familia if count >= MAX_GENERATIONS_TTL: families_to_remove.append(family) # Penalización decreciente con el tiempo # En cada generación sin uso, reduce el conteo # (simula decaimiento) # Remover familias expiradas for family in families_to_remove: del self._used_families[family] logger.debug(f"Familia '{family}' expirada después de {MAX_GENERATIONS_TTL} generaciones") # Guardar después de cada generación self._save() logger.info(f"Generación #{self._generation_count} completada. " f"Familias activas: {len(self._used_families)}") def get_penalty_for_sample(self, role: str, sample_path: str, sample_name: str) -> float: """ Calcula la penalización para un sample específico. Returns: float entre 0.0 y 1.0 (multiplicar el score original por este factor) 1.0 = sin penalización 0.3 = penalización máxima """ if role not in CRITICAL_ROLES: return 1.0 # Sin penalización para roles no críticos with self._lock: family = self._detect_family(sample_path, sample_name) family_uses = self._used_families.get(family, 0) if family else 0 path_uses = self._used_paths.get(sample_path, 0) # Penalización por familia (acumulativa) if family_uses >= 3: family_penalty = MAX_PENALTY elif family_uses > 0: family_penalty = PENALTY_FORMULA.get(family_uses, MAX_PENALTY) else: family_penalty = 1.0 # Penalización adicional por path específico (evitar repetición exacta) if path_uses >= 2: path_penalty = 0.5 elif path_uses == 1: path_penalty = 0.8 else: path_penalty = 1.0 total_penalty = family_penalty * path_penalty if total_penalty < 1.0: logger.debug(f"Penalización para '{sample_name}': {total_penalty:.2f} " f"(familia: {family_penalty:.2f} [{family_uses} usos], " f"path: {path_penalty:.2f} [{path_uses} usos])") return total_penalty def _detect_family(self, sample_path: str, sample_name: str) -> Optional[str]: """ Detecta la familia de un sample basado en path y nombre. Estrategias (en orden de prioridad): 1. Keywords en el nombre del archivo 2. Directorio padre 3. Path completo Returns: Nombre de la familia o None si no se detecta """ path_lower = sample_path.lower() name_lower = sample_name.lower() # 1. Buscar keywords en nombre for family, keywords in FAMILY_KEYWORDS.items(): for kw in keywords: if kw in name_lower: return family # 2. Buscar en directorio padre # Ej: "808_Kicks/kick_808_warm.wav" → familia "808" parent_dir = Path(sample_path).parent.name.lower() if sample_path else "" for family, keywords in FAMILY_KEYWORDS.items(): for kw in keywords: if kw in parent_dir: return family # 3. Buscar en path completo for family, keywords in FAMILY_KEYWORDS.items(): for kw in keywords: if kw in path_lower: return family # Si no hay coincidencia, devolver None return None def get_stats(self) -> Dict[str, Any]: """ Retorna estadísticas de la memoria de diversidad. Returns: Dict con: - used_families: dict de familias y conteos - total_families: int - used_paths: dict de paths y conteos - total_paths: int - generation_count: int - file_location: str """ with self._lock: return { 'used_families': dict(self._used_families), 'total_families': len(self._used_families), 'used_paths': dict(self._used_paths), 'total_paths': len(self._used_paths), 'generation_count': self._generation_count, 'critical_roles': list(CRITICAL_ROLES), 'file_location': str(self._file_path.absolute()) if self._file_path.exists() else None, 'max_generations_ttl': MAX_GENERATIONS_TTL, 'penalty_formula': PENALTY_FORMULA, } def reset(self) -> None: """Limpia toda la memoria de diversidad.""" with self._lock: self._reset_data() self._save() logger.info("DiversityMemory reseteada completamente") # ============================================================================= # INSTANCIA GLOBAL # ============================================================================= # Instancia singleton (thread-safe por el lock interno) _diversity_memory: Optional[DiversityMemory] = None def get_diversity_memory(project_dir: Optional[Path] = None) -> DiversityMemory: """Obtiene la instancia global de DiversityMemory.""" global _diversity_memory if _diversity_memory is None: _diversity_memory = DiversityMemory(project_dir) return _diversity_memory def reset_diversity_memory() -> None: """API: Limpia la memoria de diversidad.""" memory = get_diversity_memory() memory.reset() def get_diversity_memory_stats() -> Dict[str, Any]: """API: Obtiene estadísticas de la memoria.""" memory = get_diversity_memory() return memory.get_stats() def record_sample_usage(role: str, sample_path: str, sample_name: str) -> None: """API: Registra uso de un sample.""" memory = get_diversity_memory() memory.record_sample_usage(role, sample_path, sample_name) def record_generation_complete() -> None: """API: Marca fin de generación y aplica TTL.""" memory = get_diversity_memory() memory.record_generation_complete() def get_penalty_for_sample(role: str, sample_path: str, sample_name: str) -> float: """API: Obtiene penalización para un sample.""" memory = get_diversity_memory() return memory.get_penalty_for_sample(role, sample_path, sample_name) # ============================================================================= # FUNCIÓN DE AYUDA PARA DETECCIÓN EXTERNA # ============================================================================= def detect_sample_family(sample_path: str, sample_name: str) -> Optional[str]: """ Detecta la familia de un sample (función pública). Usa la misma lógica que DiversityMemory. """ memory = get_diversity_memory() return memory._detect_family(sample_path, sample_name) # Familias conocidas para referencia def get_known_families() -> Dict[str, List[str]]: """Retorna las familias de samples conocidas con sus keywords.""" return FAMILY_KEYWORDS.copy()