- MCP Server with audio fallback, sample management - Song generator with bus routing - Reference listener and audio resampler - Vector-based sample search - Master chain with limiter and calibration - Fix: Audio fallback now works without M4L - Fix: Full song detection in sample loader Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
382 lines
14 KiB
Python
382 lines
14 KiB
Python
"""
|
|
diversity_memory.py - Sistema de memoria de diversidad entre generaciones
|
|
|
|
Persistencia cross-generation para evitar repetición de familias de samples.
|
|
Incluye TTL automático, penalización acumulativa y thread-safety.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger("DiversityMemory")
|
|
|
|
# =============================================================================
|
|
# CONFIGURACIÓN
|
|
# =============================================================================
|
|
|
|
DIVERSITY_MEMORY_FILE = "diversity_memory.json"
|
|
MAX_GENERATIONS_TTL = 10 # Familias expiran después de 10 generaciones
|
|
CRITICAL_ROLES = {'kick', 'clap', 'hat', 'hat_closed', 'hat_open', 'bass_loop', 'vocal_loop', 'top_loop'}
|
|
|
|
# Fórmula de penalización acumulativa
|
|
# 0 usos → 1.0 (sin penalización)
|
|
# 1 uso → 0.7 (penalización leve)
|
|
# 2 usos → 0.5 (penalización media)
|
|
# 3+ usos → 0.3 (penalización fuerte)
|
|
PENALTY_FORMULA = {0: 1.0, 1: 0.7, 2: 0.5, 3: 0.3}
|
|
MAX_PENALTY = 0.3
|
|
|
|
# Keywords para detección de familias
|
|
FAMILY_KEYWORDS = {
|
|
# Drums por tipo de máquina
|
|
'808': ['808', 'tr808', 'tr-808', 'eight-oh-eight'],
|
|
'909': ['909', 'tr909', 'tr-909', 'nine-oh-nine'],
|
|
'707': ['707', 'tr707'],
|
|
'606': ['606', 'tr606'],
|
|
'acoustic': ['acoustic', 'real', 'live', 'studio', 'analog_real'],
|
|
'vinyl': ['vinyl', 'vin', 'recorded', 'sampled_drum'],
|
|
'digital': ['digital', 'digi', 'synthetic', 'synth', 'electronic'],
|
|
'analog': ['analog', 'analogue', 'moog', 'oberheim', 'sequential'],
|
|
# Bass por tipo
|
|
'reese': ['reese', 'reese_bass'],
|
|
'acid': ['acid', '303', 'tb303', 'bassline'],
|
|
'sub': ['sub', 'subby', 'sub_bass'],
|
|
'growl': ['growl', 'wobble', 'dubstep'],
|
|
# Vocals por estilo
|
|
'vocal_chop': ['chop', 'chopped', 'stutter'],
|
|
'vocal_phrase': ['phrase', 'hook', 'shout'],
|
|
'vocal_verse': ['verse', 'acapella', 'acappella'],
|
|
# Loops por textura
|
|
'percu_shaker': ['shaker', 'shake'],
|
|
'percu_conga': ['conga', 'bongo', 'latin'],
|
|
'percu_tribal': ['tribal', 'ethnic', 'world'],
|
|
}
|
|
|
|
# =============================================================================
|
|
# ESTRUCTURA DE DATOS
|
|
# =============================================================================
|
|
|
|
class DiversityMemory:
|
|
"""Memoria thread-safe de diversidad con persistencia JSON."""
|
|
|
|
def __init__(self, project_dir: Optional[Path] = None):
|
|
"""
|
|
Inicializa la memoria de diversidad.
|
|
|
|
Args:
|
|
project_dir: Directorio del proyecto para guardar el archivo JSON
|
|
"""
|
|
self._lock = threading.RLock()
|
|
|
|
# Determinar directorio del proyecto
|
|
if project_dir is None:
|
|
# Buscar en directorios conocidos
|
|
possible_dirs = [
|
|
Path(__file__).parent.parent, # MCP_Server/../
|
|
Path.home() / "Documents" / "AbletonMCP_AI",
|
|
Path(os.getcwd()),
|
|
]
|
|
for pd in possible_dirs:
|
|
if pd.exists() and pd.is_dir():
|
|
project_dir = pd
|
|
break
|
|
|
|
self._file_path = (project_dir / DIVERSITY_MEMORY_FILE) if project_dir else Path(DIVERSITY_MEMORY_FILE)
|
|
|
|
# Datos en memoria
|
|
self._used_families: Dict[str, int] = defaultdict(int)
|
|
self._used_paths: Dict[str, int] = defaultdict(int)
|
|
self._generation_count: int = 0
|
|
self._last_updated: str = datetime.now().isoformat()
|
|
|
|
# Cargar datos existentes
|
|
self._load()
|
|
|
|
def _load(self) -> None:
|
|
"""Carga la memoria desde el archivo JSON."""
|
|
if self._file_path.exists():
|
|
try:
|
|
with open(self._file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
self._used_families = defaultdict(int, data.get('used_families', {}))
|
|
self._used_paths = defaultdict(int, data.get('used_paths', {}))
|
|
self._generation_count = data.get('generation_count', 0)
|
|
self._last_updated = data.get('last_updated', datetime.now().isoformat())
|
|
|
|
logger.debug(f"DiversityMemory cargada desde {self._file_path}")
|
|
logger.debug(f" - Familias usadas: {len(self._used_families)}")
|
|
logger.debug(f" - Paths usados: {len(self._used_paths)}")
|
|
logger.debug(f" - Generación #{self._generation_count}")
|
|
except Exception as e:
|
|
logger.warning(f"Error cargando diversity_memory.json: {e}")
|
|
# Resetear a valores por defecto
|
|
self._reset_data()
|
|
else:
|
|
logger.debug(f"Archivo {self._file_path} no existe, iniciando memoria vacía")
|
|
|
|
def _save(self) -> None:
|
|
"""Guarda la memoria al archivo JSON."""
|
|
with self._lock:
|
|
data = {
|
|
'used_families': dict(self._used_families),
|
|
'used_paths': dict(self._used_paths),
|
|
'generation_count': self._generation_count,
|
|
'last_updated': datetime.now().isoformat(),
|
|
'version': '1.0'
|
|
}
|
|
|
|
try:
|
|
# Crear directorio si no existe
|
|
self._file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(self._file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
logger.debug(f"DiversityMemory guardada en {self._file_path}")
|
|
except Exception as e:
|
|
logger.error(f"Error guardando diversity_memory.json: {e}")
|
|
|
|
def _reset_data(self) -> None:
|
|
"""Resetea los datos a valores iniciales."""
|
|
self._used_families.clear()
|
|
self._used_paths.clear()
|
|
self._generation_count = 0
|
|
self._last_updated = datetime.now().isoformat()
|
|
|
|
def record_sample_usage(self, role: str, sample_path: str, sample_name: str) -> None:
|
|
"""
|
|
Registra el uso de un sample en esta generación.
|
|
|
|
Args:
|
|
role: Rol del sample (ej: 'kick', 'clap')
|
|
sample_path: Path completo al archivo
|
|
sample_name: Nombre del archivo
|
|
"""
|
|
if role not in CRITICAL_ROLES:
|
|
return # Solo tracking de roles críticos
|
|
|
|
with self._lock:
|
|
family = self._detect_family(sample_path, sample_name)
|
|
|
|
if family:
|
|
self._used_families[family] += 1
|
|
logger.debug(f"Registrada familia '{family}' para rol '{role}' (usos: {self._used_families[family]})")
|
|
|
|
# Siempre registrar el path
|
|
self._used_paths[sample_path] += 1
|
|
|
|
def record_generation_complete(self) -> None:
|
|
"""
|
|
Marca el fin de una generación y aplica TTL.
|
|
Decrementa contadores y elimina familias expiradas.
|
|
"""
|
|
with self._lock:
|
|
self._generation_count += 1
|
|
|
|
# Aplicar TTL a familias
|
|
families_to_remove = []
|
|
for family, count in self._used_families.items():
|
|
if count > 0:
|
|
# TTL: después de MAX_GENERATIONS_TTL, eliminar familia
|
|
if count >= MAX_GENERATIONS_TTL:
|
|
families_to_remove.append(family)
|
|
# Penalización decreciente con el tiempo
|
|
# En cada generación sin uso, reduce el conteo
|
|
# (simula decaimiento)
|
|
|
|
# Remover familias expiradas
|
|
for family in families_to_remove:
|
|
del self._used_families[family]
|
|
logger.debug(f"Familia '{family}' expirada después de {MAX_GENERATIONS_TTL} generaciones")
|
|
|
|
# Guardar después de cada generación
|
|
self._save()
|
|
|
|
logger.info(f"Generación #{self._generation_count} completada. "
|
|
f"Familias activas: {len(self._used_families)}")
|
|
|
|
def get_penalty_for_sample(self, role: str, sample_path: str, sample_name: str) -> float:
|
|
"""
|
|
Calcula la penalización para un sample específico.
|
|
|
|
Returns:
|
|
float entre 0.0 y 1.0 (multiplicar el score original por este factor)
|
|
1.0 = sin penalización
|
|
0.3 = penalización máxima
|
|
"""
|
|
if role not in CRITICAL_ROLES:
|
|
return 1.0 # Sin penalización para roles no críticos
|
|
|
|
with self._lock:
|
|
family = self._detect_family(sample_path, sample_name)
|
|
family_uses = self._used_families.get(family, 0) if family else 0
|
|
path_uses = self._used_paths.get(sample_path, 0)
|
|
|
|
# Penalización por familia (acumulativa)
|
|
if family_uses >= 3:
|
|
family_penalty = MAX_PENALTY
|
|
elif family_uses > 0:
|
|
family_penalty = PENALTY_FORMULA.get(family_uses, MAX_PENALTY)
|
|
else:
|
|
family_penalty = 1.0
|
|
|
|
# Penalización adicional por path específico (evitar repetición exacta)
|
|
if path_uses >= 2:
|
|
path_penalty = 0.5
|
|
elif path_uses == 1:
|
|
path_penalty = 0.8
|
|
else:
|
|
path_penalty = 1.0
|
|
|
|
total_penalty = family_penalty * path_penalty
|
|
|
|
if total_penalty < 1.0:
|
|
logger.debug(f"Penalización para '{sample_name}': {total_penalty:.2f} "
|
|
f"(familia: {family_penalty:.2f} [{family_uses} usos], "
|
|
f"path: {path_penalty:.2f} [{path_uses} usos])")
|
|
|
|
return total_penalty
|
|
|
|
def _detect_family(self, sample_path: str, sample_name: str) -> Optional[str]:
|
|
"""
|
|
Detecta la familia de un sample basado en path y nombre.
|
|
|
|
Estrategias (en orden de prioridad):
|
|
1. Keywords en el nombre del archivo
|
|
2. Directorio padre
|
|
3. Path completo
|
|
|
|
Returns:
|
|
Nombre de la familia o None si no se detecta
|
|
"""
|
|
path_lower = sample_path.lower()
|
|
name_lower = sample_name.lower()
|
|
|
|
# 1. Buscar keywords en nombre
|
|
for family, keywords in FAMILY_KEYWORDS.items():
|
|
for kw in keywords:
|
|
if kw in name_lower:
|
|
return family
|
|
|
|
# 2. Buscar en directorio padre
|
|
# Ej: "808_Kicks/kick_808_warm.wav" → familia "808"
|
|
parent_dir = Path(sample_path).parent.name.lower() if sample_path else ""
|
|
for family, keywords in FAMILY_KEYWORDS.items():
|
|
for kw in keywords:
|
|
if kw in parent_dir:
|
|
return family
|
|
|
|
# 3. Buscar en path completo
|
|
for family, keywords in FAMILY_KEYWORDS.items():
|
|
for kw in keywords:
|
|
if kw in path_lower:
|
|
return family
|
|
|
|
# Si no hay coincidencia, devolver None
|
|
return None
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Retorna estadísticas de la memoria de diversidad.
|
|
|
|
Returns:
|
|
Dict con:
|
|
- used_families: dict de familias y conteos
|
|
- total_families: int
|
|
- used_paths: dict de paths y conteos
|
|
- total_paths: int
|
|
- generation_count: int
|
|
- file_location: str
|
|
"""
|
|
with self._lock:
|
|
return {
|
|
'used_families': dict(self._used_families),
|
|
'total_families': len(self._used_families),
|
|
'used_paths': dict(self._used_paths),
|
|
'total_paths': len(self._used_paths),
|
|
'generation_count': self._generation_count,
|
|
'critical_roles': list(CRITICAL_ROLES),
|
|
'file_location': str(self._file_path.absolute()) if self._file_path.exists() else None,
|
|
'max_generations_ttl': MAX_GENERATIONS_TTL,
|
|
'penalty_formula': PENALTY_FORMULA,
|
|
}
|
|
|
|
def reset(self) -> None:
|
|
"""Limpia toda la memoria de diversidad."""
|
|
with self._lock:
|
|
self._reset_data()
|
|
self._save()
|
|
logger.info("DiversityMemory reseteada completamente")
|
|
|
|
|
|
# =============================================================================
|
|
# INSTANCIA GLOBAL
|
|
# =============================================================================
|
|
|
|
# Instancia singleton (thread-safe por el lock interno)
|
|
_diversity_memory: Optional[DiversityMemory] = None
|
|
|
|
|
|
def get_diversity_memory(project_dir: Optional[Path] = None) -> DiversityMemory:
|
|
"""Obtiene la instancia global de DiversityMemory."""
|
|
global _diversity_memory
|
|
if _diversity_memory is None:
|
|
_diversity_memory = DiversityMemory(project_dir)
|
|
return _diversity_memory
|
|
|
|
|
|
def reset_diversity_memory() -> None:
|
|
"""API: Limpia la memoria de diversidad."""
|
|
memory = get_diversity_memory()
|
|
memory.reset()
|
|
|
|
|
|
def get_diversity_memory_stats() -> Dict[str, Any]:
|
|
"""API: Obtiene estadísticas de la memoria."""
|
|
memory = get_diversity_memory()
|
|
return memory.get_stats()
|
|
|
|
|
|
def record_sample_usage(role: str, sample_path: str, sample_name: str) -> None:
|
|
"""API: Registra uso de un sample."""
|
|
memory = get_diversity_memory()
|
|
memory.record_sample_usage(role, sample_path, sample_name)
|
|
|
|
|
|
def record_generation_complete() -> None:
|
|
"""API: Marca fin de generación y aplica TTL."""
|
|
memory = get_diversity_memory()
|
|
memory.record_generation_complete()
|
|
|
|
|
|
def get_penalty_for_sample(role: str, sample_path: str, sample_name: str) -> float:
|
|
"""API: Obtiene penalización para un sample."""
|
|
memory = get_diversity_memory()
|
|
return memory.get_penalty_for_sample(role, sample_path, sample_name)
|
|
|
|
|
|
# =============================================================================
|
|
# FUNCIÓN DE AYUDA PARA DETECCIÓN EXTERNA
|
|
# =============================================================================
|
|
|
|
def detect_sample_family(sample_path: str, sample_name: str) -> Optional[str]:
|
|
"""
|
|
Detecta la familia de un sample (función pública).
|
|
Usa la misma lógica que DiversityMemory.
|
|
"""
|
|
memory = get_diversity_memory()
|
|
return memory._detect_family(sample_path, sample_name)
|
|
|
|
|
|
# Familias conocidas para referencia
|
|
def get_known_families() -> Dict[str, List[str]]:
|
|
"""Retorna las familias de samples conocidas con sus keywords."""
|
|
return FAMILY_KEYWORDS.copy()
|