Add produce_with_spectral_coherence() - professional production with spectral analysis

This commit is contained in:
Administrator
2026-04-13 16:17:02 -03:00
parent dac7ec2a5a
commit c6a1705026

View File

@@ -192,6 +192,8 @@ TIMEOUTS = {
"analyze_all_bpm": 600.0, # 10 minutes for analyzing 800+ samples "analyze_all_bpm": 600.0, # 10 minutes for analyzing 800+ samples
"select_bpm_coherent_pool": 20.0, "select_bpm_coherent_pool": 20.0,
"warp_clip_to_bpm": 30.0, "warp_clip_to_bpm": 30.0,
# Spectral Coherence Production
"produce_with_spectral_coherence": 300.0,
} }
@@ -6914,6 +6916,282 @@ def get_production_progress(ctx: Context) -> str:
return _err(f"Error getting production progress: {str(e)}") return _err(f"Error getting production progress: {str(e)}")
@mcp.tool()
def produce_with_spectral_coherence(ctx: Context,
bpm: int = 100,
key: str = "Am",
style: str = "standard",
coherence_threshold: float = 0.90,
max_samples_per_role: int = 12,
auto_record: bool = True) -> str:
"""
Genera una cancion profesional con seleccion espectral coherente.
Usa los 511 samples analizados para crear una produccion donde TODOS
los samples son espectralmente coherentes (mismo timbre, energia compatible).
Args:
bpm: Tempo del proyecto (default 100)
key: Tonalidad (default Am)
style: Estilo de produccion (standard, minimal, trap, perreo)
coherence_threshold: Minimo score de coherencia (0.0-1.0, default 0.90 profesional)
max_samples_per_role: Cuantos samples usar por rol (default 12)
auto_record: Grabar a Arrangement View automaticamente
Returns:
JSON con detalles de la produccion, coherencia por rol, y samples usados.
"""
import sqlite3
import numpy as np
import pickle
from pathlib import Path
DB_PATH = r"C:\ProgramData\Ableton\Live 12 Suite\Resources\MIDI Remote Scripts\libreria\reggaeton\sample_metadata.db"
LIBRARY_PATH = r"C:\ProgramData\Ableton\Live 12 Suite\Resources\MIDI Remote Scripts\libreria\reggaeton"
try:
# Conectar a base de datos con features espectrales
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Verificar que hay datos
cursor.execute("SELECT COUNT(*) FROM samples")
total_samples = cursor.fetchone()[0]
if total_samples == 0:
return _err("Database vacia. Ejecutar analisis de libreria primero.")
logger.info(f"[SPECTRAL] {total_samples} samples disponibles en base de datos")
# Mapeo de roles a categorias
ROLE_CATEGORIES = {
"kick": ["kick", "kicks", "8. KICKS", "kicks"],
"snare": ["snare", "snares", "9. SNARE", "snares"],
"hihat": ["hi-hat", "hi_hat", "hihats", "hat", "hats"],
"perc": ["perc", "percs", "perc loop", "10. PERCS", "PERC"],
"bass": ["bass", "basses", "Bass", "BASS", "reese"],
"drumloop": ["drumloop", "drumloops", "4. DRUM LOOPS", "LATINOS - DRUM LOOPS"],
"oneshot": ["oneshot", "oneshots", "3. ONE SHOTS", "LATINOS - ONE SHOTS", "20 One Shots"],
"fx": ["fx", "FX", "5. FX", "transicion"],
"vocal": ["vocal", "vocals", "11. VOCALS", "20 Vocals Phrases"],
"pad": ["pad", "pads", "PAD"],
"lead": ["lead", "leads", "LEAD"]
}
def get_samples_for_role(role, min_coherence=0.85):
"""Selecciona samples coherentes para un rol."""
categories = ROLE_CATEGORIES.get(role, [role])
# Buscar samples de las categorias del rol
samples = []
for cat in categories:
cursor.execute("""
SELECT s.path, s.bpm, s.key, s.duration, s.rms,
s.spectral_centroid, s.spectral_rolloff, s.zero_crossing_rate,
s.mfcc_1, s.mfcc_2, s.mfcc_3, s.mfcc_4, s.mfcc_5,
s.mfcc_6, s.mfcc_7, s.mfcc_8, s.mfcc_9, s.mfcc_10,
s.mfcc_11, s.mfcc_12, s.mfcc_13,
sb.embedding, sb.spectral_features
FROM samples s
JOIN samples_bpm sb ON s.path = sb.path
WHERE s.category LIKE ?
AND s.duration > 0
ORDER BY s.duration DESC
""", (f"%{cat}%",))
for row in cursor.fetchall():
samples.append({
'path': row[0],
'bpm': row[1] or bpm,
'key': row[2] or key,
'duration': row[3],
'rms': row[4] or -20,
'spectral_centroid': row[5] or 2000,
'spectral_rolloff': row[6] or 4000,
'zcr': row[7] or 0.1,
'mfccs': list(row[8:21]),
'embedding': row[21],
'spectral_features': row[22]
})
if len(samples) < 2:
logger.warning(f"[SPECTRAL] Pocos samples para rol {role}: {len(samples)}")
return samples[:max_samples_per_role]
# Calcular coherencia entre pares y seleccionar los mas coherentes
selected = [samples[0]] # Empezar con el primero
for candidate in samples[1:]:
if len(selected) >= max_samples_per_role:
break
# Calcular coherencia promedio con los ya seleccionados
coherence_scores = []
for selected_sample in selected:
score = calculate_coherence(candidate, selected_sample)
coherence_scores.append(score)
avg_coherence = np.mean(coherence_scores) if coherence_scores else 0
if avg_coherence >= min_coherence:
selected.append(candidate)
logger.debug(f"[SPECTRAL] {role}: {candidate['path'][:30]}... coherencia={avg_coherence:.3f}")
logger.info(f"[SPECTRAL] Rol {role}: {len(selected)} samples seleccionados (coherencia >= {min_coherence})")
return selected
def calculate_coherence(s1, s2):
"""Calcula coherencia entre dos samples usando features pre-calculadas."""
scores = []
# 1. Similitud de timbre (MFCC) - 40%
mfcc_sim = cosine_similarity(s1['mfccs'], s2['mfccs'])
scores.append(mfcc_sim * 0.40)
# 2. Compatibilidad espectral - 30%
centroid_diff = abs(s1['spectral_centroid'] - s2['spectral_centroid']) / max(s1['spectral_centroid'], 1)
centroid_sim = max(0, 1 - centroid_diff)
scores.append(centroid_sim * 0.30)
# 3. Balance de energia - 20%
rms_diff = abs(s1['rms'] - s2['rms']) / 60 # Normalizar
rms_sim = max(0, 1 - rms_diff)
scores.append(rms_sim * 0.20)
# 4. ZCR compatibilidad - 10%
zcr_sim = 1 - min(1, abs(s1['zcr'] - s2['zcr']) * 10)
scores.append(zcr_sim * 0.10)
return sum(scores)
def cosine_similarity(v1, v2):
"""Calcula similitud coseno entre dos vectores."""
try:
v1_arr = np.array(v1)
v2_arr = np.array(v2)
dot = np.dot(v1_arr, v2_arr)
norm = np.linalg.norm(v1_arr) * np.linalg.norm(v2_arr)
return float(dot / norm) if norm > 0 else 0.0
except:
return 0.0
# Seleccionar samples coherentes por rol
logger.info("[SPECTRAL] Iniciando seleccion coherente...")
selected_kits = {}
coherence_scores = {}
for role in ["kick", "snare", "hihat", "perc", "bass", "drumloop", "oneshot", "fx"]:
samples = get_samples_for_role(role, min_coherence=coherence_threshold)
selected_kits[role] = samples
# Calcular score promedio de coherencia para este rol
if len(samples) >= 2:
pairwise_scores = []
for i in range(len(samples)):
for j in range(i+1, len(samples)):
score = calculate_coherence(samples[i], samples[j])
pairwise_scores.append(score)
avg_coherence = np.mean(pairwise_scores) if pairwise_scores else 0
else:
avg_coherence = 0.85 # Default si solo hay 1 sample
coherence_scores[role] = round(avg_coherence, 3)
# Reporte de coherencia
overall_coherence = np.mean(list(coherence_scores.values()))
logger.info(f"[SPECTRAL] Coherencia general: {overall_coherence:.3f}")
# Ahora crear la produccion con los samples seleccionados
tracks_created = []
samples_loaded = []
# Crear tracks y cargar samples coherentes
for role_idx, (role, samples) in enumerate(selected_kits.items()):
if not samples:
continue
# Crear track
track_result = _send_to_ableton(
"create_audio_track",
{"index": -1},
timeout=TIMEOUTS["create_audio_track"]
)
if track_result.get("status") != "success":
continue
track_index = track_result["result"]["track_index"]
# Renombrar track
_send_to_ableton(
"set_track_name",
{"track_index": track_index, "name": f"{role.title()} Spectral"},
timeout=10.0
)
# Cargar samples coherentes en slots
for slot_idx, sample in enumerate(samples[:8]): # Max 8 slots
sample_path = os.path.join(LIBRARY_PATH, sample['path'])
if os.path.exists(sample_path):
load_result = _send_to_ableton(
"load_sample_to_clip",
{"track_index": track_index, "clip_index": slot_idx, "sample_path": sample_path},
timeout=TIMEOUTS["load_sample_to_clip"]
)
if load_result.get("status") == "success":
samples_loaded.append({
"role": role,
"track": track_index,
"slot": slot_idx,
"path": sample['path'],
"bpm": sample['bpm'],
"key": sample['key'],
"duration": sample['duration']
})
tracks_created.append({
"role": role,
"track_index": track_index,
"samples_count": len([s for s in samples_loaded if s['role'] == role])
})
conn.close()
# Disparar clips para escuchar
for track_info in tracks_created:
if track_info['samples_count'] > 0:
_send_to_ableton(
"fire_clip",
{"track_index": track_info['track_index'], "clip_index": 0},
timeout=10.0
)
# Iniciar playback
_send_to_ableton("start_playback", {}, timeout=10.0)
return _ok({
"status": "success",
"message": "Produccion profesional con coherencia espectral creada",
"total_samples_analyzed": total_samples,
"samples_used": len(samples_loaded),
"tracks_created": len(tracks_created),
"coherence_threshold": coherence_threshold,
"coherence_scores_by_role": coherence_scores,
"overall_coherence": round(overall_coherence, 3),
"is_professional": overall_coherence >= 0.90,
"tracks": tracks_created,
"samples": samples_loaded[:20], # Primeros 20 para preview
"project_bpm": bpm,
"project_key": key,
"style": style
})
except Exception as e:
logger.error(f"[SPECTRAL] Error: {str(e)}")
return _err(f"Error en produccion espectral: {str(e)}")
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# MAIN # MAIN
# ------------------------------------------------------------------ # ------------------------------------------------------------------