import os import json import logging from pathlib import Path from typing import List, Dict, Tuple, Optional, Any try: from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity import numpy as np HAS_ML = True except ImportError: HAS_ML = False # Importar audio_analyzer para análisis espectral (T016) try: from audio_analyzer import AudioAnalyzer, get_analyzer HAS_ANALYZER = True except ImportError: HAS_ANALYZER = False logger = logging.getLogger("VectorManager") logging.basicConfig(level=logging.INFO) class VectorManager: def __init__(self, library_dir: str, skip_audio_analysis: bool = False): self.library_dir = Path(library_dir) self.index_file = self.library_dir / ".sample_embeddings.json" self.skip_audio_analysis = skip_audio_analysis self.model = None self.embeddings = [] self.metadata = [] # Inicializar analizador de audio si está disponible (T016) self.analyzer = None if HAS_ANALYZER and not skip_audio_analysis: try: self.analyzer = get_analyzer() logger.info("✓ AudioAnalyzer inicializado para análisis espectral") except Exception as e: logger.warning(f"No se pudo inicializar AudioAnalyzer: {e}") if HAS_ML: try: # Load a very lightweight model for fast embeddings logger.info("Loading sentence-transformers model (all-MiniLM-L6-v2)...") self.model = SentenceTransformer('all-MiniLM-L6-v2') except Exception as e: logger.error(f"Failed to load embedding model: {e}") self._load_or_build_index() def _load_or_build_index(self): if self.index_file.exists(): logger.info("Loading existing vector index...") try: with open(self.index_file, 'r', encoding='utf-8') as f: data = json.load(f) self.metadata = data.get('metadata', []) if HAS_ML and 'embeddings' in data: self.embeddings = np.array(data['embeddings']) else: logger.warning("No embeddings found in loaded index.") except Exception as e: logger.error(f"Failed to load index: {e}") self._build_index() else: self._build_index() def _build_index(self): logger.info(f"Scanning library {self.library_dir} for new embeddings...") logger.info(f"Audio analysis: {'enabled' if self.analyzer else 'disabled (T016)'}") extensions = {'.wav', '.aif', '.aiff', '.mp3'} files_to_process = [] for ext in extensions: files_to_process.extend(self.library_dir.rglob('*' + ext)) files_to_process.extend(self.library_dir.rglob('*' + ext.upper())) if not files_to_process: logger.warning(f"No audio files found in {self.library_dir} to embed.") return texts_to_embed = [] self.metadata = [] total_files = len(set(files_to_process)) for i, f in enumerate(set(files_to_process)): # Clean up the name for better semantic understanding name = f.stem clean_name = name.replace('_', ' ').replace('-', ' ').lower() # Use relative path as part of the context since folders represent duration and type try: rel_path = f.relative_to(self.library_dir) parts = rel_path.parts[:-1] path_context = " ".join(parts).lower() except ValueError: path_context = "" # T016: Análisis espectral durante indexado spectral_features = self._analyze_sample_spectral(f) # T018: Mejorar text embedding con info espectral brightness_tag = self._get_brightness_tag(spectral_features.get('spectral_centroid', 5000)) harmonic_tag = "harmonic=yes" if spectral_features.get('is_harmonic') else "harmonic=no" key_tag = f"key={spectral_features.get('key', 'unknown')}" description = f"{clean_name} {path_context} {brightness_tag} {harmonic_tag} {key_tag}" texts_to_embed.append(description) # T020: Agregar campo is_tonal sample_type = spectral_features.get('sample_type', 'unknown') is_tonal = self._is_tonal_sample(sample_type) spectral_features['is_tonal'] = is_tonal self.metadata.append({ 'path': str(f), 'name': name, 'description': description, 'spectral_features': spectral_features # T016: Guardar features espectrales }) # Log de progreso cada 50 archivos if (i + 1) % 50 == 0: logger.info(f"Procesados {i + 1}/{total_files} samples...") if HAS_ML and self.model: logger.info(f"Generating vectors for {len(texts_to_embed)} samples. This might take a moment...") embeddings = self.model.encode(texts_to_embed) self.embeddings = embeddings # Save the vectors with open(self.index_file, 'w', encoding='utf-8') as f: json.dump({ 'metadata': self.metadata, 'embeddings': embeddings.tolist() }, f) logger.info(f"✓ Saved {len(self.metadata)} embeddings with spectral analysis to {self.index_file}") else: logger.error("ML libraries not installed. Run 'pip install sentence-transformers scikit-learn numpy'") def _analyze_sample_spectral(self, file_path: Path) -> Dict[str, Any]: """ T016: Análisis espectral de un sample usando AudioAnalyzer. Retorna dict con key, spectral_centroid, is_harmonic, etc. """ if not self.analyzer: return { 'key': None, 'key_confidence': 0.0, 'spectral_centroid': 5000.0, 'rms_energy': 0.5, 'is_harmonic': False, 'is_percussive': True, 'sample_type': 'unknown' } try: features = self.analyzer.analyze(str(file_path)) return { 'key': features.key, 'key_confidence': features.key_confidence, 'spectral_centroid': features.spectral_centroid, 'spectral_rolloff': features.spectral_rolloff, 'rms_energy': features.rms_energy, 'is_harmonic': features.is_harmonic, 'is_percussive': features.is_percussive, 'sample_type': features.sample_type.value, 'duration': features.duration, 'bpm': features.bpm } except Exception as e: logger.warning(f"Error analizando {file_path}: {e}") return { 'key': None, 'key_confidence': 0.0, 'spectral_centroid': 5000.0, 'rms_energy': 0.5, 'is_harmonic': False, 'is_percussive': True, 'sample_type': 'unknown' } def _get_brightness_tag(self, spectral_centroid: float) -> str: """ T018: Generar tag de brillo espectral para el embedding de texto. """ if spectral_centroid < 1000: return "brightness=dark" elif spectral_centroid < 3000: return "brightness=warm" elif spectral_centroid < 6000: return "brightness=neutral" elif spectral_centroid < 10000: return "brightness=bright" else: return "brightness=harsh" def _is_tonal_sample(self, sample_type: str) -> bool: """ T020: Determinar si un tipo de sample es tonal (armónico). """ tonal_types = {'bass', 'synth', 'pad', 'lead', 'pluck', 'arp', 'chord', 'stab', 'vocal'} return any(t in sample_type.lower() for t in tonal_types) def get_sample_spectral_features(self, file_path: str) -> Optional[Dict[str, Any]]: """ Obtener features espectrales de un sample específico del índice. """ for meta in self.metadata: if meta['path'] == file_path: return meta.get('spectral_features') return None def get_samples_by_key(self, key: str) -> List[Dict]: """ Retornar todos los samples que coinciden con una key específica. """ results = [] for meta in self.metadata: spectral = meta.get('spectral_features', {}) if spectral.get('key') == key: results.append(meta) return results def semantic_search(self, query: str, limit: int = 5) -> List[Dict]: """ Returns a list of metadata dicts sorted by semantic relevance down to the limit. Fallback to basic substring matching if ML is unavailable. """ if not HAS_ML or self.model is None or len(self.embeddings) == 0: logger.warning("ML unavailable, falling back to substring search.") return self._fallback_search(query, limit) logger.info(f"Performing semantic search for: '{query}'") query_emb = self.model.encode([query]) # Calculate cosine similarity between query and all stored embeddings similarities = cosine_similarity(query_emb, self.embeddings)[0] # Get top indices top_indices = np.argsort(similarities)[::-1][:limit] results = [] for idx in top_indices: score = float(similarities[idx]) meta = self.metadata[idx].copy() meta['score'] = score results.append(meta) return results def _fallback_search(self, query: str, limit: int = 5) -> List[Dict]: query = query.lower() scored = [] for m in self.metadata: score = 0 if query in m['name'].lower(): score += 10 if query in m['description'].lower(): score += 5 if score > 0: scored.append((score, m)) scored.sort(key=lambda x: x[0], reverse=True) return [m for s, m in scored[:limit]] if __name__ == "__main__": import sys if len(sys.argv) > 1: path = sys.argv[1] vm = VectorManager(path) if len(sys.argv) > 2: query = sys.argv[2] res = vm.semantic_search(query) print("Search Results for", query) for r in res: print(r['score'], r['name'], r['path']) else: print("Usage: python vector_manager.py [search_query]")