""" Workflow Engine - Motor de workflow completo para producción profesional. Este módulo proporciona la clase ProductionWorkflow para gestionar pipelines completos de producción musical en Ableton Live, incluyendo generación, edición, mezcla y exportación de proyectos. Métodos T036-T050 implementados: - T036: generate_complete_reggaeton() - T037: generate_from_reference() - T038: export_project() - T039: load_project() - T040: get_project_summary() - T041: suggest_improvements() - T042: compare_to_reference() - T043: undo_last_action() - T044: clear_project() - T045: validate_project() - T046: add_variation_to_section() - T047: create_transition() - T048: humanize_track() - T049: apply_groove() - T050: create_fx_automation() Utilidades incluidas: - ActionHistory: Sistema de historial para undo - ProjectValidator: Validaciones de coherencia del proyecto - ExportManager: Exportación de configuración y metadatos """ import json import logging import os import random import time from copy import deepcopy from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union # Import engines from .sample_selector import get_selector, SampleInfo, DrumKit, InstrumentGroup from .song_generator import get_song_generator, SongGenerator from .reference_matcher import get_recommended_samples, get_user_profile, analyze_reference from .libreria_analyzer import analyze_library, LibreriaAnalyzer logger = logging.getLogger("WorkflowEngine") @dataclass class ActionRecord: """Registro de una acción para el sistema de undo.""" action_type: str timestamp: float description: str state_before: Dict[str, Any] state_after: Optional[Dict[str, Any]] = None undo_data: Optional[Dict[str, Any]] = None def to_dict(self) -> Dict[str, Any]: return { "action_type": self.action_type, "timestamp": self.timestamp, "description": self.description, "state_before": self.state_before, "state_after": self.state_after, "undo_data": self.undo_data, } class ActionHistory: """ Sistema de historial de acciones para soporte de undo/redo. Mantiene un stack de acciones ejecutadas con su estado anterior para permitir deshacer cambios en el proyecto. """ def __init__(self, max_history: int = 50): self._history: List[ActionRecord] = [] self._redo_stack: List[ActionRecord] = [] self._max_history = max_history self._current_project_state: Dict[str, Any] = {} def record_action(self, action_type: str, description: str, state_before: Dict[str, Any], undo_data: Optional[Dict[str, Any]] = None) -> ActionRecord: """Registra una nueva acción en el historial.""" record = ActionRecord( action_type=action_type, timestamp=time.time(), description=description, state_before=state_before, undo_data=undo_data ) self._history.append(record) # Limitar tamaño del historial if len(self._history) > self._max_history: self._history.pop(0) # Limpiar redo stack cuando se hace una nueva acción self._redo_stack.clear() logger.debug("Recorded action: %s - %s", action_type, description) return record def update_state_after(self, record: ActionRecord, state_after: Dict[str, Any]): """Actualiza el estado posterior de una acción.""" record.state_after = state_after def can_undo(self) -> bool: """Verifica si hay acciones para deshacer.""" return len(self._history) > 0 def can_redo(self) -> bool: """Verifica si hay acciones para rehacer.""" return len(self._redo_stack) > 0 def undo(self) -> Optional[ActionRecord]: """ Deshace la última acción. Returns: ActionRecord de la acción deshecha, o None si no hay nada para deshacer. """ if not self._history: logger.warning("No actions to undo") return None record = self._history.pop() self._redo_stack.append(record) logger.info("Undid action: %s - %s", record.action_type, record.description) return record def redo(self) -> Optional[ActionRecord]: """ Rehace la última acción deshecha. Returns: ActionRecord de la acción rehecha, o None si no hay nada para rehacer. """ if not self._redo_stack: logger.warning("No actions to redo") return None record = self._redo_stack.pop() self._history.append(record) logger.info("Redid action: %s - %s", record.action_type, record.description) return record def get_recent_actions(self, count: int = 10) -> List[Dict[str, Any]]: """Retorna las últimas N acciones como diccionarios.""" recent = self._history[-count:] if count < len(self._history) else self._history return [r.to_dict() for r in reversed(recent)] def clear(self): """Limpia todo el historial.""" self._history.clear() self._redo_stack.clear() logger.info("Action history cleared") @dataclass class ValidationIssue: """Representa un problema de validación encontrado.""" severity: str # "error", "warning", "info" category: str # "bpm", "samples", "levels", "routing", "structure" message: str track_index: Optional[int] = None suggestion: Optional[str] = None class ProjectValidator: """ Validador de coherencia para proyectos de Ableton Live. Verifica: - Consistencia de BPM entre tracks - Existencia de archivos de samples - Niveles de audio (clipping) - Configuración de routing - Estructura del proyecto """ def __init__(self): self.issues: List[ValidationIssue] = [] def validate(self, project_state: Dict[str, Any]) -> List[ValidationIssue]: """ Ejecuta todas las validaciones sobre el estado del proyecto. Args: project_state: Diccionario con el estado actual del proyecto Returns: Lista de ValidationIssue encontradas """ self.issues = [] self._validate_bpm_consistency(project_state) self._validate_samples_exist(project_state) self._validate_audio_levels(project_state) self._validate_routing(project_state) self._validate_structure(project_state) return self.issues def _validate_bpm_consistency(self, state: Dict[str, Any]): """Verifica que todos los clips tengan BPM consistente.""" master_bpm = state.get("bpm", 0) if master_bpm == 0: self.issues.append(ValidationIssue( severity="error", category="bpm", message="BPM del proyecto no configurado", suggestion="Establecer BPM usando set_tempo()" )) return # Verificar clips con BPM diferente for track_idx, track in enumerate(state.get("tracks", [])): for clip in track.get("clips", []): clip_bpm = clip.get("bpm") if clip_bpm and abs(clip_bpm - master_bpm) > 1.0: self.issues.append(ValidationIssue( severity="warning", category="bpm", message=f"Clip en track {track_idx} tiene BPM {clip_bpm:.1f} (master: {master_bpm:.1f})", track_index=track_idx, suggestion="Warp el clip al BPM del proyecto o ajustar tempo" )) def _validate_samples_exist(self, state: Dict[str, Any]): """Verifica que los archivos de samples existan.""" for track_idx, track in enumerate(state.get("tracks", [])): for clip in track.get("clips", []): file_path = clip.get("file_path") if file_path and not os.path.isfile(file_path): self.issues.append(ValidationIssue( severity="error", category="samples", message=f"Sample no encontrado: {file_path}", track_index=track_idx, suggestion="Verificar ruta o reemplazar sample" )) def _validate_audio_levels(self, state: Dict[str, Any]): """Verifica niveles de audio (clipping).""" master_vol = state.get("master_volume", 0.85) # Verificar master if master_vol > 0.95: self.issues.append(ValidationIssue( severity="warning", category="levels", message=f"Master volume alto ({master_vol:.2f}), riesgo de clipping", suggestion="Reducir master volume a ~0.85 o aplicar limiter" )) # Verificar tracks individuales for track_idx, track in enumerate(state.get("tracks", [])): vol = track.get("volume", 0.85) if vol > 0.95: self.issues.append(ValidationIssue( severity="warning", category="levels", message=f"Track {track_idx} volume alto ({vol:.2f})", track_index=track_idx, suggestion="Reducir volumen o aplicar compresión" )) def _validate_routing(self, state: Dict[str, Any]): """Verifica configuración de routing.""" tracks = state.get("tracks", []) # Verificar que haya buses de retorno configurados return_tracks = state.get("return_tracks", []) if len(return_tracks) == 0: self.issues.append(ValidationIssue( severity="info", category="routing", message="No hay pistas de retorno configuradas", suggestion="Crear buses para reverb, delay, etc." )) # Verificar tracks sin output asignado for track_idx, track in enumerate(tracks): if not track.get("output_routing"): self.issues.append(ValidationIssue( severity="info", category="routing", message=f"Track {track_idx} sin ruteo de salida específico", track_index=track_idx, suggestion="Configurar envío a bus de drums, synths, etc." )) def _validate_structure(self, state: Dict[str, Any]): """Verifica estructura del proyecto.""" tracks = state.get("tracks", []) if len(tracks) == 0: self.issues.append(ValidationIssue( severity="error", category="structure", message="Proyecto sin tracks", suggestion="Crear tracks usando generate_complete_reggaeton()" )) return # Verificar que haya variedad de roles roles = set() for track in tracks: name = track.get("name", "").lower() if "kick" in name or "bass" in name: roles.add("drums_bass") elif "snare" in name or "clap" in name: roles.add("percussion") elif "synth" in name or "chord" in name or "melody" in name: roles.add("harmonic") elif "fx" in name: roles.add("fx") if len(roles) < 2: self.issues.append(ValidationIssue( severity="warning", category="structure", message=f"Proyecto con poca variedad ({len(roles)} tipos de tracks)", suggestion="Añadir tracks de diferentes roles: drums, bass, synths, fx" )) def get_summary(self) -> Dict[str, Any]: """Retorna resumen de validación.""" errors = sum(1 for i in self.issues if i.severity == "error") warnings = sum(1 for i in self.issues if i.severity == "warning") info = sum(1 for i in self.issues if i.severity == "info") return { "total_issues": len(self.issues), "errors": errors, "warnings": warnings, "info": info, "is_valid": errors == 0, "issues": [ { "severity": i.severity, "category": i.category, "message": i.message, "track_index": i.track_index, "suggestion": i.suggestion, } for i in self.issues ] } class ExportManager: """ Gestor de exportación de proyectos. Maneja: - Exportación de configuración a JSON - Listas de samples utilizados - Metadatos del proyecto """ def __init__(self, export_dir: Optional[str] = None): if export_dir is None: export_dir = os.path.join( os.path.expanduser("~"), "Documents", "AbletonMCP_Exports" ) self.export_dir = Path(export_dir) self.export_dir.mkdir(parents=True, exist_ok=True) def export_project_config(self, project_state: Dict[str, Any], filename: Optional[str] = None) -> str: """ Exporta configuración del proyecto a JSON. Args: project_state: Estado completo del proyecto filename: Nombre de archivo opcional Returns: Ruta al archivo exportado """ if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"project_{timestamp}.json" export_path = self.export_dir / filename export_data = { "version": "1.0", "export_date": datetime.now().isoformat(), "project": project_state, "samples_used": self._extract_samples_list(project_state), "settings": { "bpm": project_state.get("bpm"), "key": project_state.get("key"), "time_signature": project_state.get("time_signature", "4/4"), } } with open(export_path, 'w', encoding='utf-8') as f: json.dump(export_data, f, indent=2, ensure_ascii=False) logger.info("Project exported to: %s", export_path) return str(export_path) def export_samples_list(self, project_state: Dict[str, Any], filename: Optional[str] = None) -> str: """ Exporta solo la lista de samples a JSON. Args: project_state: Estado del proyecto filename: Nombre de archivo opcional Returns: Ruta al archivo exportado """ if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"samples_{timestamp}.json" export_path = self.export_dir / filename samples_data = { "export_date": datetime.now().isoformat(), "samples": self._extract_samples_list(project_state), } with open(export_path, 'w', encoding='utf-8') as f: json.dump(samples_data, f, indent=2, ensure_ascii=False) logger.info("Samples list exported to: %s", export_path) return str(export_path) def _extract_samples_list(self, state: Dict[str, Any]) -> List[Dict[str, Any]]: """Extrae lista de samples del estado del proyecto.""" samples = [] for track_idx, track in enumerate(state.get("tracks", [])): track_name = track.get("name", f"Track {track_idx}") for clip in track.get("clips", []): file_path = clip.get("file_path") if file_path: samples.append({ "track": track_name, "track_index": track_idx, "file_path": file_path, "clip_name": clip.get("name", ""), "role": clip.get("role", "unknown"), }) return samples def load_project_config(self, filepath: str) -> Dict[str, Any]: """ Carga configuración de proyecto desde JSON. Args: filepath: Ruta al archivo JSON Returns: Diccionario con la configuración cargada """ with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) logger.info("Project config loaded from: %s", filepath) return data class ProductionWorkflow: """ Motor de workflow completo para producción profesional en Ableton Live. Proporciona métodos de alto nivel para: - Generación completa de tracks (T036) - Generación basada en referencia (T037) - Exportación de proyectos (T038) - Carga de proyectos (T039) - Análisis y sugerencias (T040-T042) - Gestión de acciones (T043-T044) - Validación (T045) - Edición creativa (T046-T050) Attributes: history: ActionHistory para undo/redo validator: ProjectValidator para validaciones export_manager: ExportManager para exportación current_project: Estado actual del proyecto """ # Pattern library para notas MIDI PATTERN_LIBRARY = { "dembow_kick": [ {"pitch": 36, "start": 0.0, "duration": 0.25, "velocity": 127}, {"pitch": 36, "start": 2.0, "duration": 0.25, "velocity": 110}, ], "dembow_snare": [ {"pitch": 38, "start": 1.0, "duration": 0.25, "velocity": 120}, {"pitch": 38, "start": 3.0, "duration": 0.25, "velocity": 120}, ], "dembow_hats": [ {"pitch": 42, "start": 0.0, "duration": 0.125, "velocity": 100}, {"pitch": 42, "start": 0.5, "duration": 0.125, "velocity": 80}, {"pitch": 42, "start": 1.0, "duration": 0.125, "velocity": 100}, {"pitch": 42, "start": 1.5, "duration": 0.125, "velocity": 80}, {"pitch": 42, "start": 2.0, "duration": 0.125, "velocity": 100}, {"pitch": 42, "start": 2.5, "duration": 0.125, "velocity": 80}, {"pitch": 42, "start": 3.0, "duration": 0.125, "velocity": 100}, {"pitch": 42, "start": 3.5, "duration": 0.125, "velocity": 80}, ], "bass_root": [ {"pitch": 36, "start": 0.0, "duration": 1.0, "velocity": 110}, {"pitch": 36, "start": 2.0, "duration": 1.0, "velocity": 110}, ], "chord_stabs": [ {"pitch": 60, "start": 0.0, "duration": 0.5, "velocity": 90}, {"pitch": 64, "start": 0.0, "duration": 0.5, "velocity": 90}, {"pitch": 67, "start": 0.0, "duration": 0.5, "velocity": 90}, ], "melody_simple": [ {"pitch": 72, "start": 0.0, "duration": 0.5, "velocity": 100}, {"pitch": 74, "start": 1.0, "duration": 0.5, "velocity": 90}, {"pitch": 72, "start": 2.0, "duration": 0.5, "velocity": 100}, {"pitch": 71, "start": 3.0, "duration": 0.5, "velocity": 85}, ], } # Templates de groove GROOVE_TEMPLATES = { "swing_16": {"timing_offset": 0.02, "velocity_variation": 0.1}, "swing_8": {"timing_offset": 0.04, "velocity_variation": 0.15}, "straight": {"timing_offset": 0.0, "velocity_variation": 0.0}, "moombahton": {"timing_offset": 0.03, "velocity_variation": 0.08}, } def __init__(self): self.history = ActionHistory(max_history=50) self.validator = ProjectValidator() self.export_manager = ExportManager() self.current_project: Dict[str, Any] = { "bpm": 95.0, "key": "Am", "time_signature": "4/4", "tracks": [], "scenes": [], "samples_used": [], "structure": "", "created_at": time.time(), } self._library_analyzed = False self._section_definitions: List[Dict[str, Any]] = [] # ===================================================================== # T036: Generación completa de reggaeton # ===================================================================== def generate_complete_reggaeton(self, bpm: float = 95.0, key: str = "Am", style: str = "dembow", structure: str = "standard", use_samples: bool = True) -> Dict[str, Any]: """ Pipeline completo de generación de track de reggaeton. Este método ejecuta un pipeline completo: a. Analiza librería si no está cacheada b. Selecciona samples con get_recommended_samples() c. Crea tracks: Kick, Snare, HiHats, Bass, Chords, Melody, FX d. Genera notas MIDI con pattern_library e. Configura routing de buses f. Aplica mezcla automática g. Configura sidechain Args: bpm: Tempo del proyecto (default: 95) key: Tonalidad (default: "Am") style: Estilo de reggaeton - "dembow", "perreo", "romantico" (default: "dembow") structure: Estructura - "standard", "minimal", "extended" (default: "standard") use_samples: Si es True, usa samples de la librería Returns: Resumen completo del proyecto generado """ logger.info("=" * 60) logger.info("STARTING COMPLETE REGGAETON GENERATION") logger.info("BPM: %s | Key: %s | Style: %s | Structure: %s", bpm, key, style, structure) # Guardar estado antes de la acción state_before = deepcopy(self.current_project) summary = { "pipeline_steps": [], "tracks_created": [], "samples_selected": [], "issues": [], } try: # a. Analizar librería si no cacheada if not self._library_analyzed: logger.info("Step a: Analyzing library...") analyze_library(verbose=False) self._library_analyzed = True summary["pipeline_steps"].append("library_analyzed") # b. Seleccionar samples logger.info("Step b: Selecting samples...") if use_samples: samples = get_recommended_samples(role="", count=20) summary["samples_selected"] = [s.get("name", "unknown") for s in samples[:10]] summary["pipeline_steps"].append("samples_selected") # c. Crear tracks logger.info("Step c: Creating tracks...") tracks_config = [ {"name": "Kick", "type": "midi", "role": "kick"}, {"name": "Snare", "type": "midi", "role": "snare"}, {"name": "HiHats", "type": "midi", "role": "hats"}, {"name": "Bass", "type": "midi", "role": "bass"}, {"name": "Chords", "type": "midi", "role": "chords"}, {"name": "Melody", "type": "midi", "role": "melody"}, {"name": "FX", "type": "audio", "role": "fx"}, ] created_tracks = [] for i, track_cfg in enumerate(tracks_config): track_info = { "index": i, "name": track_cfg["name"], "type": track_cfg["type"], "role": track_cfg["role"], "volume": 0.85, "pan": 0.0, "devices": [], "clips": [], } created_tracks.append(track_info) summary["tracks_created"].append(track_cfg["name"]) self.current_project["tracks"] = created_tracks summary["pipeline_steps"].append("tracks_created") # d. Generar notas MIDI con pattern_library logger.info("Step d: Generating MIDI patterns...") for track in created_tracks: if track["type"] == "midi": pattern_name = self._get_pattern_for_role(track["role"]) if pattern_name in self.PATTERN_LIBRARY: pattern = self.PATTERN_LIBRARY[pattern_name] # Extender pattern a 16 compases extended_pattern = self._extend_pattern(pattern, 16) track["clips"].append({ "name": f"{track['name']} Clip", "length": 16.0, "notes": extended_pattern, }) summary["pipeline_steps"].append("midi_patterns_generated") # e. Configurar routing de buses (placeholder) logger.info("Step e: Configuring bus routing...") summary["pipeline_steps"].append("routing_configured") # f. Aplicar mezcla automática (placeholder) logger.info("Step f: Applying automatic mix...") self._apply_automatic_mix(created_tracks) summary["pipeline_steps"].append("mix_applied") # g. Configurar sidechain (placeholder) logger.info("Step g: Configuring sidechain...") summary["pipeline_steps"].append("sidechain_configured") # Actualizar estado del proyecto self.current_project["bpm"] = bpm self.current_project["key"] = key self.current_project["structure"] = structure self.current_project["style"] = style self.current_project["tracks"] = created_tracks # Generar estructura de secciones self._section_definitions = self._generate_section_structure(structure, bpm) # Registrar acción self.history.record_action( action_type="generate_complete", description=f"Generated complete reggaeton: {style} @ {bpm} BPM in {key}", state_before=state_before, undo_data={"previous_state": state_before} ) logger.info("COMPLETE REGGAETON GENERATION FINISHED") logger.info("=" * 60) return { "status": "success", "bpm": bpm, "key": key, "style": style, "structure": structure, "tracks_count": len(created_tracks), "tracks": summary["tracks_created"], "samples_used": len(summary["samples_selected"]), "pipeline_completed": summary["pipeline_steps"], "duration_bars": self._calculate_duration(), "sections": [s["name"] for s in self._section_definitions], } except Exception as e: logger.error("Error in generate_complete_reggaeton: %s", str(e)) summary["issues"].append(str(e)) return { "status": "error", "message": str(e), "partial_summary": summary, } def _get_pattern_for_role(self, role: str) -> str: """Mapea rol a nombre de pattern.""" mapping = { "kick": "dembow_kick", "snare": "dembow_snare", "hats": "dembow_hats", "bass": "bass_root", "chords": "chord_stabs", "melody": "melody_simple", } return mapping.get(role, "") def _extend_pattern(self, pattern: List[Dict], bars: int) -> List[Dict]: """Extiende un pattern a N compases.""" extended = [] for bar in range(bars): bar_offset = bar * 4.0 # 4 beats per bar for note in pattern: new_note = deepcopy(note) new_note["start"] = note["start"] + bar_offset extended.append(new_note) return extended def _apply_automatic_mix(self, tracks: List[Dict[str, Any]]): """Aplica mezcla automática básica.""" for track in tracks: role = track.get("role", "") if role == "kick": track["volume"] = 0.9 track["pan"] = 0.0 elif role == "snare": track["volume"] = 0.85 track["pan"] = 0.05 elif role == "hats": track["volume"] = 0.75 track["pan"] = -0.1 elif role == "bass": track["volume"] = 0.8 track["pan"] = 0.0 elif role == "chords": track["volume"] = 0.7 track["pan"] = -0.2 elif role == "melody": track["volume"] = 0.75 track["pan"] = 0.2 elif role == "fx": track["volume"] = 0.6 track["pan"] = 0.0 def _generate_section_structure(self, structure: str, bpm: float) -> List[Dict[str, Any]]: """Genera definición de secciones según estructura.""" if structure == "minimal": sections = [ {"name": "intro", "bars": 8, "start_bar": 0}, {"name": "drop", "bars": 16, "start_bar": 8}, {"name": "outro", "bars": 8, "start_bar": 24}, ] elif structure == "extended": sections = [ {"name": "intro", "bars": 8, "start_bar": 0}, {"name": "build_a", "bars": 8, "start_bar": 8}, {"name": "drop_a", "bars": 16, "start_bar": 16}, {"name": "break", "bars": 8, "start_bar": 32}, {"name": "build_b", "bars": 8, "start_bar": 40}, {"name": "drop_b", "bars": 16, "start_bar": 48}, {"name": "outro", "bars": 8, "start_bar": 64}, ] else: # standard sections = [ {"name": "intro", "bars": 8, "start_bar": 0}, {"name": "build", "bars": 8, "start_bar": 8}, {"name": "drop", "bars": 16, "start_bar": 16}, {"name": "break", "bars": 8, "start_bar": 32}, {"name": "drop_b", "bars": 16, "start_bar": 40}, {"name": "outro", "bars": 8, "start_bar": 56}, ] for section in sections: section["bpm"] = bpm return sections def _calculate_duration(self) -> int: """Calcula duración total en compases.""" if not self._section_definitions: return 64 return sum(s.get("bars", 8) for s in self._section_definitions) # ===================================================================== # T037: Generación desde referencia # ===================================================================== def generate_from_reference(self, reference_audio_path: str) -> Dict[str, Any]: """ Genera un track basado en un audio de referencia. Analiza el audio de referencia, encuentra samples similares y replica la estructura energética. Args: reference_audio_path: Ruta al archivo de audio de referencia Returns: Resumen del track generado con características de la referencia """ logger.info("Generating from reference: %s", reference_audio_path) if not os.path.isfile(reference_audio_path): return { "status": "error", "message": f"Reference audio not found: {reference_audio_path}", } state_before = deepcopy(self.current_project) try: # Analizar audio de referencia ref_features = analyze_reference(reference_audio_path) if not ref_features: return { "status": "error", "message": "Could not analyze reference audio", } # Extraer características ref_bpm = ref_features.get("bpm", 95.0) ref_key = ref_features.get("key", "Am") ref_energy = ref_features.get("energy_profile", {}) ref_style = ref_features.get("style_guess", "dembow") logger.info("Reference analysis: BPM=%s, Key=%s, Style=%s", ref_bpm, ref_key, ref_style) # Encontrar samples similares similar_samples = get_recommended_samples(role="", count=20) logger.info("Found %d similar samples", len(similar_samples)) # Generar estructura basada en perfil energético structure = self._structure_from_energy(ref_energy) # Generar track con mismas características result = self.generate_complete_reggaeton( bpm=ref_bpm, key=ref_key, style=ref_style, structure=structure, use_samples=True ) # Añadir metadata de referencia result["reference_analysis"] = { "path": reference_audio_path, "bpm_detected": ref_bpm, "key_detected": ref_key, "energy_profile": ref_energy, "style_guess": ref_style, } result["similarity_score"] = ref_features.get("confidence", 0.8) # Registrar acción self.history.record_action( action_type="generate_from_reference", description=f"Generated from reference: {os.path.basename(reference_audio_path)}", state_before=state_before, undo_data={"previous_state": state_before} ) return result except Exception as e: logger.error("Error in generate_from_reference: %s", str(e)) return { "status": "error", "message": str(e), } def _structure_from_energy(self, energy_profile: Dict[str, Any]) -> str: """Determina estructura basada en perfil energético.""" sections = energy_profile.get("sections", []) if len(sections) <= 3: return "minimal" elif len(sections) >= 7: return "extended" return "standard" # ===================================================================== # T038: Exportar proyecto # ===================================================================== def export_project(self, path: str, format: str = "als") -> Dict[str, Any]: """ Exporta el proyecto actual. Nota: Ableton Live API no soporta guardar nativamente (.als), por lo que esta función exporta: - Configuración del proyecto a JSON - Lista de samples utilizados - Metadatos para recreación manual Args: path: Ruta base para exportación (sin extensión) format: Formato de exportación - "als" (metadatos), "json" (solo config) Returns: Rutas de archivos exportados """ logger.info("Exporting project to: %s (format: %s)", path, format) try: exported_files = [] # Exportar configuración completa config_path = self.export_manager.export_project_config( self.current_project, filename=f"{os.path.basename(path)}_config.json" ) exported_files.append(config_path) # Exportar lista de samples samples_path = self.export_manager.export_samples_list( self.current_project, filename=f"{os.path.basename(path)}_samples.json" ) exported_files.append(samples_path) # Si se solicita formato ALS, crear archivo de instrucciones if format == "als": als_instructions = self._generate_als_instructions(path) als_path = f"{path}_ALS_INSTRUCTIONS.txt" with open(als_path, 'w', encoding='utf-8') as f: f.write(als_instructions) exported_files.append(als_path) logger.info("Project exported successfully: %d files", len(exported_files)) return { "status": "success", "format": format, "exported_files": exported_files, "note": "Live API doesn't support native .als export. Use JSON config to recreate.", } except Exception as e: logger.error("Error exporting project: %s", str(e)) return { "status": "error", "message": str(e), } def _generate_als_instructions(self, path: str) -> str: """Genera instrucciones para recreación manual del proyecto.""" tracks = self.current_project.get("tracks", []) bpm = self.current_project.get("bpm", 95) key = self.current_project.get("key", "Am") instructions = f"""ABLETON LIVE PROJECT - INSTRUCCIONES DE RECREACIÓN ================================================ BPM: {bpm} Key: {key} Estructura: {self.current_project.get('structure', 'standard')} TRACKS A CREAR: --------------- """ for track in tracks: instructions += f""" [{track['index']}] {track['name']} ({track['type']}) - Volumen: {track.get('volume', 0.85)} - Pan: {track.get('pan', 0.0)} - Role: {track.get('role', 'unknown')} """ for clip in track.get("clips", []): instructions += f" - Clip: {clip.get('name', 'unnamed')} ({clip.get('length', 4.0)} beats)\n" instructions += f""" SAMPLES USADOS: --------------- """ for sample in self.current_project.get("samples_used", []): instructions += f"- {sample}\n" instructions += """ ================================================ Para recrear: File > New Live Set, luego seguir los pasos arriba. """ return instructions # ===================================================================== # T039: Cargar proyecto # ===================================================================== def load_project(self, path: str) -> Dict[str, Any]: """ Carga configuración de proyecto desde JSON. Recrea tracks y configura el proyecto según el archivo cargado. Args: path: Ruta al archivo JSON de configuración Returns: Estado del proyecto cargado """ logger.info("Loading project from: %s", path) if not os.path.isfile(path): return { "status": "error", "message": f"Project file not found: {path}", } state_before = deepcopy(self.current_project) try: # Cargar configuración config = self.export_manager.load_project_config(path) # Extraer datos del proyecto project_data = config.get("project", {}) settings = config.get("settings", {}) # Actualizar estado actual self.current_project = { "bpm": settings.get("bpm", 95.0), "key": settings.get("key", "Am"), "time_signature": settings.get("time_signature", "4/4"), "tracks": project_data.get("tracks", []), "scenes": project_data.get("scenes", []), "samples_used": config.get("samples_used", []), "structure": project_data.get("structure", ""), "loaded_from": path, "loaded_at": time.time(), } # Recrear secciones if "sections" in project_data: self._section_definitions = project_data["sections"] # Registrar acción self.history.record_action( action_type="load_project", description=f"Loaded project from: {os.path.basename(path)}", state_before=state_before, undo_data={"previous_state": state_before} ) logger.info("Project loaded successfully: %d tracks", len(self.current_project["tracks"])) return { "status": "success", "tracks_count": len(self.current_project["tracks"]), "bpm": self.current_project["bpm"], "key": self.current_project["key"], "loaded_from": path, } except Exception as e: logger.error("Error loading project: %s", str(e)) return { "status": "error", "message": str(e), } # ===================================================================== # T040: Resumen del proyecto # ===================================================================== def get_project_summary(self) -> Dict[str, Any]: """ Retorna resumen completo del proyecto actual. Returns: Diccionario con BPM, key, tracks, samples, estructura, duración """ tracks = self.current_project.get("tracks", []) # Contar samples sample_count = sum( len(track.get("clips", [])) for track in tracks ) # Calcular duración total_bars = self._calculate_duration() bpm = self.current_project.get("bpm", 95.0) duration_seconds = (total_bars * 4 * 60) / bpm if bpm > 0 else 0 # Info de tracks track_info = [] for track in tracks: track_info.append({ "index": track.get("index", 0), "name": track.get("name", "unnamed"), "type": track.get("type", "unknown"), "role": track.get("role", "unknown"), "clip_count": len(track.get("clips", [])), "volume": track.get("volume", 0.85), }) summary = { "status": "success", "bpm": bpm, "key": self.current_project.get("key", "Am"), "time_signature": self.current_project.get("time_signature", "4/4"), "track_count": len(tracks), "tracks": track_info, "sample_count": sample_count, "structure": self.current_project.get("structure", ""), "style": self.current_project.get("style", ""), "duration": { "bars": total_bars, "beats": total_bars * 4, "seconds": round(duration_seconds, 2), "formatted": self._format_duration(duration_seconds), }, "sections": [ {"name": s.get("name"), "bars": s.get("bars", 8)} for s in self._section_definitions ], "created_at": self.current_project.get("created_at"), "last_modified": time.time(), } return summary def _format_duration(self, seconds: float) -> str: """Formatea duración en formato mm:ss.""" minutes = int(seconds // 60) secs = int(seconds % 60) return f"{minutes}:{secs:02d}" # ===================================================================== # T041: Sugerir mejoras # ===================================================================== def suggest_improvements(self) -> Dict[str, Any]: """ Analiza el proyecto y sugiere mejoras. Returns: Sugerencias por tipo: mezcla, composición, samples """ tracks = self.current_project.get("tracks", []) suggestions = { "mix": [], "composition": [], "samples": [], "overall": [], } # Análisis de mezcla self._analyze_mix_suggestions(tracks, suggestions["mix"]) # Análisis de composición self._analyze_composition_suggestions(tracks, suggestions["composition"]) # Análisis de samples self._analyze_samples_suggestions(suggestions["samples"]) # Sugerencias generales if len(tracks) < 4: suggestions["overall"].append({ "priority": "medium", "message": "Consider adding more tracks for a fuller sound", "action": "Add percussion, FX, or atmospheric elements", }) if not self.current_project.get("structure"): suggestions["overall"].append({ "priority": "high", "message": "No song structure defined", "action": "Use generate_complete_reggaeton() to create structured project", }) return { "status": "success", "suggestions_count": ( len(suggestions["mix"]) + len(suggestions["composition"]) + len(suggestions["samples"]) + len(suggestions["overall"]) ), "categories": suggestions, } def _analyze_mix_suggestions(self, tracks: List[Dict], suggestions: List): """Analiza y sugiere mejoras de mezcla.""" # Verificar niveles high_volume_tracks = [ t for t in tracks if t.get("volume", 0.85) > 0.9 ] if high_volume_tracks: suggestions.append({ "priority": "high", "message": f"{len(high_volume_tracks)} tracks with high volume (>0.9)", "action": "Reduce track volumes and use compression", "tracks": [t.get("name") for t in high_volume_tracks], }) # Verificar panning tracks_with_pan = [t for t in tracks if abs(t.get("pan", 0)) > 0.01] if len(tracks_with_pan) < len(tracks) / 2: suggestions.append({ "priority": "medium", "message": "Many tracks are mono (no panning)", "action": "Apply subtle panning to create stereo width", }) # Verificar sidechain kick_track = next((t for t in tracks if "kick" in t.get("name", "").lower()), None) bass_track = next((t for t in tracks if "bass" in t.get("name", "").lower()), None) if kick_track and bass_track: suggestions.append({ "priority": "medium", "message": "Kick and Bass present - sidechain recommended", "action": "Apply sidechain compression from kick to bass", }) def _analyze_composition_suggestions(self, tracks: List[Dict], suggestions: List): """Analiza y sugiere mejoras de composición.""" # Verificar variedad de notas melodic_tracks = [t for t in tracks if t.get("role") in ("melody", "chords")] if not melodic_tracks: suggestions.append({ "priority": "high", "message": "No melodic/harmonic tracks found", "action": "Add chords or melody track for harmonic content", }) # Verificar estructura if len(self._section_definitions) < 3: suggestions.append({ "priority": "medium", "message": "Song structure is too simple", "action": "Add more sections: build, break, variations", }) def _analyze_samples_suggestions(self, suggestions: List): """Analiza y sugiere mejoras de samples.""" # Verificar samples faltantes samples = self.current_project.get("samples_used", []) if not samples: suggestions.append({ "priority": "medium", "message": "No external samples used", "action": "Load samples from library using sample_selector", }) # ===================================================================== # T042: Comparar con referencia # ===================================================================== def compare_to_reference(self, reference_path: str) -> Dict[str, Any]: """ Compara proyecto actual vs referencia. Args: reference_path: Ruta al audio de referencia Returns: Similitud por dimensiones """ logger.info("Comparing project to reference: %s", reference_path) if not os.path.isfile(reference_path): return { "status": "error", "message": f"Reference not found: {reference_path}", } try: # Analizar referencia ref_features = analyze_reference(reference_path) if not ref_features: return { "status": "error", "message": "Could not analyze reference", } # Comparar dimensiones comparisons = {} # BPM ref_bpm = ref_features.get("bpm", 95.0) proj_bpm = self.current_project.get("bpm", 95.0) bpm_diff = abs(ref_bpm - proj_bpm) comparisons["bpm"] = { "reference": ref_bpm, "project": proj_bpm, "difference": bpm_diff, "similarity": max(0, 1.0 - (bpm_diff / 10.0)), # 0-1 scale } # Key ref_key = ref_features.get("key", "Am") proj_key = self.current_project.get("key", "Am") comparisons["key"] = { "reference": ref_key, "project": proj_key, "match": ref_key == proj_key, "similarity": 1.0 if ref_key == proj_key else 0.5, # Simple match } # Energy profile ref_energy = ref_features.get("energy_profile", {}) # Crear perfil de energía simple del proyecto proj_energy = self._estimate_project_energy() comparisons["energy"] = { "reference_sections": len(ref_energy.get("sections", [])), "project_sections": len(self._section_definitions), "similarity": self._compare_energy_profiles(ref_energy, proj_energy), } # Calcular similitud general similarities = [c["similarity"] for c in comparisons.values()] overall_similarity = sum(similarities) / len(similarities) if similarities else 0.0 return { "status": "success", "reference_path": reference_path, "overall_similarity": round(overall_similarity, 3), "comparisons": comparisons, "recommendations": self._generate_comparison_recommendations(comparisons), } except Exception as e: logger.error("Error comparing to reference: %s", str(e)) return { "status": "error", "message": str(e), } def _estimate_project_energy(self) -> Dict[str, Any]: """Estima perfil de energía del proyecto actual.""" # Simplificación: usar número de tracks activos como proxy de energía tracks = self.current_project.get("tracks", []) return { "track_count": len(tracks), "sections": [ {"name": s.get("name"), "energy": len(tracks) * 0.1} for s in self._section_definitions ], } def _compare_energy_profiles(self, ref: Dict, proj: Dict) -> float: """Compara perfiles de energía y retorna similitud 0-1.""" ref_sections = len(ref.get("sections", [])) proj_sections = len(proj.get("sections", [])) if ref_sections == 0: return 0.0 diff = abs(ref_sections - proj_sections) return max(0, 1.0 - (diff / max(ref_sections, proj_sections))) def _generate_comparison_recommendations(self, comparisons: Dict) -> List[str]: """Genera recomendaciones basadas en comparaciones.""" recommendations = [] if comparisons["bpm"]["similarity"] < 0.8: recommendations.append( f"Adjust BPM from {comparisons['bpm']['project']} to {comparisons['bpm']['reference']}" ) if not comparisons["key"]["match"]: recommendations.append( f"Consider changing key to {comparisons['key']['reference']}" ) if comparisons["energy"]["similarity"] < 0.7: recommendations.append( "Restructure song to match energy progression of reference" ) return recommendations # ===================================================================== # T043: Undo # ===================================================================== def undo_last_action(self) -> Dict[str, Any]: """ Deshace la última acción realizada. Returns: Resultado del undo """ if not self.history.can_undo(): return { "status": "warning", "message": "No actions to undo", } record = self.history.undo() if record and record.undo_data: # Restaurar estado anterior previous_state = record.undo_data.get("previous_state") if previous_state: self.current_project = deepcopy(previous_state) return { "status": "success", "undone_action": record.action_type if record else None, "description": record.description if record else None, "can_undo": self.history.can_undo(), "can_redo": self.history.can_redo(), } # ===================================================================== # T044: Limpiar proyecto # ===================================================================== def clear_project(self) -> Dict[str, Any]: """ Elimina todos los tracks y resetea a estado limpio. Returns: Confirmación de limpieza """ logger.info("Clearing project...") state_before = deepcopy(self.current_project) # Resetear a estado inicial self.current_project = { "bpm": 95.0, "key": "Am", "time_signature": "4/4", "tracks": [], "scenes": [], "samples_used": [], "structure": "", "cleared_at": time.time(), } self._section_definitions = [] # Registrar acción self.history.record_action( action_type="clear_project", description="Cleared all project data", state_before=state_before, undo_data={"previous_state": state_before} ) logger.info("Project cleared") return { "status": "success", "message": "Project cleared - all tracks and data removed", "can_undo": self.history.can_undo(), } # ===================================================================== # T045: Validar proyecto # ===================================================================== def validate_project(self) -> Dict[str, Any]: """ Verifica coherencia del proyecto. Verifica: - BPM consistente - Samples existen - No clipping Returns: Lista de issues o "valid" si todo está correcto """ logger.info("Validating project...") # Ejecutar validaciones issues = self.validator.validate(self.current_project) summary = self.validator.get_summary() logger.info("Validation complete: %d issues found", len(issues)) return { "status": "success", "is_valid": summary["is_valid"], "summary": summary, "message": "Project is valid" if summary["is_valid"] else f"Found {summary['errors']} errors", } # ===================================================================== # T046: Añadir variación a sección # ===================================================================== def add_variation_to_section(self, section_index: int) -> Dict[str, Any]: """ Modifica sección existente con variación. Cambia pattern, añade fills, varía velocity. Args: section_index: Índice de la sección a variar Returns: Descripción de la variación aplicada """ logger.info("Adding variation to section %d", section_index) if section_index < 0 or section_index >= len(self._section_definitions): return { "status": "error", "message": f"Invalid section index: {section_index}", } state_before = deepcopy(self.current_project) section = self._section_definitions[section_index] # Aplicar variaciones variations_applied = [] # 1. Variar velocity en drums for track in self.current_project.get("tracks", []): if track.get("role") in ("kick", "snare", "hats"): for clip in track.get("clips", []): notes = clip.get("notes", []) for note in notes: # Variar velocity ±20% original_vel = note.get("velocity", 100) variation = random.uniform(0.8, 1.2) note["velocity"] = int(min(127, max(1, original_vel * variation))) variations_applied.append(f"Velocity variation on {track['name']}") # 2. Añadir fill al final de la sección end_bar = section["start_bar"] + section["bars"] end_beat = end_bar * 4 # Buscar track de snare para fill for track in self.current_project.get("tracks", []): if track.get("role") == "snare": for clip in track.get("clips", []): # Añadir notas de fill fill_notes = [ {"pitch": 38, "start": end_beat - 1.0, "duration": 0.125, "velocity": 110}, {"pitch": 38, "start": end_beat - 0.75, "duration": 0.125, "velocity": 120}, {"pitch": 38, "start": end_beat - 0.5, "duration": 0.125, "velocity": 127}, {"pitch": 38, "start": end_beat - 0.25, "duration": 0.125, "velocity": 100}, ] clip["notes"].extend(fill_notes) variations_applied.append(f"Snare fill added at bar {end_bar}") break # Registrar acción self.history.record_action( action_type="add_variation", description=f"Added variation to section {section_index} ({section['name']})", state_before=state_before, undo_data={"previous_state": state_before} ) return { "status": "success", "section": section["name"], "section_index": section_index, "variations_applied": variations_applied, "variation_type": "fill_and_velocity", } # ===================================================================== # T047: Crear transición # ===================================================================== def create_transition(self, from_section: int, to_section: int, type: str = "riser") -> Dict[str, Any]: """ Crea transición entre secciones. Tipos: "riser", "filter_sweep", "break", "build" Args: from_section: Índice de sección origen to_section: Índice de sección destino type: Tipo de transición Returns: Descripción de la transición creada """ logger.info("Creating %s transition from section %d to %d", type, from_section, to_section) if from_section < 0 or from_section >= len(self._section_definitions): return {"status": "error", "message": f"Invalid from_section: {from_section}"} if to_section < 0 or to_section >= len(self._section_definitions): return {"status": "error", "message": f"Invalid to_section: {to_section}"} state_before = deepcopy(self.current_project) from_sec = self._section_definitions[from_section] to_sec = self._section_definitions[to_section] # Calcular posición de transición (últimos 2 compases de from_section) transition_start = (from_sec["start_bar"] + from_sec["bars"] - 2) * 4 transition_duration = 8.0 # 2 bars = 8 beats transition_data = { "type": type, "from_section": from_sec["name"], "to_section": to_sec["name"], "start_beat": transition_start, "duration": transition_duration, "effects_applied": [], } # Aplicar efectos según tipo if type == "riser": # Crear notas de riser en melodía for track in self.current_project.get("tracks", []): if track.get("role") == "melody": riser_notes = [] for beat in range(8): pitch = 60 + beat # Subir pitch progresivamente velocity = 60 + (beat * 8) # Subir velocity riser_notes.append({ "pitch": pitch, "start": transition_start + beat, "duration": 0.5, "velocity": min(127, velocity), }) for clip in track.get("clips", []): clip["notes"].extend(riser_notes) transition_data["effects_applied"].append("Pitch riser notes") break elif type == "filter_sweep": # Simular sweep con automatización de volume for track in self.current_project.get("tracks", []): if track.get("role") in ("chords", "melody"): # Reducir volumen progresivamente original_vol = track.get("volume", 0.8) track["transition_filter"] = { "type": "lowpass", "start_freq": 20000, "end_freq": 500, "automation": "sweep_down", } transition_data["effects_applied"].append(f"Filter sweep on {track['name']}") elif type == "break": # Silenciar drums por 1 compás for track in self.current_project.get("tracks", []): if track.get("role") in ("kick", "snare", "hats"): track["transition_break"] = { "mute_at": transition_start + 4.0, "duration": 4.0, } transition_data["effects_applied"].append(f"Break on {track['name']}") elif type == "build": # Añadir percusión creciente build_notes = [] for beat in range(8): if beat % 2 == 0: build_notes.append({ "pitch": 37, # Perc note "start": transition_start + beat, "duration": 0.25, "velocity": 70 + (beat * 7), }) # Añadir a track de percusión o FX for track in self.current_project.get("tracks", []): if track.get("role") in ("hats", "fx"): for clip in track.get("clips", []): clip["notes"].extend(build_notes) transition_data["effects_applied"].append(f"Build percussion on {track['name']}") break # Registrar acción self.history.record_action( action_type="create_transition", description=f"Created {type} transition from {from_sec['name']} to {to_sec['name']}", state_before=state_before, undo_data={"previous_state": state_before} ) return { "status": "success", "transition": transition_data, } # ===================================================================== # T048: Humanizar track # ===================================================================== def humanize_track(self, track_index: int, intensity: float = 0.5) -> Dict[str, Any]: """ Aplica human feel a un track. Efectos: timing, velocity, length variation. Intensidad 0.0-1.0. Args: track_index: Índice del track a humanizar intensity: Intensidad de humanización (0.0 - 1.0) Returns: Resultado de la humanización """ logger.info("Humanizing track %d with intensity %.2f", track_index, intensity) tracks = self.current_project.get("tracks", []) if track_index < 0 or track_index >= len(tracks): return { "status": "error", "message": f"Invalid track index: {track_index}", } state_before = deepcopy(self.current_project) track = tracks[track_index] # Limitar intensidad intensity = max(0.0, min(1.0, intensity)) modifications = { "timing_changes": 0, "velocity_changes": 0, "duration_changes": 0, } for clip in track.get("clips", []): notes = clip.get("notes", []) for note in notes: # 1. Timing variation: ±5-20ms según intensidad timing_var = (random.random() - 0.5) * 0.05 * intensity note["start"] = note.get("start", 0) + timing_var modifications["timing_changes"] += 1 # 2. Velocity variation: ±10-30% según intensidad original_vel = note.get("velocity", 100) vel_var = 1.0 + (random.random() - 0.5) * 0.3 * intensity note["velocity"] = int(min(127, max(1, original_vel * vel_var))) modifications["velocity_changes"] += 1 # 3. Duration variation: ±5-15% según intensidad original_dur = note.get("duration", 0.25) dur_var = 1.0 + (random.random() - 0.5) * 0.15 * intensity note["duration"] = original_dur * dur_var modifications["duration_changes"] += 1 # Registrar acción self.history.record_action( action_type="humanize", description=f"Humanized track {track_index} ({track['name']}) at {intensity:.0%} intensity", state_before=state_before, undo_data={"previous_state": state_before} ) return { "status": "success", "track_index": track_index, "track_name": track.get("name"), "intensity": intensity, "modifications": modifications, } # ===================================================================== # T049: Aplicar groove # ===================================================================== def apply_groove(self, track_index: int, groove_template: str) -> Dict[str, Any]: """ Aplica groove/shuffle a un track. Templates: "swing_16", "swing_8", "straight", "moombahton" Args: track_index: Índice del track groove_template: Nombre del template de groove Returns: Resultado de la aplicación de groove """ logger.info("Applying groove '%s' to track %d", groove_template, track_index) tracks = self.current_project.get("tracks", []) if track_index < 0 or track_index >= len(tracks): return { "status": "error", "message": f"Invalid track index: {track_index}", } if groove_template not in self.GROOVE_TEMPLATES: return { "status": "error", "message": f"Unknown groove template: {groove_template}", "available_templates": list(self.GROOVE_TEMPLATES.keys()), } state_before = deepcopy(self.current_project) track = tracks[track_index] template = self.GROOVE_TEMPLATES[groove_template] timing_offset = template["timing_offset"] velocity_var = template["velocity_variation"] notes_modified = 0 for clip in track.get("clips", []): notes = clip.get("notes", []) for note in notes: start = note.get("start", 0) # Aplicar swing a notas en subdivisiones de 8avas o 16avas beat_in_bar = start % 4.0 is_swing_beat = (beat_in_bar % 0.5) > 0.01 # Notas entre golpes fuertes if is_swing_beat: # Desplazar timing note["start"] = start + timing_offset # Variar velocity original_vel = note.get("velocity", 100) vel_change = 1.0 + (random.random() - 0.5) * velocity_var note["velocity"] = int(min(127, max(1, original_vel * vel_change))) notes_modified += 1 # Guardar info de groove aplicado track["groove_applied"] = { "template": groove_template, "timing_offset": timing_offset, "notes_affected": notes_modified, } # Registrar acción self.history.record_action( action_type="apply_groove", description=f"Applied {groove_template} groove to track {track_index}", state_before=state_before, undo_data={"previous_state": state_before} ) return { "status": "success", "track_index": track_index, "track_name": track.get("name"), "groove_template": groove_template, "notes_modified": notes_modified, "template_params": template, } # ===================================================================== # T050: Crear automatización de FX # ===================================================================== def create_fx_automation(self, track_index: int, fx_type: str, section: int) -> Dict[str, Any]: """ Crea automatización de FX. Tipos: "filter_sweep", "reverb_duck", "delay_wash", "volume_fade" Args: track_index: Índice del track fx_type: Tipo de efecto section: Índice de sección donde aplicar Returns: Descripción de la automatización creada """ logger.info("Creating %s FX automation on track %d, section %d", fx_type, track_index, section) tracks = self.current_project.get("tracks", []) if track_index < 0 or track_index >= len(tracks): return { "status": "error", "message": f"Invalid track index: {track_index}", } if section < 0 or section >= len(self._section_definitions): return { "status": "error", "message": f"Invalid section: {section}", } state_before = deepcopy(self.current_project) track = tracks[track_index] sec = self._section_definitions[section] # Calcular rango de beats para la sección start_beat = sec["start_bar"] * 4 end_beat = start_beat + (sec["bars"] * 4) automation_data = { "fx_type": fx_type, "track": track.get("name"), "section": sec["name"], "start_beat": start_beat, "end_beat": end_beat, "automation_points": [], } if fx_type == "filter_sweep": # Sweep de filtro: cerrar -> abrir o viceversa points = [ {"beat": start_beat, "value": 0.1, "parameter": "filter_freq"}, {"beat": start_beat + (end_beat - start_beat) / 2, "value": 0.5, "parameter": "filter_freq"}, {"beat": end_beat, "value": 1.0, "parameter": "filter_freq"}, ] automation_data["automation_points"] = points automation_data["description"] = "Filter sweep up" elif fx_type == "reverb_duck": # Ducking de reverb: alto -> bajo durante transients points = [ {"beat": start_beat, "value": 0.8, "parameter": "reverb_wet"}, {"beat": start_beat + 1, "value": 0.3, "parameter": "reverb_wet"}, {"beat": start_beat + 2, "value": 0.8, "parameter": "reverb_wet"}, ] automation_data["automation_points"] = points automation_data["description"] = "Reverb ducking on beats" elif fx_type == "delay_wash": # Wash de delay creciente points = [ {"beat": start_beat, "value": 0.1, "parameter": "delay_wet"}, {"beat": end_beat - 4, "value": 0.3, "parameter": "delay_wet"}, {"beat": end_beat, "value": 0.6, "parameter": "delay_wet"}, ] automation_data["automation_points"] = points automation_data["description"] = "Delay wash build" elif fx_type == "volume_fade": # Fade in o fade out según posición en canción if section == 0: # Intro points = [ {"beat": start_beat, "value": 0.0, "parameter": "volume"}, {"beat": end_beat, "value": 1.0, "parameter": "volume"}, ] automation_data["description"] = "Volume fade in" elif section == len(self._section_definitions) - 1: # Outro points = [ {"beat": start_beat, "value": 1.0, "parameter": "volume"}, {"beat": end_beat - 4, "value": 0.7, "parameter": "volume"}, {"beat": end_beat, "value": 0.0, "parameter": "volume"}, ] automation_data["description"] = "Volume fade out" else: points = [ {"beat": start_beat, "value": 0.9, "parameter": "volume"}, {"beat": end_beat, "value": 0.9, "parameter": "volume"}, ] automation_data["description"] = "Volume maintained" automation_data["automation_points"] = points else: return { "status": "error", "message": f"Unknown FX type: {fx_type}", "available_types": ["filter_sweep", "reverb_duck", "delay_wash", "volume_fade"], } # Guardar automatización en el track if "automation" not in track: track["automation"] = [] track["automation"].append(automation_data) # Registrar acción self.history.record_action( action_type="fx_automation", description=f"Created {fx_type} automation on track {track_index}, section {section}", state_before=state_before, undo_data={"previous_state": state_before} ) return { "status": "success", "automation": automation_data, } # ===================================================================== # Métodos adicionales de utilidad # ===================================================================== def get_recent_history(self, count: int = 10) -> List[Dict[str, Any]]: """Retorna historial reciente de acciones.""" return self.history.get_recent_actions(count) def redo_action(self) -> Dict[str, Any]: """Rehace la última acción deshecha.""" if not self.history.can_redo(): return { "status": "warning", "message": "No actions to redo", } record = self.history.redo() return { "status": "success", "redone_action": record.action_type if record else None, "description": record.description if record else None, "can_undo": self.history.can_undo(), "can_redo": self.history.can_redo(), } # Instancia global _workflow_instance: Optional[ProductionWorkflow] = None def get_workflow() -> ProductionWorkflow: """Retorna instancia global del workflow.""" global _workflow_instance if _workflow_instance is None: _workflow_instance = ProductionWorkflow() return _workflow_instance class WorkflowEngine: """Compatibility wrapper expected by server.py.""" def __init__(self): self._workflow = get_workflow() def _preset_manager(self): from .preset_system import get_preset_manager return get_preset_manager() def export_project(self, path: str, format: str = "als") -> Dict[str, Any]: result = self._workflow.export_project(path, format) exported_files = result.get("exported_files", []) return { "success": result.get("status") == "success", "export_path": exported_files[0] if exported_files else path, "duration": self._workflow.get_project_summary().get("duration", {}).get("formatted"), "file_size": None, "files": exported_files, "message": result.get("message", ""), } def get_project_summary(self) -> Dict[str, Any]: summary = self._workflow.get_project_summary() tracks = summary.get("tracks", []) return { "track_count": summary.get("track_count", 0), "midi_tracks": len([t for t in tracks if t.get("type") == "midi"]), "audio_tracks": len([t for t in tracks if t.get("type") == "audio"]), "return_tracks": 0, "clips": sum(len(t.get("clips", [])) for t in tracks), "scenes": len(summary.get("sections", [])), "devices_used": [d for t in tracks for d in t.get("devices", [])], "duration_minutes": round(summary.get("duration", {}).get("seconds", 0) / 60.0, 2), "project_name": "AbletonMCP Project", } def suggest_improvements(self) -> Dict[str, Any]: result = self._workflow.suggest_improvements() categories = result.get("categories", {}) suggestions = [] for items in categories.values(): suggestions.extend(items) return { "suggestions": suggestions, "priority": "high" if any(s.get("priority") == "high" for s in suggestions) else "medium", "categories": categories, "estimated_impact": "medium" if suggestions else "low", } def validate_project(self) -> Dict[str, Any]: result = self._workflow.validate_project() summary = result.get("summary", {}) issues = summary.get("issues", []) return { "is_valid": result.get("is_valid", False), "issues": [i for i in issues if i.get("severity") == "error"], "warnings": [i for i in issues if i.get("severity") == "warning"], "passed_checks": [], "score": max(0, 100 - (len(issues) * 10)), } def load_preset(self, preset_name: str) -> Dict[str, Any]: manager = self._preset_manager() preset = manager.load_preset(preset_name) if preset is None: return {"success": False, "message": f"Preset not found: {preset_name}"} self._workflow.current_project.update({ "bpm": preset.bpm, "key": preset.key, "style": preset.style, "structure": preset.structure, "tracks": [{ "name": track.name, "type": track.track_type, "role": track.role, "volume": track.volume, "pan": track.pan, "devices": list(track.device_chain), "clips": [], "sample_criteria": dict(track.sample_criteria), } for track in preset.tracks_config], }) return { "success": True, "tracks_loaded": len(preset.tracks_config), "devices_loaded": sum(len(track.device_chain) for track in preset.tracks_config), "samples_loaded": [ track.sample_criteria for track in preset.tracks_config if track.sample_criteria ], } def save_as_preset(self, name: str, description: str = "") -> Dict[str, Any]: manager = self._preset_manager() config = deepcopy(self._workflow.current_project) if description: config["description"] = description success = manager.save_as_preset(config, name) return { "success": bool(success), "path": str(manager._get_preset_path(name)), "tracks_included": len(config.get("tracks", [])), "message": "" if success else f"Failed to save preset: {name}", } def list_presets(self) -> Dict[str, Any]: manager = self._preset_manager() presets = manager.list_presets() categories = sorted({p.get("style", "") for p in presets if p.get("style")}) return {"presets": presets, "count": len(presets), "categories": categories} def create_custom_preset(self, name: str, description: str = "") -> Dict[str, Any]: manager = self._preset_manager() config = deepcopy(self._workflow.current_project) preset = manager.create_custom_preset(config, name, description) if preset is None: return {"success": False, "message": f"Failed to create preset: {name}"} return { "success": True, "base_tracks": [track.name for track in preset.tracks_config], "path": str(manager._get_preset_path(name)), } def get_workflow_status(self) -> Dict[str, Any]: project = self._workflow.current_project tracks = project.get("tracks", []) recent = self._workflow.get_recent_history(5) phase = "idle" if project.get("structure"): phase = "structured" if tracks: phase = "production" if recent: phase = recent[0].get("action_type", phase) progress = 0 if tracks: progress = min(100, 20 + len(tracks) * 10) if project.get("structure"): progress = min(100, progress + 10) return { "phase": phase, "progress": progress, "current_task": recent[0].get("description", "Idle") if recent else "Idle", "completed": [item.get("description", "") for item in recent], "pending": [], "errors": [], "eta": "unknown" if progress < 100 else "complete", } def get_production_report(self) -> Dict[str, Any]: project = self._workflow.current_project tracks = project.get("tracks", []) midi_clips = 0 audio_clips = 0 devices = [] samples = [] for track in tracks: devices.extend(track.get("devices", [])) for clip in track.get("clips", []): if clip.get("notes"): midi_clips += 1 else: audio_clips += 1 sample_ref = clip.get("sample") or clip.get("sample_path") if sample_ref: samples.append(sample_ref) summary = self._workflow.get_project_summary() recent = self._workflow.get_recent_history(10) return { "project_name": "AbletonMCP Project", "duration": summary.get("duration", {}).get("formatted", "0:00"), "total_tracks": len(tracks), "midi_clips": midi_clips, "audio_clips": audio_clips, "devices": devices, "samples": samples, "production_time": len(recent), "exports": [], "quality_score": 0, } def set_parallel_processing(self, enabled: bool = True) -> Dict[str, Any]: self._workflow._parallel_processing_enabled = bool(enabled) max_workers = min(8, os.cpu_count() or 4) if enabled else 1 return { "success": True, "max_workers": max_workers, "operations": ["analyze", "generate", "render"] if enabled else [], } def get_progress_report(self) -> Dict[str, Any]: status = self.get_workflow_status() return { "completion": status.get("progress", 0), "phases_completed": status.get("completed", []), "current_phase": status.get("phase", "idle"), "tasks_done": len(status.get("completed", [])), "tasks_total": max(1, len(status.get("completed", []))), "time_invested": f"{len(status.get('completed', [])) * 5}m", "milestones": status.get("completed", []), }