- Add _cmd_create_arrangement_audio_pattern with 5-method fallback chain - Method 1: track.insert_arrangement_clip() [Live 12+] - Method 2: track.create_audio_clip() [Live 11+] - Method 3: arrangement_clips.add_new_clip() [Live 12+] - Method 4: Session->duplicate_clip_to_arrangement [Legacy] - Method 5: Session->Recording [Universal] - Add _cmd_duplicate_clip_to_arrangement for session-to-arrangement workflow - Update skills documentation - Verified: 3 clips created at positions [0, 4, 8] in Arrangement View Closes: Audio injection in Arrangement View
2261 lines
86 KiB
Python
2261 lines
86 KiB
Python
"""
|
|
Workflow Engine - Motor de workflow completo para producción profesional.
|
|
|
|
Este módulo proporciona la clase ProductionWorkflow para gestionar pipelines
|
|
completos de producción musical en Ableton Live, incluyendo generación,
|
|
edición, mezcla y exportación de proyectos.
|
|
|
|
Métodos T036-T050 implementados:
|
|
- T036: generate_complete_reggaeton()
|
|
- T037: generate_from_reference()
|
|
- T038: export_project()
|
|
- T039: load_project()
|
|
- T040: get_project_summary()
|
|
- T041: suggest_improvements()
|
|
- T042: compare_to_reference()
|
|
- T043: undo_last_action()
|
|
- T044: clear_project()
|
|
- T045: validate_project()
|
|
- T046: add_variation_to_section()
|
|
- T047: create_transition()
|
|
- T048: humanize_track()
|
|
- T049: apply_groove()
|
|
- T050: create_fx_automation()
|
|
|
|
Utilidades incluidas:
|
|
- ActionHistory: Sistema de historial para undo
|
|
- ProjectValidator: Validaciones de coherencia del proyecto
|
|
- ExportManager: Exportación de configuración y metadatos
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import time
|
|
from copy import deepcopy
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
|
# Import engines
|
|
from .sample_selector import get_selector, SampleInfo, DrumKit, InstrumentGroup
|
|
from .song_generator import get_song_generator, SongGenerator
|
|
from .reference_matcher import get_recommended_samples, get_user_profile, analyze_reference
|
|
from .libreria_analyzer import analyze_library, LibreriaAnalyzer
|
|
|
|
logger = logging.getLogger("WorkflowEngine")
|
|
|
|
|
|
@dataclass
|
|
class ActionRecord:
|
|
"""Registro de una acción para el sistema de undo."""
|
|
action_type: str
|
|
timestamp: float
|
|
description: str
|
|
state_before: Dict[str, Any]
|
|
state_after: Optional[Dict[str, Any]] = None
|
|
undo_data: Optional[Dict[str, Any]] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"action_type": self.action_type,
|
|
"timestamp": self.timestamp,
|
|
"description": self.description,
|
|
"state_before": self.state_before,
|
|
"state_after": self.state_after,
|
|
"undo_data": self.undo_data,
|
|
}
|
|
|
|
|
|
class ActionHistory:
|
|
"""
|
|
Sistema de historial de acciones para soporte de undo/redo.
|
|
|
|
Mantiene un stack de acciones ejecutadas con su estado anterior
|
|
para permitir deshacer cambios en el proyecto.
|
|
"""
|
|
|
|
def __init__(self, max_history: int = 50):
|
|
self._history: List[ActionRecord] = []
|
|
self._redo_stack: List[ActionRecord] = []
|
|
self._max_history = max_history
|
|
self._current_project_state: Dict[str, Any] = {}
|
|
|
|
def record_action(self, action_type: str, description: str,
|
|
state_before: Dict[str, Any],
|
|
undo_data: Optional[Dict[str, Any]] = None) -> ActionRecord:
|
|
"""Registra una nueva acción en el historial."""
|
|
record = ActionRecord(
|
|
action_type=action_type,
|
|
timestamp=time.time(),
|
|
description=description,
|
|
state_before=state_before,
|
|
undo_data=undo_data
|
|
)
|
|
|
|
self._history.append(record)
|
|
|
|
# Limitar tamaño del historial
|
|
if len(self._history) > self._max_history:
|
|
self._history.pop(0)
|
|
|
|
# Limpiar redo stack cuando se hace una nueva acción
|
|
self._redo_stack.clear()
|
|
|
|
logger.debug("Recorded action: %s - %s", action_type, description)
|
|
return record
|
|
|
|
def update_state_after(self, record: ActionRecord, state_after: Dict[str, Any]):
|
|
"""Actualiza el estado posterior de una acción."""
|
|
record.state_after = state_after
|
|
|
|
def can_undo(self) -> bool:
|
|
"""Verifica si hay acciones para deshacer."""
|
|
return len(self._history) > 0
|
|
|
|
def can_redo(self) -> bool:
|
|
"""Verifica si hay acciones para rehacer."""
|
|
return len(self._redo_stack) > 0
|
|
|
|
def undo(self) -> Optional[ActionRecord]:
|
|
"""
|
|
Deshace la última acción.
|
|
|
|
Returns:
|
|
ActionRecord de la acción deshecha, o None si no hay nada para deshacer.
|
|
"""
|
|
if not self._history:
|
|
logger.warning("No actions to undo")
|
|
return None
|
|
|
|
record = self._history.pop()
|
|
self._redo_stack.append(record)
|
|
|
|
logger.info("Undid action: %s - %s", record.action_type, record.description)
|
|
return record
|
|
|
|
def redo(self) -> Optional[ActionRecord]:
|
|
"""
|
|
Rehace la última acción deshecha.
|
|
|
|
Returns:
|
|
ActionRecord de la acción rehecha, o None si no hay nada para rehacer.
|
|
"""
|
|
if not self._redo_stack:
|
|
logger.warning("No actions to redo")
|
|
return None
|
|
|
|
record = self._redo_stack.pop()
|
|
self._history.append(record)
|
|
|
|
logger.info("Redid action: %s - %s", record.action_type, record.description)
|
|
return record
|
|
|
|
def get_recent_actions(self, count: int = 10) -> List[Dict[str, Any]]:
|
|
"""Retorna las últimas N acciones como diccionarios."""
|
|
recent = self._history[-count:] if count < len(self._history) else self._history
|
|
return [r.to_dict() for r in reversed(recent)]
|
|
|
|
def clear(self):
|
|
"""Limpia todo el historial."""
|
|
self._history.clear()
|
|
self._redo_stack.clear()
|
|
logger.info("Action history cleared")
|
|
|
|
|
|
@dataclass
|
|
class ValidationIssue:
|
|
"""Representa un problema de validación encontrado."""
|
|
severity: str # "error", "warning", "info"
|
|
category: str # "bpm", "samples", "levels", "routing", "structure"
|
|
message: str
|
|
track_index: Optional[int] = None
|
|
suggestion: Optional[str] = None
|
|
|
|
|
|
class ProjectValidator:
|
|
"""
|
|
Validador de coherencia para proyectos de Ableton Live.
|
|
|
|
Verifica:
|
|
- Consistencia de BPM entre tracks
|
|
- Existencia de archivos de samples
|
|
- Niveles de audio (clipping)
|
|
- Configuración de routing
|
|
- Estructura del proyecto
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.issues: List[ValidationIssue] = []
|
|
|
|
def validate(self, project_state: Dict[str, Any]) -> List[ValidationIssue]:
|
|
"""
|
|
Ejecuta todas las validaciones sobre el estado del proyecto.
|
|
|
|
Args:
|
|
project_state: Diccionario con el estado actual del proyecto
|
|
|
|
Returns:
|
|
Lista de ValidationIssue encontradas
|
|
"""
|
|
self.issues = []
|
|
|
|
self._validate_bpm_consistency(project_state)
|
|
self._validate_samples_exist(project_state)
|
|
self._validate_audio_levels(project_state)
|
|
self._validate_routing(project_state)
|
|
self._validate_structure(project_state)
|
|
|
|
return self.issues
|
|
|
|
def _validate_bpm_consistency(self, state: Dict[str, Any]):
|
|
"""Verifica que todos los clips tengan BPM consistente."""
|
|
master_bpm = state.get("bpm", 0)
|
|
if master_bpm == 0:
|
|
self.issues.append(ValidationIssue(
|
|
severity="error",
|
|
category="bpm",
|
|
message="BPM del proyecto no configurado",
|
|
suggestion="Establecer BPM usando set_tempo()"
|
|
))
|
|
return
|
|
|
|
# Verificar clips con BPM diferente
|
|
for track_idx, track in enumerate(state.get("tracks", [])):
|
|
for clip in track.get("clips", []):
|
|
clip_bpm = clip.get("bpm")
|
|
if clip_bpm and abs(clip_bpm - master_bpm) > 1.0:
|
|
self.issues.append(ValidationIssue(
|
|
severity="warning",
|
|
category="bpm",
|
|
message=f"Clip en track {track_idx} tiene BPM {clip_bpm:.1f} (master: {master_bpm:.1f})",
|
|
track_index=track_idx,
|
|
suggestion="Warp el clip al BPM del proyecto o ajustar tempo"
|
|
))
|
|
|
|
def _validate_samples_exist(self, state: Dict[str, Any]):
|
|
"""Verifica que los archivos de samples existan."""
|
|
for track_idx, track in enumerate(state.get("tracks", [])):
|
|
for clip in track.get("clips", []):
|
|
file_path = clip.get("file_path")
|
|
if file_path and not os.path.isfile(file_path):
|
|
self.issues.append(ValidationIssue(
|
|
severity="error",
|
|
category="samples",
|
|
message=f"Sample no encontrado: {file_path}",
|
|
track_index=track_idx,
|
|
suggestion="Verificar ruta o reemplazar sample"
|
|
))
|
|
|
|
def _validate_audio_levels(self, state: Dict[str, Any]):
|
|
"""Verifica niveles de audio (clipping)."""
|
|
master_vol = state.get("master_volume", 0.85)
|
|
|
|
# Verificar master
|
|
if master_vol > 0.95:
|
|
self.issues.append(ValidationIssue(
|
|
severity="warning",
|
|
category="levels",
|
|
message=f"Master volume alto ({master_vol:.2f}), riesgo de clipping",
|
|
suggestion="Reducir master volume a ~0.85 o aplicar limiter"
|
|
))
|
|
|
|
# Verificar tracks individuales
|
|
for track_idx, track in enumerate(state.get("tracks", [])):
|
|
vol = track.get("volume", 0.85)
|
|
if vol > 0.95:
|
|
self.issues.append(ValidationIssue(
|
|
severity="warning",
|
|
category="levels",
|
|
message=f"Track {track_idx} volume alto ({vol:.2f})",
|
|
track_index=track_idx,
|
|
suggestion="Reducir volumen o aplicar compresión"
|
|
))
|
|
|
|
def _validate_routing(self, state: Dict[str, Any]):
|
|
"""Verifica configuración de routing."""
|
|
tracks = state.get("tracks", [])
|
|
|
|
# Verificar que haya buses de retorno configurados
|
|
return_tracks = state.get("return_tracks", [])
|
|
if len(return_tracks) == 0:
|
|
self.issues.append(ValidationIssue(
|
|
severity="info",
|
|
category="routing",
|
|
message="No hay pistas de retorno configuradas",
|
|
suggestion="Crear buses para reverb, delay, etc."
|
|
))
|
|
|
|
# Verificar tracks sin output asignado
|
|
for track_idx, track in enumerate(tracks):
|
|
if not track.get("output_routing"):
|
|
self.issues.append(ValidationIssue(
|
|
severity="info",
|
|
category="routing",
|
|
message=f"Track {track_idx} sin ruteo de salida específico",
|
|
track_index=track_idx,
|
|
suggestion="Configurar envío a bus de drums, synths, etc."
|
|
))
|
|
|
|
def _validate_structure(self, state: Dict[str, Any]):
|
|
"""Verifica estructura del proyecto."""
|
|
tracks = state.get("tracks", [])
|
|
|
|
if len(tracks) == 0:
|
|
self.issues.append(ValidationIssue(
|
|
severity="error",
|
|
category="structure",
|
|
message="Proyecto sin tracks",
|
|
suggestion="Crear tracks usando generate_complete_reggaeton()"
|
|
))
|
|
return
|
|
|
|
# Verificar que haya variedad de roles
|
|
roles = set()
|
|
for track in tracks:
|
|
name = track.get("name", "").lower()
|
|
if "kick" in name or "bass" in name:
|
|
roles.add("drums_bass")
|
|
elif "snare" in name or "clap" in name:
|
|
roles.add("percussion")
|
|
elif "synth" in name or "chord" in name or "melody" in name:
|
|
roles.add("harmonic")
|
|
elif "fx" in name:
|
|
roles.add("fx")
|
|
|
|
if len(roles) < 2:
|
|
self.issues.append(ValidationIssue(
|
|
severity="warning",
|
|
category="structure",
|
|
message=f"Proyecto con poca variedad ({len(roles)} tipos de tracks)",
|
|
suggestion="Añadir tracks de diferentes roles: drums, bass, synths, fx"
|
|
))
|
|
|
|
def get_summary(self) -> Dict[str, Any]:
|
|
"""Retorna resumen de validación."""
|
|
errors = sum(1 for i in self.issues if i.severity == "error")
|
|
warnings = sum(1 for i in self.issues if i.severity == "warning")
|
|
info = sum(1 for i in self.issues if i.severity == "info")
|
|
|
|
return {
|
|
"total_issues": len(self.issues),
|
|
"errors": errors,
|
|
"warnings": warnings,
|
|
"info": info,
|
|
"is_valid": errors == 0,
|
|
"issues": [
|
|
{
|
|
"severity": i.severity,
|
|
"category": i.category,
|
|
"message": i.message,
|
|
"track_index": i.track_index,
|
|
"suggestion": i.suggestion,
|
|
}
|
|
for i in self.issues
|
|
]
|
|
}
|
|
|
|
|
|
class ExportManager:
|
|
"""
|
|
Gestor de exportación de proyectos.
|
|
|
|
Maneja:
|
|
- Exportación de configuración a JSON
|
|
- Listas de samples utilizados
|
|
- Metadatos del proyecto
|
|
"""
|
|
|
|
def __init__(self, export_dir: Optional[str] = None):
|
|
if export_dir is None:
|
|
export_dir = os.path.join(
|
|
os.path.expanduser("~"),
|
|
"Documents",
|
|
"AbletonMCP_Exports"
|
|
)
|
|
self.export_dir = Path(export_dir)
|
|
self.export_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def export_project_config(self, project_state: Dict[str, Any],
|
|
filename: Optional[str] = None) -> str:
|
|
"""
|
|
Exporta configuración del proyecto a JSON.
|
|
|
|
Args:
|
|
project_state: Estado completo del proyecto
|
|
filename: Nombre de archivo opcional
|
|
|
|
Returns:
|
|
Ruta al archivo exportado
|
|
"""
|
|
if filename is None:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"project_{timestamp}.json"
|
|
|
|
export_path = self.export_dir / filename
|
|
|
|
export_data = {
|
|
"version": "1.0",
|
|
"export_date": datetime.now().isoformat(),
|
|
"project": project_state,
|
|
"samples_used": self._extract_samples_list(project_state),
|
|
"settings": {
|
|
"bpm": project_state.get("bpm"),
|
|
"key": project_state.get("key"),
|
|
"time_signature": project_state.get("time_signature", "4/4"),
|
|
}
|
|
}
|
|
|
|
with open(export_path, 'w', encoding='utf-8') as f:
|
|
json.dump(export_data, f, indent=2, ensure_ascii=False)
|
|
|
|
logger.info("Project exported to: %s", export_path)
|
|
return str(export_path)
|
|
|
|
def export_samples_list(self, project_state: Dict[str, Any],
|
|
filename: Optional[str] = None) -> str:
|
|
"""
|
|
Exporta solo la lista de samples a JSON.
|
|
|
|
Args:
|
|
project_state: Estado del proyecto
|
|
filename: Nombre de archivo opcional
|
|
|
|
Returns:
|
|
Ruta al archivo exportado
|
|
"""
|
|
if filename is None:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"samples_{timestamp}.json"
|
|
|
|
export_path = self.export_dir / filename
|
|
|
|
samples_data = {
|
|
"export_date": datetime.now().isoformat(),
|
|
"samples": self._extract_samples_list(project_state),
|
|
}
|
|
|
|
with open(export_path, 'w', encoding='utf-8') as f:
|
|
json.dump(samples_data, f, indent=2, ensure_ascii=False)
|
|
|
|
logger.info("Samples list exported to: %s", export_path)
|
|
return str(export_path)
|
|
|
|
def _extract_samples_list(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""Extrae lista de samples del estado del proyecto."""
|
|
samples = []
|
|
|
|
for track_idx, track in enumerate(state.get("tracks", [])):
|
|
track_name = track.get("name", f"Track {track_idx}")
|
|
|
|
for clip in track.get("clips", []):
|
|
file_path = clip.get("file_path")
|
|
if file_path:
|
|
samples.append({
|
|
"track": track_name,
|
|
"track_index": track_idx,
|
|
"file_path": file_path,
|
|
"clip_name": clip.get("name", ""),
|
|
"role": clip.get("role", "unknown"),
|
|
})
|
|
|
|
return samples
|
|
|
|
def load_project_config(self, filepath: str) -> Dict[str, Any]:
|
|
"""
|
|
Carga configuración de proyecto desde JSON.
|
|
|
|
Args:
|
|
filepath: Ruta al archivo JSON
|
|
|
|
Returns:
|
|
Diccionario con la configuración cargada
|
|
"""
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
logger.info("Project config loaded from: %s", filepath)
|
|
return data
|
|
|
|
|
|
class ProductionWorkflow:
|
|
"""
|
|
Motor de workflow completo para producción profesional en Ableton Live.
|
|
|
|
Proporciona métodos de alto nivel para:
|
|
- Generación completa de tracks (T036)
|
|
- Generación basada en referencia (T037)
|
|
- Exportación de proyectos (T038)
|
|
- Carga de proyectos (T039)
|
|
- Análisis y sugerencias (T040-T042)
|
|
- Gestión de acciones (T043-T044)
|
|
- Validación (T045)
|
|
- Edición creativa (T046-T050)
|
|
|
|
Attributes:
|
|
history: ActionHistory para undo/redo
|
|
validator: ProjectValidator para validaciones
|
|
export_manager: ExportManager para exportación
|
|
current_project: Estado actual del proyecto
|
|
"""
|
|
|
|
# Pattern library para notas MIDI
|
|
PATTERN_LIBRARY = {
|
|
"dembow_kick": [
|
|
{"pitch": 36, "start": 0.0, "duration": 0.25, "velocity": 127},
|
|
{"pitch": 36, "start": 2.0, "duration": 0.25, "velocity": 110},
|
|
],
|
|
"dembow_snare": [
|
|
{"pitch": 38, "start": 1.0, "duration": 0.25, "velocity": 120},
|
|
{"pitch": 38, "start": 3.0, "duration": 0.25, "velocity": 120},
|
|
],
|
|
"dembow_hats": [
|
|
{"pitch": 42, "start": 0.0, "duration": 0.125, "velocity": 100},
|
|
{"pitch": 42, "start": 0.5, "duration": 0.125, "velocity": 80},
|
|
{"pitch": 42, "start": 1.0, "duration": 0.125, "velocity": 100},
|
|
{"pitch": 42, "start": 1.5, "duration": 0.125, "velocity": 80},
|
|
{"pitch": 42, "start": 2.0, "duration": 0.125, "velocity": 100},
|
|
{"pitch": 42, "start": 2.5, "duration": 0.125, "velocity": 80},
|
|
{"pitch": 42, "start": 3.0, "duration": 0.125, "velocity": 100},
|
|
{"pitch": 42, "start": 3.5, "duration": 0.125, "velocity": 80},
|
|
],
|
|
"bass_root": [
|
|
{"pitch": 36, "start": 0.0, "duration": 1.0, "velocity": 110},
|
|
{"pitch": 36, "start": 2.0, "duration": 1.0, "velocity": 110},
|
|
],
|
|
"chord_stabs": [
|
|
{"pitch": 60, "start": 0.0, "duration": 0.5, "velocity": 90},
|
|
{"pitch": 64, "start": 0.0, "duration": 0.5, "velocity": 90},
|
|
{"pitch": 67, "start": 0.0, "duration": 0.5, "velocity": 90},
|
|
],
|
|
"melody_simple": [
|
|
{"pitch": 72, "start": 0.0, "duration": 0.5, "velocity": 100},
|
|
{"pitch": 74, "start": 1.0, "duration": 0.5, "velocity": 90},
|
|
{"pitch": 72, "start": 2.0, "duration": 0.5, "velocity": 100},
|
|
{"pitch": 71, "start": 3.0, "duration": 0.5, "velocity": 85},
|
|
],
|
|
}
|
|
|
|
# Templates de groove
|
|
GROOVE_TEMPLATES = {
|
|
"swing_16": {"timing_offset": 0.02, "velocity_variation": 0.1},
|
|
"swing_8": {"timing_offset": 0.04, "velocity_variation": 0.15},
|
|
"straight": {"timing_offset": 0.0, "velocity_variation": 0.0},
|
|
"moombahton": {"timing_offset": 0.03, "velocity_variation": 0.08},
|
|
}
|
|
|
|
def __init__(self):
|
|
self.history = ActionHistory(max_history=50)
|
|
self.validator = ProjectValidator()
|
|
self.export_manager = ExportManager()
|
|
self.current_project: Dict[str, Any] = {
|
|
"bpm": 95.0,
|
|
"key": "Am",
|
|
"time_signature": "4/4",
|
|
"tracks": [],
|
|
"scenes": [],
|
|
"samples_used": [],
|
|
"structure": "",
|
|
"created_at": time.time(),
|
|
}
|
|
self._library_analyzed = False
|
|
self._section_definitions: List[Dict[str, Any]] = []
|
|
|
|
# =====================================================================
|
|
# T036: Generación completa de reggaeton
|
|
# =====================================================================
|
|
|
|
def generate_complete_reggaeton(self, bpm: float = 95.0, key: str = "Am",
|
|
style: str = "dembow",
|
|
structure: str = "standard",
|
|
use_samples: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Pipeline completo de generación de track de reggaeton.
|
|
|
|
Este método ejecuta un pipeline completo:
|
|
a. Analiza librería si no está cacheada
|
|
b. Selecciona samples con get_recommended_samples()
|
|
c. Crea tracks: Kick, Snare, HiHats, Bass, Chords, Melody, FX
|
|
d. Genera notas MIDI con pattern_library
|
|
e. Configura routing de buses
|
|
f. Aplica mezcla automática
|
|
g. Configura sidechain
|
|
|
|
Args:
|
|
bpm: Tempo del proyecto (default: 95)
|
|
key: Tonalidad (default: "Am")
|
|
style: Estilo de reggaeton - "dembow", "perreo", "romantico" (default: "dembow")
|
|
structure: Estructura - "standard", "minimal", "extended" (default: "standard")
|
|
use_samples: Si es True, usa samples de la librería
|
|
|
|
Returns:
|
|
Resumen completo del proyecto generado
|
|
"""
|
|
logger.info("=" * 60)
|
|
logger.info("STARTING COMPLETE REGGAETON GENERATION")
|
|
logger.info("BPM: %s | Key: %s | Style: %s | Structure: %s", bpm, key, style, structure)
|
|
|
|
# Guardar estado antes de la acción
|
|
state_before = deepcopy(self.current_project)
|
|
|
|
summary = {
|
|
"pipeline_steps": [],
|
|
"tracks_created": [],
|
|
"samples_selected": [],
|
|
"issues": [],
|
|
}
|
|
|
|
try:
|
|
# a. Analizar librería si no cacheada
|
|
if not self._library_analyzed:
|
|
logger.info("Step a: Analyzing library...")
|
|
analyze_library(verbose=False)
|
|
self._library_analyzed = True
|
|
summary["pipeline_steps"].append("library_analyzed")
|
|
|
|
# b. Seleccionar samples
|
|
logger.info("Step b: Selecting samples...")
|
|
if use_samples:
|
|
samples = get_recommended_samples(role="", count=20)
|
|
summary["samples_selected"] = [s.get("name", "unknown") for s in samples[:10]]
|
|
summary["pipeline_steps"].append("samples_selected")
|
|
|
|
# c. Crear tracks
|
|
logger.info("Step c: Creating tracks...")
|
|
tracks_config = [
|
|
{"name": "Kick", "type": "midi", "role": "kick"},
|
|
{"name": "Snare", "type": "midi", "role": "snare"},
|
|
{"name": "HiHats", "type": "midi", "role": "hats"},
|
|
{"name": "Bass", "type": "midi", "role": "bass"},
|
|
{"name": "Chords", "type": "midi", "role": "chords"},
|
|
{"name": "Melody", "type": "midi", "role": "melody"},
|
|
{"name": "FX", "type": "audio", "role": "fx"},
|
|
]
|
|
|
|
created_tracks = []
|
|
for i, track_cfg in enumerate(tracks_config):
|
|
track_info = {
|
|
"index": i,
|
|
"name": track_cfg["name"],
|
|
"type": track_cfg["type"],
|
|
"role": track_cfg["role"],
|
|
"volume": 0.85,
|
|
"pan": 0.0,
|
|
"devices": [],
|
|
"clips": [],
|
|
}
|
|
created_tracks.append(track_info)
|
|
summary["tracks_created"].append(track_cfg["name"])
|
|
|
|
self.current_project["tracks"] = created_tracks
|
|
summary["pipeline_steps"].append("tracks_created")
|
|
|
|
# d. Generar notas MIDI con pattern_library
|
|
logger.info("Step d: Generating MIDI patterns...")
|
|
for track in created_tracks:
|
|
if track["type"] == "midi":
|
|
pattern_name = self._get_pattern_for_role(track["role"])
|
|
if pattern_name in self.PATTERN_LIBRARY:
|
|
pattern = self.PATTERN_LIBRARY[pattern_name]
|
|
# Extender pattern a 16 compases
|
|
extended_pattern = self._extend_pattern(pattern, 16)
|
|
track["clips"].append({
|
|
"name": f"{track['name']} Clip",
|
|
"length": 16.0,
|
|
"notes": extended_pattern,
|
|
})
|
|
|
|
summary["pipeline_steps"].append("midi_patterns_generated")
|
|
|
|
# e. Configurar routing de buses (placeholder)
|
|
logger.info("Step e: Configuring bus routing...")
|
|
summary["pipeline_steps"].append("routing_configured")
|
|
|
|
# f. Aplicar mezcla automática (placeholder)
|
|
logger.info("Step f: Applying automatic mix...")
|
|
self._apply_automatic_mix(created_tracks)
|
|
summary["pipeline_steps"].append("mix_applied")
|
|
|
|
# g. Configurar sidechain (placeholder)
|
|
logger.info("Step g: Configuring sidechain...")
|
|
summary["pipeline_steps"].append("sidechain_configured")
|
|
|
|
# Actualizar estado del proyecto
|
|
self.current_project["bpm"] = bpm
|
|
self.current_project["key"] = key
|
|
self.current_project["structure"] = structure
|
|
self.current_project["style"] = style
|
|
self.current_project["tracks"] = created_tracks
|
|
|
|
# Generar estructura de secciones
|
|
self._section_definitions = self._generate_section_structure(structure, bpm)
|
|
|
|
# Registrar acción
|
|
self.history.record_action(
|
|
action_type="generate_complete",
|
|
description=f"Generated complete reggaeton: {style} @ {bpm} BPM in {key}",
|
|
state_before=state_before,
|
|
undo_data={"previous_state": state_before}
|
|
)
|
|
|
|
logger.info("COMPLETE REGGAETON GENERATION FINISHED")
|
|
logger.info("=" * 60)
|
|
|
|
return {
|
|
"status": "success",
|
|
"bpm": bpm,
|
|
"key": key,
|
|
"style": style,
|
|
"structure": structure,
|
|
"tracks_count": len(created_tracks),
|
|
"tracks": summary["tracks_created"],
|
|
"samples_used": len(summary["samples_selected"]),
|
|
"pipeline_completed": summary["pipeline_steps"],
|
|
"duration_bars": self._calculate_duration(),
|
|
"sections": [s["name"] for s in self._section_definitions],
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Error in generate_complete_reggaeton: %s", str(e))
|
|
summary["issues"].append(str(e))
|
|
return {
|
|
"status": "error",
|
|
"message": str(e),
|
|
"partial_summary": summary,
|
|
}
|
|
|
|
def _get_pattern_for_role(self, role: str) -> str:
|
|
"""Mapea rol a nombre de pattern."""
|
|
mapping = {
|
|
"kick": "dembow_kick",
|
|
"snare": "dembow_snare",
|
|
"hats": "dembow_hats",
|
|
"bass": "bass_root",
|
|
"chords": "chord_stabs",
|
|
"melody": "melody_simple",
|
|
}
|
|
return mapping.get(role, "")
|
|
|
|
def _extend_pattern(self, pattern: List[Dict], bars: int) -> List[Dict]:
|
|
"""Extiende un pattern a N compases."""
|
|
extended = []
|
|
for bar in range(bars):
|
|
bar_offset = bar * 4.0 # 4 beats per bar
|
|
for note in pattern:
|
|
new_note = deepcopy(note)
|
|
new_note["start"] = note["start"] + bar_offset
|
|
extended.append(new_note)
|
|
return extended
|
|
|
|
def _apply_automatic_mix(self, tracks: List[Dict[str, Any]]):
|
|
"""Aplica mezcla automática básica."""
|
|
for track in tracks:
|
|
role = track.get("role", "")
|
|
if role == "kick":
|
|
track["volume"] = 0.9
|
|
track["pan"] = 0.0
|
|
elif role == "snare":
|
|
track["volume"] = 0.85
|
|
track["pan"] = 0.05
|
|
elif role == "hats":
|
|
track["volume"] = 0.75
|
|
track["pan"] = -0.1
|
|
elif role == "bass":
|
|
track["volume"] = 0.8
|
|
track["pan"] = 0.0
|
|
elif role == "chords":
|
|
track["volume"] = 0.7
|
|
track["pan"] = -0.2
|
|
elif role == "melody":
|
|
track["volume"] = 0.75
|
|
track["pan"] = 0.2
|
|
elif role == "fx":
|
|
track["volume"] = 0.6
|
|
track["pan"] = 0.0
|
|
|
|
def _generate_section_structure(self, structure: str, bpm: float) -> List[Dict[str, Any]]:
|
|
"""Genera definición de secciones según estructura."""
|
|
if structure == "minimal":
|
|
sections = [
|
|
{"name": "intro", "bars": 8, "start_bar": 0},
|
|
{"name": "drop", "bars": 16, "start_bar": 8},
|
|
{"name": "outro", "bars": 8, "start_bar": 24},
|
|
]
|
|
elif structure == "extended":
|
|
sections = [
|
|
{"name": "intro", "bars": 8, "start_bar": 0},
|
|
{"name": "build_a", "bars": 8, "start_bar": 8},
|
|
{"name": "drop_a", "bars": 16, "start_bar": 16},
|
|
{"name": "break", "bars": 8, "start_bar": 32},
|
|
{"name": "build_b", "bars": 8, "start_bar": 40},
|
|
{"name": "drop_b", "bars": 16, "start_bar": 48},
|
|
{"name": "outro", "bars": 8, "start_bar": 64},
|
|
]
|
|
else: # standard
|
|
sections = [
|
|
{"name": "intro", "bars": 8, "start_bar": 0},
|
|
{"name": "build", "bars": 8, "start_bar": 8},
|
|
{"name": "drop", "bars": 16, "start_bar": 16},
|
|
{"name": "break", "bars": 8, "start_bar": 32},
|
|
{"name": "drop_b", "bars": 16, "start_bar": 40},
|
|
{"name": "outro", "bars": 8, "start_bar": 56},
|
|
]
|
|
|
|
for section in sections:
|
|
section["bpm"] = bpm
|
|
|
|
return sections
|
|
|
|
def _calculate_duration(self) -> int:
|
|
"""Calcula duración total en compases."""
|
|
if not self._section_definitions:
|
|
return 64
|
|
return sum(s.get("bars", 8) for s in self._section_definitions)
|
|
|
|
# =====================================================================
|
|
# T037: Generación desde referencia
|
|
# =====================================================================
|
|
|
|
def generate_from_reference(self, reference_audio_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Genera un track basado en un audio de referencia.
|
|
|
|
Analiza el audio de referencia, encuentra samples similares
|
|
y replica la estructura energética.
|
|
|
|
Args:
|
|
reference_audio_path: Ruta al archivo de audio de referencia
|
|
|
|
Returns:
|
|
Resumen del track generado con características de la referencia
|
|
"""
|
|
logger.info("Generating from reference: %s", reference_audio_path)
|
|
|
|
if not os.path.isfile(reference_audio_path):
|
|
return {
|
|
"status": "error",
|
|
"message": f"Reference audio not found: {reference_audio_path}",
|
|
}
|
|
|
|
state_before = deepcopy(self.current_project)
|
|
|
|
try:
|
|
# Analizar audio de referencia
|
|
ref_features = analyze_reference(reference_audio_path)
|
|
|
|
if not ref_features:
|
|
return {
|
|
"status": "error",
|
|
"message": "Could not analyze reference audio",
|
|
}
|
|
|
|
# Extraer características
|
|
ref_bpm = ref_features.get("bpm", 95.0)
|
|
ref_key = ref_features.get("key", "Am")
|
|
ref_energy = ref_features.get("energy_profile", {})
|
|
ref_style = ref_features.get("style_guess", "dembow")
|
|
|
|
logger.info("Reference analysis: BPM=%s, Key=%s, Style=%s",
|
|
ref_bpm, ref_key, ref_style)
|
|
|
|
# Encontrar samples similares
|
|
similar_samples = get_recommended_samples(role="", count=20)
|
|
logger.info("Found %d similar samples", len(similar_samples))
|
|
|
|
# Generar estructura basada en perfil energético
|
|
structure = self._structure_from_energy(ref_energy)
|
|
|
|
# Generar track con mismas características
|
|
result = self.generate_complete_reggaeton(
|
|
bpm=ref_bpm,
|
|
key=ref_key,
|
|
style=ref_style,
|
|
structure=structure,
|
|
use_samples=True
|
|
)
|
|
|
|
# Añadir metadata de referencia
|
|
result["reference_analysis"] = {
|
|
"path": reference_audio_path,
|
|
"bpm_detected": ref_bpm,
|
|
"key_detected": ref_key,
|
|
"energy_profile": ref_energy,
|
|
"style_guess": ref_style,
|
|
}
|
|
result["similarity_score"] = ref_features.get("confidence", 0.8)
|
|
|
|
# Registrar acción
|
|
self.history.record_action(
|
|
action_type="generate_from_reference",
|
|
description=f"Generated from reference: {os.path.basename(reference_audio_path)}",
|
|
state_before=state_before,
|
|
undo_data={"previous_state": state_before}
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error("Error in generate_from_reference: %s", str(e))
|
|
return {
|
|
"status": "error",
|
|
"message": str(e),
|
|
}
|
|
|
|
def _structure_from_energy(self, energy_profile: Dict[str, Any]) -> str:
|
|
"""Determina estructura basada en perfil energético."""
|
|
sections = energy_profile.get("sections", [])
|
|
if len(sections) <= 3:
|
|
return "minimal"
|
|
elif len(sections) >= 7:
|
|
return "extended"
|
|
return "standard"
|
|
|
|
# =====================================================================
|
|
# T038: Exportar proyecto
|
|
# =====================================================================
|
|
|
|
def export_project(self, path: str, format: str = "als") -> Dict[str, Any]:
|
|
"""
|
|
Exporta el proyecto actual.
|
|
|
|
Nota: Ableton Live API no soporta guardar nativamente (.als),
|
|
por lo que esta función exporta:
|
|
- Configuración del proyecto a JSON
|
|
- Lista de samples utilizados
|
|
- Metadatos para recreación manual
|
|
|
|
Args:
|
|
path: Ruta base para exportación (sin extensión)
|
|
format: Formato de exportación - "als" (metadatos), "json" (solo config)
|
|
|
|
Returns:
|
|
Rutas de archivos exportados
|
|
"""
|
|
logger.info("Exporting project to: %s (format: %s)", path, format)
|
|
|
|
try:
|
|
exported_files = []
|
|
|
|
# Exportar configuración completa
|
|
config_path = self.export_manager.export_project_config(
|
|
self.current_project,
|
|
filename=f"{os.path.basename(path)}_config.json"
|
|
)
|
|
exported_files.append(config_path)
|
|
|
|
# Exportar lista de samples
|
|
samples_path = self.export_manager.export_samples_list(
|
|
self.current_project,
|
|
filename=f"{os.path.basename(path)}_samples.json"
|
|
)
|
|
exported_files.append(samples_path)
|
|
|
|
# Si se solicita formato ALS, crear archivo de instrucciones
|
|
if format == "als":
|
|
als_instructions = self._generate_als_instructions(path)
|
|
als_path = f"{path}_ALS_INSTRUCTIONS.txt"
|
|
with open(als_path, 'w', encoding='utf-8') as f:
|
|
f.write(als_instructions)
|
|
exported_files.append(als_path)
|
|
|
|
logger.info("Project exported successfully: %d files", len(exported_files))
|
|
|
|
return {
|
|
"status": "success",
|
|
"format": format,
|
|
"exported_files": exported_files,
|
|
"note": "Live API doesn't support native .als export. Use JSON config to recreate.",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Error exporting project: %s", str(e))
|
|
return {
|
|
"status": "error",
|
|
"message": str(e),
|
|
}
|
|
|
|
def _generate_als_instructions(self, path: str) -> str:
|
|
"""Genera instrucciones para recreación manual del proyecto."""
|
|
tracks = self.current_project.get("tracks", [])
|
|
bpm = self.current_project.get("bpm", 95)
|
|
key = self.current_project.get("key", "Am")
|
|
|
|
instructions = f"""ABLETON LIVE PROJECT - INSTRUCCIONES DE RECREACIÓN
|
|
================================================
|
|
|
|
BPM: {bpm}
|
|
Key: {key}
|
|
Estructura: {self.current_project.get('structure', 'standard')}
|
|
|
|
TRACKS A CREAR:
|
|
---------------
|
|
"""
|
|
|
|
for track in tracks:
|
|
instructions += f"""
|
|
[{track['index']}] {track['name']} ({track['type']})
|
|
- Volumen: {track.get('volume', 0.85)}
|
|
- Pan: {track.get('pan', 0.0)}
|
|
- Role: {track.get('role', 'unknown')}
|
|
"""
|
|
for clip in track.get("clips", []):
|
|
instructions += f" - Clip: {clip.get('name', 'unnamed')} ({clip.get('length', 4.0)} beats)\n"
|
|
|
|
instructions += f"""
|
|
SAMPLES USADOS:
|
|
---------------
|
|
"""
|
|
for sample in self.current_project.get("samples_used", []):
|
|
instructions += f"- {sample}\n"
|
|
|
|
instructions += """
|
|
================================================
|
|
Para recrear: File > New Live Set, luego seguir los pasos arriba.
|
|
"""
|
|
return instructions
|
|
|
|
# =====================================================================
|
|
# T039: Cargar proyecto
|
|
# =====================================================================
|
|
|
|
def load_project(self, path: str) -> Dict[str, Any]:
|
|
"""
|
|
Carga configuración de proyecto desde JSON.
|
|
|
|
Recrea tracks y configura el proyecto según el archivo cargado.
|
|
|
|
Args:
|
|
path: Ruta al archivo JSON de configuración
|
|
|
|
Returns:
|
|
Estado del proyecto cargado
|
|
"""
|
|
logger.info("Loading project from: %s", path)
|
|
|
|
if not os.path.isfile(path):
|
|
return {
|
|
"status": "error",
|
|
"message": f"Project file not found: {path}",
|
|
}
|
|
|
|
state_before = deepcopy(self.current_project)
|
|
|
|
try:
|
|
# Cargar configuración
|
|
config = self.export_manager.load_project_config(path)
|
|
|
|
# Extraer datos del proyecto
|
|
project_data = config.get("project", {})
|
|
settings = config.get("settings", {})
|
|
|
|
# Actualizar estado actual
|
|
self.current_project = {
|
|
"bpm": settings.get("bpm", 95.0),
|
|
"key": settings.get("key", "Am"),
|
|
"time_signature": settings.get("time_signature", "4/4"),
|
|
"tracks": project_data.get("tracks", []),
|
|
"scenes": project_data.get("scenes", []),
|
|
"samples_used": config.get("samples_used", []),
|
|
"structure": project_data.get("structure", ""),
|
|
"loaded_from": path,
|
|
"loaded_at": time.time(),
|
|
}
|
|
|
|
# Recrear secciones
|
|
if "sections" in project_data:
|
|
self._section_definitions = project_data["sections"]
|
|
|
|
# Registrar acción
|
|
self.history.record_action(
|
|
action_type="load_project",
|
|
description=f"Loaded project from: {os.path.basename(path)}",
|
|
state_before=state_before,
|
|
undo_data={"previous_state": state_before}
|
|
)
|
|
|
|
logger.info("Project loaded successfully: %d tracks",
|
|
len(self.current_project["tracks"]))
|
|
|
|
return {
|
|
"status": "success",
|
|
"tracks_count": len(self.current_project["tracks"]),
|
|
"bpm": self.current_project["bpm"],
|
|
"key": self.current_project["key"],
|
|
"loaded_from": path,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Error loading project: %s", str(e))
|
|
return {
|
|
"status": "error",
|
|
"message": str(e),
|
|
}
|
|
|
|
# =====================================================================
|
|
# T040: Resumen del proyecto
|
|
# =====================================================================
|
|
|
|
def get_project_summary(self) -> Dict[str, Any]:
|
|
"""
|
|
Retorna resumen completo del proyecto actual.
|
|
|
|
Returns:
|
|
Diccionario con BPM, key, tracks, samples, estructura, duración
|
|
"""
|
|
tracks = self.current_project.get("tracks", [])
|
|
|
|
# Contar samples
|
|
sample_count = sum(
|
|
len(track.get("clips", []))
|
|
for track in tracks
|
|
)
|
|
|
|
# Calcular duración
|
|
total_bars = self._calculate_duration()
|
|
bpm = self.current_project.get("bpm", 95.0)
|
|
duration_seconds = (total_bars * 4 * 60) / bpm if bpm > 0 else 0
|
|
|
|
# Info de tracks
|
|
track_info = []
|
|
for track in tracks:
|
|
track_info.append({
|
|
"index": track.get("index", 0),
|
|
"name": track.get("name", "unnamed"),
|
|
"type": track.get("type", "unknown"),
|
|
"role": track.get("role", "unknown"),
|
|
"clip_count": len(track.get("clips", [])),
|
|
"volume": track.get("volume", 0.85),
|
|
})
|
|
|
|
summary = {
|
|
"status": "success",
|
|
"bpm": bpm,
|
|
"key": self.current_project.get("key", "Am"),
|
|
"time_signature": self.current_project.get("time_signature", "4/4"),
|
|
"track_count": len(tracks),
|
|
"tracks": track_info,
|
|
"sample_count": sample_count,
|
|
"structure": self.current_project.get("structure", ""),
|
|
"style": self.current_project.get("style", ""),
|
|
"duration": {
|
|
"bars": total_bars,
|
|
"beats": total_bars * 4,
|
|
"seconds": round(duration_seconds, 2),
|
|
"formatted": self._format_duration(duration_seconds),
|
|
},
|
|
"sections": [
|
|
{"name": s.get("name"), "bars": s.get("bars", 8)}
|
|
for s in self._section_definitions
|
|
],
|
|
"created_at": self.current_project.get("created_at"),
|
|
"last_modified": time.time(),
|
|
}
|
|
|
|
return summary
|
|
|
|
def _format_duration(self, seconds: float) -> str:
|
|
"""Formatea duración en formato mm:ss."""
|
|
minutes = int(seconds // 60)
|
|
secs = int(seconds % 60)
|
|
return f"{minutes}:{secs:02d}"
|
|
|
|
# =====================================================================
|
|
# T041: Sugerir mejoras
|
|
# =====================================================================
|
|
|
|
def suggest_improvements(self) -> Dict[str, Any]:
|
|
"""
|
|
Analiza el proyecto y sugiere mejoras.
|
|
|
|
Returns:
|
|
Sugerencias por tipo: mezcla, composición, samples
|
|
"""
|
|
tracks = self.current_project.get("tracks", [])
|
|
suggestions = {
|
|
"mix": [],
|
|
"composition": [],
|
|
"samples": [],
|
|
"overall": [],
|
|
}
|
|
|
|
# Análisis de mezcla
|
|
self._analyze_mix_suggestions(tracks, suggestions["mix"])
|
|
|
|
# Análisis de composición
|
|
self._analyze_composition_suggestions(tracks, suggestions["composition"])
|
|
|
|
# Análisis de samples
|
|
self._analyze_samples_suggestions(suggestions["samples"])
|
|
|
|
# Sugerencias generales
|
|
if len(tracks) < 4:
|
|
suggestions["overall"].append({
|
|
"priority": "medium",
|
|
"message": "Consider adding more tracks for a fuller sound",
|
|
"action": "Add percussion, FX, or atmospheric elements",
|
|
})
|
|
|
|
if not self.current_project.get("structure"):
|
|
suggestions["overall"].append({
|
|
"priority": "high",
|
|
"message": "No song structure defined",
|
|
"action": "Use generate_complete_reggaeton() to create structured project",
|
|
})
|
|
|
|
return {
|
|
"status": "success",
|
|
"suggestions_count": (
|
|
len(suggestions["mix"]) +
|
|
len(suggestions["composition"]) +
|
|
len(suggestions["samples"]) +
|
|
len(suggestions["overall"])
|
|
),
|
|
"categories": suggestions,
|
|
}
|
|
|
|
def _analyze_mix_suggestions(self, tracks: List[Dict], suggestions: List):
|
|
"""Analiza y sugiere mejoras de mezcla."""
|
|
# Verificar niveles
|
|
high_volume_tracks = [
|
|
t for t in tracks
|
|
if t.get("volume", 0.85) > 0.9
|
|
]
|
|
if high_volume_tracks:
|
|
suggestions.append({
|
|
"priority": "high",
|
|
"message": f"{len(high_volume_tracks)} tracks with high volume (>0.9)",
|
|
"action": "Reduce track volumes and use compression",
|
|
"tracks": [t.get("name") for t in high_volume_tracks],
|
|
})
|
|
|
|
# Verificar panning
|
|
tracks_with_pan = [t for t in tracks if abs(t.get("pan", 0)) > 0.01]
|
|
if len(tracks_with_pan) < len(tracks) / 2:
|
|
suggestions.append({
|
|
"priority": "medium",
|
|
"message": "Many tracks are mono (no panning)",
|
|
"action": "Apply subtle panning to create stereo width",
|
|
})
|
|
|
|
# Verificar sidechain
|
|
kick_track = next((t for t in tracks if "kick" in t.get("name", "").lower()), None)
|
|
bass_track = next((t for t in tracks if "bass" in t.get("name", "").lower()), None)
|
|
if kick_track and bass_track:
|
|
suggestions.append({
|
|
"priority": "medium",
|
|
"message": "Kick and Bass present - sidechain recommended",
|
|
"action": "Apply sidechain compression from kick to bass",
|
|
})
|
|
|
|
def _analyze_composition_suggestions(self, tracks: List[Dict], suggestions: List):
|
|
"""Analiza y sugiere mejoras de composición."""
|
|
# Verificar variedad de notas
|
|
melodic_tracks = [t for t in tracks if t.get("role") in ("melody", "chords")]
|
|
if not melodic_tracks:
|
|
suggestions.append({
|
|
"priority": "high",
|
|
"message": "No melodic/harmonic tracks found",
|
|
"action": "Add chords or melody track for harmonic content",
|
|
})
|
|
|
|
# Verificar estructura
|
|
if len(self._section_definitions) < 3:
|
|
suggestions.append({
|
|
"priority": "medium",
|
|
"message": "Song structure is too simple",
|
|
"action": "Add more sections: build, break, variations",
|
|
})
|
|
|
|
def _analyze_samples_suggestions(self, suggestions: List):
|
|
"""Analiza y sugiere mejoras de samples."""
|
|
# Verificar samples faltantes
|
|
samples = self.current_project.get("samples_used", [])
|
|
if not samples:
|
|
suggestions.append({
|
|
"priority": "medium",
|
|
"message": "No external samples used",
|
|
"action": "Load samples from library using sample_selector",
|
|
})
|
|
|
|
# =====================================================================
|
|
# T042: Comparar con referencia
|
|
# =====================================================================
|
|
|
|
def compare_to_reference(self, reference_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Compara proyecto actual vs referencia.
|
|
|
|
Args:
|
|
reference_path: Ruta al audio de referencia
|
|
|
|
Returns:
|
|
Similitud por dimensiones
|
|
"""
|
|
logger.info("Comparing project to reference: %s", reference_path)
|
|
|
|
if not os.path.isfile(reference_path):
|
|
return {
|
|
"status": "error",
|
|
"message": f"Reference not found: {reference_path}",
|
|
}
|
|
|
|
try:
|
|
# Analizar referencia
|
|
ref_features = analyze_reference(reference_path)
|
|
|
|
if not ref_features:
|
|
return {
|
|
"status": "error",
|
|
"message": "Could not analyze reference",
|
|
}
|
|
|
|
# Comparar dimensiones
|
|
comparisons = {}
|
|
|
|
# BPM
|
|
ref_bpm = ref_features.get("bpm", 95.0)
|
|
proj_bpm = self.current_project.get("bpm", 95.0)
|
|
bpm_diff = abs(ref_bpm - proj_bpm)
|
|
comparisons["bpm"] = {
|
|
"reference": ref_bpm,
|
|
"project": proj_bpm,
|
|
"difference": bpm_diff,
|
|
"similarity": max(0, 1.0 - (bpm_diff / 10.0)), # 0-1 scale
|
|
}
|
|
|
|
# Key
|
|
ref_key = ref_features.get("key", "Am")
|
|
proj_key = self.current_project.get("key", "Am")
|
|
comparisons["key"] = {
|
|
"reference": ref_key,
|
|
"project": proj_key,
|
|
"match": ref_key == proj_key,
|
|
"similarity": 1.0 if ref_key == proj_key else 0.5, # Simple match
|
|
}
|
|
|
|
# Energy profile
|
|
ref_energy = ref_features.get("energy_profile", {})
|
|
# Crear perfil de energía simple del proyecto
|
|
proj_energy = self._estimate_project_energy()
|
|
|
|
comparisons["energy"] = {
|
|
"reference_sections": len(ref_energy.get("sections", [])),
|
|
"project_sections": len(self._section_definitions),
|
|
"similarity": self._compare_energy_profiles(ref_energy, proj_energy),
|
|
}
|
|
|
|
# Calcular similitud general
|
|
similarities = [c["similarity"] for c in comparisons.values()]
|
|
overall_similarity = sum(similarities) / len(similarities) if similarities else 0.0
|
|
|
|
return {
|
|
"status": "success",
|
|
"reference_path": reference_path,
|
|
"overall_similarity": round(overall_similarity, 3),
|
|
"comparisons": comparisons,
|
|
"recommendations": self._generate_comparison_recommendations(comparisons),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Error comparing to reference: %s", str(e))
|
|
return {
|
|
"status": "error",
|
|
"message": str(e),
|
|
}
|
|
|
|
def _estimate_project_energy(self) -> Dict[str, Any]:
|
|
"""Estima perfil de energía del proyecto actual."""
|
|
# Simplificación: usar número de tracks activos como proxy de energía
|
|
tracks = self.current_project.get("tracks", [])
|
|
return {
|
|
"track_count": len(tracks),
|
|
"sections": [
|
|
{"name": s.get("name"), "energy": len(tracks) * 0.1}
|
|
for s in self._section_definitions
|
|
],
|
|
}
|
|
|
|
def _compare_energy_profiles(self, ref: Dict, proj: Dict) -> float:
|
|
"""Compara perfiles de energía y retorna similitud 0-1."""
|
|
ref_sections = len(ref.get("sections", []))
|
|
proj_sections = len(proj.get("sections", []))
|
|
|
|
if ref_sections == 0:
|
|
return 0.0
|
|
|
|
diff = abs(ref_sections - proj_sections)
|
|
return max(0, 1.0 - (diff / max(ref_sections, proj_sections)))
|
|
|
|
def _generate_comparison_recommendations(self, comparisons: Dict) -> List[str]:
|
|
"""Genera recomendaciones basadas en comparaciones."""
|
|
recommendations = []
|
|
|
|
if comparisons["bpm"]["similarity"] < 0.8:
|
|
recommendations.append(
|
|
f"Adjust BPM from {comparisons['bpm']['project']} to {comparisons['bpm']['reference']}"
|
|
)
|
|
|
|
if not comparisons["key"]["match"]:
|
|
recommendations.append(
|
|
f"Consider changing key to {comparisons['key']['reference']}"
|
|
)
|
|
|
|
if comparisons["energy"]["similarity"] < 0.7:
|
|
recommendations.append(
|
|
"Restructure song to match energy progression of reference"
|
|
)
|
|
|
|
return recommendations
|
|
|
|
# =====================================================================
|
|
# T043: Undo
|
|
# =====================================================================
|
|
|
|
def undo_last_action(self) -> Dict[str, Any]:
|
|
"""
|
|
Deshace la última acción realizada.
|
|
|
|
Returns:
|
|
Resultado del undo
|
|
"""
|
|
if not self.history.can_undo():
|
|
return {
|
|
"status": "warning",
|
|
"message": "No actions to undo",
|
|
}
|
|
|
|
record = self.history.undo()
|
|
if record and record.undo_data:
|
|
# Restaurar estado anterior
|
|
previous_state = record.undo_data.get("previous_state")
|
|
if previous_state:
|
|
self.current_project = deepcopy(previous_state)
|
|
|
|
return {
|
|
"status": "success",
|
|
"undone_action": record.action_type if record else None,
|
|
"description": record.description if record else None,
|
|
"can_undo": self.history.can_undo(),
|
|
"can_redo": self.history.can_redo(),
|
|
}
|
|
|
|
# =====================================================================
|
|
# T044: Limpiar proyecto
|
|
# =====================================================================
|
|
|
|
def clear_project(self) -> Dict[str, Any]:
|
|
"""
|
|
Elimina todos los tracks y resetea a estado limpio.
|
|
|
|
Returns:
|
|
Confirmación de limpieza
|
|
"""
|
|
logger.info("Clearing project...")
|
|
|
|
state_before = deepcopy(self.current_project)
|
|
|
|
# Resetear a estado inicial
|
|
self.current_project = {
|
|
"bpm": 95.0,
|
|
"key": "Am",
|
|
"time_signature": "4/4",
|
|
"tracks": [],
|
|
"scenes": [],
|
|
"samples_used": [],
|
|
"structure": "",
|
|
"cleared_at": time.time(),
|
|
}
|
|
self._section_definitions = []
|
|
|
|
# Registrar acción
|
|
self.history.record_action(
|
|
action_type="clear_project",
|
|
description="Cleared all project data",
|
|
state_before=state_before,
|
|
undo_data={"previous_state": state_before}
|
|
)
|
|
|
|
logger.info("Project cleared")
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Project cleared - all tracks and data removed",
|
|
"can_undo": self.history.can_undo(),
|
|
}
|
|
|
|
# =====================================================================
|
|
# T045: Validar proyecto
|
|
# =====================================================================
|
|
|
|
def validate_project(self) -> Dict[str, Any]:
|
|
"""
|
|
Verifica coherencia del proyecto.
|
|
|
|
Verifica:
|
|
- BPM consistente
|
|
- Samples existen
|
|
- No clipping
|
|
|
|
Returns:
|
|
Lista de issues o "valid" si todo está correcto
|
|
"""
|
|
logger.info("Validating project...")
|
|
|
|
# Ejecutar validaciones
|
|
issues = self.validator.validate(self.current_project)
|
|
summary = self.validator.get_summary()
|
|
|
|
logger.info("Validation complete: %d issues found", len(issues))
|
|
|
|
return {
|
|
"status": "success",
|
|
"is_valid": summary["is_valid"],
|
|
"summary": summary,
|
|
"message": "Project is valid" if summary["is_valid"] else f"Found {summary['errors']} errors",
|
|
}
|
|
|
|
# =====================================================================
|
|
# T046: Añadir variación a sección
|
|
# =====================================================================
|
|
|
|
def add_variation_to_section(self, section_index: int) -> Dict[str, Any]:
|
|
"""
|
|
Modifica sección existente con variación.
|
|
|
|
Cambia pattern, añade fills, varía velocity.
|
|
|
|
Args:
|
|
section_index: Índice de la sección a variar
|
|
|
|
Returns:
|
|
Descripción de la variación aplicada
|
|
"""
|
|
logger.info("Adding variation to section %d", section_index)
|
|
|
|
if section_index < 0 or section_index >= len(self._section_definitions):
|
|
return {
|
|
"status": "error",
|
|
"message": f"Invalid section index: {section_index}",
|
|
}
|
|
|
|
state_before = deepcopy(self.current_project)
|
|
section = self._section_definitions[section_index]
|
|
|
|
# Aplicar variaciones
|
|
variations_applied = []
|
|
|
|
# 1. Variar velocity en drums
|
|
for track in self.current_project.get("tracks", []):
|
|
if track.get("role") in ("kick", "snare", "hats"):
|
|
for clip in track.get("clips", []):
|
|
notes = clip.get("notes", [])
|
|
for note in notes:
|
|
# Variar velocity ±20%
|
|
original_vel = note.get("velocity", 100)
|
|
variation = random.uniform(0.8, 1.2)
|
|
note["velocity"] = int(min(127, max(1, original_vel * variation)))
|
|
variations_applied.append(f"Velocity variation on {track['name']}")
|
|
|
|
# 2. Añadir fill al final de la sección
|
|
end_bar = section["start_bar"] + section["bars"]
|
|
end_beat = end_bar * 4
|
|
|
|
# Buscar track de snare para fill
|
|
for track in self.current_project.get("tracks", []):
|
|
if track.get("role") == "snare":
|
|
for clip in track.get("clips", []):
|
|
# Añadir notas de fill
|
|
fill_notes = [
|
|
{"pitch": 38, "start": end_beat - 1.0, "duration": 0.125, "velocity": 110},
|
|
{"pitch": 38, "start": end_beat - 0.75, "duration": 0.125, "velocity": 120},
|
|
{"pitch": 38, "start": end_beat - 0.5, "duration": 0.125, "velocity": 127},
|
|
{"pitch": 38, "start": end_beat - 0.25, "duration": 0.125, "velocity": 100},
|
|
]
|
|
clip["notes"].extend(fill_notes)
|
|
variations_applied.append(f"Snare fill added at bar {end_bar}")
|
|
break
|
|
|
|
# Registrar acción
|
|
self.history.record_action(
|
|
action_type="add_variation",
|
|
description=f"Added variation to section {section_index} ({section['name']})",
|
|
state_before=state_before,
|
|
undo_data={"previous_state": state_before}
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"section": section["name"],
|
|
"section_index": section_index,
|
|
"variations_applied": variations_applied,
|
|
"variation_type": "fill_and_velocity",
|
|
}
|
|
|
|
# =====================================================================
|
|
# T047: Crear transición
|
|
# =====================================================================
|
|
|
|
def create_transition(self, from_section: int, to_section: int,
|
|
type: str = "riser") -> Dict[str, Any]:
|
|
"""
|
|
Crea transición entre secciones.
|
|
|
|
Tipos: "riser", "filter_sweep", "break", "build"
|
|
|
|
Args:
|
|
from_section: Índice de sección origen
|
|
to_section: Índice de sección destino
|
|
type: Tipo de transición
|
|
|
|
Returns:
|
|
Descripción de la transición creada
|
|
"""
|
|
logger.info("Creating %s transition from section %d to %d",
|
|
type, from_section, to_section)
|
|
|
|
if from_section < 0 or from_section >= len(self._section_definitions):
|
|
return {"status": "error", "message": f"Invalid from_section: {from_section}"}
|
|
if to_section < 0 or to_section >= len(self._section_definitions):
|
|
return {"status": "error", "message": f"Invalid to_section: {to_section}"}
|
|
|
|
state_before = deepcopy(self.current_project)
|
|
|
|
from_sec = self._section_definitions[from_section]
|
|
to_sec = self._section_definitions[to_section]
|
|
|
|
# Calcular posición de transición (últimos 2 compases de from_section)
|
|
transition_start = (from_sec["start_bar"] + from_sec["bars"] - 2) * 4
|
|
transition_duration = 8.0 # 2 bars = 8 beats
|
|
|
|
transition_data = {
|
|
"type": type,
|
|
"from_section": from_sec["name"],
|
|
"to_section": to_sec["name"],
|
|
"start_beat": transition_start,
|
|
"duration": transition_duration,
|
|
"effects_applied": [],
|
|
}
|
|
|
|
# Aplicar efectos según tipo
|
|
if type == "riser":
|
|
# Crear notas de riser en melodía
|
|
for track in self.current_project.get("tracks", []):
|
|
if track.get("role") == "melody":
|
|
riser_notes = []
|
|
for beat in range(8):
|
|
pitch = 60 + beat # Subir pitch progresivamente
|
|
velocity = 60 + (beat * 8) # Subir velocity
|
|
riser_notes.append({
|
|
"pitch": pitch,
|
|
"start": transition_start + beat,
|
|
"duration": 0.5,
|
|
"velocity": min(127, velocity),
|
|
})
|
|
for clip in track.get("clips", []):
|
|
clip["notes"].extend(riser_notes)
|
|
transition_data["effects_applied"].append("Pitch riser notes")
|
|
break
|
|
|
|
elif type == "filter_sweep":
|
|
# Simular sweep con automatización de volume
|
|
for track in self.current_project.get("tracks", []):
|
|
if track.get("role") in ("chords", "melody"):
|
|
# Reducir volumen progresivamente
|
|
original_vol = track.get("volume", 0.8)
|
|
track["transition_filter"] = {
|
|
"type": "lowpass",
|
|
"start_freq": 20000,
|
|
"end_freq": 500,
|
|
"automation": "sweep_down",
|
|
}
|
|
transition_data["effects_applied"].append(f"Filter sweep on {track['name']}")
|
|
|
|
elif type == "break":
|
|
# Silenciar drums por 1 compás
|
|
for track in self.current_project.get("tracks", []):
|
|
if track.get("role") in ("kick", "snare", "hats"):
|
|
track["transition_break"] = {
|
|
"mute_at": transition_start + 4.0,
|
|
"duration": 4.0,
|
|
}
|
|
transition_data["effects_applied"].append(f"Break on {track['name']}")
|
|
|
|
elif type == "build":
|
|
# Añadir percusión creciente
|
|
build_notes = []
|
|
for beat in range(8):
|
|
if beat % 2 == 0:
|
|
build_notes.append({
|
|
"pitch": 37, # Perc note
|
|
"start": transition_start + beat,
|
|
"duration": 0.25,
|
|
"velocity": 70 + (beat * 7),
|
|
})
|
|
|
|
# Añadir a track de percusión o FX
|
|
for track in self.current_project.get("tracks", []):
|
|
if track.get("role") in ("hats", "fx"):
|
|
for clip in track.get("clips", []):
|
|
clip["notes"].extend(build_notes)
|
|
transition_data["effects_applied"].append(f"Build percussion on {track['name']}")
|
|
break
|
|
|
|
# Registrar acción
|
|
self.history.record_action(
|
|
action_type="create_transition",
|
|
description=f"Created {type} transition from {from_sec['name']} to {to_sec['name']}",
|
|
state_before=state_before,
|
|
undo_data={"previous_state": state_before}
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"transition": transition_data,
|
|
}
|
|
|
|
# =====================================================================
|
|
# T048: Humanizar track
|
|
# =====================================================================
|
|
|
|
def humanize_track(self, track_index: int, intensity: float = 0.5) -> Dict[str, Any]:
|
|
"""
|
|
Aplica human feel a un track.
|
|
|
|
Efectos: timing, velocity, length variation.
|
|
Intensidad 0.0-1.0.
|
|
|
|
Args:
|
|
track_index: Índice del track a humanizar
|
|
intensity: Intensidad de humanización (0.0 - 1.0)
|
|
|
|
Returns:
|
|
Resultado de la humanización
|
|
"""
|
|
logger.info("Humanizing track %d with intensity %.2f", track_index, intensity)
|
|
|
|
tracks = self.current_project.get("tracks", [])
|
|
if track_index < 0 or track_index >= len(tracks):
|
|
return {
|
|
"status": "error",
|
|
"message": f"Invalid track index: {track_index}",
|
|
}
|
|
|
|
state_before = deepcopy(self.current_project)
|
|
track = tracks[track_index]
|
|
|
|
# Limitar intensidad
|
|
intensity = max(0.0, min(1.0, intensity))
|
|
|
|
modifications = {
|
|
"timing_changes": 0,
|
|
"velocity_changes": 0,
|
|
"duration_changes": 0,
|
|
}
|
|
|
|
for clip in track.get("clips", []):
|
|
notes = clip.get("notes", [])
|
|
|
|
for note in notes:
|
|
# 1. Timing variation: ±5-20ms según intensidad
|
|
timing_var = (random.random() - 0.5) * 0.05 * intensity
|
|
note["start"] = note.get("start", 0) + timing_var
|
|
modifications["timing_changes"] += 1
|
|
|
|
# 2. Velocity variation: ±10-30% según intensidad
|
|
original_vel = note.get("velocity", 100)
|
|
vel_var = 1.0 + (random.random() - 0.5) * 0.3 * intensity
|
|
note["velocity"] = int(min(127, max(1, original_vel * vel_var)))
|
|
modifications["velocity_changes"] += 1
|
|
|
|
# 3. Duration variation: ±5-15% según intensidad
|
|
original_dur = note.get("duration", 0.25)
|
|
dur_var = 1.0 + (random.random() - 0.5) * 0.15 * intensity
|
|
note["duration"] = original_dur * dur_var
|
|
modifications["duration_changes"] += 1
|
|
|
|
# Registrar acción
|
|
self.history.record_action(
|
|
action_type="humanize",
|
|
description=f"Humanized track {track_index} ({track['name']}) at {intensity:.0%} intensity",
|
|
state_before=state_before,
|
|
undo_data={"previous_state": state_before}
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"track_index": track_index,
|
|
"track_name": track.get("name"),
|
|
"intensity": intensity,
|
|
"modifications": modifications,
|
|
}
|
|
|
|
# =====================================================================
|
|
# T049: Aplicar groove
|
|
# =====================================================================
|
|
|
|
def apply_groove(self, track_index: int, groove_template: str) -> Dict[str, Any]:
|
|
"""
|
|
Aplica groove/shuffle a un track.
|
|
|
|
Templates: "swing_16", "swing_8", "straight", "moombahton"
|
|
|
|
Args:
|
|
track_index: Índice del track
|
|
groove_template: Nombre del template de groove
|
|
|
|
Returns:
|
|
Resultado de la aplicación de groove
|
|
"""
|
|
logger.info("Applying groove '%s' to track %d", groove_template, track_index)
|
|
|
|
tracks = self.current_project.get("tracks", [])
|
|
if track_index < 0 or track_index >= len(tracks):
|
|
return {
|
|
"status": "error",
|
|
"message": f"Invalid track index: {track_index}",
|
|
}
|
|
|
|
if groove_template not in self.GROOVE_TEMPLATES:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Unknown groove template: {groove_template}",
|
|
"available_templates": list(self.GROOVE_TEMPLATES.keys()),
|
|
}
|
|
|
|
state_before = deepcopy(self.current_project)
|
|
track = tracks[track_index]
|
|
template = self.GROOVE_TEMPLATES[groove_template]
|
|
|
|
timing_offset = template["timing_offset"]
|
|
velocity_var = template["velocity_variation"]
|
|
|
|
notes_modified = 0
|
|
|
|
for clip in track.get("clips", []):
|
|
notes = clip.get("notes", [])
|
|
|
|
for note in notes:
|
|
start = note.get("start", 0)
|
|
|
|
# Aplicar swing a notas en subdivisiones de 8avas o 16avas
|
|
beat_in_bar = start % 4.0
|
|
is_swing_beat = (beat_in_bar % 0.5) > 0.01 # Notas entre golpes fuertes
|
|
|
|
if is_swing_beat:
|
|
# Desplazar timing
|
|
note["start"] = start + timing_offset
|
|
|
|
# Variar velocity
|
|
original_vel = note.get("velocity", 100)
|
|
vel_change = 1.0 + (random.random() - 0.5) * velocity_var
|
|
note["velocity"] = int(min(127, max(1, original_vel * vel_change)))
|
|
|
|
notes_modified += 1
|
|
|
|
# Guardar info de groove aplicado
|
|
track["groove_applied"] = {
|
|
"template": groove_template,
|
|
"timing_offset": timing_offset,
|
|
"notes_affected": notes_modified,
|
|
}
|
|
|
|
# Registrar acción
|
|
self.history.record_action(
|
|
action_type="apply_groove",
|
|
description=f"Applied {groove_template} groove to track {track_index}",
|
|
state_before=state_before,
|
|
undo_data={"previous_state": state_before}
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"track_index": track_index,
|
|
"track_name": track.get("name"),
|
|
"groove_template": groove_template,
|
|
"notes_modified": notes_modified,
|
|
"template_params": template,
|
|
}
|
|
|
|
# =====================================================================
|
|
# T050: Crear automatización de FX
|
|
# =====================================================================
|
|
|
|
def create_fx_automation(self, track_index: int, fx_type: str,
|
|
section: int) -> Dict[str, Any]:
|
|
"""
|
|
Crea automatización de FX.
|
|
|
|
Tipos: "filter_sweep", "reverb_duck", "delay_wash", "volume_fade"
|
|
|
|
Args:
|
|
track_index: Índice del track
|
|
fx_type: Tipo de efecto
|
|
section: Índice de sección donde aplicar
|
|
|
|
Returns:
|
|
Descripción de la automatización creada
|
|
"""
|
|
logger.info("Creating %s FX automation on track %d, section %d",
|
|
fx_type, track_index, section)
|
|
|
|
tracks = self.current_project.get("tracks", [])
|
|
if track_index < 0 or track_index >= len(tracks):
|
|
return {
|
|
"status": "error",
|
|
"message": f"Invalid track index: {track_index}",
|
|
}
|
|
|
|
if section < 0 or section >= len(self._section_definitions):
|
|
return {
|
|
"status": "error",
|
|
"message": f"Invalid section: {section}",
|
|
}
|
|
|
|
state_before = deepcopy(self.current_project)
|
|
track = tracks[track_index]
|
|
sec = self._section_definitions[section]
|
|
|
|
# Calcular rango de beats para la sección
|
|
start_beat = sec["start_bar"] * 4
|
|
end_beat = start_beat + (sec["bars"] * 4)
|
|
|
|
automation_data = {
|
|
"fx_type": fx_type,
|
|
"track": track.get("name"),
|
|
"section": sec["name"],
|
|
"start_beat": start_beat,
|
|
"end_beat": end_beat,
|
|
"automation_points": [],
|
|
}
|
|
|
|
if fx_type == "filter_sweep":
|
|
# Sweep de filtro: cerrar -> abrir o viceversa
|
|
points = [
|
|
{"beat": start_beat, "value": 0.1, "parameter": "filter_freq"},
|
|
{"beat": start_beat + (end_beat - start_beat) / 2, "value": 0.5, "parameter": "filter_freq"},
|
|
{"beat": end_beat, "value": 1.0, "parameter": "filter_freq"},
|
|
]
|
|
automation_data["automation_points"] = points
|
|
automation_data["description"] = "Filter sweep up"
|
|
|
|
elif fx_type == "reverb_duck":
|
|
# Ducking de reverb: alto -> bajo durante transients
|
|
points = [
|
|
{"beat": start_beat, "value": 0.8, "parameter": "reverb_wet"},
|
|
{"beat": start_beat + 1, "value": 0.3, "parameter": "reverb_wet"},
|
|
{"beat": start_beat + 2, "value": 0.8, "parameter": "reverb_wet"},
|
|
]
|
|
automation_data["automation_points"] = points
|
|
automation_data["description"] = "Reverb ducking on beats"
|
|
|
|
elif fx_type == "delay_wash":
|
|
# Wash de delay creciente
|
|
points = [
|
|
{"beat": start_beat, "value": 0.1, "parameter": "delay_wet"},
|
|
{"beat": end_beat - 4, "value": 0.3, "parameter": "delay_wet"},
|
|
{"beat": end_beat, "value": 0.6, "parameter": "delay_wet"},
|
|
]
|
|
automation_data["automation_points"] = points
|
|
automation_data["description"] = "Delay wash build"
|
|
|
|
elif fx_type == "volume_fade":
|
|
# Fade in o fade out según posición en canción
|
|
if section == 0: # Intro
|
|
points = [
|
|
{"beat": start_beat, "value": 0.0, "parameter": "volume"},
|
|
{"beat": end_beat, "value": 1.0, "parameter": "volume"},
|
|
]
|
|
automation_data["description"] = "Volume fade in"
|
|
elif section == len(self._section_definitions) - 1: # Outro
|
|
points = [
|
|
{"beat": start_beat, "value": 1.0, "parameter": "volume"},
|
|
{"beat": end_beat - 4, "value": 0.7, "parameter": "volume"},
|
|
{"beat": end_beat, "value": 0.0, "parameter": "volume"},
|
|
]
|
|
automation_data["description"] = "Volume fade out"
|
|
else:
|
|
points = [
|
|
{"beat": start_beat, "value": 0.9, "parameter": "volume"},
|
|
{"beat": end_beat, "value": 0.9, "parameter": "volume"},
|
|
]
|
|
automation_data["description"] = "Volume maintained"
|
|
automation_data["automation_points"] = points
|
|
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Unknown FX type: {fx_type}",
|
|
"available_types": ["filter_sweep", "reverb_duck", "delay_wash", "volume_fade"],
|
|
}
|
|
|
|
# Guardar automatización en el track
|
|
if "automation" not in track:
|
|
track["automation"] = []
|
|
track["automation"].append(automation_data)
|
|
|
|
# Registrar acción
|
|
self.history.record_action(
|
|
action_type="fx_automation",
|
|
description=f"Created {fx_type} automation on track {track_index}, section {section}",
|
|
state_before=state_before,
|
|
undo_data={"previous_state": state_before}
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"automation": automation_data,
|
|
}
|
|
|
|
# =====================================================================
|
|
# Métodos adicionales de utilidad
|
|
# =====================================================================
|
|
|
|
def get_recent_history(self, count: int = 10) -> List[Dict[str, Any]]:
|
|
"""Retorna historial reciente de acciones."""
|
|
return self.history.get_recent_actions(count)
|
|
|
|
def redo_action(self) -> Dict[str, Any]:
|
|
"""Rehace la última acción deshecha."""
|
|
if not self.history.can_redo():
|
|
return {
|
|
"status": "warning",
|
|
"message": "No actions to redo",
|
|
}
|
|
|
|
record = self.history.redo()
|
|
return {
|
|
"status": "success",
|
|
"redone_action": record.action_type if record else None,
|
|
"description": record.description if record else None,
|
|
"can_undo": self.history.can_undo(),
|
|
"can_redo": self.history.can_redo(),
|
|
}
|
|
|
|
|
|
# Instancia global
|
|
_workflow_instance: Optional[ProductionWorkflow] = None
|
|
|
|
|
|
def get_workflow() -> ProductionWorkflow:
|
|
"""Retorna instancia global del workflow."""
|
|
global _workflow_instance
|
|
if _workflow_instance is None:
|
|
_workflow_instance = ProductionWorkflow()
|
|
return _workflow_instance
|
|
|
|
|
|
class WorkflowEngine:
|
|
"""Compatibility wrapper expected by server.py."""
|
|
|
|
def __init__(self):
|
|
self._workflow = get_workflow()
|
|
|
|
def _preset_manager(self):
|
|
from .preset_system import get_preset_manager
|
|
|
|
return get_preset_manager()
|
|
|
|
def export_project(self, path: str, format: str = "als") -> Dict[str, Any]:
|
|
result = self._workflow.export_project(path, format)
|
|
exported_files = result.get("exported_files", [])
|
|
return {
|
|
"success": result.get("status") == "success",
|
|
"export_path": exported_files[0] if exported_files else path,
|
|
"duration": self._workflow.get_project_summary().get("duration", {}).get("formatted"),
|
|
"file_size": None,
|
|
"files": exported_files,
|
|
"message": result.get("message", ""),
|
|
}
|
|
|
|
def get_project_summary(self) -> Dict[str, Any]:
|
|
summary = self._workflow.get_project_summary()
|
|
tracks = summary.get("tracks", [])
|
|
return {
|
|
"track_count": summary.get("track_count", 0),
|
|
"midi_tracks": len([t for t in tracks if t.get("type") == "midi"]),
|
|
"audio_tracks": len([t for t in tracks if t.get("type") == "audio"]),
|
|
"return_tracks": 0,
|
|
"clips": sum(len(t.get("clips", [])) for t in tracks),
|
|
"scenes": len(summary.get("sections", [])),
|
|
"devices_used": [d for t in tracks for d in t.get("devices", [])],
|
|
"duration_minutes": round(summary.get("duration", {}).get("seconds", 0) / 60.0, 2),
|
|
"project_name": "AbletonMCP Project",
|
|
}
|
|
|
|
def suggest_improvements(self) -> Dict[str, Any]:
|
|
result = self._workflow.suggest_improvements()
|
|
categories = result.get("categories", {})
|
|
suggestions = []
|
|
for items in categories.values():
|
|
suggestions.extend(items)
|
|
return {
|
|
"suggestions": suggestions,
|
|
"priority": "high" if any(s.get("priority") == "high" for s in suggestions) else "medium",
|
|
"categories": categories,
|
|
"estimated_impact": "medium" if suggestions else "low",
|
|
}
|
|
|
|
def validate_project(self) -> Dict[str, Any]:
|
|
result = self._workflow.validate_project()
|
|
summary = result.get("summary", {})
|
|
issues = summary.get("issues", [])
|
|
return {
|
|
"is_valid": result.get("is_valid", False),
|
|
"issues": [i for i in issues if i.get("severity") == "error"],
|
|
"warnings": [i for i in issues if i.get("severity") == "warning"],
|
|
"passed_checks": [],
|
|
"score": max(0, 100 - (len(issues) * 10)),
|
|
}
|
|
|
|
def load_preset(self, preset_name: str) -> Dict[str, Any]:
|
|
manager = self._preset_manager()
|
|
preset = manager.load_preset(preset_name)
|
|
if preset is None:
|
|
return {"success": False, "message": f"Preset not found: {preset_name}"}
|
|
|
|
self._workflow.current_project.update({
|
|
"bpm": preset.bpm,
|
|
"key": preset.key,
|
|
"style": preset.style,
|
|
"structure": preset.structure,
|
|
"tracks": [{
|
|
"name": track.name,
|
|
"type": track.track_type,
|
|
"role": track.role,
|
|
"volume": track.volume,
|
|
"pan": track.pan,
|
|
"devices": list(track.device_chain),
|
|
"clips": [],
|
|
"sample_criteria": dict(track.sample_criteria),
|
|
} for track in preset.tracks_config],
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"tracks_loaded": len(preset.tracks_config),
|
|
"devices_loaded": sum(len(track.device_chain) for track in preset.tracks_config),
|
|
"samples_loaded": [
|
|
track.sample_criteria for track in preset.tracks_config if track.sample_criteria
|
|
],
|
|
}
|
|
|
|
def save_as_preset(self, name: str, description: str = "") -> Dict[str, Any]:
|
|
manager = self._preset_manager()
|
|
config = deepcopy(self._workflow.current_project)
|
|
if description:
|
|
config["description"] = description
|
|
|
|
success = manager.save_as_preset(config, name)
|
|
return {
|
|
"success": bool(success),
|
|
"path": str(manager._get_preset_path(name)),
|
|
"tracks_included": len(config.get("tracks", [])),
|
|
"message": "" if success else f"Failed to save preset: {name}",
|
|
}
|
|
|
|
def list_presets(self) -> Dict[str, Any]:
|
|
manager = self._preset_manager()
|
|
presets = manager.list_presets()
|
|
categories = sorted({p.get("style", "") for p in presets if p.get("style")})
|
|
return {"presets": presets, "count": len(presets), "categories": categories}
|
|
|
|
def create_custom_preset(self, name: str, description: str = "") -> Dict[str, Any]:
|
|
manager = self._preset_manager()
|
|
config = deepcopy(self._workflow.current_project)
|
|
preset = manager.create_custom_preset(config, name, description)
|
|
if preset is None:
|
|
return {"success": False, "message": f"Failed to create preset: {name}"}
|
|
|
|
return {
|
|
"success": True,
|
|
"base_tracks": [track.name for track in preset.tracks_config],
|
|
"path": str(manager._get_preset_path(name)),
|
|
}
|
|
|
|
def get_workflow_status(self) -> Dict[str, Any]:
|
|
project = self._workflow.current_project
|
|
tracks = project.get("tracks", [])
|
|
recent = self._workflow.get_recent_history(5)
|
|
|
|
phase = "idle"
|
|
if project.get("structure"):
|
|
phase = "structured"
|
|
if tracks:
|
|
phase = "production"
|
|
if recent:
|
|
phase = recent[0].get("action_type", phase)
|
|
|
|
progress = 0
|
|
if tracks:
|
|
progress = min(100, 20 + len(tracks) * 10)
|
|
if project.get("structure"):
|
|
progress = min(100, progress + 10)
|
|
|
|
return {
|
|
"phase": phase,
|
|
"progress": progress,
|
|
"current_task": recent[0].get("description", "Idle") if recent else "Idle",
|
|
"completed": [item.get("description", "") for item in recent],
|
|
"pending": [],
|
|
"errors": [],
|
|
"eta": "unknown" if progress < 100 else "complete",
|
|
}
|
|
|
|
def get_production_report(self) -> Dict[str, Any]:
|
|
project = self._workflow.current_project
|
|
tracks = project.get("tracks", [])
|
|
midi_clips = 0
|
|
audio_clips = 0
|
|
devices = []
|
|
samples = []
|
|
|
|
for track in tracks:
|
|
devices.extend(track.get("devices", []))
|
|
for clip in track.get("clips", []):
|
|
if clip.get("notes"):
|
|
midi_clips += 1
|
|
else:
|
|
audio_clips += 1
|
|
sample_ref = clip.get("sample") or clip.get("sample_path")
|
|
if sample_ref:
|
|
samples.append(sample_ref)
|
|
|
|
summary = self._workflow.get_project_summary()
|
|
recent = self._workflow.get_recent_history(10)
|
|
|
|
return {
|
|
"project_name": "AbletonMCP Project",
|
|
"duration": summary.get("duration", {}).get("formatted", "0:00"),
|
|
"total_tracks": len(tracks),
|
|
"midi_clips": midi_clips,
|
|
"audio_clips": audio_clips,
|
|
"devices": devices,
|
|
"samples": samples,
|
|
"production_time": len(recent),
|
|
"exports": [],
|
|
"quality_score": 0,
|
|
}
|
|
|
|
def set_parallel_processing(self, enabled: bool = True) -> Dict[str, Any]:
|
|
self._workflow._parallel_processing_enabled = bool(enabled)
|
|
max_workers = min(8, os.cpu_count() or 4) if enabled else 1
|
|
return {
|
|
"success": True,
|
|
"max_workers": max_workers,
|
|
"operations": ["analyze", "generate", "render"] if enabled else [],
|
|
}
|
|
|
|
def get_progress_report(self) -> Dict[str, Any]:
|
|
status = self.get_workflow_status()
|
|
return {
|
|
"completion": status.get("progress", 0),
|
|
"phases_completed": status.get("completed", []),
|
|
"current_phase": status.get("phase", "idle"),
|
|
"tasks_done": len(status.get("completed", [])),
|
|
"tasks_total": max(1, len(status.get("completed", []))),
|
|
"time_invested": f"{len(status.get('completed', [])) * 5}m",
|
|
"milestones": status.get("completed", []),
|
|
}
|