Files
ableton-mcp-ai/AbletonMCP_AI/MCP_Server/sample_manager.py
renato97 5b63d38945 Fix library diversity and auto-automation issues
Problem: System was stuck using single all_tracks folder, causing:
- No bucket sampling diversity (all files in 1 folder)
- Repetitive sample selection
- No coherence between sections
- Fades/volumes not auto-applied

Fixes:
1. Changed DEFAULT_LIBRARY from all_tracks to organized_samples
   - server.py: Updated SAMPLES_DIR
   - sample_manager.py: Updated base_dir
   - health_check.py: Added organized_samples as primary paths

2. organized_samples structure enables T013 bucket sampling:
   - loops/bass: 34 samples
   - loops/synth: 43 samples
   - loops/vocal: 24 samples
   - oneshots/kick: 20 samples
   - oneshots/perc: 35 samples
   - Each subfolder < 15 files = perfect for bucket sampling

3. Added auto-automation to generate_song():
   - Fade-in 4 bars for kick/bass/hat (intro)
   - Build curve: music tracks 0.5 -> 0.9 (32-40 bars)
   - Reverb automation: 0% -> 40% -> 0% on atmos/pad/vocal
   - apply_automation parameter (default True)

4. Each track now gets diverse samples from different subfolders:
   - Bass from loops/bass (34 options)
   - Synth from loops/synth (43 options)
   - Drums from oneshots/ (kick, perc, snare)

Coverage wheel will now track usage across 20+ subfolders instead of 1.
Diversity memory will work correctly with proper family tracking per folder.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 01:22:23 -03:00

1012 lines
34 KiB
Python

"""
sample_manager.py - Gestión completa de librería de samples
Proporciona:
- Indexación y escaneo de directorios de samples
- Clasificación automática por tipo, key, BPM
- Gestión de metadatos y tags
- Búsqueda avanzada con filtros múltiples
- Caché de índice para rendimiento
- Soporte para múltiples formatos (WAV, AIFF, MP3, OGG, FLAC)
"""
import json
import hashlib
import logging
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple, Callable
from dataclasses import dataclass, field, asdict
from datetime import datetime
from collections import defaultdict
import threading
# Importar analizador de audio
try:
from .audio_analyzer import AudioAnalyzer, SampleType, analyze_sample, quick_analyze
AUDIO_ANALYSIS_AVAILABLE = True
except ImportError:
try:
from audio_analyzer import AudioAnalyzer, SampleType, analyze_sample, quick_analyze
AUDIO_ANALYSIS_AVAILABLE = True
except ImportError:
AUDIO_ANALYSIS_AVAILABLE = False
AudioAnalyzer = None
SampleType = None
analyze_sample = None
quick_analyze = None
logger = logging.getLogger("SampleManager")
@dataclass
class Sample:
"""Representa un sample en la librería"""
id: str
name: str
path: str
category: str
subcategory: str
sample_type: str
key: Optional[str] = None
bpm: Optional[float] = None
duration: float = 0.0
sample_rate: int = 44100
channels: int = 2
file_size: int = 0
format: str = "wav"
# Metadatos adicionales
genres: List[str] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
mood: str = ""
energy: float = 0.5 # 0-1
# Información de análisis
analyzed: bool = False
analysis_version: int = 0
spectral_centroid: float = 0.0
rms_energy: float = 0.0
is_harmonic: bool = False
is_percussive: bool = False
# Metadatos del sistema
date_added: str = field(default_factory=lambda: datetime.now().isoformat())
date_modified: str = field(default_factory=lambda: datetime.now().isoformat())
play_count: int = 0
rating: int = 0 # 0-5
def to_dict(self) -> Dict[str, Any]:
"""Convierte el sample a diccionario"""
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Sample':
"""Crea un Sample desde un diccionario"""
# Filtrar solo los campos que existen en la clase
valid_fields = {f.name for f in cls.__dataclass_fields__.values()}
filtered_data = {k: v for k, v in data.items() if k in valid_fields}
return cls(**filtered_data)
def get_display_name(self) -> str:
"""Nombre formateado para mostrar"""
parts = [self.name]
if self.key:
parts.append(f"Key: {self.key}")
if self.bpm:
parts.append(f"{self.bpm:.1f} BPM")
return " | ".join(parts)
class SampleManager:
"""
Gestor principal de la librería de samples.
Características:
- Indexación recursiva de directorios
- Clasificación automática por tipo
- Detección de key y BPM (si librosa está disponible)
- Búsqueda avanzada con múltiples filtros
- Sistema de favoritos y ratings
- Caché persistente en JSON
"""
# Categorías principales y subcategorías
CATEGORIES = {
'drums': {
'kick': ['kick', 'bd', 'bass drum', 'kickdrum'],
'snare': ['snare', 'snr', 'sd', 'rimshot'],
'clap': ['clap', 'clp', 'handclap'],
'hat_closed': ['closed hat', 'chh', 'closed'],
'hat_open': ['open hat', 'ohh', 'open'],
'hat': ['hat', 'hihat', 'hi-hat'],
'perc': ['perc', 'percussion', 'conga', 'bongo', 'timbale'],
'shaker': ['shaker', 'tambourine', 'tamb'],
'tom': ['tom', 'tomtom'],
'cymbal': ['crash', 'ride', 'cymbal', 'china'],
},
'bass': {
'sub': ['sub', 'subbass', '808'],
'bassline': ['bassline', 'bass', 'reese'],
'acid': ['acid', 'tb303', '303'],
},
'synths': {
'lead': ['lead', 'solo', 'main'],
'pad': ['pad', 'atmosphere', 'dron', 'ambient'],
'pluck': ['pluck', 'arp', 'arpeggio'],
'chord': ['chord', 'stab', 'hit'],
'fx': ['fx', 'effect', 'sweep', 'riser', 'downlifter'],
},
'vocals': {
'vocal': ['vocal', 'vox', 'voice'],
'speech': ['speech', 'talk', 'phrase'],
'chant': ['chant', 'shout', 'yell'],
},
'loops': {
'drum_loop': ['drum loop', 'beat loop', 'groove'],
'perc_loop': ['perc loop', 'percussion loop'],
'bass_loop': ['bass loop', 'bassline loop'],
'synth_loop': ['synth loop', 'lead loop'],
'full_loop': ['full loop', 'complete loop'],
},
'one_shots': {
'hit': ['hit', 'impact', 'sting'],
'noise': ['noise', 'texture', 'grain'],
}
}
# Mapeo de extensiones de archivo
SUPPORTED_FORMATS = {'.wav', '.aif', '.aiff', '.mp3', '.ogg', '.flac', '.m4a'}
# Géneros soportados con palabras clave
GENRE_KEYWORDS = {
'house': ['house', 'deep', 'soulful', 'garage', 'classic'],
'techno': ['techno', 'industrial', 'detroit', 'berlin', 'acid'],
'tech-house': ['tech house', 'tech-house', 'groovy', 'bouncy'],
'trance': ['trance', 'progressive', 'uplifting', 'psy'],
'drum-and-bass': ['drum and bass', 'dnb', 'neuro', 'liquid', 'jungle'],
'hip-hop': ['hip hop', 'hiphop', 'trap', 'boom bap', 'lofi'],
'ambient': ['ambient', 'chillout', 'downtempo', 'meditation'],
'edm': ['edm', 'electro', 'big room', 'festival'],
}
def __init__(self, base_dir: str, cache_dir: Optional[str] = None):
"""
Inicializa el gestor de samples.
Args:
base_dir: Directorio raíz de la librería de samples
cache_dir: Directorio para caché (default: base_dir/.sample_cache)
"""
self.base_dir = Path(base_dir)
self.cache_dir = Path(cache_dir) if cache_dir else self.base_dir / ".sample_cache"
self.cache_dir.mkdir(exist_ok=True)
self.samples: Dict[str, Sample] = {}
self.index_file = self.cache_dir / "sample_library.json"
self.stats_file = self.cache_dir / "library_stats.json"
# Analizador de audio
self.analyzer = AudioAnalyzer() if AUDIO_ANALYSIS_AVAILABLE else None
# Locks para thread-safety
self._lock = threading.RLock()
self._index_dirty = False
# Estadísticas
self.stats = {
'total_samples': 0,
'total_size': 0,
'by_category': defaultdict(int),
'by_key': defaultdict(int),
'by_bpm_range': defaultdict(int),
'last_scan': None,
}
# Cargar índice existente
self._load_index()
def _generate_id(self, file_path: str) -> str:
"""Genera un ID único para un sample basado en su ruta"""
return hashlib.md5(file_path.encode()).hexdigest()[:16]
def _get_file_hash(self, file_path: Path) -> str:
"""Calcula hash del archivo para detectar cambios"""
stat = file_path.stat()
return hashlib.md5(f"{stat.st_size}_{stat.st_mtime}".encode()).hexdigest()
def scan_directory(self, directory: Optional[str] = None,
recursive: bool = True,
analyze_audio: bool = False,
progress_callback: Optional[Callable[[int, int, str], None]] = None) -> Dict[str, Any]:
"""
Escanear un directorio en busca de samples.
Args:
directory: Directorio a escanear (default: base_dir)
recursive: Escanear subdirectorios
analyze_audio: Analizar contenido de audio (más lento)
progress_callback: Función llamada con (procesados, total, archivo_actual)
Returns:
Estadísticas del escaneo
"""
scan_dir = Path(directory) if directory else self.base_dir
if not scan_dir.exists():
raise FileNotFoundError(f"Directorio no encontrado: {scan_dir}")
logger.info(f"Escaneando: {scan_dir}")
# Encontrar todos los archivos de audio
if recursive:
audio_files = list(scan_dir.rglob('*'))
else:
audio_files = list(scan_dir.iterdir())
audio_files = [f for f in audio_files
if f.is_file() and f.suffix.lower() in self.SUPPORTED_FORMATS]
total = len(audio_files)
processed = 0
added = 0
updated = 0
errors = 0
logger.info(f"Encontrados {total} archivos de audio")
with self._lock:
for file_path in audio_files:
processed += 1
if progress_callback:
progress_callback(processed, total, str(file_path.name))
try:
result = self._process_file(file_path, analyze_audio)
if result == 'added':
added += 1
elif result == 'updated':
updated += 1
except Exception as e:
logger.error(f"Error procesando {file_path}: {e}")
errors += 1
self._index_dirty = True
self._update_stats()
self._save_index()
self.stats['last_scan'] = datetime.now().isoformat()
return {
'processed': processed,
'added': added,
'updated': updated,
'errors': errors,
'total_samples': len(self.samples),
}
def _process_file(self, file_path: Path, analyze_audio: bool) -> str:
"""Procesa un archivo individual. Retorna 'added', 'updated', o 'unchanged'"""
file_id = self._generate_id(str(file_path))
self._get_file_hash(file_path)
# Verificar si ya existe y no ha cambiado
if file_id in self.samples:
existing = self.samples[file_id]
# Comparar hash implícito por fecha de modificación
current_stat = file_path.stat()
if existing.date_modified:
try:
mod_time = datetime.fromisoformat(existing.date_modified).timestamp()
if abs(current_stat.st_mtime - mod_time) < 1:
return 'unchanged'
except Exception:
pass
# Extraer información del nombre
name = file_path.stem
category, subcategory = self._classify_by_name(name)
sample_type = self._detect_sample_type(name)
key = self._extract_key_from_name(name)
bpm = self._extract_bpm_from_name(name)
genres = self._detect_genres(name)
# Análisis de audio si está disponible
audio_features = {}
if analyze_audio and self.analyzer:
try:
audio_features = analyze_sample(str(file_path))
# Usar valores detectados si no están en el nombre
if not bpm and audio_features.get('bpm'):
bpm = audio_features['bpm']
if not key and audio_features.get('key'):
key = audio_features['key']
if audio_features.get('sample_type'):
sample_type = audio_features['sample_type']
if audio_features.get('suggested_genres'):
genres = list(set(genres + audio_features['suggested_genres']))
except Exception as e:
logger.warning(f"Error analizando audio {file_path}: {e}")
# Crear o actualizar sample
is_new = file_id not in self.samples
sample = Sample(
id=file_id,
name=name,
path=str(file_path),
category=category,
subcategory=subcategory,
sample_type=sample_type,
key=key,
bpm=bpm,
duration=audio_features.get('duration', 0.0),
sample_rate=audio_features.get('sample_rate', 44100),
file_size=file_path.stat().st_size,
format=file_path.suffix.lower().lstrip('.'),
genres=genres,
tags=self._extract_tags(name),
analyzed=analyze_audio,
spectral_centroid=audio_features.get('spectral_centroid', 0.0),
rms_energy=audio_features.get('rms_energy', 0.0),
is_harmonic=audio_features.get('is_harmonic', False),
is_percussive=audio_features.get('is_percussive', False),
date_modified=datetime.now().isoformat(),
)
self.samples[file_id] = sample
return 'added' if is_new else 'updated'
def _classify_by_name(self, name: str) -> Tuple[str, str]:
"""Clasifica un sample por su nombre en categoría y subcategoría"""
name_lower = name.lower()
for category, subcategories in self.CATEGORIES.items():
for subcategory, keywords in subcategories.items():
for keyword in keywords:
if keyword in name_lower:
return category, subcategory
# Fallback: intentar detectar loops
if 'loop' in name_lower:
return 'loops', 'unknown'
return 'unknown', 'unknown'
def _detect_sample_type(self, name: str) -> str:
"""Detecta el tipo específico de sample"""
category, subcategory = self._classify_by_name(name)
if category == 'drums':
return subcategory
elif category == 'bass':
return f"bass_{subcategory}"
elif category == 'synths':
return subcategory
elif category == 'vocals':
return subcategory
elif category == 'loops':
return subcategory
return 'unknown'
def _extract_key_from_name(self, name: str) -> Optional[str]:
"""Extrae la tonalidad del nombre del archivo"""
import re
# Patrones comunes
patterns = [
r'[_\s\-]([A-G][#b]?(?:m|min|minor)?)[_\s\-]',
r'\bin\s+([A-G][#b]?(?:m|min|minor)?)\b',
r'Key[_\s]?([A-G][#b]?(?:m|min|minor)?)',
r'[_\s\-]([A-G][#b]?)\s*(?:maj|major)?[_\s\-]',
]
for pattern in patterns:
match = re.search(pattern, name, re.IGNORECASE)
if match:
key = match.group(1)
# Normalizar bemoles a sostenidos
key = key.replace('b', '#').replace('Db', 'C#').replace('Eb', 'D#')
key = key.replace('Gb', 'F#').replace('Ab', 'G#').replace('Bb', 'A#')
# Detectar modo
is_minor = 'm' in key.lower() or 'min' in key.lower()
key = key.replace('min', '').replace('minor', '').replace('major', '')
key = key.rstrip('mM')
if is_minor:
key = key + 'm'
return key
return None
def _extract_bpm_from_name(self, name: str) -> Optional[float]:
"""Extrae el BPM del nombre del archivo"""
import re
patterns = [
r'[_\s\-](\d{2,3})\s*BPM',
r'[_\s\-](\d{2,3})[_\s\-]',
r'(\d{2,3})bpm',
]
for pattern in patterns:
match = re.search(pattern, name, re.IGNORECASE)
if match:
bpm = int(match.group(1))
if 60 <= bpm <= 200:
return float(bpm)
return None
def _detect_genres(self, name: str) -> List[str]:
"""Detecta géneros musicales del nombre"""
name_lower = name.lower()
genres = []
for genre, keywords in self.GENRE_KEYWORDS.items():
for keyword in keywords:
if keyword in name_lower:
genres.append(genre)
break
return genres
def _extract_tags(self, name: str) -> List[str]:
"""Extrae tags del nombre del archivo"""
import re
tags = []
name_lower = name.lower()
# Palabras comunes como tags
common_tags = [
'dry', 'wet', 'processed', 'raw', 'analog', 'digital',
'vintage', 'modern', 'punchy', 'deep', 'bright', 'dark',
'tight', 'loose', 'fat', 'thin', 'crisp', 'warm',
'one shot', 'loop', 'sample', 'hit'
]
for tag in common_tags:
if tag in name_lower:
tags.append(tag.replace(' ', '_'))
# Extraer números como versiones
numbers = re.findall(r'\d+', name)
for num in numbers:
if len(num) <= 2: # Probablemente versión
tags.append(f"v{num}")
return list(set(tags))
def search(self,
query: str = "",
category: str = "",
subcategory: str = "",
sample_type: str = "",
key: str = "",
bpm: Optional[float] = None,
bpm_tolerance: int = 5,
genres: List[str] = None,
tags: List[str] = None,
min_rating: int = 0,
favorites_only: bool = False,
limit: int = 50,
sort_by: str = "name") -> List[Sample]:
"""
Búsqueda avanzada de samples con múltiples filtros.
Args:
query: Búsqueda por nombre
category: Categoría principal
subcategory: Subcategoría
sample_type: Tipo específico
key: Tonalidad musical
bpm: BPM objetivo
bpm_tolerance: Tolerancia de BPM (+/-)
genres: Lista de géneros
tags: Lista de tags
min_rating: Rating mínimo
favorites_only: Solo favoritos
limit: Límite de resultados
sort_by: Campo para ordenar
Returns:
Lista de samples que coinciden
"""
with self._lock:
results = []
query_lower = query.lower()
for sample in self.samples.values():
# Filtro por query (nombre)
if query and query_lower not in sample.name.lower():
continue
# Filtros de categoría
if category and sample.category != category.lower():
continue
if subcategory and sample.subcategory != subcategory.lower():
continue
if sample_type and sample.sample_type != sample_type.lower():
continue
# Filtro por key
if key:
sample_key = (sample.key or "").lower()
if sample_key != key.lower():
# Intentar key compatible (mismo root)
if not sample_key.startswith(key.lower().rstrip('m')):
continue
# Filtro por BPM
if bpm is not None and sample.bpm:
if abs(sample.bpm - bpm) > bpm_tolerance:
continue
# Filtro por géneros
if genres:
sample_genres = [g.lower() for g in sample.genres]
if not any(g.lower() in sample_genres for g in genres):
continue
# Filtro por tags
if tags:
sample_tags = [t.lower() for t in sample.tags]
if not any(t.lower() in sample_tags for t in tags):
continue
# Filtro por rating
if min_rating > 0 and sample.rating < min_rating:
continue
# Filtro de favoritos
if favorites_only and sample.rating < 4:
continue
results.append(sample)
# Ordenar resultados
if sort_by == "name":
results.sort(key=lambda s: s.name.lower())
elif sort_by == "bpm":
results.sort(key=lambda s: s.bpm or 0)
elif sort_by == "rating":
results.sort(key=lambda s: s.rating, reverse=True)
elif sort_by == "date_added":
results.sort(key=lambda s: s.date_added, reverse=True)
return results[:limit]
def get_by_id(self, sample_id: str) -> Optional[Sample]:
"""Obtiene un sample por su ID"""
with self._lock:
return self.samples.get(sample_id)
def get_by_path(self, file_path: str) -> Optional[Sample]:
"""Obtiene un sample por su ruta"""
sample_id = self._generate_id(file_path)
return self.get_by_id(sample_id)
def get_random(self, category: str = "", limit: int = 1) -> List[Sample]:
"""Obtiene samples aleatorios"""
import random
with self._lock:
samples = list(self.samples.values())
if category:
samples = [s for s in samples if s.category == category]
if not samples:
return []
return random.sample(samples, min(limit, len(samples)))
def get_pack_for_genre(self, genre: str, key: str = "",
bpm: Optional[float] = None) -> Dict[str, List[Sample]]:
"""
Obtiene un pack completo de samples para un género específico.
Returns:
Dict con samples organizados por tipo
"""
pack = {
'kicks': [],
'snares': [],
'claps': [],
'hats': [],
'percussion': [],
'bass': [],
'synths': [],
'fx': [],
}
# Buscar samples por tipo
type_mapping = {
'kicks': ['kick'],
'snares': ['snare'],
'claps': ['clap'],
'hats': ['hat', 'hat_closed', 'hat_open'],
'percussion': ['perc', 'shaker', 'tom', 'cymbal'],
'bass': ['bass', 'sub', 'bassline', 'acid'],
'synths': ['lead', 'pad', 'pluck', 'chord'],
'fx': ['fx', 'hit', 'noise'],
}
for pack_category, sample_types in type_mapping.items():
for sample_type in sample_types:
samples = self.search(
sample_type=sample_type,
key=key,
bpm=bpm,
genres=[genre] if genre else None,
limit=5
)
if samples:
pack[pack_category].extend(samples)
return pack
def update_sample(self, sample_id: str, **kwargs) -> bool:
"""
Actualiza metadatos de un sample.
Args:
sample_id: ID del sample
**kwargs: Campos a actualizar
"""
with self._lock:
if sample_id not in self.samples:
return False
sample = self.samples[sample_id]
# Campos permitidos para actualización
allowed_fields = {
'rating', 'tags', 'genres', 'mood', 'energy',
'key', 'bpm', 'play_count'
}
for field, value in kwargs.items():
if field in allowed_fields and hasattr(sample, field):
setattr(sample, field, value)
sample.date_modified = datetime.now().isoformat()
self._index_dirty = True
return True
def rate_sample(self, sample_id: str, rating: int) -> bool:
"""Califica un sample (1-5 estrellas)"""
if 0 <= rating <= 5:
return self.update_sample(sample_id, rating=rating)
return False
def increment_play_count(self, sample_id: str) -> bool:
"""Incrementa el contador de reproducciones"""
sample = self.get_by_id(sample_id)
if sample:
return self.update_sample(sample_id, play_count=sample.play_count + 1)
return False
def delete_sample(self, sample_id: str, delete_file: bool = False) -> bool:
"""
Elimina un sample del índice.
Args:
sample_id: ID del sample
delete_file: Si True, también elimina el archivo físico
"""
with self._lock:
if sample_id not in self.samples:
return False
sample = self.samples[sample_id]
if delete_file:
try:
Path(sample.path).unlink()
except Exception as e:
logger.error(f"Error eliminando archivo: {e}")
return False
del self.samples[sample_id]
self._index_dirty = True
self._update_stats()
return True
def refresh(self, analyze_audio: bool = False) -> Dict[str, Any]:
"""Refresca el índice completo"""
logger.info("Refrescando índice de samples...")
# Guardar IDs actuales para detectar eliminados
current_paths = {s.path for s in self.samples.values()}
# Re-escanear
stats = self.scan_directory(analyze_audio=analyze_audio)
# Detectar archivos eliminados
new_paths = {s.path for s in self.samples.values()}
removed = current_paths - new_paths
for path in removed:
sample_id = self._generate_id(path)
if sample_id in self.samples:
del self.samples[sample_id]
stats['removed'] = stats.get('removed', 0) + 1
self._save_index()
return stats
def get_stats(self) -> Dict[str, Any]:
"""Obtiene estadísticas de la librería"""
with self._lock:
return {
'total_samples': len(self.samples),
'total_size': sum(s.file_size for s in self.samples.values()),
'by_category': dict(self.stats['by_category']),
'by_key': dict(self.stats['by_key']),
'by_bpm_range': dict(self.stats['by_bpm_range']),
'last_scan': self.stats['last_scan'],
}
def export_library(self, output_path: str, format: str = "json") -> str:
"""
Exporta la librería a un archivo.
Args:
output_path: Ruta del archivo de salida
format: 'json' o 'csv'
Returns:
Ruta del archivo exportado
"""
output = Path(output_path)
with self._lock:
if format == "json":
data = {
'export_date': datetime.now().isoformat(),
'stats': self.get_stats(),
'samples': [s.to_dict() for s in self.samples.values()]
}
with open(output, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
elif format == "csv":
import csv
with open(output, 'w', newline='', encoding='utf-8') as f:
if self.samples:
writer = csv.DictWriter(f, fieldnames=self.samples[list(self.samples.keys())[0]].to_dict().keys())
writer.writeheader()
for sample in self.samples.values():
writer.writerow(sample.to_dict())
return str(output)
def import_library(self, input_path: str, merge: bool = True) -> Dict[str, int]:
"""
Importa una librería desde un archivo JSON.
Args:
input_path: Ruta del archivo a importar
merge: Si True, mezcla con la librería existente
Returns:
Estadísticas de la importación
"""
with open(input_path, 'r', encoding='utf-8') as f:
data = json.load(f)
imported_samples = data.get('samples', [])
with self._lock:
if not merge:
self.samples.clear()
added = 0
updated = 0
for sample_data in imported_samples:
try:
sample = Sample.from_dict(sample_data)
if sample.id in self.samples:
updated += 1
else:
added += 1
self.samples[sample.id] = sample
except Exception as e:
logger.error(f"Error importando sample: {e}")
self._index_dirty = True
self._update_stats()
self._save_index()
return {'added': added, 'updated': updated}
def _update_stats(self):
"""Actualiza las estadísticas de la librería"""
self.stats['total_samples'] = len(self.samples)
self.stats['total_size'] = sum(s.file_size for s in self.samples.values())
# Resetear contadores
self.stats['by_category'] = defaultdict(int)
self.stats['by_key'] = defaultdict(int)
self.stats['by_bpm_range'] = defaultdict(int)
for sample in self.samples.values():
self.stats['by_category'][sample.category] += 1
if sample.key:
self.stats['by_key'][sample.key] += 1
if sample.bpm:
if sample.bpm < 100:
self.stats['by_bpm_range']['slow (<100)'] += 1
elif sample.bpm < 128:
self.stats['by_bpm_range']['mid (100-128)'] += 1
elif sample.bpm < 140:
self.stats['by_bpm_range']['fast (128-140)'] += 1
else:
self.stats['by_bpm_range']['very fast (>140)'] += 1
def _load_index(self):
"""Carga el índice desde disco"""
if not self.index_file.exists():
logger.info("No existe índice previo, iniciando librería vacía")
return
try:
with open(self.index_file, 'r', encoding='utf-8') as f:
data = json.load(f)
for sample_data in data.get('samples', []):
try:
sample = Sample.from_dict(sample_data)
self.samples[sample.id] = sample
except Exception as e:
logger.warning(f"Error cargando sample: {e}")
self.stats = data.get('stats', self.stats)
logger.info(f"Índice cargado: {len(self.samples)} samples")
except Exception as e:
logger.error(f"Error cargando índice: {e}")
def _save_index(self):
"""Guarda el índice a disco"""
if not self._index_dirty:
return
try:
data = {
'version': 1,
'saved_at': datetime.now().isoformat(),
'stats': self.get_stats(),
'samples': [s.to_dict() for s in self.samples.values()]
}
# Guardar a archivo temporal primero
temp_file = self.index_file.with_suffix('.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# Renombrar atómicamente
temp_file.replace(self.index_file)
self._index_dirty = False
logger.info(f"Índice guardado: {len(self.samples)} samples")
except Exception as e:
logger.error(f"Error guardando índice: {e}")
def save(self):
"""Fuerza el guardado del índice"""
self._index_dirty = True
self._save_index()
# Instancia global
_manager: Optional[SampleManager] = None
def get_manager(base_dir: Optional[str] = None) -> SampleManager:
"""Obtiene la instancia global del gestor"""
global _manager
if _manager is None:
if base_dir is None:
# FIX: Use absolute path to avoid junction/hardlink issues
PROGRAM_DATA_DIR = Path("C:/ProgramData/Ableton/Live 12 Suite/Resources/MIDI Remote Scripts")
base_dir = str(PROGRAM_DATA_DIR / "librerias" / "organized_samples")
_manager = SampleManager(base_dir)
return _manager
# Funciones de conveniencia
def scan_samples(directory: str, analyze_audio: bool = False) -> Dict[str, Any]:
"""Escanear directorio de samples"""
manager = get_manager(directory)
return manager.scan_directory(analyze_audio=analyze_audio)
def find_samples(query: str = "", **kwargs) -> List[Dict[str, Any]]:
"""Buscar samples"""
manager = get_manager()
samples = manager.search(query=query, **kwargs)
return [s.to_dict() for s in samples]
def get_sample_pack(genre: str, key: str = "", bpm: Optional[float] = None) -> Dict[str, List[Dict]]:
"""Obtener pack de samples para un género"""
manager = get_manager()
pack = manager.get_pack_for_genre(genre, key, bpm)
return {k: [s.to_dict() for s in v] for k, v in pack.items()}
# Testing
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.INFO)
if len(sys.argv) < 2:
print("Uso: python sample_manager.py <directorio> [comando]")
print("\nComandos:")
print(" scan - Escanear directorio")
print(" stats - Mostrar estadísticas")
print(" search - Buscar samples")
sys.exit(1)
directory = sys.argv[1]
command = sys.argv[2] if len(sys.argv) > 2 else "scan"
manager = SampleManager(directory)
if command == "scan":
print(f"\nEscaneando: {directory}")
print("=" * 50)
def progress(current, total, filename):
pct = (current / total) * 100
print(f"\r[{pct:5.1f}%] {filename[:50]:<50}", end="", flush=True)
stats = manager.scan_directory(progress_callback=progress)
print("\n")
print(f"Procesados: {stats['processed']}")
print(f"Agregados: {stats['added']}")
print(f"Actualizados: {stats['updated']}")
print(f"Errores: {stats['errors']}")
print(f"Total en librería: {stats['total_samples']}")
elif command == "stats":
stats = manager.get_stats()
print("\nEstadísticas de la librería:")
print("=" * 50)
print(f"Total samples: {stats['total_samples']}")
print(f"Tamaño total: {stats['total_size'] / (1024**2):.1f} MB")
print(f"Último escaneo: {stats['last_scan']}")
print("\nPor categoría:")
for cat, count in sorted(stats['by_category'].items()):
print(f" {cat}: {count}")
print("\nPor key:")
for key, count in sorted(stats['by_key'].items()):
print(f" {key}: {count}")
elif command == "search":
query = sys.argv[3] if len(sys.argv) > 3 else ""
print(f"\nBuscando: '{query}'")
print("=" * 50)
results = manager.search(query=query, limit=20)
for s in results:
print(f"\n{s.name}")
print(f" Categoría: {s.category}/{s.subcategory}")
print(f" Key: {s.key or 'N/A'} | BPM: {s.bpm or 'N/A'}")
print(f" Path: {s.path}")