Files
ableton-mcp-ai/AbletonMCP_AI/MCP_Server/server_v2.py
renato97 6ec8663954 Initial commit: AbletonMCP-AI complete system
- MCP Server with audio fallback, sample management
- Song generator with bus routing
- Reference listener and audio resampler
- Vector-based sample search
- Master chain with limiter and calibration
- Fix: Audio fallback now works without M4L
- Fix: Full song detection in sample loader

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-28 22:53:10 -03:00

1367 lines
43 KiB
Python

"""
AbletonMCP AI Server v2 - Servidor MCP robusto para generación musical
Integra FastMCP con Ableton Live 12 via socket TCP y Max for Live via UDP
Para ejecutar:
python -m AbletonMCP_AI.MCP_Server.server_v2
O con uv:
uv run python -m AbletonMCP_AI.MCP_Server.server_v2
"""
from mcp.server.fastmcp import FastMCP, Context
import socket
import json
import logging
import sys
from dataclasses import dataclass
from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any, List, Optional
from pathlib import Path
from datetime import datetime
# Añadir el path para imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
# from song_generator import SongGenerator, StyleConfig
from sample_index import SampleIndex
except ImportError as e:
print(f"Error importando módulos locales: {e}")
SongGenerator = None
SampleIndex = None
# Configuración de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(Path(__file__).parent / 'server_v2.log', mode='a')
]
)
logger = logging.getLogger("AbletonMCP-AI-v2")
# ============================================================================
# CONSTANTES Y CONFIGURACIÓN
# ============================================================================
DEFAULT_ABLETON_PORT = 9877
DEFAULT_MAX_PORT = 9879
MAX_HOST = "127.0.0.1"
ABLETON_HOST = "localhost"
SAMPLES_DIR = r"C:\Users\ren\embeddings\all_tracks"
# Colores por tipo de track
TRACK_COLORS = {
'kick': 10, # Rojo
'snare': 20, # Verde
'hat': 5, # Amarillo
'clap': 45, # Naranja
'bass': 30, # Azul
'synth': 50, # Rosa/Magenta
'chords': 60, # Púrpura
'fx': 25, # Verde claro
'vocal': 15, # Naranja oscuro
}
# Instrucciones para el productor (contexto de IA)
PRODUCER_INSTRUCTIONS = """
Eres AbletonMCP-AI v2, un productor musical experto integrado con Ableton Live 12 y Max for Live.
Tu objetivo es crear música electrónica profesional mediante prompts en lenguaje natural.
CAPACIDADES PRINCIPALES:
1. Generar tracks completos con estructura profesional (Intro, Build, Drop, Break, Outro)
2. Crear patrones MIDI para diferentes géneros (Techno, House, Trance, Tech-House, etc.)
3. Seleccionar y cargar samples apropiados desde la librería local
4. Enviar rutas de samples a Max for Live para carga dinámica
5. Configurar BPM, tonalidad y estructura musical
6. Controlar transporte (play, stop, tempo)
7. Crear clips y escenas en Ableton
HERRAMIENTAS DISPONIBLES:
- generate_song(genre, style, bpm): Genera una canción completa
- load_sample_kit(genre): Carga un kit de samples para un género
- create_pattern(instrument, pattern_type): Crea patrones MIDI
- control_transport(action): Controla reproducción
- get_session_info(): Obtiene información de la sesión
ESTILOS SOPORTADOS:
- Techno: Industrial, Peak Time, Dub, Minimal, Acid
- House: Deep, Tech-House, Progressive, Afro, Classic 90s
- Trance: Psy, Progressive, Uplifting
- Drum & Bass: Liquid, Neuro, Jump-up, Jungle
FLUJO DE TRABAJO:
1. Analizar el prompt del usuario para extraer género, BPM, tonalidad, mood
2. Detectar samples disponibles en la librería
3. Generar patrones MIDI característicos del género
4. Enviar comandos a Ableton via socket TCP
5. Enviar rutas de samples a Max via UDP
6. Proporcionar feedback sobre lo creado
REGLAS:
- Siempre verifica la conexión con Ableton antes de ejecutar comandos
- Usa valores por defecto razonables si el usuario no especifica
- Organiza los tracks con colores consistentes
- Maneja errores gracefully y proporciona mensajes útiles
- Loggea todas las operaciones para debugging
""".strip()
# ============================================================================
# CLASES DE CONEXIÓN
# ============================================================================
@dataclass
class AbletonConnection:
"""Gestiona la conexión TCP con Ableton Live"""
host: str = ABLETON_HOST
port: int = DEFAULT_ABLETON_PORT
sock: Optional[socket.socket] = None
connected: bool = False
last_error: Optional[str] = None
def connect(self, timeout: float = 5.0) -> bool:
"""Conecta al Remote Script de Ableton"""
if self.connected and self.sock:
return True
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(timeout)
self.sock.connect((self.host, self.port))
self.sock.settimeout(None) # Non-blocking después de conectar
self.connected = True
self.last_error = None
logger.info(f"Conectado a Ableton en {self.host}:{self.port}")
return True
except socket.timeout:
self.last_error = f"Timeout conectando a {self.host}:{self.port}"
logger.error(self.last_error)
self.sock = None
self.connected = False
return False
except Exception as e:
self.last_error = f"Error conectando a Ableton: {e}"
logger.error(self.last_error)
self.sock = None
self.connected = False
return False
def disconnect(self):
"""Desconecta de Ableton"""
if self.sock:
try:
self.sock.close()
except Exception as e:
logger.error(f"Error desconectando: {e}")
finally:
self.sock = None
self.connected = False
logger.info("Desconectado de Ableton")
def send_command(self, command_type: str, params: Dict[str, Any] = None,
timeout: float = 15.0) -> Dict[str, Any]:
"""Envía un comando a Ableton y retorna la respuesta"""
if not self.connected and not self.connect():
return {"status": "error", "message": "No conectado a Ableton"}
command = {
"type": command_type,
"params": params or {}
}
try:
logger.debug(f"Enviando comando: {command_type}")
self.sock.sendall(json.dumps(command).encode('utf-8'))
# Recibir respuesta
self.sock.settimeout(timeout)
chunks = []
while True:
try:
chunk = self.sock.recv(8192)
if not chunk:
break
chunks.append(chunk)
# Intentar parsear JSON completo
try:
data = b''.join(chunks)
response = json.loads(data.decode('utf-8'))
return response
except json.JSONDecodeError:
continue
except socket.timeout:
logger.warning("Timeout esperando respuesta")
break
# Respuesta incompleta
if chunks:
data = b''.join(chunks)
try:
return json.loads(data.decode('utf-8'))
except Exception:
return {"status": "error", "message": "Respuesta JSON incompleta"}
else:
return {"status": "error", "message": "No se recibió respuesta"}
except socket.error as e:
self.connected = False
self.last_error = f"Error de socket: {e}"
logger.error(self.last_error)
return {"status": "error", "message": str(e)}
except Exception as e:
self.connected = False
self.last_error = f"Error en comunicación: {e}"
logger.error(self.last_error)
return {"status": "error", "message": str(e)}
@dataclass
class MaxConnection:
"""Gestiona la conexión UDP con Max for Live"""
host: str = MAX_HOST
port: int = DEFAULT_MAX_PORT
sock: Optional[socket.socket] = None
def __post_init__(self):
self._init_socket()
def _init_socket(self):
"""Inicializa el socket UDP"""
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
logger.info(f"Socket UDP inicializado para Max en {self.host}:{self.port}")
except Exception as e:
logger.error(f"Error inicializando socket UDP: {e}")
self.sock = None
def send_message(self, message: Dict[str, Any]) -> bool:
"""Envía un mensaje JSON a Max for Live via UDP"""
if not self.sock:
self._init_socket()
if not self.sock:
return False
try:
data = json.dumps(message).encode('utf-8')
self.sock.sendto(data, (self.host, self.port))
logger.debug(f"Mensaje enviado a Max: {message.get('type', 'unknown')}")
return True
except Exception as e:
logger.error(f"Error enviando mensaje a Max: {e}")
return False
def send_sample_path(self, track_index: int, sample_path: str,
slot: int = 0) -> bool:
"""Envía una ruta de sample a Max para cargar"""
message = {
"type": "load_sample",
"track_index": track_index,
"sample_path": sample_path,
"slot": slot,
"timestamp": datetime.now().isoformat()
}
return self.send_message(message)
def send_sample_kit(self, kit: Dict[str, List[Dict]]) -> bool:
"""Envía un kit completo de samples a Max"""
message = {
"type": "load_sample_kit",
"kit": kit,
"timestamp": datetime.now().isoformat()
}
return self.send_message(message)
def send_command(self, command: str, params: Dict[str, Any] = None) -> bool:
"""Envía un comando genérico a Max"""
message = {
"type": "command",
"command": command,
"params": params or {},
"timestamp": datetime.now().isoformat()
}
return self.send_message(message)
# ============================================================================
# GESTORES GLOBALES
# ============================================================================
_ableton_connection: Optional[AbletonConnection] = None
_max_connection: Optional[MaxConnection] = None
_sample_index: Optional['SampleIndex'] = None
_song_generator: Optional['SongGenerator'] = None
def get_ableton_connection() -> AbletonConnection:
"""Obtiene o crea la conexión con Ableton"""
global _ableton_connection
if _ableton_connection is None:
_ableton_connection = AbletonConnection()
return _ableton_connection
def get_max_connection() -> MaxConnection:
"""Obtiene o crea la conexión con Max"""
global _max_connection
if _max_connection is None:
_max_connection = MaxConnection()
return _max_connection
def get_sample_index() -> Optional['SampleIndex']:
"""Obtiene o crea el índice de samples"""
global _sample_index
if _sample_index is None and SampleIndex is not None:
try:
_sample_index = SampleIndex(SAMPLES_DIR)
except Exception as e:
logger.error(f"Error cargando índice de samples: {e}")
return _sample_index
def get_song_generator() -> Optional['SongGenerator']:
"""Obtiene o crea el generador de canciones"""
global _song_generator
if _song_generator is None and SongGenerator is not None:
_song_generator = SongGenerator()
return _song_generator
# ============================================================================
# LIFESPAN DEL SERVIDOR
# ============================================================================
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Maneja el ciclo de vida del servidor"""
try:
logger.info("=" * 60)
logger.info("AbletonMCP-AI Server v2 iniciando...")
logger.info("=" * 60)
# Intentar conectar a Ableton
try:
ableton = get_ableton_connection()
if ableton.connect():
logger.info("Conectado a Ableton Live")
else:
logger.warning("No se pudo conectar a Ableton (¿está abierto el script?)")
except Exception as e:
logger.warning(f"Error conectando a Ableton: {e}")
# Inicializar conexión con Max
try:
get_max_connection()
logger.info(f"Conexión UDP con Max lista en puerto {DEFAULT_MAX_PORT}")
except Exception as e:
logger.warning(f"Error inicializando conexión con Max: {e}")
# Inicializar índice de samples
try:
sample_index = get_sample_index()
if sample_index:
logger.info(f"Índice de samples cargado: {len(sample_index.samples)} samples")
else:
logger.warning("Índice de samples no disponible")
except Exception as e:
logger.warning(f"Error cargando índice de samples: {e}")
# Inicializar generador de canciones
try:
song_gen = get_song_generator()
if song_gen:
logger.info("Generador de canciones listo")
else:
logger.warning("Generador de canciones no disponible")
except Exception as e:
logger.warning(f"Error inicializando generador: {e}")
yield {
"ableton": _ableton_connection,
"max": _max_connection,
"samples": _sample_index,
"generator": _song_generator
}
finally:
global _ableton_connection, _max_connection
if _ableton_connection:
logger.info("Desconectando de Ableton...")
_ableton_connection.disconnect()
if _max_connection and _max_connection.sock:
logger.info("Cerrando socket UDP...")
_max_connection.sock.close()
logger.info("AbletonMCP-AI Server v2 detenido")
# ============================================================================
# CREAR SERVIDOR MCP
# ============================================================================
mcp = FastMCP(
"AbletonMCP-AI-v2",
instructions=PRODUCER_INSTRUCTIONS,
lifespan=server_lifespan
)
# ============================================================================
# HERRAMIENTAS MCP - GENERACIÓN DE CANCIONES
# ============================================================================
@mcp.tool()
def generate_song(
ctx: Context,
genre: str = "house",
style: str = "",
bpm: float = 0,
key: str = "",
structure: str = "standard"
) -> str:
"""
Genera una canción completa con estructura profesional
Args:
genre: Género musical (techno, house, trance, tech-house, drum-and-bass)
style: Sub-género o estilo específico (e.g., "industrial", "deep", "90s", "minimal")
bpm: BPM deseado (0 = auto-seleccionar según género)
key: Tonalidad (e.g., "Am", "F#m", "C") - vacío = auto-seleccionar
structure: Estructura del track (standard, minimal, extended)
Returns:
Resumen de la canción generada
Ejemplos:
generate_song("techno", "industrial", 138, "F#m")
generate_song("house", "deep", 124, "Am")
generate_song("tech-house", "groovy", 126)
"""
try:
generator = get_song_generator()
if not generator:
return "Error: Generador de canciones no disponible"
ableton = get_ableton_connection()
if not ableton.connect():
return f"Error: No se pudo conectar a Ableton en {ABLETON_HOST}:{DEFAULT_ABLETON_PORT}"
# Generar configuración
config = generator.generate_config(genre, style, bpm, key, structure)
# Enviar comando a Ableton
response = ableton.send_command("generate_complete_song", {
"genre": genre,
"style": style or config.get('style', ''),
"bpm": config.get('bpm', 120),
"key": config.get('key', ''),
"structure": structure
})
if response.get("status") == "success":
summary = config.get("summary", "")
return f"Canción generada exitosamente!\n{summary}"
else:
return f"Error generando canción: {response.get('message', 'Error desconocido')}"
except Exception as e:
logger.exception("Error en generate_song")
return f"Error: {str(e)}"
@mcp.tool()
def load_sample_kit(
ctx: Context,
genre: str = "techno",
key: str = "",
bpm: int = 0
) -> str:
"""
Carga un kit de samples completo para un género específico
Args:
genre: Género musical para seleccionar samples apropiados
key: Tonalidad preferida para samples armónicos
bpm: BPM preferido para samples con tempo específico
Returns:
Lista de samples cargados
"""
try:
sample_index = get_sample_index()
if not sample_index:
return "Error: Índice de samples no disponible"
max_conn = get_max_connection()
# Obtener pack de samples
kit = sample_index.get_sample_pack(genre, key, bpm)
# Contar samples encontrados
total_samples = sum(len(samples) for samples in kit.values())
if total_samples == 0:
return f"No se encontraron samples para el género '{genre}'"
# Enviar a Max
if max_conn.send_sample_kit(kit):
# Construir resumen
lines = [f"Kit de samples para {genre} cargado:", ""]
for category, samples in kit.items():
if samples:
lines.append(f"{category.upper()}:")
for s in samples[:2]: # Mostrar máximo 2 por categoría
lines.append(f" - {s['name']}")
if len(samples) > 2:
lines.append(f" ... y {len(samples)-2} más")
lines.append("")
lines.append(f"Total: {total_samples} samples enviados a Max")
return "\n".join(lines)
else:
return "Error enviando kit a Max for Live"
except Exception as e:
logger.exception("Error en load_sample_kit")
return f"Error: {str(e)}"
@mcp.tool()
def create_pattern(
ctx: Context,
instrument: str,
pattern_type: str = "standard",
track_index: int = -1,
clip_index: int = 0,
length: float = 4.0,
key: str = "Am",
genre: str = "techno"
) -> str:
"""
Crea un patrón MIDI para un instrumento específico
Args:
instrument: Tipo de instrumento (kick, snare, hat, clap, bass, chords, lead, melody)
pattern_type: Tipo de patrón (standard, minimal, full, complex, simple)
track_index: Índice del track (-1 = crear nuevo)
clip_index: Índice del clip/slot
length: Duración en beats
key: Tonalidad para instrumentos melódicos
genre: Género para estilo del patrón
Returns:
Confirmación del patrón creado
"""
try:
generator = get_song_generator()
if not generator:
return "Error: Generador no disponible"
ableton = get_ableton_connection()
if not ableton.connect():
return "Error: No conectado a Ableton"
# Crear track si es necesario
if track_index < 0:
response = ableton.send_command("create_midi_track", {"index": -1})
if response.get("status") == "success":
track_index = response.get("result", {}).get("index", 0)
else:
return "Error creando track MIDI"
# Crear clip
clip_response = ableton.send_command("create_clip", {
"track_index": track_index,
"clip_index": clip_index,
"length": length
})
if clip_response.get("status") != "success":
return f"Error creando clip: {clip_response.get('message')}"
# Generar notas según instrumento
notes = []
color = TRACK_COLORS.get(instrument.lower(), 0)
if instrument.lower() in ['kick', 'bd', 'bass drum']:
notes = generator._create_kick_pattern(genre, pattern_type)
elif instrument.lower() in ['snare', 'sd', 'clap']:
notes = generator._create_clap_pattern(genre, pattern_type)
elif instrument.lower() in ['hat', 'hihat', 'hh']:
notes = generator._create_hat_pattern(genre, pattern_type)
elif instrument.lower() in ['perc', 'percussion']:
notes = generator._create_perc_pattern(genre, pattern_type)
elif instrument.lower() == 'bass':
notes = generator.create_bassline(key, pattern_type, length)
elif instrument.lower() in ['chords', 'chord', 'pads']:
notes = generator.create_chord_progression(key, genre, length)
elif instrument.lower() in ['lead', 'melody', 'synth']:
notes = generator.create_melody(key, 'minor', length, genre)
else:
return f"Instrumento '{instrument}' no reconocido"
# Aplicar color al track
if color:
ableton.send_command("set_track_color", {
"track_index": track_index,
"color": color
})
# Agregar notas
notes_response = ableton.send_command("add_notes_to_clip", {
"track_index": track_index,
"clip_index": clip_index,
"notes": notes
})
if notes_response.get("status") == "success":
return f"Patrón '{pattern_type}' para {instrument} creado en track {track_index}, clip {clip_index} ({len(notes)} notas)"
else:
return f"Error agregando notas: {notes_response.get('message')}"
except Exception as e:
logger.exception("Error en create_pattern")
return f"Error: {str(e)}"
@mcp.tool()
def control_transport(
ctx: Context,
action: str,
tempo: float = None
) -> str:
"""
Controla el transporte de Ableton (play, stop, tempo)
Args:
action: Acción a ejecutar (play, stop, continue, toggle, set_tempo)
tempo: BPM a establecer (solo para action='set_tempo')
Returns:
Confirmación de la acción
"""
try:
ableton = get_ableton_connection()
if not ableton.connect():
return "Error: No conectado a Ableton"
action = action.lower()
if action == "play":
response = ableton.send_command("start_playback")
if response.get("status") == "success":
return "Reproducción iniciada"
elif action == "stop":
response = ableton.send_command("stop_playback")
if response.get("status") == "success":
return "Reproducción detenida"
elif action == "continue":
response = ableton.send_command("continue_playback")
if response.get("status") == "success":
return "Reproducción continuada"
elif action in ["set_tempo", "tempo", "bpm"]:
if tempo is None or tempo <= 0:
return "Error: Debes especificar un tempo válido"
response = ableton.send_command("set_tempo", {"tempo": tempo})
if response.get("status") == "success":
return f"Tempo establecido a {tempo} BPM"
elif action == "get_tempo":
response = ableton.send_command("get_session_info")
if response.get("status") == "success":
return f"Tempo actual: {response.get('result', {}).get('tempo', 'desconocido')} BPM"
else:
return f"Acción '{action}' no reconocida. Usa: play, stop, continue, set_tempo"
return f"Error: {response.get('message', 'Error desconocido')}"
except Exception as e:
logger.exception("Error en control_transport")
return f"Error: {str(e)}"
@mcp.tool()
def get_session_info(ctx: Context) -> str:
"""
Obtiene información completa de la sesión actual de Ableton
Returns:
JSON con información de la sesión (tempo, tracks, estado de reproducción)
"""
try:
ableton = get_ableton_connection()
if not ableton.connect():
return f"Error: No conectado a Ableton en {ABLETON_HOST}:{DEFAULT_ABLETON_PORT}"
response = ableton.send_command("get_session_info")
if response.get("status") == "success":
result = response.get("result", {})
info_lines = [
"Información de la sesión:",
f" Tempo: {result.get('tempo', 'N/A')} BPM",
f" Reproduciendo: {'' if result.get('is_playing') else 'No'}",
f" Tracks: {result.get('num_tracks', 'N/A')}",
]
if 'current_song_time' in result:
info_lines.append(f" Tiempo: {result.get('current_song_time')} beats")
return "\n".join(info_lines)
else:
return f"Error: {response.get('message', 'Error desconocido')}"
except Exception as e:
logger.exception("Error en get_session_info")
return f"Error: {str(e)}"
# ============================================================================
# HERRAMIENTAS MCP - GESTIÓN DE SAMPLES
# ============================================================================
@mcp.tool()
def search_samples(
ctx: Context,
query: str,
category: str = "",
limit: int = 10
) -> str:
"""
Busca samples en la librería local
Args:
query: Término de búsqueda (e.g., "kick", "bass", "hat")
category: Categoría (kick, snare, hat, bass, synth, percussion, vocal)
limit: Número máximo de resultados
Returns:
Lista de samples encontrados
"""
try:
sample_index = get_sample_index()
if not sample_index:
return "Error: Índice de samples no disponible"
results = sample_index.search(query, category, limit)
if not results:
return f"No se encontraron samples para '{query}'"
output = [f"Samples encontrados para '{query}':\n"]
for i, sample in enumerate(results, 1):
output.append(f"{i}. {sample['name']} ({sample['category']})")
output.append(f" Path: {sample['path']}")
if sample.get('key'):
output.append(f" Key: {sample['key']}, BPM: {sample.get('bpm', 'N/A')}")
output.append("")
return "\n".join(output)
except Exception as e:
logger.exception("Error en search_samples")
return f"Error: {str(e)}"
@mcp.tool()
def get_random_sample(
ctx: Context,
category: str = ""
) -> str:
"""
Obtiene un sample aleatorio de la librería
Args:
category: Categoría opcional para filtrar
Returns:
Información del sample seleccionado
"""
try:
sample_index = get_sample_index()
if not sample_index:
return "Error: Índice de samples no disponible"
sample = sample_index.get_random_sample(category)
if not sample:
return f"No hay samples disponibles{' en categoría ' + category if category else ''}"
return f"""Sample aleatorio seleccionado:
Nombre: {sample['name']}
Categoría: {sample['category']}
Path: {sample['path']}
Key: {sample.get('key', 'N/A')}
BPM: {sample.get('bpm', 'N/A')}"""
except Exception as e:
logger.exception("Error en get_random_sample")
return f"Error: {str(e)}"
@mcp.tool()
def send_sample_to_max(
ctx: Context,
sample_path: str,
track_index: int = 0,
slot: int = 0
) -> str:
"""
Envía una ruta de sample a Max for Live para cargar
Args:
sample_path: Ruta completa del archivo de audio
track_index: Índice del track donde cargar
slot: Slot/clip donde cargar el sample
Returns:
Confirmación del envío
"""
try:
max_conn = get_max_connection()
if max_conn.send_sample_path(track_index, sample_path, slot):
return f"Sample enviado a Max: {Path(sample_path).name} -> Track {track_index}, Slot {slot}"
else:
return "Error enviando sample a Max"
except Exception as e:
logger.exception("Error en send_sample_to_max")
return f"Error: {str(e)}"
@mcp.tool()
def refresh_sample_index(ctx: Context) -> str:
"""
Refresca el índice de samples escaneando el directorio nuevamente
Returns:
Confirmación con el número de samples encontrados
"""
try:
global _sample_index
if SampleIndex is None:
return "Error: Módulo SampleIndex no disponible"
_sample_index = SampleIndex(SAMPLES_DIR)
_sample_index.refresh()
return f"Índice refrescado: {len(_sample_index.samples)} samples encontrados"
except Exception as e:
logger.exception("Error en refresh_sample_index")
return f"Error: {str(e)}"
# ============================================================================
# HERRAMIENTAS MCP - CREACIÓN AVANZADA
# ============================================================================
@mcp.tool()
def create_drum_pattern(
ctx: Context,
track_index: int,
clip_index: int,
style: str = "techno",
pattern_type: str = "full",
length: float = 4.0
) -> str:
"""
Crea un patrón de batería completo
Args:
track_index: Índice del track MIDI donde crear el patrón
clip_index: Índice del clip/slot
style: Estilo (techno, house, trance, minimal)
pattern_type: Tipo de patrón (full, kick-only, hats-only, minimal)
length: Duración en beats
Returns:
Confirmación del patrón creado
"""
try:
generator = get_song_generator()
if not generator:
return "Error: Generador no disponible"
ableton = get_ableton_connection()
if not ableton.connect():
return "Error: No conectado a Ableton"
notes = generator.create_drum_pattern(style, pattern_type, length)
# Crear clip
clip_response = ableton.send_command("create_clip", {
"track_index": track_index,
"clip_index": clip_index,
"length": length
})
if clip_response.get("status") != "success":
return f"Error creando clip: {clip_response.get('message')}"
# Agregar notas
notes_response = ableton.send_command("add_notes_to_clip", {
"track_index": track_index,
"clip_index": clip_index,
"notes": notes
})
if notes_response.get("status") == "success":
return f"Patrón de batería '{style}' creado ({len(notes)} notas)"
else:
return f"Error agregando notas: {notes_response.get('message')}"
except Exception as e:
logger.exception("Error en create_drum_pattern")
return f"Error: {str(e)}"
@mcp.tool()
def create_bassline(
ctx: Context,
track_index: int,
clip_index: int,
key: str,
style: str = "rolling",
length: float = 16.0
) -> str:
"""
Crea una línea de bajo musical
Args:
track_index: Índice del track MIDI
clip_index: Índice del clip
key: Tonalidad (e.g., "Am", "F#m", "C")
style: Estilo (rolling, minimal, acid, walking, offbeat)
length: Duración en beats
Returns:
Confirmación del bassline creado
"""
try:
generator = get_song_generator()
if not generator:
return "Error: Generador no disponible"
ableton = get_ableton_connection()
if not ableton.connect():
return "Error: No conectado a Ableton"
notes = generator.create_bassline(key, style, length)
# Crear clip
clip_response = ableton.send_command("create_clip", {
"track_index": track_index,
"clip_index": clip_index,
"length": length
})
if clip_response.get("status") != "success":
return f"Error creando clip: {clip_response.get('message')}"
# Agregar notas
notes_response = ableton.send_command("add_notes_to_clip", {
"track_index": track_index,
"clip_index": clip_index,
"notes": notes
})
if notes_response.get("status") == "success":
return f"Bassline '{style}' en {key} creado ({len(notes)} notas)"
else:
return f"Error agregando notas: {notes_response.get('message')}"
except Exception as e:
logger.exception("Error en create_bassline")
return f"Error: {str(e)}"
@mcp.tool()
def create_chord_progression(
ctx: Context,
track_index: int,
clip_index: int,
key: str,
progression_type: str = "techno",
length: float = 16.0
) -> str:
"""
Crea una progresión de acordes
Args:
track_index: Índice del track MIDI
clip_index: Índice del clip
key: Tonalidad (e.g., "Am", "F#m", "C")
progression_type: Tipo (techno, house, deep, minor)
length: Duración en beats (usualmente 16 = 4 compases)
Returns:
Confirmación de la progresión creada
"""
try:
generator = get_song_generator()
if not generator:
return "Error: Generador no disponible"
ableton = get_ableton_connection()
if not ableton.connect():
return "Error: No conectado a Ableton"
notes = generator.create_chord_progression(key, progression_type, length)
# Crear clip
clip_response = ableton.send_command("create_clip", {
"track_index": track_index,
"clip_index": clip_index,
"length": length
})
if clip_response.get("status") != "success":
return f"Error creando clip: {clip_response.get('message')}"
# Agregar notas
notes_response = ableton.send_command("add_notes_to_clip", {
"track_index": track_index,
"clip_index": clip_index,
"notes": notes
})
if notes_response.get("status") == "success":
return f"Progresión '{progression_type}' en {key} creada ({len(notes)} notas)"
else:
return f"Error agregando notas: {notes_response.get('message')}"
except Exception as e:
logger.exception("Error en create_chord_progression")
return f"Error: {str(e)}"
# ============================================================================
# HERRAMIENTAS MCP - GESTIÓN DE TRACKS Y CLIPS
# ============================================================================
@mcp.tool()
def create_midi_track(
ctx: Context,
name: str = "MIDI Track",
color: int = None
) -> str:
"""
Crea un nuevo track MIDI
Args:
name: Nombre del track
color: Color del track (0-69, opcional)
Returns:
Confirmación con el índice del track creado
"""
try:
ableton = get_ableton_connection()
if not ableton.connect():
return "Error: No conectado a Ableton"
response = ableton.send_command("create_midi_track", {"index": -1})
if response.get("status") == "success":
track_index = response.get("result", {}).get("index", 0)
# Setear nombre
ableton.send_command("set_track_name", {
"track_index": track_index,
"name": name
})
# Setear color si se especificó
if color is not None:
ableton.send_command("set_track_color", {
"track_index": track_index,
"color": color
})
return f"Track MIDI '{name}' creado en índice {track_index}"
else:
return f"Error: {response.get('message')}"
except Exception as e:
logger.exception("Error en create_midi_track")
return f"Error: {str(e)}"
@mcp.tool()
def create_audio_track(
ctx: Context,
name: str = "Audio Track",
color: int = None
) -> str:
"""
Crea un nuevo track de audio
Args:
name: Nombre del track
color: Color del track (0-69, opcional)
Returns:
Confirmación con el índice del track creado
"""
try:
ableton = get_ableton_connection()
if not ableton.connect():
return "Error: No conectado a Ableton"
response = ableton.send_command("create_audio_track", {"index": -1})
if response.get("status") == "success":
track_index = response.get("result", {}).get("index", 0)
# Setear nombre
ableton.send_command("set_track_name", {
"track_index": track_index,
"name": name
})
# Setear color si se especificó
if color is not None:
ableton.send_command("set_track_color", {
"track_index": track_index,
"color": color
})
return f"Track de audio '{name}' creado en índice {track_index}"
else:
return f"Error: {response.get('message')}"
except Exception as e:
logger.exception("Error en create_audio_track")
return f"Error: {str(e)}"
@mcp.tool()
def set_track_volume(
ctx: Context,
track_index: int,
volume: float
) -> str:
"""
Ajusta el volumen de un track (0.0 - 1.0)
Args:
track_index: Índice del track
volume: Volumen entre 0.0 y 1.0
Returns:
Confirmación del cambio
"""
try:
ableton = get_ableton_connection()
if not ableton.connect():
return "Error: No conectado a Ableton"
response = ableton.send_command("set_track_volume", {
"track_index": track_index,
"volume": volume
})
if response.get("status") == "success":
return f"Volumen del track {track_index} ajustado a {volume:.2f}"
else:
return f"Error: {response.get('message')}"
except Exception as e:
logger.exception("Error en set_track_volume")
return f"Error: {str(e)}"
@mcp.tool()
def fire_clip(
ctx: Context,
track_index: int,
clip_index: int
) -> str:
"""
Dispara/reproduce un clip específico
Args:
track_index: Índice del track
clip_index: Índice del clip/slot
Returns:
Confirmación
"""
try:
ableton = get_ableton_connection()
if not ableton.connect():
return "Error: No conectado a Ableton"
response = ableton.send_command("fire_clip", {
"track_index": track_index,
"clip_index": clip_index
})
if response.get("status") == "success":
return f"Clip en track {track_index}, slot {clip_index} disparado"
else:
return f"Error: {response.get('message')}"
except Exception as e:
logger.exception("Error en fire_clip")
return f"Error: {str(e)}"
@mcp.tool()
def fire_scene(
ctx: Context,
scene_index: int
) -> str:
"""
Dispara una scene (todos sus clips)
Args:
scene_index: Índice de la scene
Returns:
Confirmación
"""
try:
ableton = get_ableton_connection()
if not ableton.connect():
return "Error: No conectado a Ableton"
response = ableton.send_command("fire_scene", {
"scene_index": scene_index
})
if response.get("status") == "success":
return f"Scene {scene_index} disparada"
else:
return f"Error: {response.get('message')}"
except Exception as e:
logger.exception("Error en fire_scene")
return f"Error: {str(e)}"
# ============================================================================
# HERRAMIENTAS MCP - UTILIDADES
# ============================================================================
@mcp.tool()
def get_available_samples(ctx: Context) -> str:
"""
Obtiene un resumen de los samples disponibles en la librería
Returns:
Resumen por categorías
"""
try:
sample_index = get_sample_index()
if not sample_index:
return "Error: Índice de samples no disponible"
categories = {}
for sample in sample_index.samples:
cat = sample['category']
categories[cat] = categories.get(cat, 0) + 1
lines = ["Samples disponibles:", ""]
for cat, count in sorted(categories.items(), key=lambda x: -x[1]):
lines.append(f" {cat}: {count}")
lines.append("")
lines.append(f"Total: {len(sample_index.samples)} samples")
return "\n".join(lines)
except Exception as e:
logger.exception("Error en get_available_samples")
return f"Error: {str(e)}"
@mcp.tool()
def test_connections(ctx: Context) -> str:
"""
Prueba las conexiones con Ableton y Max
Returns:
Estado de las conexiones
"""
results = []
# Probar Ableton
try:
ableton = get_ableton_connection()
if ableton.connect(timeout=3.0):
results.append("Ableton: Conectado")
# Probar comando simple
resp = ableton.send_command("get_session_info")
if resp.get("status") == "success":
results.append(f" - Tempo: {resp.get('result', {}).get('tempo')} BPM")
results.append(f" - Tracks: {resp.get('result', {}).get('num_tracks')}")
else:
results.append(f"Ableton: No conectado ({ableton.last_error})")
except Exception as e:
results.append(f"Ableton: Error - {e}")
# Probar Max
try:
max_conn = get_max_connection()
if max_conn.send_message({"type": "ping", "timestamp": datetime.now().isoformat()}):
results.append(f"Max for Live: Conexión UDP lista en puerto {DEFAULT_MAX_PORT}")
else:
results.append("Max for Live: Error enviando mensaje")
except Exception as e:
results.append(f"Max for Live: Error - {e}")
# Probar Samples
try:
sample_index = get_sample_index()
if sample_index:
results.append(f"Samples: {len(sample_index.samples)} samples indexados")
else:
results.append("Samples: Índice no disponible")
except Exception as e:
results.append(f"Samples: Error - {e}")
return "\n".join(results)
# ============================================================================
# MAIN
# ============================================================================
def main():
"""Punto de entrada principal"""
import argparse
parser = argparse.ArgumentParser(description="AbletonMCP-AI Server v2")
parser.add_argument("--port", type=int, default=0, help="Puerto para el servidor MCP (0 = auto)")
parser.add_argument("--transport", type=str, default="stdio",
choices=["stdio", "sse"], help="Transporte MCP")
parser.add_argument("--test", action="store_true", help="Probar conexiones y salir")
args = parser.parse_args()
print("=" * 60)
print("AbletonMCP-AI Server v2")
print("=" * 60)
print(f"Transporte: {args.transport}")
print(f"Ableton: {ABLETON_HOST}:{DEFAULT_ABLETON_PORT}")
print(f"Max UDP: {MAX_HOST}:{DEFAULT_MAX_PORT}")
print(f"Samples: {SAMPLES_DIR}")
print("-" * 60)
if args.test:
print("\nProbando conexiones...")
# Crear contexto temporal para test
ctx = Context(request_context={})
result = test_connections(ctx)
print(result)
return
# Iniciar servidor MCP
mcp.run(transport=args.transport)
if __name__ == "__main__":
main()