- Add _cmd_create_arrangement_audio_pattern with 5-method fallback chain - Method 1: track.insert_arrangement_clip() [Live 12+] - Method 2: track.create_audio_clip() [Live 11+] - Method 3: arrangement_clips.add_new_clip() [Live 12+] - Method 4: Session->duplicate_clip_to_arrangement [Legacy] - Method 5: Session->Recording [Universal] - Add _cmd_duplicate_clip_to_arrangement for session-to-arrangement workflow - Update skills documentation - Verified: 3 clips created at positions [0, 4, 8] in Arrangement View Closes: Audio injection in Arrangement View
640 lines
22 KiB
Python
640 lines
22 KiB
Python
"""
|
|
LibreriaAnalyzer - Análisis espectral de samples de audio
|
|
|
|
Escanea recursivamente la librería de samples y extrae features espectrales
|
|
usando librosa (con fallback a scipy si no está disponible).
|
|
|
|
Uso:
|
|
from engines.libreria_analyzer import LibreriaAnalyzer
|
|
|
|
analyzer = LibreriaAnalyzer()
|
|
analyzer.analyze_all() # Analiza toda la librería
|
|
|
|
# O consultar features de un sample específico
|
|
features = analyzer.get_features("C:/.../kick_808.wav")
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
|
|
# Audio analysis libraries
|
|
try:
|
|
import numpy as np
|
|
import librosa
|
|
import librosa.feature
|
|
LIBROSA_AVAILABLE = True
|
|
except ImportError:
|
|
LIBROSA_AVAILABLE = False
|
|
try:
|
|
import numpy as np
|
|
from scipy.io import wavfile
|
|
from scipy import signal
|
|
SCIPY_AVAILABLE = True
|
|
except ImportError:
|
|
SCIPY_AVAILABLE = False
|
|
np = None
|
|
|
|
|
|
class LibreriaAnalyzer:
|
|
"""
|
|
Analizador espectral de librería de samples.
|
|
|
|
Extrae features de audio para todos los samples encontrados
|
|
y los guarda en caché para evitar re-análisis.
|
|
"""
|
|
|
|
# Extensiones de audio soportadas
|
|
SUPPORTED_EXTENSIONS = {'.wav', '.mp3', '.aif', '.aiff', '.flac'}
|
|
|
|
# Caché de features
|
|
CACHE_FILENAME = '.features_cache.json'
|
|
CACHE_MAX_AGE_DAYS = 7
|
|
|
|
# Mapeo de carpetas a roles
|
|
ROLE_MAPPING = {
|
|
'kick': 'kick',
|
|
'snare': 'snare',
|
|
'bass': 'bass',
|
|
'fx': 'fx',
|
|
'drumloops': 'drum_loop',
|
|
'drumloop': 'drum_loop',
|
|
'hi-hat': 'hat_closed',
|
|
'hihat': 'hat_closed',
|
|
'hat': 'hat_closed',
|
|
'oneshots': 'oneshot',
|
|
'oneshot': 'oneshot',
|
|
'perc loop': 'perc_loop',
|
|
'perc_loop': 'perc_loop',
|
|
'reggaeton 3': 'synth',
|
|
'sentimientolatino2025': 'multi',
|
|
'sounds presets': 'preset',
|
|
'extra': 'extra',
|
|
'flp': 'project',
|
|
}
|
|
|
|
def __init__(self, library_path: str = None, verbose: bool = True):
|
|
"""
|
|
Inicializa el analizador.
|
|
|
|
Args:
|
|
library_path: Ruta base de la librería. Por defecto: libreria/reggaeton/
|
|
verbose: Si True, muestra progreso del análisis
|
|
"""
|
|
if library_path is None:
|
|
# Default path según la estructura del proyecto
|
|
base_path = Path("C:/ProgramData/Ableton/Live 12 Suite/Resources/MIDI Remote Scripts")
|
|
self.library_path = base_path / "libreria" / "reggaeton"
|
|
else:
|
|
self.library_path = Path(library_path)
|
|
|
|
self.verbose = verbose
|
|
self.features: Dict[str, Dict[str, Any]] = {}
|
|
self.cache_path = self.library_path / self.CACHE_FILENAME
|
|
|
|
# Verificar disponibilidad de librerías
|
|
if not LIBROSA_AVAILABLE and not SCIPY_AVAILABLE:
|
|
raise ImportError(
|
|
"Se requiere librosa o scipy para análisis de audio. "
|
|
"Instala: pip install librosa numpy"
|
|
)
|
|
|
|
# Cargar caché existente si está disponible
|
|
self._load_cache()
|
|
|
|
def _load_cache(self) -> bool:
|
|
"""
|
|
Carga el caché de features si existe y es reciente.
|
|
|
|
Returns:
|
|
True si se cargó el caché, False en caso contrario
|
|
"""
|
|
if not self.cache_path.exists():
|
|
return False
|
|
|
|
try:
|
|
# Verificar edad del caché
|
|
cache_age = datetime.now() - datetime.fromtimestamp(
|
|
self.cache_path.stat().st_mtime
|
|
)
|
|
|
|
if cache_age > timedelta(days=self.CACHE_MAX_AGE_DAYS):
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Caché expirado ({cache_age.days} días). Re-analizando...")
|
|
return False
|
|
|
|
# Cargar caché
|
|
with open(self.cache_path, 'r', encoding='utf-8') as f:
|
|
cache_data = json.load(f)
|
|
|
|
self.features = cache_data.get('samples', {})
|
|
|
|
if self.verbose:
|
|
total = cache_data.get('total_samples', len(self.features))
|
|
scan_date = cache_data.get('scan_date', 'unknown')
|
|
print(f"[LibreriaAnalyzer] Caché cargado: {total} samples (desde {scan_date})")
|
|
|
|
return True
|
|
|
|
except (json.JSONDecodeError, IOError, KeyError) as e:
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Error cargando caché: {e}")
|
|
return False
|
|
|
|
def _save_cache(self) -> None:
|
|
"""Guarda las features actuales en el caché."""
|
|
cache_data = {
|
|
"version": "1.0",
|
|
"total_samples": len(self.features),
|
|
"scan_date": datetime.now().isoformat(),
|
|
"library_path": str(self.library_path),
|
|
"samples": self.features
|
|
}
|
|
|
|
try:
|
|
with open(self.cache_path, 'w', encoding='utf-8') as f:
|
|
json.dump(cache_data, f, indent=2, ensure_ascii=False)
|
|
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Caché guardado: {len(self.features)} samples")
|
|
except IOError as e:
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Error guardando caché: {e}")
|
|
|
|
def _detect_role(self, file_path: Path) -> str:
|
|
"""
|
|
Detecta el rol del sample basado en la carpeta contenedora.
|
|
|
|
Args:
|
|
file_path: Ruta al archivo de audio
|
|
|
|
Returns:
|
|
Rol detectado (kick, snare, bass, etc.)
|
|
"""
|
|
# Obtener partes del path en minúsculas
|
|
path_parts = [p.lower() for p in file_path.parts]
|
|
|
|
# Buscar coincidencias en el mapeo
|
|
for part in path_parts:
|
|
# Remover caracteres especiales para matching
|
|
clean_part = part.replace(' ', '_').replace('-', '_').replace('(', '').replace(')', '')
|
|
|
|
if part in self.ROLE_MAPPING:
|
|
return self.ROLE_MAPPING[part]
|
|
if clean_part in self.ROLE_MAPPING:
|
|
return self.ROLE_MAPPING[clean_part]
|
|
|
|
# Buscar substrings
|
|
for key, role in self.ROLE_MAPPING.items():
|
|
if key in part or key in clean_part:
|
|
return role
|
|
|
|
return "unknown"
|
|
|
|
def _get_pack_name(self, file_path: Path) -> str:
|
|
"""
|
|
Obtiene el nombre del pack/carpeta padre del sample.
|
|
|
|
Args:
|
|
file_path: Ruta al archivo de audio
|
|
|
|
Returns:
|
|
Nombre del pack/carpeta
|
|
"""
|
|
# El pack es el directorio padre inmediato
|
|
parent = file_path.parent.name
|
|
return parent if parent else "root"
|
|
|
|
def _extract_features_librosa(self, file_path: Path) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Extrae features de audio usando librosa.
|
|
|
|
Args:
|
|
file_path: Ruta al archivo de audio
|
|
|
|
Returns:
|
|
Diccionario con features o None si hay error
|
|
"""
|
|
try:
|
|
# Cargar audio
|
|
y, sr = librosa.load(str(file_path), sr=None, mono=True)
|
|
|
|
# Duración
|
|
duration = librosa.get_duration(y=y, sr=sr)
|
|
|
|
# RMS (energía)
|
|
rms = float(np.mean(librosa.feature.rms(y=y)))
|
|
rms_db = 20 * np.log10(rms + 1e-10) # Convertir a dB
|
|
|
|
# Spectral Centroid (brillo)
|
|
spectral_centroid = float(np.mean(librosa.feature.spectral_centroid(y=y, sr=sr)))
|
|
|
|
# Spectral Rolloff
|
|
spectral_rolloff = float(np.mean(librosa.feature.spectral_rolloff(y=y, sr=sr)))
|
|
|
|
# Zero Crossing Rate
|
|
zcr = float(np.mean(librosa.feature.zero_crossing_rate(y)))
|
|
|
|
# MFCCs (13 coeficientes)
|
|
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
|
|
mfccs_mean = [float(np.mean(coef)) for coef in mfccs]
|
|
|
|
# Onset Strength (qué tan rítmico es)
|
|
onset_env = librosa.onset.onset_strength(y=y, sr=sr)
|
|
onset_strength = float(np.mean(onset_env))
|
|
|
|
# BPM detection
|
|
try:
|
|
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
|
|
bpm = float(tempo) if isinstance(tempo, (int, float, np.number)) else float(tempo[0])
|
|
except:
|
|
bpm = 0.0
|
|
|
|
# Key detection via chromagram
|
|
try:
|
|
chromagram = librosa.feature.chroma_cqt(y=y, sr=sr)
|
|
# Sumar a lo largo del tiempo para obtener el perfil de pitch
|
|
chroma_avg = np.sum(chromagram, axis=1)
|
|
# Notas musicales
|
|
notes = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
|
|
# Encontrar la nota dominante
|
|
key_index = np.argmax(chroma_avg)
|
|
key = notes[key_index]
|
|
|
|
# Detectar si es mayor o menor (heurística simple)
|
|
# Si el tercer grado está presente, es menor
|
|
minor_third_idx = (key_index + 3) % 12
|
|
if chroma_avg[minor_third_idx] > chroma_avg[(key_index + 4) % 12]:
|
|
key += 'm'
|
|
except:
|
|
key = ""
|
|
|
|
# Determinar canales (asumimos mono después de librosa.load con mono=True)
|
|
# Para saber si era stereo originalmente, tendríamos que cargar de nuevo
|
|
try:
|
|
y_orig, _ = librosa.load(str(file_path), sr=None, mono=False)
|
|
channels = y_orig.shape[0] if len(y_orig.shape) > 1 else 1
|
|
except:
|
|
channels = 1
|
|
|
|
return {
|
|
"rms": round(rms_db, 2),
|
|
"spectral_centroid": round(spectral_centroid, 2),
|
|
"spectral_rolloff": round(spectral_rolloff, 2),
|
|
"zero_crossing_rate": round(zcr, 4),
|
|
"mfccs": [round(m, 4) for m in mfccs_mean],
|
|
"onset_strength": round(onset_strength, 4),
|
|
"duration": round(duration, 3),
|
|
"sample_rate": sr,
|
|
"channels": channels,
|
|
"bpm": round(bpm, 1) if bpm > 0 else 0,
|
|
"key": key
|
|
}
|
|
|
|
except Exception as e:
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Error analizando {file_path}: {e}")
|
|
return None
|
|
|
|
def _extract_features_scipy(self, file_path: Path) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Extrae features básicas usando scipy (fallback cuando librosa no está).
|
|
|
|
Solo soporta archivos WAV.
|
|
|
|
Args:
|
|
file_path: Ruta al archivo de audio
|
|
|
|
Returns:
|
|
Diccionario con features básicas o None si hay error
|
|
"""
|
|
try:
|
|
# scipy solo soporta WAV nativamente
|
|
if file_path.suffix.lower() not in {'.wav'}:
|
|
return None
|
|
|
|
# Cargar audio
|
|
sr, data = wavfile.read(str(file_path))
|
|
|
|
# Convertir a float y mono si es necesario
|
|
if data.ndim > 1:
|
|
channels = data.shape[1]
|
|
data = np.mean(data, axis=1) # Convertir a mono
|
|
else:
|
|
channels = 1
|
|
|
|
# Normalizar a float [-1, 1]
|
|
if data.dtype == np.int16:
|
|
data = data.astype(np.float32) / 32768.0
|
|
elif data.dtype == np.int32:
|
|
data = data.astype(np.float32) / 2147483648.0
|
|
else:
|
|
data = data.astype(np.float32)
|
|
|
|
# Duración
|
|
duration = len(data) / sr
|
|
|
|
# RMS
|
|
rms = np.sqrt(np.mean(data ** 2))
|
|
rms_db = 20 * np.log10(rms + 1e-10)
|
|
|
|
# Spectral Centroid usando FFT
|
|
fft = np.fft.fft(data)
|
|
freqs = np.fft.fftfreq(len(data), 1/sr)
|
|
magnitude = np.abs(fft)
|
|
|
|
# Solo frecuencias positivas
|
|
positive_freqs = freqs[:len(freqs)//2]
|
|
positive_magnitude = magnitude[:len(magnitude)//2]
|
|
|
|
spectral_centroid = np.sum(positive_freqs * positive_magnitude) / np.sum(positive_magnitude)
|
|
|
|
# Zero Crossing Rate
|
|
zcr = np.mean(np.diff(np.sign(data)) != 0)
|
|
|
|
# No podemos hacer análisis avanzado sin librosa
|
|
return {
|
|
"rms": round(rms_db, 2),
|
|
"spectral_centroid": round(float(spectral_centroid), 2),
|
|
"spectral_rolloff": 0.0, # No disponible sin librosa
|
|
"zero_crossing_rate": round(float(zcr), 4),
|
|
"mfccs": [], # No disponible sin librosa
|
|
"onset_strength": 0.0, # No disponible sin librosa
|
|
"duration": round(duration, 3),
|
|
"sample_rate": sr,
|
|
"channels": channels,
|
|
"bpm": 0, # No disponible sin librosa
|
|
"key": "" # No disponible sin librosa
|
|
}
|
|
|
|
except Exception as e:
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Error (scipy) analizando {file_path}: {e}")
|
|
return None
|
|
|
|
def _extract_features(self, file_path: Path) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Extrae features de un archivo de audio.
|
|
|
|
Usa librosa si está disponible, de lo contrario usa scipy.
|
|
|
|
Args:
|
|
file_path: Ruta al archivo de audio
|
|
|
|
Returns:
|
|
Diccionario con features o None si hay error
|
|
"""
|
|
if LIBROSA_AVAILABLE:
|
|
return self._extract_features_librosa(file_path)
|
|
elif SCIPY_AVAILABLE:
|
|
return self._extract_features_scipy(file_path)
|
|
else:
|
|
return None
|
|
|
|
def _scan_samples(self) -> List[Path]:
|
|
"""
|
|
Escanea recursivamente la librería buscando samples de audio.
|
|
|
|
Returns:
|
|
Lista de rutas a archivos de audio encontrados
|
|
"""
|
|
samples = []
|
|
|
|
if not self.library_path.exists():
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Librería no encontrada: {self.library_path}")
|
|
return samples
|
|
|
|
for ext in self.SUPPORTED_EXTENSIONS:
|
|
samples.extend(self.library_path.rglob(f"*{ext}"))
|
|
|
|
return samples
|
|
|
|
def analyze_sample(self, file_path: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Analiza un sample individual y extrae sus features.
|
|
|
|
Args:
|
|
file_path: Ruta al archivo de audio
|
|
|
|
Returns:
|
|
Diccionario con todas las features del sample
|
|
"""
|
|
path = Path(file_path)
|
|
|
|
if not path.exists():
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Archivo no encontrado: {file_path}")
|
|
return None
|
|
|
|
if path.suffix.lower() not in self.SUPPORTED_EXTENSIONS:
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Formato no soportado: {path.suffix}")
|
|
return None
|
|
|
|
# Extraer features de audio
|
|
audio_features = self._extract_features(path)
|
|
|
|
if audio_features is None:
|
|
return None
|
|
|
|
# Construir el objeto completo de features
|
|
abs_path = str(path.resolve())
|
|
role = self._detect_role(path)
|
|
pack = self._get_pack_name(path)
|
|
|
|
features = {
|
|
"name": path.name,
|
|
"pack": pack,
|
|
"role": role,
|
|
**audio_features
|
|
}
|
|
|
|
# Guardar en caché interno
|
|
self.features[abs_path] = features
|
|
|
|
return features
|
|
|
|
def analyze_all(self, force_reanalyze: bool = False) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Analiza todos los samples de la librería.
|
|
|
|
Args:
|
|
force_reanalyze: Si True, re-analiza incluso si hay caché
|
|
|
|
Returns:
|
|
Diccionario con todas las features indexadas por path
|
|
"""
|
|
# Verificar si ya tenemos caché válido
|
|
if not force_reanalyze and self.features:
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Usando caché existente con {len(self.features)} samples")
|
|
return self.features
|
|
|
|
# Escanear samples
|
|
samples = self._scan_samples()
|
|
|
|
if not samples:
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] No se encontraron samples en {self.library_path}")
|
|
return {}
|
|
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Encontrados {len(samples)} samples para analizar")
|
|
|
|
# Analizar cada sample
|
|
total = len(samples)
|
|
analyzed = 0
|
|
failed = 0
|
|
|
|
for i, sample_path in enumerate(samples, 1):
|
|
abs_path = str(sample_path.resolve())
|
|
|
|
# Verificar si ya está en caché y no es force_reanalyze
|
|
if not force_reanalyze and abs_path in self.features:
|
|
continue
|
|
|
|
# Analizar sample
|
|
features = self.analyze_sample(abs_path)
|
|
|
|
if features:
|
|
analyzed += 1
|
|
else:
|
|
failed += 1
|
|
|
|
# Mostrar progreso
|
|
if self.verbose and i % 10 == 0:
|
|
pct = (i / total) * 100
|
|
print(f"[LibreriaAnalyzer] Progreso: {i}/{total} ({pct:.1f}%) - OK: {analyzed}, Fallos: {failed}")
|
|
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Análisis completo: {analyzed} analizados, {failed} fallidos")
|
|
|
|
# Guardar caché
|
|
self._save_cache()
|
|
|
|
return self.features
|
|
|
|
def get_features(self, sample_path: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Obtiene las features de un sample específico.
|
|
|
|
Si el sample no está en caché, lo analiza.
|
|
|
|
Args:
|
|
sample_path: Ruta al archivo de audio
|
|
|
|
Returns:
|
|
Diccionario con features o None si no se puede analizar
|
|
"""
|
|
abs_path = str(Path(sample_path).resolve())
|
|
|
|
# Verificar si está en caché
|
|
if abs_path in self.features:
|
|
return self.features[abs_path]
|
|
|
|
# Analizar si no está en caché
|
|
return self.analyze_sample(sample_path)
|
|
|
|
def get_all_features(self) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Obtiene todas las features cargadas/analizadas.
|
|
|
|
Returns:
|
|
Diccionario con todas las features
|
|
"""
|
|
return self.features
|
|
|
|
def clear_cache(self) -> None:
|
|
"""Elimina el archivo de caché y limpia las features en memoria."""
|
|
self.features = {}
|
|
if self.cache_path.exists():
|
|
try:
|
|
self.cache_path.unlink()
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Caché eliminado: {self.cache_path}")
|
|
except IOError as e:
|
|
if self.verbose:
|
|
print(f"[LibreriaAnalyzer] Error eliminando caché: {e}")
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Obtiene estadísticas de la librería analizada.
|
|
|
|
Returns:
|
|
Diccionario con estadísticas
|
|
"""
|
|
if not self.features:
|
|
return {
|
|
"total_samples": 0,
|
|
"by_role": {},
|
|
"avg_duration": 0,
|
|
"avg_rms": 0
|
|
}
|
|
|
|
# Contar por rol
|
|
by_role = {}
|
|
total_duration = 0
|
|
total_rms = 0
|
|
|
|
for path, features in self.features.items():
|
|
role = features.get("role", "unknown")
|
|
by_role[role] = by_role.get(role, 0) + 1
|
|
|
|
total_duration += features.get("duration", 0)
|
|
total_rms += features.get("rms", 0)
|
|
|
|
total = len(self.features)
|
|
|
|
return {
|
|
"total_samples": total,
|
|
"by_role": by_role,
|
|
"avg_duration": round(total_duration / total, 3) if total > 0 else 0,
|
|
"avg_rms": round(total_rms / total, 2) if total > 0 else 0
|
|
}
|
|
|
|
|
|
# Función de conveniencia para uso directo
|
|
def analyze_library(library_path: str = None, verbose: bool = True) -> LibreriaAnalyzer:
|
|
"""
|
|
Analiza toda la librería y retorna el analizador configurado.
|
|
|
|
Args:
|
|
library_path: Ruta a la librería (default: libreria/reggaeton/)
|
|
verbose: Mostrar progreso
|
|
|
|
Returns:
|
|
Instancia de LibreriaAnalyzer con todas las features cargadas
|
|
"""
|
|
analyzer = LibreriaAnalyzer(library_path=library_path, verbose=verbose)
|
|
analyzer.analyze_all()
|
|
return analyzer
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Test básico
|
|
print("[LibreriaAnalyzer] Test de inicialización...")
|
|
|
|
try:
|
|
analyzer = LibreriaAnalyzer(verbose=True)
|
|
print(f"Librería: {analyzer.library_path}")
|
|
print(f"Caché: {analyzer.cache_path}")
|
|
print(f"Librosa disponible: {LIBROSA_AVAILABLE}")
|
|
print(f"Scipy disponible: {SCIPY_AVAILABLE}")
|
|
|
|
# Intentar cargar/analizar
|
|
features = analyzer.analyze_all()
|
|
print(f"\nTotal samples en caché: {len(features)}")
|
|
|
|
# Mostrar estadísticas
|
|
stats = analyzer.get_stats()
|
|
print(f"\nEstadísticas: {json.dumps(stats, indent=2)}")
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|