- MCP Server with audio fallback, sample management - Song generator with bus routing - Reference listener and audio resampler - Vector-based sample search - Master chain with limiter and calibration - Fix: Audio fallback now works without M4L - Fix: Full song detection in sample loader Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
309 lines
10 KiB
Python
309 lines
10 KiB
Python
"""
|
|
sample_index.py - Índice y búsqueda de samples para AbletonMCP-AI
|
|
|
|
Gestiona la librería de samples locales con metadatos extraídos de los nombres.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional
|
|
import re
|
|
|
|
logger = logging.getLogger("SampleIndex")
|
|
|
|
|
|
class SampleIndex:
|
|
"""Índice de samples con búsqueda y metadatos"""
|
|
|
|
# Categorías por palabras clave
|
|
CATEGORIES = {
|
|
'kick': ['kick', 'bd', 'bass drum', 'kick drum'],
|
|
'snare': ['snare', 'sd', 'snr'],
|
|
'clap': ['clap', 'clp'],
|
|
'hat': ['hat', 'hh', 'hihat', 'hi-hat', 'closed hat', 'open hat'],
|
|
'perc': ['perc', 'percussion', 'conga', 'bongo', 'shaker', 'tamb', 'timb'],
|
|
'bass': ['bass', 'bassline', 'sub', '808', ' Reese'],
|
|
'synth': ['synth', 'lead', 'pad', 'arp', 'pluck', 'stab', 'chord'],
|
|
'vocal': ['vocal', 'vox', 'voice', 'speech', 'talk'],
|
|
'fx': ['fx', 'effect', 'sweep', 'riser', 'downlifter', 'impact', 'hit'],
|
|
'loop': ['loop', 'full', 'groove'],
|
|
}
|
|
|
|
def __init__(self, base_dir: str):
|
|
"""
|
|
Inicializa el índice de samples
|
|
|
|
Args:
|
|
base_dir: Directorio base donde buscar samples
|
|
"""
|
|
self.base_dir = Path(base_dir)
|
|
self.samples: List[Dict[str, Any]] = []
|
|
self.index_file = self.base_dir / ".sample_index.json"
|
|
|
|
# Cargar o construir índice
|
|
if self.index_file.exists():
|
|
self._load_index()
|
|
else:
|
|
self._build_index()
|
|
self._save_index()
|
|
|
|
def _build_index(self):
|
|
"""Construye el índice escaneando el directorio"""
|
|
logger.info(f"Construyendo índice de samples en: {self.base_dir}")
|
|
|
|
extensions = {'.wav', '.aif', '.aiff', '.mp3', '.ogg'}
|
|
|
|
for file_path in self.base_dir.rglob('*'):
|
|
if file_path.suffix.lower() in extensions:
|
|
sample_info = self._analyze_sample(file_path)
|
|
self.samples.append(sample_info)
|
|
|
|
logger.info(f"Índice construido: {len(self.samples)} samples encontrados")
|
|
|
|
def _analyze_sample(self, file_path: Path) -> Dict[str, Any]:
|
|
"""Analiza un sample y extrae metadatos del nombre"""
|
|
name = file_path.stem
|
|
name_lower = name.lower()
|
|
|
|
# Determinar categoría
|
|
category = self._detect_category(name_lower)
|
|
|
|
# Extraer key del nombre
|
|
key = self._extract_key(name)
|
|
|
|
# Extraer BPM del nombre
|
|
bpm = self._extract_bpm(name)
|
|
|
|
return {
|
|
'name': name,
|
|
'path': str(file_path),
|
|
'category': category,
|
|
'key': key,
|
|
'bpm': bpm,
|
|
'size': file_path.stat().st_size if file_path.exists() else 0,
|
|
}
|
|
|
|
def _detect_category(self, name: str) -> str:
|
|
"""Detecta la categoría basada en palabras clave"""
|
|
for category, keywords in self.CATEGORIES.items():
|
|
for keyword in keywords:
|
|
if keyword in name:
|
|
return category
|
|
return 'unknown'
|
|
|
|
def _extract_key(self, name: str) -> Optional[str]:
|
|
"""Extrae la tonalidad del nombre del archivo"""
|
|
# Patrones comunes: "Key A", "in A", "A minor", "Am", "F#m", etc.
|
|
patterns = [
|
|
r'[_\s\-]([A-G][#b]?m?)\s*(?:minor|major)?[_\s\-]?',
|
|
r'[_\s\-]([A-G][#b]?)[_\s\-]',
|
|
r'\bin\s+([A-G][#b]?m?)\b',
|
|
r'Key\s+([A-G][#b]?m?)',
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, name, re.IGNORECASE)
|
|
if match:
|
|
key = match.group(1)
|
|
# Normalizar
|
|
key = key.replace('b', '#').replace('Db', 'C#').replace('Eb', 'D#')
|
|
key = key.replace('Gb', 'F#').replace('Ab', 'G#').replace('Bb', 'A#')
|
|
return key
|
|
|
|
return None
|
|
|
|
def _extract_bpm(self, name: str) -> Optional[int]:
|
|
"""Extrae el BPM del nombre del archivo"""
|
|
# Patrones: "128 BPM", "_128_", "128bpm", etc.
|
|
patterns = [
|
|
r'[_\s\-](\d{2,3})\s*BPM',
|
|
r'[_\s\-](\d{2,3})[_\s\-]',
|
|
r'(\d{2,3})bpm',
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, name, re.IGNORECASE)
|
|
if match:
|
|
bpm = int(match.group(1))
|
|
if 60 <= bpm <= 200: # Rango razonable
|
|
return bpm
|
|
|
|
return None
|
|
|
|
def _load_index(self):
|
|
"""Carga el índice desde archivo"""
|
|
try:
|
|
with open(self.index_file, 'r') as f:
|
|
data = json.load(f)
|
|
self.samples = data.get('samples', [])
|
|
logger.info(f"Índice cargado: {len(self.samples)} samples")
|
|
except Exception as e:
|
|
logger.error(f"Error cargando índice: {e}")
|
|
self._build_index()
|
|
|
|
def _save_index(self):
|
|
"""Guarda el índice a archivo"""
|
|
try:
|
|
with open(self.index_file, 'w') as f:
|
|
json.dump({
|
|
'samples': self.samples,
|
|
'base_dir': str(self.base_dir)
|
|
}, f, indent=2)
|
|
logger.info(f"Índice guardado en: {self.index_file}")
|
|
except Exception as e:
|
|
logger.error(f"Error guardando índice: {e}")
|
|
|
|
def search(self, query: str, category: str = "", limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""
|
|
Busca samples por query y/o categoría
|
|
|
|
Args:
|
|
query: Término de búsqueda
|
|
category: Categoría específica (opcional)
|
|
limit: Número máximo de resultados
|
|
|
|
Returns:
|
|
Lista de samples que coinciden
|
|
"""
|
|
query_lower = query.lower()
|
|
results = []
|
|
|
|
for sample in self.samples:
|
|
# Filtrar por categoría si se especificó
|
|
if category and sample['category'] != category.lower():
|
|
continue
|
|
|
|
# Buscar en nombre
|
|
name = sample['name'].lower()
|
|
if query_lower in name:
|
|
# Calcular score de relevancia
|
|
score = 0
|
|
if query_lower == sample.get('category', ''):
|
|
score += 10 # Coincidencia exacta de categoría
|
|
if query_lower in name.split('_'):
|
|
score += 5 # Palabra completa
|
|
if name.startswith(query_lower):
|
|
score += 3 # Comienza con el término
|
|
|
|
results.append((score, sample))
|
|
|
|
# Ordenar por score y limitar
|
|
results.sort(key=lambda x: x[0], reverse=True)
|
|
return [sample for _, sample in results[:limit]]
|
|
|
|
def find_by_key(self, key: str, category: str = "", limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""Busca samples por tonalidad"""
|
|
results = []
|
|
|
|
for sample in self.samples:
|
|
if sample.get('key') == key:
|
|
if not category or sample['category'] == category:
|
|
results.append(sample)
|
|
|
|
return results[:limit]
|
|
|
|
def find_by_bpm(self, bpm: int, tolerance: int = 5, limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""Busca samples por BPM con tolerancia"""
|
|
results = []
|
|
|
|
for sample in self.samples:
|
|
sample_bpm = sample.get('bpm')
|
|
if sample_bpm and abs(sample_bpm - bpm) <= tolerance:
|
|
results.append(sample)
|
|
|
|
return results[:limit]
|
|
|
|
def get_random_sample(self, category: str = "") -> Optional[Dict[str, Any]]:
|
|
"""Obtiene un sample aleatorio, opcionalmente filtrado por categoría"""
|
|
import random
|
|
|
|
samples = self.samples
|
|
if category:
|
|
samples = [s for s in samples if s['category'] == category]
|
|
|
|
return random.choice(samples) if samples else None
|
|
|
|
def get_sample_pack(self, genre: str, key: str = "", bpm: int = 0) -> Dict[str, List[Dict]]:
|
|
"""
|
|
Obtiene un pack de samples completo para un género
|
|
|
|
Args:
|
|
genre: Género musical
|
|
key: Tonalidad preferida
|
|
bpm: BPM preferido
|
|
|
|
Returns:
|
|
Dict con samples organizados por categoría
|
|
"""
|
|
pack = {
|
|
'kick': [],
|
|
'snare': [],
|
|
'hat': [],
|
|
'clap': [],
|
|
'perc': [],
|
|
'bass': [],
|
|
'synth': [],
|
|
'fx': [],
|
|
}
|
|
|
|
# Seleccionar un sample de cada categoría
|
|
for category in pack.keys():
|
|
candidates = [s for s in self.samples if s['category'] == category]
|
|
|
|
# Filtrar por key si se especificó
|
|
if key and candidates:
|
|
key_matches = [s for s in candidates if s.get('key') == key]
|
|
if key_matches:
|
|
candidates = key_matches
|
|
|
|
# Filtrar por BPM si se especificó
|
|
if bpm and candidates:
|
|
bpm_matches = [s for s in candidates if s.get('bpm')]
|
|
if bpm_matches:
|
|
# Ordenar por cercanía al BPM objetivo
|
|
bpm_matches.sort(key=lambda s: abs(s['bpm'] - bpm))
|
|
candidates = bpm_matches[:5] # Top 5 más cercanos
|
|
|
|
# Seleccionar hasta 3 samples
|
|
import random
|
|
if candidates:
|
|
pack[category] = random.sample(candidates, min(3, len(candidates)))
|
|
|
|
return pack
|
|
|
|
def refresh(self):
|
|
"""Reconstruye el índice desde cero"""
|
|
logger.info("Refrescando índice...")
|
|
self._build_index()
|
|
self._save_index()
|
|
|
|
|
|
# Función de utilidad para testing
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Uso: python sample_index.py <directorio_de_samples>")
|
|
sys.exit(1)
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
index = SampleIndex(sys.argv[1])
|
|
|
|
print(f"\nÍndice cargado: {len(index.samples)} samples")
|
|
print("\nDistribución por categoría:")
|
|
|
|
categories = {}
|
|
for sample in index.samples:
|
|
cat = sample['category']
|
|
categories[cat] = categories.get(cat, 0) + 1
|
|
|
|
for cat, count in sorted(categories.items(), key=lambda x: -x[1]):
|
|
print(f" {cat}: {count}")
|
|
|
|
# Ejemplo de búsqueda
|
|
print("\nBúsqueda 'kick':")
|
|
for s in index.search("kick", limit=5):
|
|
print(f" - {s['name']} ({s.get('key', '?')}, {s.get('bpm', '?')} BPM)")
|