diff --git a/AbletonMCP_AI/mcp_server/server.py b/AbletonMCP_AI/mcp_server/server.py index cd59e74..5c5b2b4 100644 --- a/AbletonMCP_AI/mcp_server/server.py +++ b/AbletonMCP_AI/mcp_server/server.py @@ -192,6 +192,8 @@ TIMEOUTS = { "analyze_all_bpm": 600.0, # 10 minutes for analyzing 800+ samples "select_bpm_coherent_pool": 20.0, "warp_clip_to_bpm": 30.0, + # Spectral Coherence Production + "produce_with_spectral_coherence": 300.0, } @@ -6914,6 +6916,282 @@ def get_production_progress(ctx: Context) -> str: return _err(f"Error getting production progress: {str(e)}") +@mcp.tool() +def produce_with_spectral_coherence(ctx: Context, + bpm: int = 100, + key: str = "Am", + style: str = "standard", + coherence_threshold: float = 0.90, + max_samples_per_role: int = 12, + auto_record: bool = True) -> str: + """ + Genera una cancion profesional con seleccion espectral coherente. + + Usa los 511 samples analizados para crear una produccion donde TODOS + los samples son espectralmente coherentes (mismo timbre, energia compatible). + + Args: + bpm: Tempo del proyecto (default 100) + key: Tonalidad (default Am) + style: Estilo de produccion (standard, minimal, trap, perreo) + coherence_threshold: Minimo score de coherencia (0.0-1.0, default 0.90 profesional) + max_samples_per_role: Cuantos samples usar por rol (default 12) + auto_record: Grabar a Arrangement View automaticamente + + Returns: + JSON con detalles de la produccion, coherencia por rol, y samples usados. + """ + import sqlite3 + import numpy as np + import pickle + from pathlib import Path + + DB_PATH = r"C:\ProgramData\Ableton\Live 12 Suite\Resources\MIDI Remote Scripts\libreria\reggaeton\sample_metadata.db" + LIBRARY_PATH = r"C:\ProgramData\Ableton\Live 12 Suite\Resources\MIDI Remote Scripts\libreria\reggaeton" + + try: + # Conectar a base de datos con features espectrales + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + # Verificar que hay datos + cursor.execute("SELECT COUNT(*) FROM samples") + total_samples = cursor.fetchone()[0] + + if total_samples == 0: + return _err("Database vacia. Ejecutar analisis de libreria primero.") + + logger.info(f"[SPECTRAL] {total_samples} samples disponibles en base de datos") + + # Mapeo de roles a categorias + ROLE_CATEGORIES = { + "kick": ["kick", "kicks", "8. KICKS", "kicks"], + "snare": ["snare", "snares", "9. SNARE", "snares"], + "hihat": ["hi-hat", "hi_hat", "hihats", "hat", "hats"], + "perc": ["perc", "percs", "perc loop", "10. PERCS", "PERC"], + "bass": ["bass", "basses", "Bass", "BASS", "reese"], + "drumloop": ["drumloop", "drumloops", "4. DRUM LOOPS", "LATINOS - DRUM LOOPS"], + "oneshot": ["oneshot", "oneshots", "3. ONE SHOTS", "LATINOS - ONE SHOTS", "20 One Shots"], + "fx": ["fx", "FX", "5. FX", "transicion"], + "vocal": ["vocal", "vocals", "11. VOCALS", "20 Vocals Phrases"], + "pad": ["pad", "pads", "PAD"], + "lead": ["lead", "leads", "LEAD"] + } + + def get_samples_for_role(role, min_coherence=0.85): + """Selecciona samples coherentes para un rol.""" + categories = ROLE_CATEGORIES.get(role, [role]) + + # Buscar samples de las categorias del rol + samples = [] + for cat in categories: + cursor.execute(""" + SELECT s.path, s.bpm, s.key, s.duration, s.rms, + s.spectral_centroid, s.spectral_rolloff, s.zero_crossing_rate, + s.mfcc_1, s.mfcc_2, s.mfcc_3, s.mfcc_4, s.mfcc_5, + s.mfcc_6, s.mfcc_7, s.mfcc_8, s.mfcc_9, s.mfcc_10, + s.mfcc_11, s.mfcc_12, s.mfcc_13, + sb.embedding, sb.spectral_features + FROM samples s + JOIN samples_bpm sb ON s.path = sb.path + WHERE s.category LIKE ? + AND s.duration > 0 + ORDER BY s.duration DESC + """, (f"%{cat}%",)) + + for row in cursor.fetchall(): + samples.append({ + 'path': row[0], + 'bpm': row[1] or bpm, + 'key': row[2] or key, + 'duration': row[3], + 'rms': row[4] or -20, + 'spectral_centroid': row[5] or 2000, + 'spectral_rolloff': row[6] or 4000, + 'zcr': row[7] or 0.1, + 'mfccs': list(row[8:21]), + 'embedding': row[21], + 'spectral_features': row[22] + }) + + if len(samples) < 2: + logger.warning(f"[SPECTRAL] Pocos samples para rol {role}: {len(samples)}") + return samples[:max_samples_per_role] + + # Calcular coherencia entre pares y seleccionar los mas coherentes + selected = [samples[0]] # Empezar con el primero + + for candidate in samples[1:]: + if len(selected) >= max_samples_per_role: + break + + # Calcular coherencia promedio con los ya seleccionados + coherence_scores = [] + for selected_sample in selected: + score = calculate_coherence(candidate, selected_sample) + coherence_scores.append(score) + + avg_coherence = np.mean(coherence_scores) if coherence_scores else 0 + + if avg_coherence >= min_coherence: + selected.append(candidate) + logger.debug(f"[SPECTRAL] {role}: {candidate['path'][:30]}... coherencia={avg_coherence:.3f}") + + logger.info(f"[SPECTRAL] Rol {role}: {len(selected)} samples seleccionados (coherencia >= {min_coherence})") + return selected + + def calculate_coherence(s1, s2): + """Calcula coherencia entre dos samples usando features pre-calculadas.""" + scores = [] + + # 1. Similitud de timbre (MFCC) - 40% + mfcc_sim = cosine_similarity(s1['mfccs'], s2['mfccs']) + scores.append(mfcc_sim * 0.40) + + # 2. Compatibilidad espectral - 30% + centroid_diff = abs(s1['spectral_centroid'] - s2['spectral_centroid']) / max(s1['spectral_centroid'], 1) + centroid_sim = max(0, 1 - centroid_diff) + scores.append(centroid_sim * 0.30) + + # 3. Balance de energia - 20% + rms_diff = abs(s1['rms'] - s2['rms']) / 60 # Normalizar + rms_sim = max(0, 1 - rms_diff) + scores.append(rms_sim * 0.20) + + # 4. ZCR compatibilidad - 10% + zcr_sim = 1 - min(1, abs(s1['zcr'] - s2['zcr']) * 10) + scores.append(zcr_sim * 0.10) + + return sum(scores) + + def cosine_similarity(v1, v2): + """Calcula similitud coseno entre dos vectores.""" + try: + v1_arr = np.array(v1) + v2_arr = np.array(v2) + dot = np.dot(v1_arr, v2_arr) + norm = np.linalg.norm(v1_arr) * np.linalg.norm(v2_arr) + return float(dot / norm) if norm > 0 else 0.0 + except: + return 0.0 + + # Seleccionar samples coherentes por rol + logger.info("[SPECTRAL] Iniciando seleccion coherente...") + + selected_kits = {} + coherence_scores = {} + + for role in ["kick", "snare", "hihat", "perc", "bass", "drumloop", "oneshot", "fx"]: + samples = get_samples_for_role(role, min_coherence=coherence_threshold) + selected_kits[role] = samples + + # Calcular score promedio de coherencia para este rol + if len(samples) >= 2: + pairwise_scores = [] + for i in range(len(samples)): + for j in range(i+1, len(samples)): + score = calculate_coherence(samples[i], samples[j]) + pairwise_scores.append(score) + avg_coherence = np.mean(pairwise_scores) if pairwise_scores else 0 + else: + avg_coherence = 0.85 # Default si solo hay 1 sample + + coherence_scores[role] = round(avg_coherence, 3) + + # Reporte de coherencia + overall_coherence = np.mean(list(coherence_scores.values())) + logger.info(f"[SPECTRAL] Coherencia general: {overall_coherence:.3f}") + + # Ahora crear la produccion con los samples seleccionados + tracks_created = [] + samples_loaded = [] + + # Crear tracks y cargar samples coherentes + for role_idx, (role, samples) in enumerate(selected_kits.items()): + if not samples: + continue + + # Crear track + track_result = _send_to_ableton( + "create_audio_track", + {"index": -1}, + timeout=TIMEOUTS["create_audio_track"] + ) + + if track_result.get("status") != "success": + continue + + track_index = track_result["result"]["track_index"] + + # Renombrar track + _send_to_ableton( + "set_track_name", + {"track_index": track_index, "name": f"{role.title()} Spectral"}, + timeout=10.0 + ) + + # Cargar samples coherentes en slots + for slot_idx, sample in enumerate(samples[:8]): # Max 8 slots + sample_path = os.path.join(LIBRARY_PATH, sample['path']) + if os.path.exists(sample_path): + load_result = _send_to_ableton( + "load_sample_to_clip", + {"track_index": track_index, "clip_index": slot_idx, "sample_path": sample_path}, + timeout=TIMEOUTS["load_sample_to_clip"] + ) + if load_result.get("status") == "success": + samples_loaded.append({ + "role": role, + "track": track_index, + "slot": slot_idx, + "path": sample['path'], + "bpm": sample['bpm'], + "key": sample['key'], + "duration": sample['duration'] + }) + + tracks_created.append({ + "role": role, + "track_index": track_index, + "samples_count": len([s for s in samples_loaded if s['role'] == role]) + }) + + conn.close() + + # Disparar clips para escuchar + for track_info in tracks_created: + if track_info['samples_count'] > 0: + _send_to_ableton( + "fire_clip", + {"track_index": track_info['track_index'], "clip_index": 0}, + timeout=10.0 + ) + + # Iniciar playback + _send_to_ableton("start_playback", {}, timeout=10.0) + + return _ok({ + "status": "success", + "message": "Produccion profesional con coherencia espectral creada", + "total_samples_analyzed": total_samples, + "samples_used": len(samples_loaded), + "tracks_created": len(tracks_created), + "coherence_threshold": coherence_threshold, + "coherence_scores_by_role": coherence_scores, + "overall_coherence": round(overall_coherence, 3), + "is_professional": overall_coherence >= 0.90, + "tracks": tracks_created, + "samples": samples_loaded[:20], # Primeros 20 para preview + "project_bpm": bpm, + "project_key": key, + "style": style + }) + + except Exception as e: + logger.error(f"[SPECTRAL] Error: {str(e)}") + return _err(f"Error en produccion espectral: {str(e)}") + + # ------------------------------------------------------------------ # MAIN # ------------------------------------------------------------------