""" Abstract Analyzer - Sistema abstracto de extracción de features de audio. Este módulo proporciona una arquitectura flexible para extraer características espectrales de samples de audio, con múltiples implementaciones: - LibrosaExtractor: Análisis completo usando librosa - DatabaseExtractor: Lookups rápidos desde SQLite - HybridExtractor: Combina ambos enfoques (cache + análisis) Uso: from engines.abstract_analyzer import HybridExtractor extractor = HybridExtractor() features = extractor.get_or_analyze("path/to/sample.wav") # O usar extractores individuales from engines.abstract_analyzer import LibrosaExtractor librosa_ext = LibrosaExtractor() bpm = librosa_ext.extract_bpm("path/to/sample.wav") """ import os import json import sqlite3 import hashlib import logging from abc import ABC, abstractmethod from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional, Any, Tuple, Union from datetime import datetime logger = logging.getLogger("AbstractAnalyzer") # Paths por defecto DEFAULT_LIBRARY_PATH = Path(r"C:\ProgramData\Ableton\Live 12 Suite\Resources\MIDI Remote Scripts\libreria\reggaeton") DEFAULT_DB_PATH = DEFAULT_LIBRARY_PATH / ".sample_metadata.db" @dataclass class SampleFeatures: """ Dataclass que encapsula todas las features extraídas de un sample. Attributes: path: Ruta absoluta al archivo de audio bpm: Tempo detectado en beats por minuto key: Tonalidad musical detectada (ej: "Am", "C") duration: Duración en segundos rms: Root Mean Square (energía promedio) en dB spectral_centroid: Centroide espectral (brillo) en Hz spectral_rolloff: Frecuencia de rolloff espectral en Hz zero_crossing_rate: Tasa de cruce por cero (noisiness) mfccs: Lista de 13 coeficientes MFCC (timbre) sample_rate: Frecuencia de muestreo en Hz channels: Número de canales (1=mono, 2=stereo) analyzed_at: Timestamp del análisis source: Fuente de los datos ('librosa', 'database', 'cache') """ path: str bpm: Optional[float] = None key: Optional[str] = None duration: Optional[float] = None rms: Optional[float] = None spectral_centroid: Optional[float] = None spectral_rolloff: Optional[float] = None zero_crossing_rate: Optional[float] = None mfccs: Optional[List[float]] = field(default_factory=list) sample_rate: Optional[int] = None channels: Optional[int] = None analyzed_at: Optional[str] = None source: str = "unknown" def to_dict(self) -> Dict[str, Any]: """Convierte a diccionario para serialización.""" return { "path": self.path, "bpm": self.bpm, "key": self.key, "duration": self.duration, "rms": self.rms, "spectral_centroid": self.spectral_centroid, "spectral_rolloff": self.spectral_rolloff, "zero_crossing_rate": self.zero_crossing_rate, "mfccs": self.mfccs, "sample_rate": self.sample_rate, "channels": self.channels, "analyzed_at": self.analyzed_at or datetime.now().isoformat(), "source": self.source } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "SampleFeatures": """Crea instancia desde diccionario.""" return cls( path=data.get("path", ""), bpm=data.get("bpm"), key=data.get("key"), duration=data.get("duration"), rms=data.get("rms"), spectral_centroid=data.get("spectral_centroid"), spectral_rolloff=data.get("spectral_rolloff"), zero_crossing_rate=data.get("zero_crossing_rate"), mfccs=data.get("mfccs", []), sample_rate=data.get("sample_rate"), channels=data.get("channels"), analyzed_at=data.get("analyzed_at"), source=data.get("source", "unknown") ) def is_complete(self) -> bool: """Verifica si todas las features principales están presentes.""" return all([ self.bpm is not None, self.key is not None, self.duration is not None, self.rms is not None, self.spectral_centroid is not None, len(self.mfccs) == 13 ]) class FeatureExtractor(ABC): """ Abstract Base Class para extractores de features de audio. Define la interfaz común que todos los extractores deben implementar. Las subclases concretas deben implementar todos los métodos abstractos. Example: class MyExtractor(FeatureExtractor): def extract_bpm(self, audio_path: str) -> Optional[float]: # Implementación específica return 128.0 """ @abstractmethod def extract_bpm(self, audio_path: str) -> Optional[float]: """ Extrae el BPM (tempo) de un archivo de audio. Args: audio_path: Ruta al archivo de audio Returns: Tempo en BPM o None si no se puede detectar """ pass @abstractmethod def extract_key(self, audio_path: str) -> Optional[str]: """ Detecta la tonalidad musical del audio. Args: audio_path: Ruta al archivo de audio Returns: Tonalidad en formato string (ej: "Am", "C", "F#m") o None """ pass @abstractmethod def extract_duration(self, audio_path: str) -> Optional[float]: """ Obtiene la duración del audio en segundos. Args: audio_path: Ruta al archivo de audio Returns: Duración en segundos o None """ pass @abstractmethod def extract_rms(self, audio_path: str) -> Optional[float]: """ Calcula el RMS (Root Mean Square) - energía promedio del audio. Args: audio_path: Ruta al archivo de audio Returns: RMS en dB o None """ pass @abstractmethod def extract_spectral_centroid(self, audio_path: str) -> Optional[float]: """ Calcula el centroide espectral (brillo del sonido). Args: audio_path: Ruta al archivo de audio Returns: Centroide espectral en Hz o None """ pass @abstractmethod def extract_spectral_rolloff(self, audio_path: str) -> Optional[float]: """ Calcula la frecuencia de rolloff espectral. Args: audio_path: Ruta al archivo de audio Returns: Frecuencia de rolloff en Hz o None """ pass @abstractmethod def extract_zero_crossing_rate(self, audio_path: str) -> Optional[float]: """ Calcula la tasa de cruce por cero (noisiness). Args: audio_path: Ruta al archivo de audio Returns: ZCR como float o None """ pass @abstractmethod def extract_mfccs(self, audio_path: str) -> Optional[List[float]]: """ Extrae los coeficientes MFCC (Mel-Frequency Cepstral Coefficients). Args: audio_path: Ruta al archivo de audio Returns: Lista de 13 coeficientes MFCC o None """ pass @abstractmethod def extract_all_features(self, audio_path: str) -> SampleFeatures: """ Extrae todas las features disponibles en una sola operación. Args: audio_path: Ruta al archivo de audio Returns: Objeto SampleFeatures con todas las características """ pass def _check_file_exists(self, audio_path: str) -> bool: """Helper para verificar que el archivo existe.""" if not os.path.exists(audio_path): logger.error("Archivo no encontrado: %s", audio_path) return False return True def _get_file_hash(self, audio_path: str) -> str: """Genera un hash único para el archivo (para cache).""" stat = os.stat(audio_path) content = f"{audio_path}:{stat.st_size}:{stat.st_mtime}" return hashlib.md5(content.encode()).hexdigest() class LibrosaExtractor(FeatureExtractor): """ Implementación de FeatureExtractor usando librosa + numpy. Realiza análisis completo de audio extrayendo todas las características espectrales. Usa lazy loading para importar librosa solo cuando se necesita. Attributes: sample_rate: Sample rate objetivo (None = mantener original) hop_length: Hop length para análisis de features n_mfcc: Número de coeficientes MFCC a extraer (default 13) """ def __init__(self, sample_rate: Optional[int] = None, hop_length: int = 512, n_mfcc: int = 13): """ Inicializa el extractor de Librosa. Args: sample_rate: Sample rate objetivo (None = mantener original) hop_length: Hop length para análisis (default 512) n_mfcc: Número de coeficientes MFCC (default 13) """ self.sample_rate = sample_rate self.hop_length = hop_length self.n_mfcc = n_mfcc self._librosa_available = None def _check_librosa(self) -> bool: """Verifica si librosa está disponible (lazy loading).""" if self._librosa_available is None: try: import librosa import numpy as np self._librosa_available = True except ImportError: logger.warning("librosa no está disponible. Algunas features no se extraerán.") self._librosa_available = False return self._librosa_available def _load_audio(self, audio_path: str) -> Tuple[Optional[Any], Optional[int]]: """ Carga el audio usando librosa. Returns: Tuple de (audio_data, sample_rate) o (None, None) si falla """ if not self._check_librosa(): return None, None try: import librosa y, sr = librosa.load(audio_path, sr=self.sample_rate, mono=True) return y, sr except Exception as e: logger.error("Error cargando audio %s: %s", audio_path, e) return None, None def extract_bpm(self, audio_path: str) -> Optional[float]: """ Detecta el BPM usando librosa.beat.beat_track. Args: audio_path: Ruta al archivo de audio Returns: BPM detectado o None """ if not self._check_file_exists(audio_path): return None if not self._check_librosa(): return None try: import librosa import numpy as np y, sr = self._load_audio(audio_path) if y is None: return None tempo, _ = librosa.beat.beat_track(y=y, sr=sr) bpm = float(tempo) if isinstance(tempo, (int, float, np.number)) else float(tempo[0]) logger.debug("BPM extraído de %s: %.1f", audio_path, bpm) return bpm except Exception as e: logger.error("Error extrayendo BPM de %s: %s", audio_path, e) return None def extract_key(self, audio_path: str) -> Optional[str]: """ Detecta la tonalidad usando chromagrama. Args: audio_path: Ruta al archivo de audio Returns: Tonalidad detectada (ej: "Am", "C") o None """ if not self._check_file_exists(audio_path): return None if not self._check_librosa(): return None try: import librosa import numpy as np y, sr = self._load_audio(audio_path) if y is None: return None # Usar chroma_cqt para mejor detección de pitch chromagram = librosa.feature.chroma_cqt(y=y, sr=sr) chroma_avg = np.sum(chromagram, axis=1) # Notas musicales notes = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] key_index = np.argmax(chroma_avg) key = notes[key_index] # Heurística simple para detectar mayor/menor # Compara intensidad del tercer grado menor vs mayor minor_third_idx = (key_index + 3) % 12 major_third_idx = (key_index + 4) % 12 if chroma_avg[minor_third_idx] > chroma_avg[major_third_idx]: key += 'm' # Menor logger.debug("Key extraída de %s: %s", audio_path, key) return key except Exception as e: logger.error("Error extrayendo key de %s: %s", audio_path, e) return None def extract_duration(self, audio_path: str) -> Optional[float]: """ Obtiene la duración del audio. Args: audio_path: Ruta al archivo de audio Returns: Duración en segundos o None """ if not self._check_file_exists(audio_path): return None if not self._check_librosa(): return None try: import librosa y, sr = self._load_audio(audio_path) if y is None: return None duration = librosa.get_duration(y=y, sr=sr) return float(duration) except Exception as e: logger.error("Error extrayendo duración de %s: %s", audio_path, e) return None def extract_rms(self, audio_path: str) -> Optional[float]: """ Calcula el RMS (energía promedio) del audio. Args: audio_path: Ruta al archivo de audio Returns: RMS en dB o None """ if not self._check_file_exists(audio_path): return None if not self._check_librosa(): return None try: import librosa import numpy as np y, sr = self._load_audio(audio_path) if y is None: return None rms = np.mean(librosa.feature.rms(y=y)) rms_db = 20 * np.log10(rms + 1e-10) # Convertir a dB return float(rms_db) except Exception as e: logger.error("Error extrayendo RMS de %s: %s", audio_path, e) return None def extract_spectral_centroid(self, audio_path: str) -> Optional[float]: """ Calcula el centroide espectral (brillo promedio). Args: audio_path: Ruta al archivo de audio Returns: Centroide espectral en Hz o None """ if not self._check_file_exists(audio_path): return None if not self._check_librosa(): return None try: import librosa import numpy as np y, sr = self._load_audio(audio_path) if y is None: return None centroid = librosa.feature.spectral_centroid(y=y, sr=sr) mean_centroid = float(np.mean(centroid)) return mean_centroid except Exception as e: logger.error("Error extrayendo spectral centroid de %s: %s", audio_path, e) return None def extract_spectral_rolloff(self, audio_path: str) -> Optional[float]: """ Calcula la frecuencia de rolloff espectral. Args: audio_path: Ruta al archivo de audio Returns: Frecuencia de rolloff en Hz o None """ if not self._check_file_exists(audio_path): return None if not self._check_librosa(): return None try: import librosa import numpy as np y, sr = self._load_audio(audio_path) if y is None: return None rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr) mean_rolloff = float(np.mean(rolloff)) return mean_rolloff except Exception as e: logger.error("Error extrayendo spectral rolloff de %s: %s", audio_path, e) return None def extract_zero_crossing_rate(self, audio_path: str) -> Optional[float]: """ Calcula la tasa de cruce por cero. Args: audio_path: Ruta al archivo de audio Returns: ZCR como float o None """ if not self._check_file_exists(audio_path): return None if not self._check_librosa(): return None try: import librosa import numpy as np y, sr = self._load_audio(audio_path) if y is None: return None zcr = librosa.feature.zero_crossing_rate(y) mean_zcr = float(np.mean(zcr)) return mean_zcr except Exception as e: logger.error("Error extrayendo ZCR de %s: %s", audio_path, e) return None def extract_mfccs(self, audio_path: str) -> Optional[List[float]]: """ Extrae los coeficientes MFCC. Args: audio_path: Ruta al archivo de audio Returns: Lista de 13 coeficientes MFCC o None """ if not self._check_file_exists(audio_path): return None if not self._check_librosa(): return None try: import librosa import numpy as np y, sr = self._load_audio(audio_path) if y is None: return None mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=self.n_mfcc) mfccs_mean = [float(np.mean(coef)) for coef in mfccs] return mfccs_mean except Exception as e: logger.error("Error extrayendo MFCCs de %s: %s", audio_path, e) return None def extract_all_features(self, audio_path: str) -> SampleFeatures: """ Extrae todas las features en una sola operación eficiente. Args: audio_path: Ruta al archivo de audio Returns: Objeto SampleFeatures completo """ if not self._check_file_exists(audio_path): return SampleFeatures(path=audio_path, source="error") if not self._check_librosa(): logger.error("librosa no disponible, no se pueden extraer features") return SampleFeatures(path=audio_path, source="error") try: import librosa import numpy as np # Cargar audio una sola vez y, sr = self._load_audio(audio_path) if y is None: return SampleFeatures(path=audio_path, source="error") # Extraer todas las features de una vez # 1. Duración duration = librosa.get_duration(y=y, sr=sr) # 2. BPM try: tempo, _ = librosa.beat.beat_track(y=y, sr=sr) bpm = float(tempo) if isinstance(tempo, (int, float, np.number)) else float(tempo[0]) except: bpm = None # 3. Key try: chromagram = librosa.feature.chroma_cqt(y=y, sr=sr) chroma_avg = np.sum(chromagram, axis=1) notes = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] key_index = np.argmax(chroma_avg) key = notes[key_index] minor_third_idx = (key_index + 3) % 12 major_third_idx = (key_index + 4) % 12 if chroma_avg[minor_third_idx] > chroma_avg[major_third_idx]: key += 'm' except: key = None # 4. RMS rms = float(np.mean(librosa.feature.rms(y=y))) rms_db = 20 * np.log10(rms + 1e-10) # 5. Spectral Centroid centroid = librosa.feature.spectral_centroid(y=y, sr=sr) spectral_centroid = float(np.mean(centroid)) # 6. Spectral Rolloff rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr) spectral_rolloff = float(np.mean(rolloff)) # 7. Zero Crossing Rate zcr = librosa.feature.zero_crossing_rate(y) zero_crossing_rate = float(np.mean(zcr)) # 8. MFCCs mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=self.n_mfcc) mfccs_mean = [float(np.mean(coef)) for coef in mfccs] # 9. Detectar canales originales try: y_orig, _ = librosa.load(audio_path, sr=None, mono=False) channels = y_orig.shape[0] if len(y_orig.shape) > 1 else 1 except: channels = 1 return SampleFeatures( path=audio_path, bpm=bpm, key=key, duration=float(duration), rms=float(rms_db), spectral_centroid=spectral_centroid, spectral_rolloff=spectral_rolloff, zero_crossing_rate=zero_crossing_rate, mfccs=mfccs_mean, sample_rate=sr, channels=channels, analyzed_at=datetime.now().isoformat(), source="librosa" ) except Exception as e: logger.error("Error extrayendo todas las features de %s: %s", audio_path, e) return SampleFeatures(path=audio_path, source="error") class SampleMetadataStore: """ Almacén de metadatos de samples usando SQLite. Proporciona lookups rápidos de features pre-calculadas sin necesidad de re-analizar los archivos de audio. Attributes: db_path: Ruta al archivo SQLite de la base de datos """ def __init__(self, db_path: Optional[Union[str, Path]] = None): """ Inicializa el store de metadatos. Args: db_path: Ruta a la base de datos SQLite (default: .sample_metadata.db en librería) """ if db_path is None: self.db_path = DEFAULT_DB_PATH else: self.db_path = Path(db_path) self._init_db() def _init_db(self) -> None: """Inicializa el schema de la base de datos si no existe.""" try: conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() # Tabla principal de samples cursor.execute(''' CREATE TABLE IF NOT EXISTS samples ( id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT UNIQUE NOT NULL, file_hash TEXT, bpm REAL, key TEXT, duration REAL, rms REAL, spectral_centroid REAL, spectral_rolloff REAL, zero_crossing_rate REAL, mfccs TEXT, -- JSON array sample_rate INTEGER, channels INTEGER, analyzed_at TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') # Índices para búsquedas rápidas cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_samples_path ON samples(path) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_samples_key ON samples(key) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_samples_bpm ON samples(bpm) ''') conn.commit() conn.close() logger.debug("Base de datos inicializada: %s", self.db_path) except sqlite3.Error as e: logger.error("Error inicializando base de datos: %s", e) def get(self, sample_path: str) -> Optional[SampleFeatures]: """ Recupera las features de un sample desde la base de datos. Args: sample_path: Ruta al archivo de audio Returns: SampleFeatures si existe en la DB, None en caso contrario """ try: conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() cursor.execute(''' SELECT path, bpm, key, duration, rms, spectral_centroid, spectral_rolloff, zero_crossing_rate, mfccs, sample_rate, channels, analyzed_at FROM samples WHERE path = ? ''', (sample_path,)) row = cursor.fetchone() conn.close() if row: mfccs = json.loads(row[8]) if row[8] else [] return SampleFeatures( path=row[0], bpm=row[1], key=row[2], duration=row[3], rms=row[4], spectral_centroid=row[5], spectral_rolloff=row[6], zero_crossing_rate=row[7], mfccs=mfccs, sample_rate=row[9], channels=row[10], analyzed_at=row[11], source="database" ) return None except sqlite3.Error as e: logger.error("Error leyendo de base de datos: %s", e) return None def save(self, features: SampleFeatures) -> bool: """ Guarda o actualiza las features de un sample en la base de datos. Args: features: Objeto SampleFeatures a guardar Returns: True si se guardó correctamente, False en caso contrario """ try: conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() # Generar hash del archivo file_hash = "" if os.path.exists(features.path): stat = os.stat(features.path) file_hash = hashlib.md5(f"{features.path}:{stat.st_size}:{stat.st_mtime}".encode()).hexdigest() mfccs_json = json.dumps(features.mfccs) if features.mfccs else "[]" cursor.execute(''' INSERT OR REPLACE INTO samples (path, file_hash, bpm, key, duration, rms, spectral_centroid, spectral_rolloff, zero_crossing_rate, mfccs, sample_rate, channels, analyzed_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', ( features.path, file_hash, features.bpm, features.key, features.duration, features.rms, features.spectral_centroid, features.spectral_rolloff, features.zero_crossing_rate, mfccs_json, features.sample_rate, features.channels, features.analyzed_at or datetime.now().isoformat() )) conn.commit() conn.close() logger.debug("Features guardadas en DB para: %s", features.path) return True except sqlite3.Error as e: logger.error("Error guardando en base de datos: %s", e) return False def exists(self, sample_path: str) -> bool: """ Verifica si un sample existe en la base de datos. Args: sample_path: Ruta al archivo de audio Returns: True si existe en la DB, False en caso contrario """ try: conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() cursor.execute('SELECT 1 FROM samples WHERE path = ?', (sample_path,)) result = cursor.fetchone() is not None conn.close() return result except sqlite3.Error as e: logger.error("Error consultando base de datos: %s", e) return False def delete(self, sample_path: str) -> bool: """ Elimina un sample de la base de datos. Args: sample_path: Ruta al archivo de audio Returns: True si se eliminó, False en caso contrario """ try: conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() cursor.execute('DELETE FROM samples WHERE path = ?', (sample_path,)) conn.commit() conn.close() return True except sqlite3.Error as e: logger.error("Error eliminando de base de datos: %s", e) return False def get_all(self, limit: Optional[int] = None) -> List[SampleFeatures]: """ Recupera todas las features almacenadas. Args: limit: Límite de resultados (opcional) Returns: Lista de SampleFeatures """ try: conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() query = ''' SELECT path, bpm, key, duration, rms, spectral_centroid, spectral_rolloff, zero_crossing_rate, mfccs, sample_rate, channels, analyzed_at FROM samples ORDER BY updated_at DESC ''' if limit: query += f' LIMIT {limit}' cursor.execute(query) rows = cursor.fetchall() conn.close() results = [] for row in rows: mfccs = json.loads(row[8]) if row[8] else [] results.append(SampleFeatures( path=row[0], bpm=row[1], key=row[2], duration=row[3], rms=row[4], spectral_centroid=row[5], spectral_rolloff=row[6], zero_crossing_rate=row[7], mfccs=mfccs, sample_rate=row[9], channels=row[10], analyzed_at=row[11], source="database" )) return results except sqlite3.Error as e: logger.error("Error leyendo de base de datos: %s", e) return [] def count(self) -> int: """ Retorna el número total de samples almacenados. Returns: Número de samples en la base de datos """ try: conn = sqlite3.connect(str(self.db_path)) cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM samples') count = cursor.fetchone()[0] conn.close() return count except sqlite3.Error as e: logger.error("Error contando registros: %s", e) return 0 class DatabaseExtractor(FeatureExtractor): """ Implementación de FeatureExtractor que usa SampleMetadataStore. Proporciona lookups rápidos desde SQLite sin necesidad de numpy/librosa. Este extractor no realiza análisis de audio, solo recupera datos cacheados. Attributes: store: Instancia de SampleMetadataStore para acceso a datos """ def __init__(self, db_path: Optional[Union[str, Path]] = None): """ Inicializa el extractor de base de datos. Args: db_path: Ruta a la base de datos SQLite (opcional) """ self.store = SampleMetadataStore(db_path) def _get_features(self, audio_path: str) -> Optional[SampleFeatures]: """Helper para obtener features desde la DB.""" return self.store.get(audio_path) def extract_bpm(self, audio_path: str) -> Optional[float]: """Recupera BPM desde la base de datos.""" if not self._check_file_exists(audio_path): return None features = self._get_features(audio_path) return features.bpm if features else None def extract_key(self, audio_path: str) -> Optional[str]: """Recupera key desde la base de datos.""" if not self._check_file_exists(audio_path): return None features = self._get_features(audio_path) return features.key if features else None def extract_duration(self, audio_path: str) -> Optional[float]: """Recupera duración desde la base de datos.""" if not self._check_file_exists(audio_path): return None features = self._get_features(audio_path) return features.duration if features else None def extract_rms(self, audio_path: str) -> Optional[float]: """Recupera RMS desde la base de datos.""" if not self._check_file_exists(audio_path): return None features = self._get_features(audio_path) return features.rms if features else None def extract_spectral_centroid(self, audio_path: str) -> Optional[float]: """Recupera spectral centroid desde la base de datos.""" if not self._check_file_exists(audio_path): return None features = self._get_features(audio_path) return features.spectral_centroid if features else None def extract_spectral_rolloff(self, audio_path: str) -> Optional[float]: """Recupera spectral rolloff desde la base de datos.""" if not self._check_file_exists(audio_path): return None features = self._get_features(audio_path) return features.spectral_rolloff if features else None def extract_zero_crossing_rate(self, audio_path: str) -> Optional[float]: """Recupera ZCR desde la base de datos.""" if not self._check_file_exists(audio_path): return None features = self._get_features(audio_path) return features.zero_crossing_rate if features else None def extract_mfccs(self, audio_path: str) -> Optional[List[float]]: """Recupera MFCCs desde la base de datos.""" if not self._check_file_exists(audio_path): return None features = self._get_features(audio_path) return features.mfccs if features else None def extract_all_features(self, audio_path: str) -> SampleFeatures: """ Recupera todas las features desde la base de datos. Si no existen en la DB, retorna un SampleFeatures vacío con source="not_found". """ if not self._check_file_exists(audio_path): return SampleFeatures(path=audio_path, source="error") features = self._get_features(audio_path) if features: return features return SampleFeatures(path=audio_path, source="not_found") def is_cached(self, audio_path: str) -> bool: """ Verifica si un sample tiene features cacheadas. Args: audio_path: Ruta al archivo de audio Returns: True si existe en la base de datos """ return self.store.exists(audio_path) class HybridExtractor(FeatureExtractor): """ Extractor híbrido que combina DatabaseExtractor + LibrosaExtractor. Estrategia: 1. Primero intenta recuperar de la base de datos (rápido) 2. Si no existe, usa LibrosaExtractor para analizar 3. Guarda automáticamente los resultados en la base de datos Esta clase es el punto de entrada recomendado para la mayoría de casos de uso. Attributes: db_extractor: Instancia de DatabaseExtractor para lookups rápidos librosa_extractor: Instancia de LibrosaExtractor para análisis """ def __init__(self, db_path: Optional[Union[str, Path]] = None, sample_rate: Optional[int] = None, n_mfcc: int = 13): """ Inicializa el extractor híbrido. Args: db_path: Ruta a la base de datos SQLite (opcional) sample_rate: Sample rate para LibrosaExtractor (opcional) n_mfcc: Número de coeficientes MFCC (default 13) """ self.db_extractor = DatabaseExtractor(db_path) self.librosa_extractor = LibrosaExtractor(sample_rate=sample_rate, n_mfcc=n_mfcc) self.store = self.db_extractor.store # Referencia directa para conveniencia def extract_bpm(self, audio_path: str) -> Optional[float]: """ Extrae BPM (desde DB si existe, sino con librosa y guarda). Args: audio_path: Ruta al archivo de audio Returns: BPM detectado o None """ # Intentar desde DB primero bpm = self.db_extractor.extract_bpm(audio_path) if bpm is not None: return bpm # Analizar con librosa bpm = self.librosa_extractor.extract_bpm(audio_path) if bpm is not None: # Guardar análisis completo para evitar re-análisis futuro features = self.librosa_extractor.extract_all_features(audio_path) self.store.save(features) return bpm def extract_key(self, audio_path: str) -> Optional[str]: """Extrae key (con estrategia híbrida).""" key = self.db_extractor.extract_key(audio_path) if key is not None: return key key = self.librosa_extractor.extract_key(audio_path) if key is not None: features = self.librosa_extractor.extract_all_features(audio_path) self.store.save(features) return key def extract_duration(self, audio_path: str) -> Optional[float]: """Extrae duración (con estrategia híbrida).""" duration = self.db_extractor.extract_duration(audio_path) if duration is not None: return duration duration = self.librosa_extractor.extract_duration(audio_path) if duration is not None: features = self.librosa_extractor.extract_all_features(audio_path) self.store.save(features) return duration def extract_rms(self, audio_path: str) -> Optional[float]: """Extrae RMS (con estrategia híbrida).""" rms = self.db_extractor.extract_rms(audio_path) if rms is not None: return rms rms = self.librosa_extractor.extract_rms(audio_path) if rms is not None: features = self.librosa_extractor.extract_all_features(audio_path) self.store.save(features) return rms def extract_spectral_centroid(self, audio_path: str) -> Optional[float]: """Extrae spectral centroid (con estrategia híbrida).""" centroid = self.db_extractor.extract_spectral_centroid(audio_path) if centroid is not None: return centroid centroid = self.librosa_extractor.extract_spectral_centroid(audio_path) if centroid is not None: features = self.librosa_extractor.extract_all_features(audio_path) self.store.save(features) return centroid def extract_spectral_rolloff(self, audio_path: str) -> Optional[float]: """Extrae spectral rolloff (con estrategia híbrida).""" rolloff = self.db_extractor.extract_spectral_rolloff(audio_path) if rolloff is not None: return rolloff rolloff = self.librosa_extractor.extract_spectral_rolloff(audio_path) if rolloff is not None: features = self.librosa_extractor.extract_all_features(audio_path) self.store.save(features) return rolloff def extract_zero_crossing_rate(self, audio_path: str) -> Optional[float]: """Extrae ZCR (con estrategia híbrida).""" zcr = self.db_extractor.extract_zero_crossing_rate(audio_path) if zcr is not None: return zcr zcr = self.librosa_extractor.extract_zero_crossing_rate(audio_path) if zcr is not None: features = self.librosa_extractor.extract_all_features(audio_path) self.store.save(features) return zcr def extract_mfccs(self, audio_path: str) -> Optional[List[float]]: """Extrae MFCCs (con estrategia híbrida).""" mfccs = self.db_extractor.extract_mfccs(audio_path) if mfccs is not None and len(mfccs) > 0: return mfccs mfccs = self.librosa_extractor.extract_mfccs(audio_path) if mfccs is not None: features = self.librosa_extractor.extract_all_features(audio_path) self.store.save(features) return mfccs def extract_all_features(self, audio_path: str) -> SampleFeatures: """ Extrae todas las features usando la estrategia híbrida. Args: audio_path: Ruta al archivo de audio Returns: Objeto SampleFeatures completo """ # Intentar desde DB primero features = self.db_extractor.extract_all_features(audio_path) if features.source != "not_found" and features.source != "error": logger.debug("Features recuperadas de DB para: %s", audio_path) return features # Analizar con librosa features = self.librosa_extractor.extract_all_features(audio_path) if features.source != "error": # Guardar en DB para futuras consultas self.store.save(features) logger.debug("Features analizadas y guardadas para: %s", audio_path) return features def get_or_analyze(self, sample_path: str) -> SampleFeatures: """ Método de conveniencia: obtiene features o las analiza si no existen. Este es el método recomendado para uso general. Es equivalente a `extract_all_features()` pero con un nombre más explícito. Args: sample_path: Ruta al archivo de audio Returns: Objeto SampleFeatures completo Example: extractor = HybridExtractor() features = extractor.get_or_analyze("path/to/kick.wav") print(f"BPM: {features.bpm}, Key: {features.key}") """ return self.extract_all_features(sample_path) def preload_library(self, library_path: Optional[Union[str, Path]] = None, extensions: Tuple[str, ...] = ('.wav', '.mp3', '.aif', '.aiff')) -> int: """ Pre-carga una librería completa analizando todos los samples nuevos. Args: library_path: Ruta a la librería (default: reggaeton/) extensions: Extensiones de audio a buscar Returns: Número de nuevos samples analizados y guardados """ if library_path is None: library_path = DEFAULT_LIBRARY_PATH library_path = Path(library_path) if not library_path.exists(): logger.error("Librería no encontrada: %s", library_path) return 0 # Buscar todos los samples samples = [] for ext in extensions: samples.extend(library_path.rglob(f"*{ext}")) logger.info("Pre-cargando librería: %d samples encontrados", len(samples)) analyzed_count = 0 for sample_path in samples: abs_path = str(sample_path.resolve()) # Saltar si ya existe en DB if self.store.exists(abs_path): continue # Analizar y guardar features = self.librosa_extractor.extract_all_features(abs_path) if features.source != "error": self.store.save(features) analyzed_count += 1 logger.info("Pre-carga completa: %d nuevos samples analizados", analyzed_count) return analyzed_count def get_stats(self) -> Dict[str, Any]: """ Retorna estadísticas del extractor híbrido. Returns: Diccionario con estadísticas de la base de datos """ return { "total_cached": self.store.count(), "db_path": str(self.store.db_path), "librosa_available": self.librosa_extractor._check_librosa() } # Funciones de conveniencia para uso directo _default_hybrid: Optional[HybridExtractor] = None def get_hybrid_extractor(db_path: Optional[str] = None) -> HybridExtractor: """ Obtiene una instancia global del HybridExtractor. Args: db_path: Ruta opcional a la base de datos Returns: Instancia de HybridExtractor """ global _default_hybrid if _default_hybrid is None: _default_hybrid = HybridExtractor(db_path) return _default_hybrid def quick_analyze(audio_path: str) -> Optional[SampleFeatures]: """ Analiza un sample rápidamente usando el extractor híbrido global. Args: audio_path: Ruta al archivo de audio Returns: SampleFeatures o None si falla """ extractor = get_hybrid_extractor() features = extractor.get_or_analyze(audio_path) if features.source == "error": return None return features def create_extractor(store=None, verbose=False): """ Create a hybrid extractor with optional metadata store. This is a convenience function used by sample_selector and other engines that need a configured extractor instance. Args: store: Optional SampleMetadataStore instance verbose: Whether to enable verbose logging Returns: HybridExtractor instance """ if verbose: logging.getLogger("AbstractAnalyzer").setLevel(logging.DEBUG) if store is not None: # Create with the provided store db_extractor = DatabaseExtractor(store) hybrid = HybridExtractor() # Replace the default db_extractor with our configured one hybrid.db_extractor = db_extractor hybrid.store = store return hybrid else: # Create default hybrid extractor return HybridExtractor() if __name__ == "__main__": # Test del módulo logging.basicConfig(level=logging.INFO) print("=" * 70) print("Abstract Analyzer - Test") print("=" * 70) # Test 1: LibrosaExtractor print("\n1. Probando LibrosaExtractor...") librosa_ext = LibrosaExtractor() print(f" Librosa disponible: {librosa_ext._check_librosa()}") # Test 2: DatabaseExtractor print("\n2. Probando DatabaseExtractor...") db_ext = DatabaseExtractor() print(f" DB path: {db_ext.store.db_path}") print(f" Samples en DB: {db_ext.store.count()}") # Test 3: HybridExtractor print("\n3. Probando HybridExtractor...") hybrid = HybridExtractor() stats = hybrid.get_stats() print(f" Total cached: {stats['total_cached']}") print(f" Librosa available: {stats['librosa_available']}") # Test 4: Análisis real (si hay samples disponibles) print("\n4. Buscando samples para analizar...") if DEFAULT_LIBRARY_PATH.exists(): samples = list(DEFAULT_LIBRARY_PATH.rglob("*.wav"))[:5] if samples: test_sample = str(samples[0].resolve()) print(f" Sample de prueba: {os.path.basename(test_sample)}") features = hybrid.get_or_analyze(test_sample) print(f" Source: {features.source}") print(f" BPM: {features.bpm}") print(f" Key: {features.key}") print(f" Duration: {features.duration:.2f}s") print(f" MFCCs: {len(features.mfccs)} coeficientes") print("\n" + "=" * 70) print("Test completado!") print("=" * 70)