Files
ableton-mcp-ai/mcp_server/engines/abstract_analyzer.py
OpenCode Agent 5ce8187c65 feat: Implement senior audio injection with 5 fallback methods
- Add _cmd_create_arrangement_audio_pattern with 5-method fallback chain
- Method 1: track.insert_arrangement_clip() [Live 12+]
- Method 2: track.create_audio_clip() [Live 11+]
- Method 3: arrangement_clips.add_new_clip() [Live 12+]
- Method 4: Session->duplicate_clip_to_arrangement [Legacy]
- Method 5: Session->Recording [Universal]

- Add _cmd_duplicate_clip_to_arrangement for session-to-arrangement workflow
- Update skills documentation
- Verified: 3 clips created at positions [0, 4, 8] in Arrangement View

Closes: Audio injection in Arrangement View
2026-04-12 14:02:32 -03:00

1473 lines
49 KiB
Python

"""
Abstract Analyzer - Sistema abstracto de extracción de features de audio.
Este módulo proporciona una arquitectura flexible para extraer características
espectrales de samples de audio, con múltiples implementaciones:
- LibrosaExtractor: Análisis completo usando librosa
- DatabaseExtractor: Lookups rápidos desde SQLite
- HybridExtractor: Combina ambos enfoques (cache + análisis)
Uso:
from engines.abstract_analyzer import HybridExtractor
extractor = HybridExtractor()
features = extractor.get_or_analyze("path/to/sample.wav")
# O usar extractores individuales
from engines.abstract_analyzer import LibrosaExtractor
librosa_ext = LibrosaExtractor()
bpm = librosa_ext.extract_bpm("path/to/sample.wav")
"""
import os
import json
import sqlite3
import hashlib
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple, Union
from datetime import datetime
logger = logging.getLogger("AbstractAnalyzer")
# Paths por defecto
DEFAULT_LIBRARY_PATH = Path(r"C:\ProgramData\Ableton\Live 12 Suite\Resources\MIDI Remote Scripts\libreria\reggaeton")
DEFAULT_DB_PATH = DEFAULT_LIBRARY_PATH / ".sample_metadata.db"
@dataclass
class SampleFeatures:
"""
Dataclass que encapsula todas las features extraídas de un sample.
Attributes:
path: Ruta absoluta al archivo de audio
bpm: Tempo detectado en beats por minuto
key: Tonalidad musical detectada (ej: "Am", "C")
duration: Duración en segundos
rms: Root Mean Square (energía promedio) en dB
spectral_centroid: Centroide espectral (brillo) en Hz
spectral_rolloff: Frecuencia de rolloff espectral en Hz
zero_crossing_rate: Tasa de cruce por cero (noisiness)
mfccs: Lista de 13 coeficientes MFCC (timbre)
sample_rate: Frecuencia de muestreo en Hz
channels: Número de canales (1=mono, 2=stereo)
analyzed_at: Timestamp del análisis
source: Fuente de los datos ('librosa', 'database', 'cache')
"""
path: str
bpm: Optional[float] = None
key: Optional[str] = None
duration: Optional[float] = None
rms: Optional[float] = None
spectral_centroid: Optional[float] = None
spectral_rolloff: Optional[float] = None
zero_crossing_rate: Optional[float] = None
mfccs: Optional[List[float]] = field(default_factory=list)
sample_rate: Optional[int] = None
channels: Optional[int] = None
analyzed_at: Optional[str] = None
source: str = "unknown"
def to_dict(self) -> Dict[str, Any]:
"""Convierte a diccionario para serialización."""
return {
"path": self.path,
"bpm": self.bpm,
"key": self.key,
"duration": self.duration,
"rms": self.rms,
"spectral_centroid": self.spectral_centroid,
"spectral_rolloff": self.spectral_rolloff,
"zero_crossing_rate": self.zero_crossing_rate,
"mfccs": self.mfccs,
"sample_rate": self.sample_rate,
"channels": self.channels,
"analyzed_at": self.analyzed_at or datetime.now().isoformat(),
"source": self.source
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "SampleFeatures":
"""Crea instancia desde diccionario."""
return cls(
path=data.get("path", ""),
bpm=data.get("bpm"),
key=data.get("key"),
duration=data.get("duration"),
rms=data.get("rms"),
spectral_centroid=data.get("spectral_centroid"),
spectral_rolloff=data.get("spectral_rolloff"),
zero_crossing_rate=data.get("zero_crossing_rate"),
mfccs=data.get("mfccs", []),
sample_rate=data.get("sample_rate"),
channels=data.get("channels"),
analyzed_at=data.get("analyzed_at"),
source=data.get("source", "unknown")
)
def is_complete(self) -> bool:
"""Verifica si todas las features principales están presentes."""
return all([
self.bpm is not None,
self.key is not None,
self.duration is not None,
self.rms is not None,
self.spectral_centroid is not None,
len(self.mfccs) == 13
])
class FeatureExtractor(ABC):
"""
Abstract Base Class para extractores de features de audio.
Define la interfaz común que todos los extractores deben implementar.
Las subclases concretas deben implementar todos los métodos abstractos.
Example:
class MyExtractor(FeatureExtractor):
def extract_bpm(self, audio_path: str) -> Optional[float]:
# Implementación específica
return 128.0
"""
@abstractmethod
def extract_bpm(self, audio_path: str) -> Optional[float]:
"""
Extrae el BPM (tempo) de un archivo de audio.
Args:
audio_path: Ruta al archivo de audio
Returns:
Tempo en BPM o None si no se puede detectar
"""
pass
@abstractmethod
def extract_key(self, audio_path: str) -> Optional[str]:
"""
Detecta la tonalidad musical del audio.
Args:
audio_path: Ruta al archivo de audio
Returns:
Tonalidad en formato string (ej: "Am", "C", "F#m") o None
"""
pass
@abstractmethod
def extract_duration(self, audio_path: str) -> Optional[float]:
"""
Obtiene la duración del audio en segundos.
Args:
audio_path: Ruta al archivo de audio
Returns:
Duración en segundos o None
"""
pass
@abstractmethod
def extract_rms(self, audio_path: str) -> Optional[float]:
"""
Calcula el RMS (Root Mean Square) - energía promedio del audio.
Args:
audio_path: Ruta al archivo de audio
Returns:
RMS en dB o None
"""
pass
@abstractmethod
def extract_spectral_centroid(self, audio_path: str) -> Optional[float]:
"""
Calcula el centroide espectral (brillo del sonido).
Args:
audio_path: Ruta al archivo de audio
Returns:
Centroide espectral en Hz o None
"""
pass
@abstractmethod
def extract_spectral_rolloff(self, audio_path: str) -> Optional[float]:
"""
Calcula la frecuencia de rolloff espectral.
Args:
audio_path: Ruta al archivo de audio
Returns:
Frecuencia de rolloff en Hz o None
"""
pass
@abstractmethod
def extract_zero_crossing_rate(self, audio_path: str) -> Optional[float]:
"""
Calcula la tasa de cruce por cero (noisiness).
Args:
audio_path: Ruta al archivo de audio
Returns:
ZCR como float o None
"""
pass
@abstractmethod
def extract_mfccs(self, audio_path: str) -> Optional[List[float]]:
"""
Extrae los coeficientes MFCC (Mel-Frequency Cepstral Coefficients).
Args:
audio_path: Ruta al archivo de audio
Returns:
Lista de 13 coeficientes MFCC o None
"""
pass
@abstractmethod
def extract_all_features(self, audio_path: str) -> SampleFeatures:
"""
Extrae todas las features disponibles en una sola operación.
Args:
audio_path: Ruta al archivo de audio
Returns:
Objeto SampleFeatures con todas las características
"""
pass
def _check_file_exists(self, audio_path: str) -> bool:
"""Helper para verificar que el archivo existe."""
if not os.path.exists(audio_path):
logger.error("Archivo no encontrado: %s", audio_path)
return False
return True
def _get_file_hash(self, audio_path: str) -> str:
"""Genera un hash único para el archivo (para cache)."""
stat = os.stat(audio_path)
content = f"{audio_path}:{stat.st_size}:{stat.st_mtime}"
return hashlib.md5(content.encode()).hexdigest()
class LibrosaExtractor(FeatureExtractor):
"""
Implementación de FeatureExtractor usando librosa + numpy.
Realiza análisis completo de audio extrayendo todas las características
espectrales. Usa lazy loading para importar librosa solo cuando se necesita.
Attributes:
sample_rate: Sample rate objetivo (None = mantener original)
hop_length: Hop length para análisis de features
n_mfcc: Número de coeficientes MFCC a extraer (default 13)
"""
def __init__(self, sample_rate: Optional[int] = None, hop_length: int = 512, n_mfcc: int = 13):
"""
Inicializa el extractor de Librosa.
Args:
sample_rate: Sample rate objetivo (None = mantener original)
hop_length: Hop length para análisis (default 512)
n_mfcc: Número de coeficientes MFCC (default 13)
"""
self.sample_rate = sample_rate
self.hop_length = hop_length
self.n_mfcc = n_mfcc
self._librosa_available = None
def _check_librosa(self) -> bool:
"""Verifica si librosa está disponible (lazy loading)."""
if self._librosa_available is None:
try:
import librosa
import numpy as np
self._librosa_available = True
except ImportError:
logger.warning("librosa no está disponible. Algunas features no se extraerán.")
self._librosa_available = False
return self._librosa_available
def _load_audio(self, audio_path: str) -> Tuple[Optional[Any], Optional[int]]:
"""
Carga el audio usando librosa.
Returns:
Tuple de (audio_data, sample_rate) o (None, None) si falla
"""
if not self._check_librosa():
return None, None
try:
import librosa
y, sr = librosa.load(audio_path, sr=self.sample_rate, mono=True)
return y, sr
except Exception as e:
logger.error("Error cargando audio %s: %s", audio_path, e)
return None, None
def extract_bpm(self, audio_path: str) -> Optional[float]:
"""
Detecta el BPM usando librosa.beat.beat_track.
Args:
audio_path: Ruta al archivo de audio
Returns:
BPM detectado o None
"""
if not self._check_file_exists(audio_path):
return None
if not self._check_librosa():
return None
try:
import librosa
import numpy as np
y, sr = self._load_audio(audio_path)
if y is None:
return None
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
bpm = float(tempo) if isinstance(tempo, (int, float, np.number)) else float(tempo[0])
logger.debug("BPM extraído de %s: %.1f", audio_path, bpm)
return bpm
except Exception as e:
logger.error("Error extrayendo BPM de %s: %s", audio_path, e)
return None
def extract_key(self, audio_path: str) -> Optional[str]:
"""
Detecta la tonalidad usando chromagrama.
Args:
audio_path: Ruta al archivo de audio
Returns:
Tonalidad detectada (ej: "Am", "C") o None
"""
if not self._check_file_exists(audio_path):
return None
if not self._check_librosa():
return None
try:
import librosa
import numpy as np
y, sr = self._load_audio(audio_path)
if y is None:
return None
# Usar chroma_cqt para mejor detección de pitch
chromagram = librosa.feature.chroma_cqt(y=y, sr=sr)
chroma_avg = np.sum(chromagram, axis=1)
# Notas musicales
notes = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
key_index = np.argmax(chroma_avg)
key = notes[key_index]
# Heurística simple para detectar mayor/menor
# Compara intensidad del tercer grado menor vs mayor
minor_third_idx = (key_index + 3) % 12
major_third_idx = (key_index + 4) % 12
if chroma_avg[minor_third_idx] > chroma_avg[major_third_idx]:
key += 'm' # Menor
logger.debug("Key extraída de %s: %s", audio_path, key)
return key
except Exception as e:
logger.error("Error extrayendo key de %s: %s", audio_path, e)
return None
def extract_duration(self, audio_path: str) -> Optional[float]:
"""
Obtiene la duración del audio.
Args:
audio_path: Ruta al archivo de audio
Returns:
Duración en segundos o None
"""
if not self._check_file_exists(audio_path):
return None
if not self._check_librosa():
return None
try:
import librosa
y, sr = self._load_audio(audio_path)
if y is None:
return None
duration = librosa.get_duration(y=y, sr=sr)
return float(duration)
except Exception as e:
logger.error("Error extrayendo duración de %s: %s", audio_path, e)
return None
def extract_rms(self, audio_path: str) -> Optional[float]:
"""
Calcula el RMS (energía promedio) del audio.
Args:
audio_path: Ruta al archivo de audio
Returns:
RMS en dB o None
"""
if not self._check_file_exists(audio_path):
return None
if not self._check_librosa():
return None
try:
import librosa
import numpy as np
y, sr = self._load_audio(audio_path)
if y is None:
return None
rms = np.mean(librosa.feature.rms(y=y))
rms_db = 20 * np.log10(rms + 1e-10) # Convertir a dB
return float(rms_db)
except Exception as e:
logger.error("Error extrayendo RMS de %s: %s", audio_path, e)
return None
def extract_spectral_centroid(self, audio_path: str) -> Optional[float]:
"""
Calcula el centroide espectral (brillo promedio).
Args:
audio_path: Ruta al archivo de audio
Returns:
Centroide espectral en Hz o None
"""
if not self._check_file_exists(audio_path):
return None
if not self._check_librosa():
return None
try:
import librosa
import numpy as np
y, sr = self._load_audio(audio_path)
if y is None:
return None
centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
mean_centroid = float(np.mean(centroid))
return mean_centroid
except Exception as e:
logger.error("Error extrayendo spectral centroid de %s: %s", audio_path, e)
return None
def extract_spectral_rolloff(self, audio_path: str) -> Optional[float]:
"""
Calcula la frecuencia de rolloff espectral.
Args:
audio_path: Ruta al archivo de audio
Returns:
Frecuencia de rolloff en Hz o None
"""
if not self._check_file_exists(audio_path):
return None
if not self._check_librosa():
return None
try:
import librosa
import numpy as np
y, sr = self._load_audio(audio_path)
if y is None:
return None
rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
mean_rolloff = float(np.mean(rolloff))
return mean_rolloff
except Exception as e:
logger.error("Error extrayendo spectral rolloff de %s: %s", audio_path, e)
return None
def extract_zero_crossing_rate(self, audio_path: str) -> Optional[float]:
"""
Calcula la tasa de cruce por cero.
Args:
audio_path: Ruta al archivo de audio
Returns:
ZCR como float o None
"""
if not self._check_file_exists(audio_path):
return None
if not self._check_librosa():
return None
try:
import librosa
import numpy as np
y, sr = self._load_audio(audio_path)
if y is None:
return None
zcr = librosa.feature.zero_crossing_rate(y)
mean_zcr = float(np.mean(zcr))
return mean_zcr
except Exception as e:
logger.error("Error extrayendo ZCR de %s: %s", audio_path, e)
return None
def extract_mfccs(self, audio_path: str) -> Optional[List[float]]:
"""
Extrae los coeficientes MFCC.
Args:
audio_path: Ruta al archivo de audio
Returns:
Lista de 13 coeficientes MFCC o None
"""
if not self._check_file_exists(audio_path):
return None
if not self._check_librosa():
return None
try:
import librosa
import numpy as np
y, sr = self._load_audio(audio_path)
if y is None:
return None
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=self.n_mfcc)
mfccs_mean = [float(np.mean(coef)) for coef in mfccs]
return mfccs_mean
except Exception as e:
logger.error("Error extrayendo MFCCs de %s: %s", audio_path, e)
return None
def extract_all_features(self, audio_path: str) -> SampleFeatures:
"""
Extrae todas las features en una sola operación eficiente.
Args:
audio_path: Ruta al archivo de audio
Returns:
Objeto SampleFeatures completo
"""
if not self._check_file_exists(audio_path):
return SampleFeatures(path=audio_path, source="error")
if not self._check_librosa():
logger.error("librosa no disponible, no se pueden extraer features")
return SampleFeatures(path=audio_path, source="error")
try:
import librosa
import numpy as np
# Cargar audio una sola vez
y, sr = self._load_audio(audio_path)
if y is None:
return SampleFeatures(path=audio_path, source="error")
# Extraer todas las features de una vez
# 1. Duración
duration = librosa.get_duration(y=y, sr=sr)
# 2. BPM
try:
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
bpm = float(tempo) if isinstance(tempo, (int, float, np.number)) else float(tempo[0])
except:
bpm = None
# 3. Key
try:
chromagram = librosa.feature.chroma_cqt(y=y, sr=sr)
chroma_avg = np.sum(chromagram, axis=1)
notes = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
key_index = np.argmax(chroma_avg)
key = notes[key_index]
minor_third_idx = (key_index + 3) % 12
major_third_idx = (key_index + 4) % 12
if chroma_avg[minor_third_idx] > chroma_avg[major_third_idx]:
key += 'm'
except:
key = None
# 4. RMS
rms = float(np.mean(librosa.feature.rms(y=y)))
rms_db = 20 * np.log10(rms + 1e-10)
# 5. Spectral Centroid
centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
spectral_centroid = float(np.mean(centroid))
# 6. Spectral Rolloff
rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
spectral_rolloff = float(np.mean(rolloff))
# 7. Zero Crossing Rate
zcr = librosa.feature.zero_crossing_rate(y)
zero_crossing_rate = float(np.mean(zcr))
# 8. MFCCs
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=self.n_mfcc)
mfccs_mean = [float(np.mean(coef)) for coef in mfccs]
# 9. Detectar canales originales
try:
y_orig, _ = librosa.load(audio_path, sr=None, mono=False)
channels = y_orig.shape[0] if len(y_orig.shape) > 1 else 1
except:
channels = 1
return SampleFeatures(
path=audio_path,
bpm=bpm,
key=key,
duration=float(duration),
rms=float(rms_db),
spectral_centroid=spectral_centroid,
spectral_rolloff=spectral_rolloff,
zero_crossing_rate=zero_crossing_rate,
mfccs=mfccs_mean,
sample_rate=sr,
channels=channels,
analyzed_at=datetime.now().isoformat(),
source="librosa"
)
except Exception as e:
logger.error("Error extrayendo todas las features de %s: %s", audio_path, e)
return SampleFeatures(path=audio_path, source="error")
class SampleMetadataStore:
"""
Almacén de metadatos de samples usando SQLite.
Proporciona lookups rápidos de features pre-calculadas sin necesidad
de re-analizar los archivos de audio.
Attributes:
db_path: Ruta al archivo SQLite de la base de datos
"""
def __init__(self, db_path: Optional[Union[str, Path]] = None):
"""
Inicializa el store de metadatos.
Args:
db_path: Ruta a la base de datos SQLite (default: .sample_metadata.db en librería)
"""
if db_path is None:
self.db_path = DEFAULT_DB_PATH
else:
self.db_path = Path(db_path)
self._init_db()
def _init_db(self) -> None:
"""Inicializa el schema de la base de datos si no existe."""
try:
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
# Tabla principal de samples
cursor.execute('''
CREATE TABLE IF NOT EXISTS samples (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT UNIQUE NOT NULL,
file_hash TEXT,
bpm REAL,
key TEXT,
duration REAL,
rms REAL,
spectral_centroid REAL,
spectral_rolloff REAL,
zero_crossing_rate REAL,
mfccs TEXT, -- JSON array
sample_rate INTEGER,
channels INTEGER,
analyzed_at TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Índices para búsquedas rápidas
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_samples_path ON samples(path)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_samples_key ON samples(key)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_samples_bpm ON samples(bpm)
''')
conn.commit()
conn.close()
logger.debug("Base de datos inicializada: %s", self.db_path)
except sqlite3.Error as e:
logger.error("Error inicializando base de datos: %s", e)
def get(self, sample_path: str) -> Optional[SampleFeatures]:
"""
Recupera las features de un sample desde la base de datos.
Args:
sample_path: Ruta al archivo de audio
Returns:
SampleFeatures si existe en la DB, None en caso contrario
"""
try:
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute('''
SELECT path, bpm, key, duration, rms, spectral_centroid,
spectral_rolloff, zero_crossing_rate, mfccs,
sample_rate, channels, analyzed_at
FROM samples WHERE path = ?
''', (sample_path,))
row = cursor.fetchone()
conn.close()
if row:
mfccs = json.loads(row[8]) if row[8] else []
return SampleFeatures(
path=row[0],
bpm=row[1],
key=row[2],
duration=row[3],
rms=row[4],
spectral_centroid=row[5],
spectral_rolloff=row[6],
zero_crossing_rate=row[7],
mfccs=mfccs,
sample_rate=row[9],
channels=row[10],
analyzed_at=row[11],
source="database"
)
return None
except sqlite3.Error as e:
logger.error("Error leyendo de base de datos: %s", e)
return None
def save(self, features: SampleFeatures) -> bool:
"""
Guarda o actualiza las features de un sample en la base de datos.
Args:
features: Objeto SampleFeatures a guardar
Returns:
True si se guardó correctamente, False en caso contrario
"""
try:
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
# Generar hash del archivo
file_hash = ""
if os.path.exists(features.path):
stat = os.stat(features.path)
file_hash = hashlib.md5(f"{features.path}:{stat.st_size}:{stat.st_mtime}".encode()).hexdigest()
mfccs_json = json.dumps(features.mfccs) if features.mfccs else "[]"
cursor.execute('''
INSERT OR REPLACE INTO samples
(path, file_hash, bpm, key, duration, rms, spectral_centroid,
spectral_rolloff, zero_crossing_rate, mfccs, sample_rate,
channels, analyzed_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (
features.path,
file_hash,
features.bpm,
features.key,
features.duration,
features.rms,
features.spectral_centroid,
features.spectral_rolloff,
features.zero_crossing_rate,
mfccs_json,
features.sample_rate,
features.channels,
features.analyzed_at or datetime.now().isoformat()
))
conn.commit()
conn.close()
logger.debug("Features guardadas en DB para: %s", features.path)
return True
except sqlite3.Error as e:
logger.error("Error guardando en base de datos: %s", e)
return False
def exists(self, sample_path: str) -> bool:
"""
Verifica si un sample existe en la base de datos.
Args:
sample_path: Ruta al archivo de audio
Returns:
True si existe en la DB, False en caso contrario
"""
try:
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute('SELECT 1 FROM samples WHERE path = ?', (sample_path,))
result = cursor.fetchone() is not None
conn.close()
return result
except sqlite3.Error as e:
logger.error("Error consultando base de datos: %s", e)
return False
def delete(self, sample_path: str) -> bool:
"""
Elimina un sample de la base de datos.
Args:
sample_path: Ruta al archivo de audio
Returns:
True si se eliminó, False en caso contrario
"""
try:
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute('DELETE FROM samples WHERE path = ?', (sample_path,))
conn.commit()
conn.close()
return True
except sqlite3.Error as e:
logger.error("Error eliminando de base de datos: %s", e)
return False
def get_all(self, limit: Optional[int] = None) -> List[SampleFeatures]:
"""
Recupera todas las features almacenadas.
Args:
limit: Límite de resultados (opcional)
Returns:
Lista de SampleFeatures
"""
try:
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
query = '''
SELECT path, bpm, key, duration, rms, spectral_centroid,
spectral_rolloff, zero_crossing_rate, mfccs,
sample_rate, channels, analyzed_at
FROM samples ORDER BY updated_at DESC
'''
if limit:
query += f' LIMIT {limit}'
cursor.execute(query)
rows = cursor.fetchall()
conn.close()
results = []
for row in rows:
mfccs = json.loads(row[8]) if row[8] else []
results.append(SampleFeatures(
path=row[0],
bpm=row[1],
key=row[2],
duration=row[3],
rms=row[4],
spectral_centroid=row[5],
spectral_rolloff=row[6],
zero_crossing_rate=row[7],
mfccs=mfccs,
sample_rate=row[9],
channels=row[10],
analyzed_at=row[11],
source="database"
))
return results
except sqlite3.Error as e:
logger.error("Error leyendo de base de datos: %s", e)
return []
def count(self) -> int:
"""
Retorna el número total de samples almacenados.
Returns:
Número de samples en la base de datos
"""
try:
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM samples')
count = cursor.fetchone()[0]
conn.close()
return count
except sqlite3.Error as e:
logger.error("Error contando registros: %s", e)
return 0
class DatabaseExtractor(FeatureExtractor):
"""
Implementación de FeatureExtractor que usa SampleMetadataStore.
Proporciona lookups rápidos desde SQLite sin necesidad de numpy/librosa.
Este extractor no realiza análisis de audio, solo recupera datos cacheados.
Attributes:
store: Instancia de SampleMetadataStore para acceso a datos
"""
def __init__(self, db_path: Optional[Union[str, Path]] = None):
"""
Inicializa el extractor de base de datos.
Args:
db_path: Ruta a la base de datos SQLite (opcional)
"""
self.store = SampleMetadataStore(db_path)
def _get_features(self, audio_path: str) -> Optional[SampleFeatures]:
"""Helper para obtener features desde la DB."""
return self.store.get(audio_path)
def extract_bpm(self, audio_path: str) -> Optional[float]:
"""Recupera BPM desde la base de datos."""
if not self._check_file_exists(audio_path):
return None
features = self._get_features(audio_path)
return features.bpm if features else None
def extract_key(self, audio_path: str) -> Optional[str]:
"""Recupera key desde la base de datos."""
if not self._check_file_exists(audio_path):
return None
features = self._get_features(audio_path)
return features.key if features else None
def extract_duration(self, audio_path: str) -> Optional[float]:
"""Recupera duración desde la base de datos."""
if not self._check_file_exists(audio_path):
return None
features = self._get_features(audio_path)
return features.duration if features else None
def extract_rms(self, audio_path: str) -> Optional[float]:
"""Recupera RMS desde la base de datos."""
if not self._check_file_exists(audio_path):
return None
features = self._get_features(audio_path)
return features.rms if features else None
def extract_spectral_centroid(self, audio_path: str) -> Optional[float]:
"""Recupera spectral centroid desde la base de datos."""
if not self._check_file_exists(audio_path):
return None
features = self._get_features(audio_path)
return features.spectral_centroid if features else None
def extract_spectral_rolloff(self, audio_path: str) -> Optional[float]:
"""Recupera spectral rolloff desde la base de datos."""
if not self._check_file_exists(audio_path):
return None
features = self._get_features(audio_path)
return features.spectral_rolloff if features else None
def extract_zero_crossing_rate(self, audio_path: str) -> Optional[float]:
"""Recupera ZCR desde la base de datos."""
if not self._check_file_exists(audio_path):
return None
features = self._get_features(audio_path)
return features.zero_crossing_rate if features else None
def extract_mfccs(self, audio_path: str) -> Optional[List[float]]:
"""Recupera MFCCs desde la base de datos."""
if not self._check_file_exists(audio_path):
return None
features = self._get_features(audio_path)
return features.mfccs if features else None
def extract_all_features(self, audio_path: str) -> SampleFeatures:
"""
Recupera todas las features desde la base de datos.
Si no existen en la DB, retorna un SampleFeatures vacío con source="not_found".
"""
if not self._check_file_exists(audio_path):
return SampleFeatures(path=audio_path, source="error")
features = self._get_features(audio_path)
if features:
return features
return SampleFeatures(path=audio_path, source="not_found")
def is_cached(self, audio_path: str) -> bool:
"""
Verifica si un sample tiene features cacheadas.
Args:
audio_path: Ruta al archivo de audio
Returns:
True si existe en la base de datos
"""
return self.store.exists(audio_path)
class HybridExtractor(FeatureExtractor):
"""
Extractor híbrido que combina DatabaseExtractor + LibrosaExtractor.
Estrategia:
1. Primero intenta recuperar de la base de datos (rápido)
2. Si no existe, usa LibrosaExtractor para analizar
3. Guarda automáticamente los resultados en la base de datos
Esta clase es el punto de entrada recomendado para la mayoría de casos de uso.
Attributes:
db_extractor: Instancia de DatabaseExtractor para lookups rápidos
librosa_extractor: Instancia de LibrosaExtractor para análisis
"""
def __init__(self,
db_path: Optional[Union[str, Path]] = None,
sample_rate: Optional[int] = None,
n_mfcc: int = 13):
"""
Inicializa el extractor híbrido.
Args:
db_path: Ruta a la base de datos SQLite (opcional)
sample_rate: Sample rate para LibrosaExtractor (opcional)
n_mfcc: Número de coeficientes MFCC (default 13)
"""
self.db_extractor = DatabaseExtractor(db_path)
self.librosa_extractor = LibrosaExtractor(sample_rate=sample_rate, n_mfcc=n_mfcc)
self.store = self.db_extractor.store # Referencia directa para conveniencia
def extract_bpm(self, audio_path: str) -> Optional[float]:
"""
Extrae BPM (desde DB si existe, sino con librosa y guarda).
Args:
audio_path: Ruta al archivo de audio
Returns:
BPM detectado o None
"""
# Intentar desde DB primero
bpm = self.db_extractor.extract_bpm(audio_path)
if bpm is not None:
return bpm
# Analizar con librosa
bpm = self.librosa_extractor.extract_bpm(audio_path)
if bpm is not None:
# Guardar análisis completo para evitar re-análisis futuro
features = self.librosa_extractor.extract_all_features(audio_path)
self.store.save(features)
return bpm
def extract_key(self, audio_path: str) -> Optional[str]:
"""Extrae key (con estrategia híbrida)."""
key = self.db_extractor.extract_key(audio_path)
if key is not None:
return key
key = self.librosa_extractor.extract_key(audio_path)
if key is not None:
features = self.librosa_extractor.extract_all_features(audio_path)
self.store.save(features)
return key
def extract_duration(self, audio_path: str) -> Optional[float]:
"""Extrae duración (con estrategia híbrida)."""
duration = self.db_extractor.extract_duration(audio_path)
if duration is not None:
return duration
duration = self.librosa_extractor.extract_duration(audio_path)
if duration is not None:
features = self.librosa_extractor.extract_all_features(audio_path)
self.store.save(features)
return duration
def extract_rms(self, audio_path: str) -> Optional[float]:
"""Extrae RMS (con estrategia híbrida)."""
rms = self.db_extractor.extract_rms(audio_path)
if rms is not None:
return rms
rms = self.librosa_extractor.extract_rms(audio_path)
if rms is not None:
features = self.librosa_extractor.extract_all_features(audio_path)
self.store.save(features)
return rms
def extract_spectral_centroid(self, audio_path: str) -> Optional[float]:
"""Extrae spectral centroid (con estrategia híbrida)."""
centroid = self.db_extractor.extract_spectral_centroid(audio_path)
if centroid is not None:
return centroid
centroid = self.librosa_extractor.extract_spectral_centroid(audio_path)
if centroid is not None:
features = self.librosa_extractor.extract_all_features(audio_path)
self.store.save(features)
return centroid
def extract_spectral_rolloff(self, audio_path: str) -> Optional[float]:
"""Extrae spectral rolloff (con estrategia híbrida)."""
rolloff = self.db_extractor.extract_spectral_rolloff(audio_path)
if rolloff is not None:
return rolloff
rolloff = self.librosa_extractor.extract_spectral_rolloff(audio_path)
if rolloff is not None:
features = self.librosa_extractor.extract_all_features(audio_path)
self.store.save(features)
return rolloff
def extract_zero_crossing_rate(self, audio_path: str) -> Optional[float]:
"""Extrae ZCR (con estrategia híbrida)."""
zcr = self.db_extractor.extract_zero_crossing_rate(audio_path)
if zcr is not None:
return zcr
zcr = self.librosa_extractor.extract_zero_crossing_rate(audio_path)
if zcr is not None:
features = self.librosa_extractor.extract_all_features(audio_path)
self.store.save(features)
return zcr
def extract_mfccs(self, audio_path: str) -> Optional[List[float]]:
"""Extrae MFCCs (con estrategia híbrida)."""
mfccs = self.db_extractor.extract_mfccs(audio_path)
if mfccs is not None and len(mfccs) > 0:
return mfccs
mfccs = self.librosa_extractor.extract_mfccs(audio_path)
if mfccs is not None:
features = self.librosa_extractor.extract_all_features(audio_path)
self.store.save(features)
return mfccs
def extract_all_features(self, audio_path: str) -> SampleFeatures:
"""
Extrae todas las features usando la estrategia híbrida.
Args:
audio_path: Ruta al archivo de audio
Returns:
Objeto SampleFeatures completo
"""
# Intentar desde DB primero
features = self.db_extractor.extract_all_features(audio_path)
if features.source != "not_found" and features.source != "error":
logger.debug("Features recuperadas de DB para: %s", audio_path)
return features
# Analizar con librosa
features = self.librosa_extractor.extract_all_features(audio_path)
if features.source != "error":
# Guardar en DB para futuras consultas
self.store.save(features)
logger.debug("Features analizadas y guardadas para: %s", audio_path)
return features
def get_or_analyze(self, sample_path: str) -> SampleFeatures:
"""
Método de conveniencia: obtiene features o las analiza si no existen.
Este es el método recomendado para uso general. Es equivalente
a `extract_all_features()` pero con un nombre más explícito.
Args:
sample_path: Ruta al archivo de audio
Returns:
Objeto SampleFeatures completo
Example:
extractor = HybridExtractor()
features = extractor.get_or_analyze("path/to/kick.wav")
print(f"BPM: {features.bpm}, Key: {features.key}")
"""
return self.extract_all_features(sample_path)
def preload_library(self, library_path: Optional[Union[str, Path]] = None,
extensions: Tuple[str, ...] = ('.wav', '.mp3', '.aif', '.aiff')) -> int:
"""
Pre-carga una librería completa analizando todos los samples nuevos.
Args:
library_path: Ruta a la librería (default: reggaeton/)
extensions: Extensiones de audio a buscar
Returns:
Número de nuevos samples analizados y guardados
"""
if library_path is None:
library_path = DEFAULT_LIBRARY_PATH
library_path = Path(library_path)
if not library_path.exists():
logger.error("Librería no encontrada: %s", library_path)
return 0
# Buscar todos los samples
samples = []
for ext in extensions:
samples.extend(library_path.rglob(f"*{ext}"))
logger.info("Pre-cargando librería: %d samples encontrados", len(samples))
analyzed_count = 0
for sample_path in samples:
abs_path = str(sample_path.resolve())
# Saltar si ya existe en DB
if self.store.exists(abs_path):
continue
# Analizar y guardar
features = self.librosa_extractor.extract_all_features(abs_path)
if features.source != "error":
self.store.save(features)
analyzed_count += 1
logger.info("Pre-carga completa: %d nuevos samples analizados", analyzed_count)
return analyzed_count
def get_stats(self) -> Dict[str, Any]:
"""
Retorna estadísticas del extractor híbrido.
Returns:
Diccionario con estadísticas de la base de datos
"""
return {
"total_cached": self.store.count(),
"db_path": str(self.store.db_path),
"librosa_available": self.librosa_extractor._check_librosa()
}
# Funciones de conveniencia para uso directo
_default_hybrid: Optional[HybridExtractor] = None
def get_hybrid_extractor(db_path: Optional[str] = None) -> HybridExtractor:
"""
Obtiene una instancia global del HybridExtractor.
Args:
db_path: Ruta opcional a la base de datos
Returns:
Instancia de HybridExtractor
"""
global _default_hybrid
if _default_hybrid is None:
_default_hybrid = HybridExtractor(db_path)
return _default_hybrid
def quick_analyze(audio_path: str) -> Optional[SampleFeatures]:
"""
Analiza un sample rápidamente usando el extractor híbrido global.
Args:
audio_path: Ruta al archivo de audio
Returns:
SampleFeatures o None si falla
"""
extractor = get_hybrid_extractor()
features = extractor.get_or_analyze(audio_path)
if features.source == "error":
return None
return features
def create_extractor(store=None, verbose=False):
"""
Create a hybrid extractor with optional metadata store.
This is a convenience function used by sample_selector and other
engines that need a configured extractor instance.
Args:
store: Optional SampleMetadataStore instance
verbose: Whether to enable verbose logging
Returns:
HybridExtractor instance
"""
if verbose:
logging.getLogger("AbstractAnalyzer").setLevel(logging.DEBUG)
if store is not None:
# Create with the provided store
db_extractor = DatabaseExtractor(store)
hybrid = HybridExtractor()
# Replace the default db_extractor with our configured one
hybrid.db_extractor = db_extractor
hybrid.store = store
return hybrid
else:
# Create default hybrid extractor
return HybridExtractor()
if __name__ == "__main__":
# Test del módulo
logging.basicConfig(level=logging.INFO)
print("=" * 70)
print("Abstract Analyzer - Test")
print("=" * 70)
# Test 1: LibrosaExtractor
print("\n1. Probando LibrosaExtractor...")
librosa_ext = LibrosaExtractor()
print(f" Librosa disponible: {librosa_ext._check_librosa()}")
# Test 2: DatabaseExtractor
print("\n2. Probando DatabaseExtractor...")
db_ext = DatabaseExtractor()
print(f" DB path: {db_ext.store.db_path}")
print(f" Samples en DB: {db_ext.store.count()}")
# Test 3: HybridExtractor
print("\n3. Probando HybridExtractor...")
hybrid = HybridExtractor()
stats = hybrid.get_stats()
print(f" Total cached: {stats['total_cached']}")
print(f" Librosa available: {stats['librosa_available']}")
# Test 4: Análisis real (si hay samples disponibles)
print("\n4. Buscando samples para analizar...")
if DEFAULT_LIBRARY_PATH.exists():
samples = list(DEFAULT_LIBRARY_PATH.rglob("*.wav"))[:5]
if samples:
test_sample = str(samples[0].resolve())
print(f" Sample de prueba: {os.path.basename(test_sample)}")
features = hybrid.get_or_analyze(test_sample)
print(f" Source: {features.source}")
print(f" BPM: {features.bpm}")
print(f" Key: {features.key}")
print(f" Duration: {features.duration:.2f}s")
print(f" MFCCs: {len(features.mfccs)} coeficientes")
print("\n" + "=" * 70)
print("Test completado!")
print("=" * 70)