Files
ableton-mcp-ai/mcp_server/engines/libreria_analyzer.py
OpenCode Agent 5ce8187c65 feat: Implement senior audio injection with 5 fallback methods
- Add _cmd_create_arrangement_audio_pattern with 5-method fallback chain
- Method 1: track.insert_arrangement_clip() [Live 12+]
- Method 2: track.create_audio_clip() [Live 11+]
- Method 3: arrangement_clips.add_new_clip() [Live 12+]
- Method 4: Session->duplicate_clip_to_arrangement [Legacy]
- Method 5: Session->Recording [Universal]

- Add _cmd_duplicate_clip_to_arrangement for session-to-arrangement workflow
- Update skills documentation
- Verified: 3 clips created at positions [0, 4, 8] in Arrangement View

Closes: Audio injection in Arrangement View
2026-04-12 14:02:32 -03:00

640 lines
22 KiB
Python

"""
LibreriaAnalyzer - Análisis espectral de samples de audio
Escanea recursivamente la librería de samples y extrae features espectrales
usando librosa (con fallback a scipy si no está disponible).
Uso:
from engines.libreria_analyzer import LibreriaAnalyzer
analyzer = LibreriaAnalyzer()
analyzer.analyze_all() # Analiza toda la librería
# O consultar features de un sample específico
features = analyzer.get_features("C:/.../kick_808.wav")
"""
import os
import json
import time
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
# Audio analysis libraries
try:
import numpy as np
import librosa
import librosa.feature
LIBROSA_AVAILABLE = True
except ImportError:
LIBROSA_AVAILABLE = False
try:
import numpy as np
from scipy.io import wavfile
from scipy import signal
SCIPY_AVAILABLE = True
except ImportError:
SCIPY_AVAILABLE = False
np = None
class LibreriaAnalyzer:
"""
Analizador espectral de librería de samples.
Extrae features de audio para todos los samples encontrados
y los guarda en caché para evitar re-análisis.
"""
# Extensiones de audio soportadas
SUPPORTED_EXTENSIONS = {'.wav', '.mp3', '.aif', '.aiff', '.flac'}
# Caché de features
CACHE_FILENAME = '.features_cache.json'
CACHE_MAX_AGE_DAYS = 7
# Mapeo de carpetas a roles
ROLE_MAPPING = {
'kick': 'kick',
'snare': 'snare',
'bass': 'bass',
'fx': 'fx',
'drumloops': 'drum_loop',
'drumloop': 'drum_loop',
'hi-hat': 'hat_closed',
'hihat': 'hat_closed',
'hat': 'hat_closed',
'oneshots': 'oneshot',
'oneshot': 'oneshot',
'perc loop': 'perc_loop',
'perc_loop': 'perc_loop',
'reggaeton 3': 'synth',
'sentimientolatino2025': 'multi',
'sounds presets': 'preset',
'extra': 'extra',
'flp': 'project',
}
def __init__(self, library_path: str = None, verbose: bool = True):
"""
Inicializa el analizador.
Args:
library_path: Ruta base de la librería. Por defecto: libreria/reggaeton/
verbose: Si True, muestra progreso del análisis
"""
if library_path is None:
# Default path según la estructura del proyecto
base_path = Path("C:/ProgramData/Ableton/Live 12 Suite/Resources/MIDI Remote Scripts")
self.library_path = base_path / "libreria" / "reggaeton"
else:
self.library_path = Path(library_path)
self.verbose = verbose
self.features: Dict[str, Dict[str, Any]] = {}
self.cache_path = self.library_path / self.CACHE_FILENAME
# Verificar disponibilidad de librerías
if not LIBROSA_AVAILABLE and not SCIPY_AVAILABLE:
raise ImportError(
"Se requiere librosa o scipy para análisis de audio. "
"Instala: pip install librosa numpy"
)
# Cargar caché existente si está disponible
self._load_cache()
def _load_cache(self) -> bool:
"""
Carga el caché de features si existe y es reciente.
Returns:
True si se cargó el caché, False en caso contrario
"""
if not self.cache_path.exists():
return False
try:
# Verificar edad del caché
cache_age = datetime.now() - datetime.fromtimestamp(
self.cache_path.stat().st_mtime
)
if cache_age > timedelta(days=self.CACHE_MAX_AGE_DAYS):
if self.verbose:
print(f"[LibreriaAnalyzer] Caché expirado ({cache_age.days} días). Re-analizando...")
return False
# Cargar caché
with open(self.cache_path, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
self.features = cache_data.get('samples', {})
if self.verbose:
total = cache_data.get('total_samples', len(self.features))
scan_date = cache_data.get('scan_date', 'unknown')
print(f"[LibreriaAnalyzer] Caché cargado: {total} samples (desde {scan_date})")
return True
except (json.JSONDecodeError, IOError, KeyError) as e:
if self.verbose:
print(f"[LibreriaAnalyzer] Error cargando caché: {e}")
return False
def _save_cache(self) -> None:
"""Guarda las features actuales en el caché."""
cache_data = {
"version": "1.0",
"total_samples": len(self.features),
"scan_date": datetime.now().isoformat(),
"library_path": str(self.library_path),
"samples": self.features
}
try:
with open(self.cache_path, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, indent=2, ensure_ascii=False)
if self.verbose:
print(f"[LibreriaAnalyzer] Caché guardado: {len(self.features)} samples")
except IOError as e:
if self.verbose:
print(f"[LibreriaAnalyzer] Error guardando caché: {e}")
def _detect_role(self, file_path: Path) -> str:
"""
Detecta el rol del sample basado en la carpeta contenedora.
Args:
file_path: Ruta al archivo de audio
Returns:
Rol detectado (kick, snare, bass, etc.)
"""
# Obtener partes del path en minúsculas
path_parts = [p.lower() for p in file_path.parts]
# Buscar coincidencias en el mapeo
for part in path_parts:
# Remover caracteres especiales para matching
clean_part = part.replace(' ', '_').replace('-', '_').replace('(', '').replace(')', '')
if part in self.ROLE_MAPPING:
return self.ROLE_MAPPING[part]
if clean_part in self.ROLE_MAPPING:
return self.ROLE_MAPPING[clean_part]
# Buscar substrings
for key, role in self.ROLE_MAPPING.items():
if key in part or key in clean_part:
return role
return "unknown"
def _get_pack_name(self, file_path: Path) -> str:
"""
Obtiene el nombre del pack/carpeta padre del sample.
Args:
file_path: Ruta al archivo de audio
Returns:
Nombre del pack/carpeta
"""
# El pack es el directorio padre inmediato
parent = file_path.parent.name
return parent if parent else "root"
def _extract_features_librosa(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""
Extrae features de audio usando librosa.
Args:
file_path: Ruta al archivo de audio
Returns:
Diccionario con features o None si hay error
"""
try:
# Cargar audio
y, sr = librosa.load(str(file_path), sr=None, mono=True)
# Duración
duration = librosa.get_duration(y=y, sr=sr)
# RMS (energía)
rms = float(np.mean(librosa.feature.rms(y=y)))
rms_db = 20 * np.log10(rms + 1e-10) # Convertir a dB
# Spectral Centroid (brillo)
spectral_centroid = float(np.mean(librosa.feature.spectral_centroid(y=y, sr=sr)))
# Spectral Rolloff
spectral_rolloff = float(np.mean(librosa.feature.spectral_rolloff(y=y, sr=sr)))
# Zero Crossing Rate
zcr = float(np.mean(librosa.feature.zero_crossing_rate(y)))
# MFCCs (13 coeficientes)
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
mfccs_mean = [float(np.mean(coef)) for coef in mfccs]
# Onset Strength (qué tan rítmico es)
onset_env = librosa.onset.onset_strength(y=y, sr=sr)
onset_strength = float(np.mean(onset_env))
# BPM detection
try:
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
bpm = float(tempo) if isinstance(tempo, (int, float, np.number)) else float(tempo[0])
except:
bpm = 0.0
# Key detection via chromagram
try:
chromagram = librosa.feature.chroma_cqt(y=y, sr=sr)
# Sumar a lo largo del tiempo para obtener el perfil de pitch
chroma_avg = np.sum(chromagram, axis=1)
# Notas musicales
notes = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
# Encontrar la nota dominante
key_index = np.argmax(chroma_avg)
key = notes[key_index]
# Detectar si es mayor o menor (heurística simple)
# Si el tercer grado está presente, es menor
minor_third_idx = (key_index + 3) % 12
if chroma_avg[minor_third_idx] > chroma_avg[(key_index + 4) % 12]:
key += 'm'
except:
key = ""
# Determinar canales (asumimos mono después de librosa.load con mono=True)
# Para saber si era stereo originalmente, tendríamos que cargar de nuevo
try:
y_orig, _ = librosa.load(str(file_path), sr=None, mono=False)
channels = y_orig.shape[0] if len(y_orig.shape) > 1 else 1
except:
channels = 1
return {
"rms": round(rms_db, 2),
"spectral_centroid": round(spectral_centroid, 2),
"spectral_rolloff": round(spectral_rolloff, 2),
"zero_crossing_rate": round(zcr, 4),
"mfccs": [round(m, 4) for m in mfccs_mean],
"onset_strength": round(onset_strength, 4),
"duration": round(duration, 3),
"sample_rate": sr,
"channels": channels,
"bpm": round(bpm, 1) if bpm > 0 else 0,
"key": key
}
except Exception as e:
if self.verbose:
print(f"[LibreriaAnalyzer] Error analizando {file_path}: {e}")
return None
def _extract_features_scipy(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""
Extrae features básicas usando scipy (fallback cuando librosa no está).
Solo soporta archivos WAV.
Args:
file_path: Ruta al archivo de audio
Returns:
Diccionario con features básicas o None si hay error
"""
try:
# scipy solo soporta WAV nativamente
if file_path.suffix.lower() not in {'.wav'}:
return None
# Cargar audio
sr, data = wavfile.read(str(file_path))
# Convertir a float y mono si es necesario
if data.ndim > 1:
channels = data.shape[1]
data = np.mean(data, axis=1) # Convertir a mono
else:
channels = 1
# Normalizar a float [-1, 1]
if data.dtype == np.int16:
data = data.astype(np.float32) / 32768.0
elif data.dtype == np.int32:
data = data.astype(np.float32) / 2147483648.0
else:
data = data.astype(np.float32)
# Duración
duration = len(data) / sr
# RMS
rms = np.sqrt(np.mean(data ** 2))
rms_db = 20 * np.log10(rms + 1e-10)
# Spectral Centroid usando FFT
fft = np.fft.fft(data)
freqs = np.fft.fftfreq(len(data), 1/sr)
magnitude = np.abs(fft)
# Solo frecuencias positivas
positive_freqs = freqs[:len(freqs)//2]
positive_magnitude = magnitude[:len(magnitude)//2]
spectral_centroid = np.sum(positive_freqs * positive_magnitude) / np.sum(positive_magnitude)
# Zero Crossing Rate
zcr = np.mean(np.diff(np.sign(data)) != 0)
# No podemos hacer análisis avanzado sin librosa
return {
"rms": round(rms_db, 2),
"spectral_centroid": round(float(spectral_centroid), 2),
"spectral_rolloff": 0.0, # No disponible sin librosa
"zero_crossing_rate": round(float(zcr), 4),
"mfccs": [], # No disponible sin librosa
"onset_strength": 0.0, # No disponible sin librosa
"duration": round(duration, 3),
"sample_rate": sr,
"channels": channels,
"bpm": 0, # No disponible sin librosa
"key": "" # No disponible sin librosa
}
except Exception as e:
if self.verbose:
print(f"[LibreriaAnalyzer] Error (scipy) analizando {file_path}: {e}")
return None
def _extract_features(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""
Extrae features de un archivo de audio.
Usa librosa si está disponible, de lo contrario usa scipy.
Args:
file_path: Ruta al archivo de audio
Returns:
Diccionario con features o None si hay error
"""
if LIBROSA_AVAILABLE:
return self._extract_features_librosa(file_path)
elif SCIPY_AVAILABLE:
return self._extract_features_scipy(file_path)
else:
return None
def _scan_samples(self) -> List[Path]:
"""
Escanea recursivamente la librería buscando samples de audio.
Returns:
Lista de rutas a archivos de audio encontrados
"""
samples = []
if not self.library_path.exists():
if self.verbose:
print(f"[LibreriaAnalyzer] Librería no encontrada: {self.library_path}")
return samples
for ext in self.SUPPORTED_EXTENSIONS:
samples.extend(self.library_path.rglob(f"*{ext}"))
return samples
def analyze_sample(self, file_path: str) -> Optional[Dict[str, Any]]:
"""
Analiza un sample individual y extrae sus features.
Args:
file_path: Ruta al archivo de audio
Returns:
Diccionario con todas las features del sample
"""
path = Path(file_path)
if not path.exists():
if self.verbose:
print(f"[LibreriaAnalyzer] Archivo no encontrado: {file_path}")
return None
if path.suffix.lower() not in self.SUPPORTED_EXTENSIONS:
if self.verbose:
print(f"[LibreriaAnalyzer] Formato no soportado: {path.suffix}")
return None
# Extraer features de audio
audio_features = self._extract_features(path)
if audio_features is None:
return None
# Construir el objeto completo de features
abs_path = str(path.resolve())
role = self._detect_role(path)
pack = self._get_pack_name(path)
features = {
"name": path.name,
"pack": pack,
"role": role,
**audio_features
}
# Guardar en caché interno
self.features[abs_path] = features
return features
def analyze_all(self, force_reanalyze: bool = False) -> Dict[str, Dict[str, Any]]:
"""
Analiza todos los samples de la librería.
Args:
force_reanalyze: Si True, re-analiza incluso si hay caché
Returns:
Diccionario con todas las features indexadas por path
"""
# Verificar si ya tenemos caché válido
if not force_reanalyze and self.features:
if self.verbose:
print(f"[LibreriaAnalyzer] Usando caché existente con {len(self.features)} samples")
return self.features
# Escanear samples
samples = self._scan_samples()
if not samples:
if self.verbose:
print(f"[LibreriaAnalyzer] No se encontraron samples en {self.library_path}")
return {}
if self.verbose:
print(f"[LibreriaAnalyzer] Encontrados {len(samples)} samples para analizar")
# Analizar cada sample
total = len(samples)
analyzed = 0
failed = 0
for i, sample_path in enumerate(samples, 1):
abs_path = str(sample_path.resolve())
# Verificar si ya está en caché y no es force_reanalyze
if not force_reanalyze and abs_path in self.features:
continue
# Analizar sample
features = self.analyze_sample(abs_path)
if features:
analyzed += 1
else:
failed += 1
# Mostrar progreso
if self.verbose and i % 10 == 0:
pct = (i / total) * 100
print(f"[LibreriaAnalyzer] Progreso: {i}/{total} ({pct:.1f}%) - OK: {analyzed}, Fallos: {failed}")
if self.verbose:
print(f"[LibreriaAnalyzer] Análisis completo: {analyzed} analizados, {failed} fallidos")
# Guardar caché
self._save_cache()
return self.features
def get_features(self, sample_path: str) -> Optional[Dict[str, Any]]:
"""
Obtiene las features de un sample específico.
Si el sample no está en caché, lo analiza.
Args:
sample_path: Ruta al archivo de audio
Returns:
Diccionario con features o None si no se puede analizar
"""
abs_path = str(Path(sample_path).resolve())
# Verificar si está en caché
if abs_path in self.features:
return self.features[abs_path]
# Analizar si no está en caché
return self.analyze_sample(sample_path)
def get_all_features(self) -> Dict[str, Dict[str, Any]]:
"""
Obtiene todas las features cargadas/analizadas.
Returns:
Diccionario con todas las features
"""
return self.features
def clear_cache(self) -> None:
"""Elimina el archivo de caché y limpia las features en memoria."""
self.features = {}
if self.cache_path.exists():
try:
self.cache_path.unlink()
if self.verbose:
print(f"[LibreriaAnalyzer] Caché eliminado: {self.cache_path}")
except IOError as e:
if self.verbose:
print(f"[LibreriaAnalyzer] Error eliminando caché: {e}")
def get_stats(self) -> Dict[str, Any]:
"""
Obtiene estadísticas de la librería analizada.
Returns:
Diccionario con estadísticas
"""
if not self.features:
return {
"total_samples": 0,
"by_role": {},
"avg_duration": 0,
"avg_rms": 0
}
# Contar por rol
by_role = {}
total_duration = 0
total_rms = 0
for path, features in self.features.items():
role = features.get("role", "unknown")
by_role[role] = by_role.get(role, 0) + 1
total_duration += features.get("duration", 0)
total_rms += features.get("rms", 0)
total = len(self.features)
return {
"total_samples": total,
"by_role": by_role,
"avg_duration": round(total_duration / total, 3) if total > 0 else 0,
"avg_rms": round(total_rms / total, 2) if total > 0 else 0
}
# Función de conveniencia para uso directo
def analyze_library(library_path: str = None, verbose: bool = True) -> LibreriaAnalyzer:
"""
Analiza toda la librería y retorna el analizador configurado.
Args:
library_path: Ruta a la librería (default: libreria/reggaeton/)
verbose: Mostrar progreso
Returns:
Instancia de LibreriaAnalyzer con todas las features cargadas
"""
analyzer = LibreriaAnalyzer(library_path=library_path, verbose=verbose)
analyzer.analyze_all()
return analyzer
if __name__ == "__main__":
# Test básico
print("[LibreriaAnalyzer] Test de inicialización...")
try:
analyzer = LibreriaAnalyzer(verbose=True)
print(f"Librería: {analyzer.library_path}")
print(f"Caché: {analyzer.cache_path}")
print(f"Librosa disponible: {LIBROSA_AVAILABLE}")
print(f"Scipy disponible: {SCIPY_AVAILABLE}")
# Intentar cargar/analizar
features = analyzer.analyze_all()
print(f"\nTotal samples en caché: {len(features)}")
# Mostrar estadísticas
stats = analyzer.get_stats()
print(f"\nEstadísticas: {json.dumps(stats, indent=2)}")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()