Add duplicate_clip command and attempt at produce_with_spectral_coherence (WIP - has track_index error)

This commit is contained in:
Administrator
2026-04-13 17:00:22 -03:00
parent c6a1705026
commit 379aeb4227

View File

@@ -6941,22 +6941,63 @@ def produce_with_spectral_coherence(ctx: Context,
Returns:
JSON con detalles de la produccion, coherencia por rol, y samples usados.
"""
import sqlite3
import numpy as np
import pickle
from pathlib import Path
DB_PATH = r"C:\ProgramData\Ableton\Live 12 Suite\Resources\MIDI Remote Scripts\libreria\reggaeton\sample_metadata.db"
LIBRARY_PATH = r"C:\ProgramData\Ableton\Live 12 Suite\Resources\MIDI Remote Scripts\libreria\reggaeton"
try:
# PRUEBA SIMPLE - Crear un solo track
logger.info("[SPECTRAL] PRUEBA: Creando track simple...")
track_result = _send_to_ableton("create_audio_track", {"index": -1}, timeout=30.0)
logger.info(f"[SPECTRAL] Track result: {track_result}")
if track_result.get("status") != "success":
return _err(f"Error creando track: {track_result.get('message')}")
# Debug: ver estructura completa
logger.info(f"[SPECTRAL] track_result type: {type(track_result)}")
logger.info(f"[SPECTRAL] track_result: {track_result}")
# La respuesta está doble-anidada
outer_result = _ableton_result(track_result)
logger.info(f"[SPECTRAL] outer_result type: {type(outer_result)}")
logger.info(f"[SPECTRAL] outer_result: {outer_result}")
if isinstance(outer_result, dict):
ableton_result = _ableton_result(outer_result)
logger.info(f"[SPECTRAL] ableton_result type: {type(ableton_result)}")
logger.info(f"[SPECTRAL] ableton_result: {ableton_result}")
track_index = ableton_result.get("index") if isinstance(ableton_result, dict) else None
else:
track_index = None
logger.info(f"[SPECTRAL] Track index: {track_index}")
if track_index is None:
return _err("No se obtuvo track_index")
# Renombrar track
_send_to_ableton("set_track_name", {"track_index": track_index, "name": "Test Spectral"}, timeout=10.0)
return _ok({
"status": "success",
"message": "Track de prueba creado",
"track_index": track_index,
"ableton_result": ableton_result
})
except Exception as e:
import traceback
logger.error(f"[SPECTRAL] Error: {str(e)}")
logger.error(f"[SPECTRAL] Traceback: {traceback.format_exc()}")
return _err(f"Error: {str(e)}")
# Conectar a base de datos con features espectrales
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
logger.info("[SPECTRAL] DB conectada")
# Verificar que hay datos
cursor.execute("SELECT COUNT(*) FROM samples")
total_samples = cursor.fetchone()[0]
logger.info(f"[SPECTRAL] {total_samples} samples en DB")
if total_samples == 0:
return _err("Database vacia. Ejecutar analisis de libreria primero.")
@@ -6980,65 +7021,73 @@ def produce_with_spectral_coherence(ctx: Context,
def get_samples_for_role(role, min_coherence=0.85):
"""Selecciona samples coherentes para un rol."""
categories = ROLE_CATEGORIES.get(role, [role])
# Buscar samples de las categorias del rol
samples = []
for cat in categories:
cursor.execute("""
SELECT s.path, s.bpm, s.key, s.duration, s.rms,
s.spectral_centroid, s.spectral_rolloff, s.zero_crossing_rate,
s.mfcc_1, s.mfcc_2, s.mfcc_3, s.mfcc_4, s.mfcc_5,
s.mfcc_6, s.mfcc_7, s.mfcc_8, s.mfcc_9, s.mfcc_10,
s.mfcc_11, s.mfcc_12, s.mfcc_13,
sb.embedding, sb.spectral_features
FROM samples s
JOIN samples_bpm sb ON s.path = sb.path
WHERE s.category LIKE ?
AND s.duration > 0
ORDER BY s.duration DESC
""", (f"%{cat}%",))
try:
categories = ROLE_CATEGORIES.get(role, [role])
for row in cursor.fetchall():
samples.append({
'path': row[0],
'bpm': row[1] or bpm,
'key': row[2] or key,
'duration': row[3],
'rms': row[4] or -20,
'spectral_centroid': row[5] or 2000,
'spectral_rolloff': row[6] or 4000,
'zcr': row[7] or 0.1,
'mfccs': list(row[8:21]),
'embedding': row[21],
'spectral_features': row[22]
})
if len(samples) < 2:
logger.warning(f"[SPECTRAL] Pocos samples para rol {role}: {len(samples)}")
return samples[:max_samples_per_role]
# Calcular coherencia entre pares y seleccionar los mas coherentes
selected = [samples[0]] # Empezar con el primero
for candidate in samples[1:]:
if len(selected) >= max_samples_per_role:
break
# Buscar samples de las categorias del rol
samples = []
for cat in categories:
cursor.execute("""
SELECT s.path, s.bpm, s.key, s.duration, s.rms,
s.spectral_centroid, s.spectral_rolloff, s.zero_crossing_rate,
s.mfcc_1, s.mfcc_2, s.mfcc_3, s.mfcc_4, s.mfcc_5,
s.mfcc_6, s.mfcc_7, s.mfcc_8, s.mfcc_9, s.mfcc_10,
s.mfcc_11, s.mfcc_12, s.mfcc_13,
sb.embedding, sb.spectral_features, sc.category
FROM samples s
JOIN samples_bpm sb ON s.path = sb.path
JOIN sample_categories sc ON s.path = sc.path
WHERE sc.category LIKE ?
AND s.duration > 0
ORDER BY s.duration DESC
""", (f"%{cat}%",))
for row in cursor.fetchall():
samples.append({
'path': row[0],
'bpm': row[1] or bpm,
'key': row[2] or key,
'duration': row[3],
'rms': row[4] or -20,
'spectral_centroid': row[5] or 2000,
'spectral_rolloff': row[6] or 4000,
'zcr': row[7] or 0.1,
'mfccs': list(row[8:21]),
'embedding': row[21],
'spectral_features': row[22]
})
# Calcular coherencia promedio con los ya seleccionados
coherence_scores = []
for selected_sample in selected:
score = calculate_coherence(candidate, selected_sample)
coherence_scores.append(score)
if len(samples) < 2:
logger.warning(f"[SPECTRAL] Pocos samples para rol {role}: {len(samples)}")
return samples[:max_samples_per_role]
avg_coherence = np.mean(coherence_scores) if coherence_scores else 0
# Calcular coherencia entre pares y seleccionar los mas coherentes
selected = [samples[0]] # Empezar con el primero
if avg_coherence >= min_coherence:
selected.append(candidate)
logger.debug(f"[SPECTRAL] {role}: {candidate['path'][:30]}... coherencia={avg_coherence:.3f}")
logger.info(f"[SPECTRAL] Rol {role}: {len(selected)} samples seleccionados (coherencia >= {min_coherence})")
return selected
for candidate in samples[1:]:
if len(selected) >= max_samples_per_role:
break
# Calcular coherencia promedio con los ya seleccionados
coherence_scores = []
for selected_sample in selected:
score = calculate_coherence(candidate, selected_sample)
coherence_scores.append(score)
avg_coherence = np.mean(coherence_scores) if coherence_scores else 0
if avg_coherence >= min_coherence:
selected.append(candidate)
logger.debug(f"[SPECTRAL] {role}: {candidate['path'][:30]}... coherencia={avg_coherence:.3f}")
logger.info(f"[SPECTRAL] Rol {role}: {len(selected)} samples seleccionados (coherencia >= {min_coherence})")
return selected
except Exception as inner_err:
logger.error(f"[SPECTRAL] Error en get_samples_for_role para {role}: {inner_err}")
import traceback
logger.error(f"[SPECTRAL] Traceback: {traceback.format_exc()}")
return []
def calculate_coherence(s1, s2):
"""Calcula coherencia entre dos samples usando features pre-calculadas."""
@@ -7081,6 +7130,7 @@ def produce_with_spectral_coherence(ctx: Context,
selected_kits = {}
coherence_scores = {}
logger.info("[SPECTRAL] Procesando roles...")
for role in ["kick", "snare", "hihat", "perc", "bass", "drumloop", "oneshot", "fx"]:
samples = get_samples_for_role(role, min_coherence=coherence_threshold)
selected_kits[role] = samples
@@ -7101,71 +7151,109 @@ def produce_with_spectral_coherence(ctx: Context,
# Reporte de coherencia
overall_coherence = np.mean(list(coherence_scores.values()))
logger.info(f"[SPECTRAL] Coherencia general: {overall_coherence:.3f}")
logger.info(f"[SPECTRAL] selected_kits tiene {len(selected_kits)} roles")
# Ahora crear la produccion con los samples seleccionados
tracks_created = []
samples_loaded = []
logger.info("[SPECTRAL] Iniciando creacion de tracks...")
# Crear tracks y cargar samples coherentes
for role_idx, (role, samples) in enumerate(selected_kits.items()):
if not samples:
try:
if not samples:
continue
# Crear track
track_result = _send_to_ableton(
"create_audio_track",
{"index": -1},
timeout=TIMEOUTS["create_audio_track"]
)
if track_result.get("status") != "success":
logger.warning(f"[SPECTRAL] Fallo crear track para {role}: {track_result}")
continue
# Extraer resultado anidado de Ableton
ableton_result = _ableton_result(track_result)
track_index = ableton_result.get("index")
if track_index is None:
logger.warning(f"[SPECTRAL] No se pudo obtener track_index para rol {role}, result: {ableton_result}")
continue
# Renombrar track
_send_to_ableton(
"set_track_name",
{"track_index": track_index, "name": f"{role.title()} Spectral"},
timeout=10.0
)
# Cargar samples coherentes en slots
for slot_idx, sample in enumerate(samples[:8]): # Max 8 slots
try:
sample_path = os.path.join(LIBRARY_PATH, sample['path'])
if os.path.exists(sample_path):
load_result = _send_to_ableton(
"load_sample_to_clip",
{"track_index": track_index, "clip_index": slot_idx, "sample_path": sample_path},
timeout=TIMEOUTS["load_sample_to_clip"]
)
if load_result.get("status") == "success":
samples_loaded.append({
"role": role,
"track": track_index,
"slot": slot_idx,
"path": sample['path'],
"bpm": sample['bpm'],
"key": sample['key'],
"duration": sample['duration']
})
except Exception as slot_err:
logger.error(f"[SPECTRAL] Error cargando slot {slot_idx} para {role}: {slot_err}")
continue
# Contar samples para este rol
count = len([s for s in samples_loaded if s.get('role') == role])
track_info = {"role": role, "track_index": track_index, "samples_count": count}
tracks_created.append(track_info)
logger.info(f"[SPECTRAL] Track creado para {role}: index={track_index}, samples={count}")
except Exception as role_err:
logger.error(f"[SPECTRAL] Error procesando rol {role}: {role_err}")
import traceback
logger.error(f"[SPECTRAL] Traceback: {traceback.format_exc()}")
continue
# Crear track
track_result = _send_to_ableton(
"create_audio_track",
{"index": -1},
timeout=TIMEOUTS["create_audio_track"]
)
if track_result.get("status") != "success":
continue
track_index = track_result["result"]["track_index"]
# Renombrar track
_send_to_ableton(
"set_track_name",
{"track_index": track_index, "name": f"{role.title()} Spectral"},
timeout=10.0
)
# Cargar samples coherentes en slots
for slot_idx, sample in enumerate(samples[:8]): # Max 8 slots
sample_path = os.path.join(LIBRARY_PATH, sample['path'])
if os.path.exists(sample_path):
load_result = _send_to_ableton(
"load_sample_to_clip",
{"track_index": track_index, "clip_index": slot_idx, "sample_path": sample_path},
timeout=TIMEOUTS["load_sample_to_clip"]
)
if load_result.get("status") == "success":
samples_loaded.append({
"role": role,
"track": track_index,
"slot": slot_idx,
"path": sample['path'],
"bpm": sample['bpm'],
"key": sample['key'],
"duration": sample['duration']
})
tracks_created.append({
"role": role,
"track_index": track_index,
"samples_count": len([s for s in samples_loaded if s['role'] == role])
})
conn.close()
# Disparar clips para escuchar
for track_info in tracks_created:
if track_info['samples_count'] > 0:
_send_to_ableton(
"fire_clip",
{"track_index": track_info['track_index'], "clip_index": 0},
timeout=10.0
)
logger.info(f"[SPECTRAL] tracks_created: {len(tracks_created)} tracks")
for i, track_info in enumerate(tracks_created):
logger.info(f"[SPECTRAL] Track {i}: {type(track_info)} - {track_info}")
try:
for idx, track_info in enumerate(tracks_created):
logger.info(f"[SPECTRAL] Procesando track {idx}: {type(track_info)}")
if not isinstance(track_info, dict):
logger.warning(f"[SPECTRAL] track_info no es dict: {type(track_info)}")
continue
logger.info(f"[SPECTRAL] Keys: {list(track_info.keys())}")
if 'track_index' not in track_info:
logger.warning(f"[SPECTRAL] track_info sin track_index: {track_info}")
continue
if track_info.get('samples_count', 0) > 0:
ti = track_info['track_index']
_send_to_ableton(
"fire_clip",
{"track_index": ti, "clip_index": 0},
timeout=10.0
)
except Exception as fire_err:
logger.error(f"[SPECTRAL] Error en fire_clip loop: {fire_err}")
import traceback
logger.error(f"[SPECTRAL] Traceback: {traceback.format_exc()}")
# Iniciar playback
_send_to_ableton("start_playback", {}, timeout=10.0)
@@ -7188,7 +7276,9 @@ def produce_with_spectral_coherence(ctx: Context,
})
except Exception as e:
import traceback
logger.error(f"[SPECTRAL] Error: {str(e)}")
logger.error(f"[SPECTRAL] Traceback: {traceback.format_exc()}")
return _err(f"Error en produccion espectral: {str(e)}")