diff --git a/AbletonMCP_AI/docs/SYSTEM_SCORE_RENDER.md b/AbletonMCP_AI/docs/SYSTEM_SCORE_RENDER.md new file mode 100644 index 0000000..e1cf743 --- /dev/null +++ b/AbletonMCP_AI/docs/SYSTEM_SCORE_RENDER.md @@ -0,0 +1,90 @@ +# System: Score → Render Pipeline (Sprint 9) + +Effective: 2026-04-14 +Primary Workflow: **Compose-then-Render** +Target View: **Session View** + +## Overview + +The Score → Render pipeline introduces a decoupled architecture where musical composition is separated from Ableton Live execution. This allows for: +1. **Incremental Composition**: Build a song piece-by-piece in a JSON score. +2. **Offline Generation**: Use AI agents (OpenAI/Anthropic) to generate scores without needing Ableton open. +3. **Batch Rendering**: Render 50+ unique songs sequentially from JSON files. +4. **Deterministic Deployment**: Entire song structures are injected into Session View in one atomic call. + +--- + +## Core Components + +### 1. SongScore (`score_engine.py`) +A pure Python data model representing a song. No Ableton dependencies. +- **Meta**: Title, Tempo, Key, Gap Bars. +- **Structure**: Ordered list of sections (Intro, Chorus, etc.) with durations. +- **Tracks**: List of track definitions (Audio or MIDI). +- **Clips**: Clips mapped to specific sections. +- **Mixer**: Volume, Pan, EQ/Compressor presets, Return Sends. + +### 2. ScoreRenderer (`score_renderer.py`) +Translates `SongScore` into TCP commands for Ableton Live. +- **Mapping**: Sections → Scenes | Tracks → Tracks | Clips → Clip Slots. +- **Sample Selection**: Resolver for `"auto"` samples based on BPM proximity. +- **MIDI Resolution**: Resolves pattern names (e.g., `dembow_standard`) into explicit MIDI notes before sending. +- **Mixer Application**: Configures devices (EQ Eight, Compressor) and sends. + +### 3. AI Loop (`ai_loop.py`) +An autonomous production script compatible with Anthropic/OpenRouter/Local LLMs. +- Queries AI for valid `SongScore` JSON. +- Validates and saves to `mcp_server/scores/`. +- Optionally renders immediately to Ableton. + +--- + +## Technical Mapping (Session View) + +The system is strictly Session-View only to avoid Arrangement complexity and allow clip-based performance. + +| SongScore Element | Ableton Element | Command Used | +|-------------------|-----------------|--------------| +| `SectionDef` | **Scene** | `create_scene`, `set_scene_name` | +| `TrackDef` | **Track** | `create_audio_track`, `create_midi_track` | +| `ClipDef` (Audio) | **Clip Slot** | `load_sample_to_clip` | +| `ClipDef` (MIDI) | **Clip Slot** | `create_clip`, `add_notes_to_clip` | +| `MixerDef` | **Devices** | `configure_eq`, `configure_compressor`, `set_track_send` | + +--- + +## Available Tools (MCP) + +### Composer Tools +- `new_score`: Initialize active score. +- `compose_structure`: Define sections and durations. +- `compose_audio_track`: Add audio tracks with sample references. +- `compose_midi_track`: Add MIDI tracks with instruments. +- `compose_pattern`: Apply predefined MIDI patterns (dembow, bass, etc.). +- `compose_mixer`: Set levels and FX presets. +- `compose_from_template`: Create full score from "reggaeton_basic", etc. + +### Management & Rendering +- `save_score` / `load_score`: Persist JSON to `mcp_server/scores/`. +- `list_scores`: List all saved canciones. +- `render_score`: Inject active score into Ableton. +- `render_score_from_file`: Render a specific JSON file. +- `render_all_scores`: Sequentially render everything in the scores folder. + +--- + +## MIDI Patterns Reference + +The following patterns can be used in `compose_midi_track` or `compose_pattern`: + +- **Drums**: `dembow_minimal`, `dembow_standard`, `dembow_double`. +- **Bass**: `bass_sub`, `bass_pluck`, `bass_octaves`, `bass_sustained`. +- **Harmony**: `chords_verse`, `chords_chorus`. +- **Melody**: `melody_simple`. + +## Best Practices for AI Agents + +1. **Always start with a Template**: Use `compose_from_template` first, then modify. +2. **Use "auto" samples**: Let the renderer pick the best file matching the BPM. +3. **Validate before Render**: Use `compose_validate` to catch ID mismatches. +4. **Iterate in JSON**: It's faster to tweak the JSON score via compose tools than to re-render everything. diff --git a/AbletonMCP_AI/mcp_server/ai_loop.py b/AbletonMCP_AI/mcp_server/ai_loop.py new file mode 100644 index 0000000..153e70f --- /dev/null +++ b/AbletonMCP_AI/mcp_server/ai_loop.py @@ -0,0 +1,377 @@ +""" +ai_loop.py — Autonomous music production loop using an Anthropic-compatible AI. + +The loop: + 1. Calls an Anthropic-compatible endpoint to generate a SongScore JSON + 2. Validates and saves the score to scores/ + 3. Optionally renders it into Ableton Live + +Configuration (environment variables OR command-line args): + AI_BASE_URL → API base URL (default: https://api.anthropic.com) + AI_API_KEY → API key (required) + AI_MODEL → model name (default: GLM-5-Turbo) + AI_MAX_TOKENS → max output tokens (default: 4096) + RENDER_AFTER → "1" to auto-render each score in Ableton (default: 0) + LOOP_COUNT → how many songs to produce (default: 10, 0 = infinite) + LOOP_DELAY → seconds between generations (default: 5) + LIB_ROOT → path to libreria/reggaeton (auto-detected) + +Usage examples: + # OpenRouter with Claude Haiku + AI_BASE_URL=https://openrouter.ai/api/v1 AI_API_KEY=sk-xxx python ai_loop.py + + # Local LM Studio (Anthropic-compatible) + AI_BASE_URL=http://localhost:1234/v1 AI_API_KEY=sk-any python ai_loop.py --count 5 + + # Real Anthropic + auto-render + AI_API_KEY=sk-ant-xxx RENDER_AFTER=1 python ai_loop.py +""" + +import argparse +import json +import logging +import os +import sys +import time +from datetime import datetime +from pathlib import Path + +_THIS_DIR = Path(__file__).resolve().parent +_PROJ_DIR = _THIS_DIR.parent +_BASE_DIR = _PROJ_DIR.parent + +for _p in (str(_THIS_DIR), str(_PROJ_DIR)): + if _p not in sys.path: + sys.path.insert(0, _p) + +from score_engine import SongScore, SCORES_DIR +from score_renderer import ScoreRenderer + +logging.basicConfig( + level = logging.INFO, + format = "%(asctime)s [ai_loop] %(levelname)s: %(message)s", +) +log = logging.getLogger("ai_loop") + +_DEFAULT_LIB_ROOT = str(_BASE_DIR / "libreria" / "reggaeton") + +SYSTEM_PROMPT = """\ +You are a professional reggaeton and Latin urban music producer AI. +Your ONLY job is to output a valid SongScore JSON object for each request. +Do NOT include any explanation, markdown code fences, or commentary. +Output ONLY raw JSON that starts with { and ends with }. + +SongScore schema: +{ + "meta": { + "title": "", + "tempo": <85-105>, + "key": "", + "genre": "reggaeton", + "time_signature": "4/4", + "gap_bars": <1.0-4.0> + }, + "structure": [ + { "name": "
", "duration_bars": }, + ... + ], + "tracks": [ + { + "id": "", + "name": "", + "type": "", + + "clips": [ + { "section": "
", "sample": "kick/auto", "loop": true } + ], + + "instrument": "", + + "mixer": { "volume": <0-1>, "pan": <-1 to 1>, "eq_preset": "" } + } + ] +} + +Available sample subfolders — use EXACTLY these values in the "sample" field: + "kick/auto" -> Kick drums + "snare/auto" -> Snares + "hi-hat (para percs normalmente)/auto" -> Hi-hat / percussion + "drumloops/auto" -> Drum loops + "perc loop/auto" -> Percussion loops + "bass/auto" -> Bass samples + "fx/auto" -> FX/transitions + +IMPORTANT: "auto" is a keyword that means "pick the best sample automatically". +Do NOT write "subfolder/auto" literally — that is an instruction, not a valid path. + +Available MIDI patterns: + dembow_minimal dembow_standard dembow_double + bass_sub bass_pluck bass_octaves bass_sustained + chords_verse chords_chorus melody_simple + +Available EQ presets: kick snare bass synth master +compression_preset is accepted but currently ignored (reserved for future use). + +Rules: +- Every track MUST have at least one clip. +- Every clip MUST reference a valid section name from the structure array. +- Always include at minimum: kick, snare or drum_loop, dembow, bass tracks. +- Vary everything: title, tempo, key, gap_bars, structure length (40-90 total bars). +- Use realistic reggaeton/latin structures (Intro, Verse, Pre-Chorus, Chorus, Bridge, Outro). +- Mix audio and MIDI tracks creatively. +- Section names MUST be unique. Use numbered suffixes: "Intro", "Verse A", "Pre-Chorus", "Chorus A", "Verse B", "Chorus B", "Bridge", "Outro". NEVER repeat a section name. +- Do NOT include "start_bar" in sections. The engine calculates it automatically from duration_bars and gap_bars. +- Output ONLY the JSON object. Nothing else. +""" + +USER_PROMPT_TEMPLATE = """\ +Generate song number {index} of {total}. +Make it unique. Use creative Spanish/English titles. +Output only the SongScore JSON. +""" + + +def _build_client(base_url: str, api_key: str): + try: + import anthropic + except ImportError: + log.error("anthropic package not installed. Run: pip install anthropic") + sys.exit(1) + + kwargs = {"api_key": api_key} + if base_url and "anthropic.com" not in base_url: + kwargs["base_url"] = base_url + + return anthropic.Anthropic(**kwargs) + + +def _generate_score(client, model: str, max_tokens: int, + index: int, total: int) -> str: + user_prompt = USER_PROMPT_TEMPLATE.format(index=index, total=total) + + message = client.messages.create( + model = model, + max_tokens = max_tokens, + system = SYSTEM_PROMPT, + messages = [{"role": "user", "content": user_prompt}], + ) + + content = message.content + if isinstance(content, list): + text_blocks = [b.text for b in content if hasattr(b, "text")] + return "\n".join(text_blocks).strip() + return str(content).strip() + + +def _fix_brackets(text: str) -> str: + """Fix common LLM bracket mistakes: } where ] is needed, missing }, etc.""" + import re + # GLM-5-Turbo sometimes closes "structure": [...] with } instead of ] + # Pattern: },\n "tracks" -> ],\n "tracks" + text = re.sub(r'\},(\s*\n\s*)"tracks"', r'],\1"tracks"', text, count=1) + # Also: }\n] (array of objects closed with } then ]) -> }\n] + text = re.sub(r'\}\s*\]', '}\n]', text) + # Trailing comma before closing bracket + text = re.sub(r',(\s*\})', r'\1', text) + text = re.sub(r',(\s*\])', r'\1', text) + return text + + +def _parse_score(raw: str, index: int) -> SongScore: + import re + + raw = raw.strip() + if raw.startswith("```"): + lines = raw.split("\n") + raw = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) + + start = raw.find("{") + end = raw.rfind("}") + 1 + if start < 0 or end <= start: + raise ValueError("No JSON object found in AI response") + raw = raw[start:end] + + # Attempt 1: direct parse + try: + data = json.loads(raw) + return SongScore.from_dict(data) + except json.JSONDecodeError: + pass + + # Attempt 2: fix common bracket errors from LLMs + fixed = _fix_brackets(raw) + try: + data = json.loads(fixed) + log.info("JSON bracket fix succeeded on attempt 2") + return SongScore.from_dict(data) + except json.JSONDecodeError: + pass + + # Attempt 3: remove // comments + trailing commas + bracket fix + cleaned = re.sub(r'//.*$', '', fixed, flags=re.MULTILINE) + cleaned = re.sub(r',(\s*\})', r'\1', cleaned) + cleaned = re.sub(r',(\s*\])', r'\1', cleaned) + + try: + data = json.loads(cleaned) + log.info("JSON cleaned successfully on attempt 3") + return SongScore.from_dict(data) + except json.JSONDecodeError as exc: + # Attempt 4: brute-force close unclosed brackets + open_b = cleaned.count('{') - cleaned.count('}') + open_br = cleaned.count('[') - cleaned.count(']') + if open_b > 0 or open_br > 0: + repaired = cleaned.rstrip().rstrip(',') + repaired += ']' * max(0, open_br) + repaired += '}' * max(0, open_b) + try: + data = json.loads(repaired) + log.info("JSON repaired (bracket closure) on attempt 4") + return SongScore.from_dict(data) + except json.JSONDecodeError as exc4: + pass + + raise ValueError( + "JSON parse failed after all attempts: %s\nLast output:\n%s" + % (exc, cleaned[:800]) + ) + + +def run_loop( + base_url: str, + api_key: str, + model: str, + max_tokens: int, + count: int, + delay: float, + render: bool, + lib_root: str, + output_prefix: str = "ai_song", + dry_run: bool = False, +): + client = _build_client(base_url, api_key) + renderer = ScoreRenderer(lib_root) if (render and not dry_run) else None + total = count if count > 0 else "inf" + + log.info("Starting AI production loop — model=%s count=%s render=%s", + model, total, render) + log.info("Scores will be saved to: %s", SCORES_DIR) + if render: + log.info("Library root: %s", lib_root) + if dry_run: + log.info("DRY RUN — Ableton will NOT be touched") + + produced = 0 + iteration = 0 + + while True: + iteration += 1 + if count > 0 and produced >= count: + break + + log.info("Generating song %d / %s", iteration, total) + + try: + raw_json = _generate_score(client, model, max_tokens, iteration, count or 999) + log.debug("Raw AI output:\n%s", raw_json[:500]) + + score = _parse_score(raw_json, iteration) + + warnings = score.validate() + if warnings: + log.warning("Validation warnings: %s", warnings) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = "%s_%03d_%s.json" % (output_prefix, iteration, timestamp) + saved_path = SCORES_DIR / filename + score.save(saved_path) + log.info("Saved: %s (%d tracks, %.0f bars)", + filename, len(score.tracks), score.total_bars()) + + if renderer: + log.info("Rendering into Ableton...") + result = renderer.render(score, clear_first=True) + if result.get("success"): + log.info("Rendered OK tracks=%d clips=%d bars=%.0f", + len(result["tracks_created"]), + result["clips_created"], + score.total_bars()) + else: + log.warning("Render completed with errors:") + for err in result.get("errors", []): + log.warning(" - %s", err) + + produced += 1 + + except KeyboardInterrupt: + log.info("Loop interrupted by user. %d songs produced.", produced) + break + except json.JSONDecodeError as exc: + log.error("JSON parse error on iteration %d: %s", iteration, exc) + except Exception as exc: + log.exception("Unexpected error on iteration %d: %s", iteration, exc) + + if count == 0 or produced < count: + if delay > 0: + log.info("Waiting %.0fs before next generation...", delay) + time.sleep(delay) + + log.info("Loop complete. %d songs produced and saved to %s", produced, SCORES_DIR) + + +def main(): + parser = argparse.ArgumentParser( + description="Autonomous AI music production loop (Anthropic-compatible)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument("--base-url", default=os.environ.get("AI_BASE_URL", "https://api.anthropic.com")) + parser.add_argument("--api-key", default=os.environ.get("AI_API_KEY", "")) + parser.add_argument("--model", default=os.environ.get("AI_MODEL", "GLM-5-Turbo")) + parser.add_argument("--max-tokens",default=int(os.environ.get("AI_MAX_TOKENS", "4096")), type=int) + parser.add_argument("--count", default=int(os.environ.get("LOOP_COUNT", "10")), type=int, + help="Songs to produce (0 = infinite)") + parser.add_argument("--delay", default=float(os.environ.get("LOOP_DELAY", "5")), type=float, + help="Seconds between generations") + parser.add_argument("--render", action="store_true", + default=os.environ.get("RENDER_AFTER", "0") == "1", + help="Render each score into Ableton immediately") + parser.add_argument("--lib-root", default=os.environ.get("LIB_ROOT", _DEFAULT_LIB_ROOT)) + parser.add_argument("--prefix", default="ai_song", + help="Filename prefix for saved scores") + parser.add_argument("--dry-run", action="store_true", + help="Generate + validate + save but do NOT call Ableton") + parser.add_argument("--list", action="store_true", + help="List saved scores and exit") + + args = parser.parse_args() + + if args.list: + scores = sorted(SCORES_DIR.glob("*.json")) + if not scores: + print("No scores saved yet.") + else: + for f in scores: + size = f.stat().st_size + print(" %s (%d bytes)" % (f.name, size)) + return + + if not args.api_key: + parser.error("API key required. Set --api-key or AI_API_KEY env variable.") + + run_loop( + base_url = args.base_url, + api_key = args.api_key, + model = args.model, + max_tokens = args.max_tokens, + count = args.count, + delay = args.delay, + render = args.render, + lib_root = args.lib_root, + output_prefix = args.prefix, + dry_run = args.dry_run, + ) + + +if __name__ == "__main__": + main() diff --git a/AbletonMCP_AI/mcp_server/score_engine.py b/AbletonMCP_AI/mcp_server/score_engine.py new file mode 100644 index 0000000..3b2ad86 --- /dev/null +++ b/AbletonMCP_AI/mcp_server/score_engine.py @@ -0,0 +1,780 @@ +""" +score_engine.py — SongScore data model, templates and in-memory singleton. + +Pure Python — zero dependencies on Ableton, MCP, or any audio library. +This module is designed to be importable from anywhere: server.py, ai_loop.py, +test scripts, etc. + +SongScore JSON schema: +{ + "meta": { "title", "tempo", "key", "genre", "time_signature", "gap_bars", "version" }, + "structure": [ { "name", "start_bar", "duration_bars" } ], + "tracks": [ + { + "id", "name", "type", # type = "audio" | "midi" + "instrument", # only for MIDI tracks (e.g. "Wavetable") + "clips": [ + { + "section", # section name → resolves start_bar automatically + "start_bar", # OR explicit start position (in bars) + "duration_bars", + "sample", # audio only e.g. "kick/auto" or "kick/kick1.wav" + "pattern", # MIDI only e.g. "dembow_standard" + "notes", # MIDI only explicit note list (overrides pattern) + "loop", "warp" # audio flags + } + ], + "mixer": { "volume","pan","eq_preset","compression_preset","send_reverb","send_delay" } + } + ] +} +""" + +import json +import os +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +# Scores directory (created automatically) +SCORES_DIR = Path(__file__).parent / "scores" +SCORES_DIR.mkdir(exist_ok=True) + +# Valid MIDI pattern names (used by sanitization) +_VALID_PATTERNS_SET = { + "dembow_minimal", "dembow_standard", "dembow_double", + "bass_sub", "bass_pluck", "bass_octaves", "bass_sustained", + "chords_verse", "chords_chorus", "melody_simple", +} + +# In-memory singleton (one active score per MCP server process) +_current_score: Optional["SongScore"] = None + + +# ================================================================== +# Data classes +# ================================================================== + +class MixerDef: + __slots__ = ("volume", "pan", "eq_preset", "compression_preset", + "send_reverb", "send_delay") + + def __init__(self, volume: float = 0.75, pan: float = 0.0, + eq_preset: str = None, compression_preset: str = None, + send_reverb: float = 0.0, send_delay: float = 0.0): + self.volume = float(volume) + self.pan = float(pan) + self.eq_preset = eq_preset + self.compression_preset = compression_preset + self.send_reverb = float(send_reverb) + self.send_delay = float(send_delay) + + def to_dict(self) -> Dict: + d: Dict[str, Any] = {"volume": self.volume, "pan": self.pan} + if self.eq_preset: + d["eq_preset"] = self.eq_preset + if self.compression_preset: + d["compression_preset"] = self.compression_preset + if self.send_reverb: + d["send_reverb"] = self.send_reverb + if self.send_delay: + d["send_delay"] = self.send_delay + return d + + @classmethod + def from_dict(cls, d: Dict) -> "MixerDef": + return cls( + volume=d.get("volume", 0.75), + pan=d.get("pan", 0.0), + eq_preset=d.get("eq_preset"), + compression_preset=d.get("compression_preset"), + send_reverb=d.get("send_reverb", 0.0), + send_delay=d.get("send_delay", 0.0), + ) + + +class ClipDef: + """Represents a single clip inside a track.""" + + def __init__(self, start_bar: float = 0.0, duration_bars: float = 4.0, + clip_type: str = "audio", sample: str = None, + pattern: str = None, notes: List[Dict] = None, + loop: bool = True, warp: bool = True, section: str = None, + name: str = None): + self.start_bar = float(start_bar) + self.duration_bars = float(duration_bars) + self.clip_type = clip_type # "audio" | "midi" + self.sample = sample # relative ref or "/abs/path.wav" + self.pattern = pattern # e.g. "dembow_standard" + self.notes = notes or [] # explicit MIDI notes + self.loop = bool(loop) + self.warp = bool(warp) + self.section = section # section name (informational) + self.name = name + + def to_dict(self) -> Dict: + d: Dict[str, Any] = { + "start_bar": self.start_bar, + "duration_bars": self.duration_bars, + } + if self.section: + d["section"] = self.section + if self.name: + d["name"] = self.name + if self.sample: + d["sample"] = self.sample + d["loop"] = self.loop + d["warp"] = self.warp + if self.pattern: + d["pattern"] = self.pattern + if self.notes: + d["notes"] = self.notes + return d + + @classmethod + def from_raw(cls, raw: Dict, structure: List[Dict] = None) -> "ClipDef": + """Build ClipDef from a raw dict, resolving section → start_bar if needed.""" + start_bar = raw.get("start_bar") + duration_bars = raw.get("duration_bars") + section_name = raw.get("section") + + if start_bar is None and section_name and structure: + for sec in structure: + if sec["name"] == section_name: + start_bar = sec["start_bar"] + if duration_bars is None: + duration_bars = sec["duration_bars"] + break + + if start_bar is None: + start_bar = 0.0 + if duration_bars is None: + duration_bars = 4.0 + + # Infer clip type from keys + clip_type = "audio" if raw.get("sample") else "midi" + + return cls( + start_bar = start_bar, + duration_bars = duration_bars, + clip_type = clip_type, + sample = raw.get("sample"), + pattern = raw.get("pattern"), + notes = raw.get("notes", []), + loop = raw.get("loop", True), + warp = raw.get("warp", True), + section = section_name, + name = raw.get("name"), + ) + + +class TrackDef: + """Represents a single track with all its clips.""" + + def __init__(self, track_id: str, name: str, track_type: str, + instrument: str = None, + clips: List[ClipDef] = None, + mixer: MixerDef = None): + self.id = track_id + self.name = name + self.type = track_type # "audio" | "midi" + self.instrument = instrument # "Wavetable", "Operator", etc. + self.clips = clips or [] + self.mixer = mixer or MixerDef() + + def to_dict(self) -> Dict: + d: Dict[str, Any] = { + "id": self.id, + "name": self.name, + "type": self.type, + "clips": [c.to_dict() for c in self.clips], + "mixer": self.mixer.to_dict(), + } + if self.instrument: + d["instrument"] = self.instrument + return d + + @classmethod + def from_raw(cls, raw: Dict, structure: List[Dict] = None) -> "TrackDef": + track_type = raw.get("type", "audio") + + # ── Phase 1: Auto-correct track type from ORIGINAL clip data (before coercion) ── + raw_clips = raw.get("clips", []) + orig_has_sample = any(c.get("sample") for c in raw_clips) + orig_has_pattern = any(c.get("pattern") for c in raw_clips) + orig_has_notes = any(c.get("notes") for c in raw_clips) + orig_has_midi = orig_has_pattern or orig_has_notes + + if track_type == "midi" and orig_has_sample and not orig_has_midi: + track_type = "audio" + elif track_type == "midi" and orig_has_sample and orig_has_midi: + all_samples_not_patterns = all( + c.get("sample") and c.get("sample").replace("/auto", "").replace("/", "_") + not in _VALID_PATTERNS_SET + for c in raw_clips if c.get("sample") + ) + sample_count = sum(1 for c in raw_clips if c.get("sample")) + midi_count = sum(1 for c in raw_clips if c.get("pattern") or c.get("notes")) + if sample_count > midi_count: + track_type = "audio" + elif track_type == "audio" and orig_has_midi and not orig_has_sample: + track_type = "midi" + elif track_type == "audio" and orig_has_sample and not orig_has_midi: + all_samples_are_patterns = all( + c.get("sample", "").replace("/auto", "").replace("/", "_") + in _VALID_PATTERNS_SET + for c in raw_clips if c.get("sample") + ) + if all_samples_are_patterns: + track_type = "midi" + + # ── Phase 2: Build clips with corrected track type ── + clips = [ClipDef.from_raw(c, structure) for c in raw_clips] + + for clip in clips: + if track_type == "midi": + clip.clip_type = "midi" + if not clip.pattern and not clip.notes: + if clip.sample: + from score_renderer import _sanitize_pattern_name + clip.pattern = _sanitize_pattern_name(clip.sample) + else: + clip.pattern = "dembow_standard" + clip.sample = None + elif clip.sample and (clip.pattern or clip.notes): + clip.sample = None + else: + clip.clip_type = "audio" + if clip.pattern and not clip.sample: + from score_renderer import _sanitize_sample_ref + clip.sample = _sanitize_sample_ref(clip.pattern) + clip.pattern = None + elif clip.pattern and clip.sample: + clip.pattern = None + + # Ensure MIDI tracks have an instrument + instrument = raw.get("instrument") + if track_type == "midi" and not instrument: + if any(c.pattern and ("chord" in c.pattern or "melody" in c.pattern) for c in clips): + instrument = "Wavetable" + else: + instrument = "Operator" + + mixer = MixerDef.from_dict(raw.get("mixer", {})) + + return cls( + track_id = raw.get("id", raw.get("name", "Track")), + name = raw.get("name", "Track"), + track_type = track_type, + instrument = instrument, + clips = clips, + mixer = mixer, + ) + + +class SectionDef: + """A named temporal section of the song.""" + + def __init__(self, name: str, start_bar: float, duration_bars: float): + self.name = name + self.start_bar = float(start_bar) + self.duration_bars = float(duration_bars) + + def to_dict(self) -> Dict: + return { + "name": self.name, + "start_bar": self.start_bar, + "duration_bars": self.duration_bars, + } + + +# ================================================================== +# SongScore — main model +# ================================================================== + +class SongScore: + """Complete musical score — pure data, no Ableton dependencies. + + Build using the builder API (set_structure, add_track, add_clip, etc.) + or load from a dict/JSON/template. + """ + + SCHEMA_VERSION = "1.0" + + def __init__(self, title: str = "Untitled", tempo: float = 95.0, + key: str = "Am", genre: str = "reggaeton", + time_signature: str = "4/4", gap_bars: float = 2.0): + self.meta: Dict[str, Any] = { + "title": title, + "tempo": float(tempo), + "key": key, + "genre": genre, + "time_signature": time_signature, + "gap_bars": float(gap_bars), + "version": self.SCHEMA_VERSION, + "created_at": datetime.now().isoformat(), + } + self.structure: List[SectionDef] = [] + self.tracks: List[TrackDef] = [] + + # ------------------------------------------------------------------ + # Builder API + # ------------------------------------------------------------------ + + def set_structure(self, sections: List[Dict]) -> "SongScore": + """Set the temporal structure. Calculates start_bar using meta['gap_bars'].""" + gap = float(self.meta.get("gap_bars", 2.0)) + current_bar = 0.0 + self.structure = [] + + for sec in sections: + name = sec.get("name", "Section") + duration = float(sec.get("duration_bars", 8)) + # Explicit start_bar overrides auto-calculation + start = float(sec.get("start_bar", current_bar)) + self.structure.append(SectionDef(name, start, duration)) + current_bar = start + duration + gap + + return self + + def add_track(self, track: TrackDef) -> "SongScore": + """Add or replace a track by ID.""" + for i, t in enumerate(self.tracks): + if t.id == track.id: + self.tracks[i] = track + return self + self.tracks.append(track) + return self + + def add_clip_to_track(self, track_id: str, clip_raw: Dict) -> "SongScore": + """Add a clip to an existing track. clip_raw may use 'section' keyword.""" + track = self.get_track(track_id) + if track is None: + raise KeyError("Track '%s' not found. Create it first." % track_id) + clip = ClipDef.from_raw(clip_raw, self.get_structure_dict()) + track.clips.append(clip) + return self + + def set_mixer(self, track_id: str, **kwargs) -> "SongScore": + """Update mixer settings for a track.""" + track = self.get_track(track_id) + if track is None: + raise KeyError("Track '%s' not found." % track_id) + for k, v in kwargs.items(): + if hasattr(track.mixer, k): + setattr(track.mixer, k, v) + return self + + # ------------------------------------------------------------------ + # Query helpers + # ------------------------------------------------------------------ + + def get_track(self, track_id: str) -> Optional[TrackDef]: + for t in self.tracks: + if t.id == track_id: + return t + return None + + def get_section(self, name: str) -> Optional[SectionDef]: + for s in self.structure: + if s.name == name: + return s + return None + + def get_structure_dict(self) -> List[Dict]: + return [s.to_dict() for s in self.structure] + + def total_bars(self) -> float: + if not self.structure: + return 0.0 + last = self.structure[-1] + return last.start_bar + last.duration_bars + + # ------------------------------------------------------------------ + # Validation + # ------------------------------------------------------------------ + + def validate(self) -> List[str]: + """Return a list of warning strings. Empty list = valid.""" + warnings: List[str] = [] + + if not self.structure: + warnings.append("No structure defined — call set_structure() first.") + if not self.tracks: + warnings.append("No tracks defined.") + + seen_names = set() + for s in self.structure: + if s.name in seen_names: + warnings.append( + "Duplicate section name '%s' — clips may map to wrong scene." % s.name + ) + seen_names.add(s.name) + + section_names = {s.name for s in self.structure} + for track in self.tracks: + if not track.clips: + warnings.append("Track '%s' has no clips." % track.id) + continue + for clip in track.clips: + if clip.section and clip.section not in section_names: + warnings.append( + "Track '%s': clip section '%s' not in structure." + % (track.id, clip.section) + ) + if track.type == "audio" and not clip.sample: + warnings.append( + "Track '%s': audio clip has no sample defined." % track.id + ) + if track.type == "midi" and not clip.pattern and not clip.notes: + warnings.append( + "Track '%s': MIDI clip has no pattern or notes." % track.id + ) + + return warnings + + # ------------------------------------------------------------------ + # Serialization + # ------------------------------------------------------------------ + + def to_dict(self) -> Dict: + return { + "meta": self.meta, + "structure": [s.to_dict() for s in self.structure], + "tracks": [t.to_dict() for t in self.tracks], + } + + def to_json(self, indent: int = 2) -> str: + return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False) + + @classmethod + def from_dict(cls, d: Dict) -> "SongScore": + meta = d.get("meta", {}) + score = cls( + title = meta.get("title", "Untitled"), + tempo = meta.get("tempo", 95), + key = meta.get("key", "Am"), + genre = meta.get("genre", "reggaeton"), + time_signature = meta.get("time_signature", "4/4"), + gap_bars = meta.get("gap_bars", 2.0), + ) + # Preserve all meta fields + score.meta.update(meta) + + # Structure — ignore start_bar from JSON, calculate automatically + gap = float(score.meta.get("gap_bars", 2.0)) + current_bar = 0.0 + seen_names = set() + + for sec in d.get("structure", []): + name = sec["name"] + duration = sec.get("duration_bars", 8) + + # Auto-deduplicate section names + base_name = name + counter = 2 + while name in seen_names: + name = "%s %d" % (base_name, counter) + counter += 1 + seen_names.add(name) + + score.structure.append(SectionDef(name, current_bar, duration)) + current_bar += duration + gap + + # Tracks (clips resolved against structure) + struct_dict = score.get_structure_dict() + for raw in d.get("tracks", []): + score.tracks.append(TrackDef.from_raw(raw, struct_dict)) + + return score + + @classmethod + def from_json(cls, json_str: str) -> "SongScore": + return cls.from_dict(json.loads(json_str)) + + def save(self, path: Path) -> Path: + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(self.to_json(), encoding="utf-8") + return path + + @classmethod + def load(cls, path: Path) -> "SongScore": + return cls.from_json(Path(path).read_text(encoding="utf-8")) + + # ------------------------------------------------------------------ + # Templates + # ------------------------------------------------------------------ + + @classmethod + def from_template(cls, template_name: str, **meta_overrides) -> "SongScore": + """Create a complete SongScore from a named template. + + meta_overrides: tempo, key, gap_bars, title, etc. + Available templates: reggaeton_basic, reggaeton_13scenes, minimal_loop + """ + templates = _get_templates() + if template_name not in templates: + raise ValueError( + "Template '%s' not found. Available: %s" + % (template_name, sorted(templates.keys())) + ) + + tmpl = templates[template_name] + meta = {**tmpl["meta"], **meta_overrides} + + score = cls( + title = meta.get("title", template_name.replace("_", " ").title()), + tempo = meta.get("tempo", 95), + key = meta.get("key", "Am"), + genre = meta.get("genre", "reggaeton"), + time_signature = meta.get("time_signature", "4/4"), + gap_bars = meta.get("gap_bars", 2.0), + ) + + score.set_structure(tmpl["structure"]) + struct_dict = score.get_structure_dict() + + for raw in tmpl["tracks"]: + score.tracks.append(TrackDef.from_raw(raw, struct_dict)) + + return score + + def list_templates(self) -> List[str]: + return sorted(_get_templates().keys()) + + +# ================================================================== +# Singleton helpers (used by server.py) +# ================================================================== + +def get_current_score() -> Optional[SongScore]: + return _current_score + + +def set_current_score(score: Optional[SongScore]) -> None: + global _current_score + _current_score = score + + +def require_score() -> SongScore: + if _current_score is None: + raise RuntimeError("No active score. Call new_score() or load_score() first.") + return _current_score + + +# ================================================================== +# Templates +# ================================================================== + +def _get_templates() -> Dict[str, Dict]: + """Return all built-in templates.""" + # Clips that reference 'section' get start_bar resolved automatically + return { + + # ────────────────────────────────────────────────────────────── + "reggaeton_basic": { + "meta": {"tempo": 95, "key": "Am", "genre": "reggaeton", "gap_bars": 2.0}, + "structure": [ + {"name": "Intro", "duration_bars": 4}, + {"name": "Verse", "duration_bars": 8}, + {"name": "Chorus", "duration_bars": 8}, + {"name": "Verse 2", "duration_bars": 8}, + {"name": "Chorus 2", "duration_bars": 8}, + {"name": "Bridge", "duration_bars": 4}, + {"name": "Outro", "duration_bars": 4}, + ], + "tracks": [ + { + "id": "drum_loop", "name": "Drum Loop", "type": "audio", + "clips": [ + {"section": "Verse", "sample": "drumloops/auto", "loop": True}, + {"section": "Chorus", "sample": "drumloops/auto", "loop": True}, + {"section": "Verse 2", "sample": "drumloops/auto", "loop": True}, + {"section": "Chorus 2", "sample": "drumloops/auto", "loop": True}, + ], + "mixer": {"volume": 0.95}, + }, + { + "id": "kick", "name": "Kick", "type": "audio", + "clips": [ + {"section": "Verse", "sample": "kick/auto"}, + {"section": "Chorus", "sample": "kick/auto"}, + {"section": "Verse 2", "sample": "kick/auto"}, + {"section": "Chorus 2", "sample": "kick/auto"}, + ], + "mixer": {"volume": 0.85, "eq_preset": "kick", + "compression_preset": "kick_punch"}, + }, + { + "id": "snare", "name": "Snare", "type": "audio", + "clips": [ + {"section": "Verse", "sample": "snare/auto"}, + {"section": "Chorus", "sample": "snare/auto"}, + {"section": "Verse 2", "sample": "snare/auto"}, + {"section": "Chorus 2", "sample": "snare/auto"}, + ], + "mixer": {"volume": 0.82, "eq_preset": "snare"}, + }, + { + "id": "perc", "name": "Perc", "type": "audio", + "clips": [ + {"section": "Verse", "sample": "perc loop/auto", "loop": True}, + {"section": "Chorus", "sample": "perc loop/auto", "loop": True}, + {"section": "Verse 2", "sample": "perc loop/auto", "loop": True}, + {"section": "Chorus 2", "sample": "perc loop/auto", "loop": True}, + ], + "mixer": {"volume": 0.65}, + }, + { + "id": "dembow", "name": "Dembow", "type": "midi", + "instrument": "Wavetable", + "clips": [ + {"section": "Intro", "pattern": "dembow_minimal"}, + {"section": "Verse", "pattern": "dembow_standard"}, + {"section": "Chorus", "pattern": "dembow_double"}, + {"section": "Verse 2", "pattern": "dembow_standard"}, + {"section": "Chorus 2", "pattern": "dembow_double"}, + ], + "mixer": {"volume": 0.80}, + }, + { + "id": "bass", "name": "Sub Bass", "type": "midi", + "instrument": "Operator", + "clips": [ + {"section": "Verse", "pattern": "bass_pluck"}, + {"section": "Chorus", "pattern": "bass_octaves"}, + {"section": "Verse 2", "pattern": "bass_pluck"}, + {"section": "Chorus 2", "pattern": "bass_octaves"}, + ], + "mixer": {"volume": 0.70}, + }, + { + "id": "chords", "name": "Chords", "type": "midi", + "instrument": "Wavetable", + "clips": [ + {"section": "Verse", "pattern": "chords_verse"}, + {"section": "Chorus", "pattern": "chords_chorus"}, + {"section": "Verse 2", "pattern": "chords_verse"}, + {"section": "Chorus 2", "pattern": "chords_chorus"}, + ], + "mixer": {"volume": 0.68}, + }, + ], + }, + + # ────────────────────────────────────────────────────────────── + "reggaeton_13scenes": { + "meta": {"tempo": 95, "key": "Am", "genre": "reggaeton", "gap_bars": 2.0}, + "structure": [ + {"name": "Intro Suave", "duration_bars": 4}, + {"name": "Build Up", "duration_bars": 4}, + {"name": "Intro Full", "duration_bars": 4}, + {"name": "Verse A", "duration_bars": 8}, + {"name": "Pre-Chorus", "duration_bars": 4}, + {"name": "Chorus A", "duration_bars": 8}, + {"name": "Verse B", "duration_bars": 8}, + {"name": "Pre-Chorus 2", "duration_bars": 4}, + {"name": "Chorus B", "duration_bars": 8}, + {"name": "Bridge", "duration_bars": 4}, + {"name": "Breakdown", "duration_bars": 4}, + {"name": "Final Chorus", "duration_bars": 8}, + {"name": "Outro", "duration_bars": 4}, + ], + "tracks": [ + { + "id": "kick", "name": "Kick", "type": "audio", + "clips": [ + {"section": "Intro Full", "sample": "kick/auto"}, + {"section": "Verse A", "sample": "kick/auto"}, + {"section": "Pre-Chorus", "sample": "kick/auto"}, + {"section": "Chorus A", "sample": "kick/auto"}, + {"section": "Verse B", "sample": "kick/auto"}, + {"section": "Pre-Chorus 2", "sample": "kick/auto"}, + {"section": "Chorus B", "sample": "kick/auto"}, + {"section": "Final Chorus", "sample": "kick/auto"}, + ], + "mixer": {"volume": 0.85, "eq_preset": "kick", + "compression_preset": "kick_punch"}, + }, + { + "id": "snare", "name": "Snare", "type": "audio", + "clips": [ + {"section": "Verse A", "sample": "snare/auto"}, + {"section": "Chorus A", "sample": "snare/auto"}, + {"section": "Verse B", "sample": "snare/auto"}, + {"section": "Chorus B", "sample": "snare/auto"}, + {"section": "Final Chorus", "sample": "snare/auto"}, + ], + "mixer": {"volume": 0.82, "eq_preset": "snare"}, + }, + { + "id": "drum_loop", "name": "Drum Loop", "type": "audio", + "clips": [ + {"section": "Verse A", "sample": "drumloops/auto", "loop": True}, + {"section": "Chorus A", "sample": "drumloops/auto", "loop": True}, + {"section": "Verse B", "sample": "drumloops/auto", "loop": True}, + {"section": "Chorus B", "sample": "drumloops/auto", "loop": True}, + {"section": "Final Chorus", "sample": "drumloops/auto", "loop": True}, + ], + "mixer": {"volume": 0.90}, + }, + { + "id": "dembow", "name": "Dembow", "type": "midi", + "instrument": "Wavetable", + "clips": [ + {"section": "Build Up", "pattern": "dembow_minimal"}, + {"section": "Intro Full", "pattern": "dembow_minimal"}, + {"section": "Verse A", "pattern": "dembow_standard"}, + {"section": "Pre-Chorus", "pattern": "dembow_standard"}, + {"section": "Chorus A", "pattern": "dembow_double"}, + {"section": "Verse B", "pattern": "dembow_standard"}, + {"section": "Pre-Chorus 2", "pattern": "dembow_standard"}, + {"section": "Chorus B", "pattern": "dembow_double"}, + {"section": "Final Chorus", "pattern": "dembow_double"}, + ], + "mixer": {"volume": 0.80}, + }, + { + "id": "bass", "name": "Sub Bass", "type": "midi", + "instrument": "Operator", + "clips": [ + {"section": "Verse A", "pattern": "bass_pluck"}, + {"section": "Chorus A", "pattern": "bass_octaves"}, + {"section": "Verse B", "pattern": "bass_pluck"}, + {"section": "Chorus B", "pattern": "bass_octaves"}, + {"section": "Final Chorus", "pattern": "bass_octaves"}, + ], + "mixer": {"volume": 0.70}, + }, + ], + }, + + # ────────────────────────────────────────────────────────────── + "minimal_loop": { + "meta": {"tempo": 100, "key": "C", "genre": "reggaeton", "gap_bars": 0.0}, + "structure": [ + {"name": "Loop", "duration_bars": 8}, + ], + "tracks": [ + { + "id": "drum", "name": "Drums", "type": "audio", + "clips": [{"section": "Loop", "sample": "drumloops/auto", "loop": True}], + "mixer": {"volume": 0.95}, + }, + { + "id": "bass", "name": "Bass", "type": "midi", + "instrument": "Operator", + "clips": [{"section": "Loop", "pattern": "bass_sub"}], + "mixer": {"volume": 0.75}, + }, + { + "id": "dembow", "name": "Dembow", "type": "midi", + "instrument": "Wavetable", + "clips": [{"section": "Loop", "pattern": "dembow_standard"}], + "mixer": {"volume": 0.80}, + }, + ], + }, + + } diff --git a/AbletonMCP_AI/mcp_server/score_renderer.py b/AbletonMCP_AI/mcp_server/score_renderer.py new file mode 100644 index 0000000..f258596 --- /dev/null +++ b/AbletonMCP_AI/mcp_server/score_renderer.py @@ -0,0 +1,774 @@ +""" +score_renderer.py — Translates a SongScore into Ableton Live SESSION VIEW operations via TCP. + +Architecture: + - Each SectionDef in score.structure → one Ableton Scene + - Each TrackDef in score.tracks → one Ableton track + - Each ClipDef in a track → clip slot at (track_index, scene_index) + +Session View mapping: + section "Verse" → scene index 1 + section "Chorus" → scene index 2 + ... + +Clip placement (Session View only): + - MIDI tracks: create_clip + add_notes_to_clip + - Audio tracks: load_sample_to_clip (loads .wav into a clip slot) + +Pattern generators (all computed on server side — no Ableton logic needed): + MIDI drums: dembow_minimal, dembow_standard, dembow_double + MIDI bass: bass_sub, bass_pluck, bass_octaves, bass_sustained + MIDI harmony: chords_verse, chords_chorus, melody_simple +""" + +import json +import os +import socket +from pathlib import Path +from typing import Dict, List, Optional + +from score_engine import SongScore, TrackDef, ClipDef + +# ------------------------------------------------------------------ +# Ableton TCP transport (self-contained — no FastMCP dependency) +# ------------------------------------------------------------------ + +ABLETON_HOST = "127.0.0.1" +ABLETON_PORT = 9877 +_TERMINATOR = b"\n" + + +def _send(cmd_type: str, params: dict, timeout: float = 30.0) -> dict: + """Send a command to Ableton via TCP and return the parsed response.""" + sock = None + try: + sock = socket.create_connection((ABLETON_HOST, ABLETON_PORT), timeout=timeout) + sock.settimeout(timeout) + msg = json.dumps({"type": cmd_type, "params": params}) + "\n" + sock.sendall(msg.encode("utf-8")) + + buf = b"" + while True: + chunk = sock.recv(65536) + if not chunk: + break + buf += chunk + if _TERMINATOR in buf: + raw, _, _ = buf.partition(_TERMINATOR) + return json.loads(raw.decode("utf-8")) + + return {"status": "error", "message": "No response terminator received"} + except socket.timeout: + return {"status": "error", "message": "Timeout after %.0fs on '%s'" % (timeout, cmd_type)} + except ConnectionRefusedError: + return {"status": "error", + "message": "Cannot connect to Ableton on %s:%d" % (ABLETON_HOST, ABLETON_PORT)} + except Exception as exc: + return {"status": "error", "message": str(exc)} + finally: + if sock: + try: + sock.close() + except Exception: + pass + + +# ------------------------------------------------------------------ +# Sample resolution — "kick/auto" or "kick/kick_01.wav" → absolute path +# ------------------------------------------------------------------ + +# Keyword mapping: invented filenames → correct folder/auto paths +_SAMPLE_KEYWORD_MAP = { + "kick": "kick/auto", + "snare": "snare/auto", + "hihat": "hi-hat (para percs normalmente)/auto", + "hi-hat": "hi-hat (para percs normalmente)/auto", + "hat": "hi-hat (para percs normalmente)/auto", + "drumloop": "drumloops/auto", + "drum": "drumloops/auto", + "perc": "perc loop/auto", + "bass": "bass/auto", + "fx": "fx/auto", + "transition": "fx/auto", + "transicion": "fx/auto", + "riser": "fx/auto", + "impact": "fx/auto", + "oneshot": "oneshots/auto", +} + +# Valid MIDI pattern names +_VALID_PATTERNS = { + "dembow_minimal", "dembow_standard", "dembow_double", + "bass_sub", "bass_pluck", "bass_octaves", "bass_sustained", + "chords_verse", "chords_chorus", "melody_simple", +} + + +def _sanitize_sample_ref(sample_ref: str) -> str: + """Map invented filenames to correct folder/auto paths. + + Handles cases where LLMs generate names like: + "kick 1.wav" → "kick/auto" + "snare 3.wav" → "snare/auto" + "hi-hat 1.wav" → "hi-hat (para percs normalmente)/auto" + """ + if not sample_ref: + return sample_ref + if "/" in sample_ref: + return sample_ref # already has folder structure + + # Already an "auto" path without folder + if sample_ref.lower() == "auto": + return "kick/auto" + + # Strip common suffixes: .wav, .mp3, .aif, numbers, spaces + name = sample_ref.lower() + name = os.path.splitext(name)[0] # remove .wav etc + # Remove trailing numbers: "kick 1" → "kick", "bass_sub 2" → "bass_sub" + name = name.strip() + name_parts = name.rsplit(None, 1) + if len(name_parts) == 2 and name_parts[1].isdigit(): + name = name_parts[0] + + # Keyword match + for keyword, path in _SAMPLE_KEYWORD_MAP.items(): + if keyword in name.lower(): + return path + + return sample_ref # no match, return as-is + + +def _sanitize_pattern_name(pattern: str) -> str: + """Map invented pattern names to valid pattern names.""" + if not pattern: + return "dembow_standard" + if pattern in _VALID_PATTERNS: + return pattern + + pat = pattern.lower().strip() + # Remove file extensions + pat = os.path.splitext(pat)[0] + # Remove trailing numbers + parts = pat.rsplit(None, 1) + if len(parts) == 2 and parts[1].isdigit(): + pat = parts[0] + + # Keyword matching + if "dembow" in pat: + return "dembow_standard" + if "bass" in pat: + if "sub" in pat: + return "bass_sub" + if "pluck" in pat: + return "bass_pluck" + if "octave" in pat: + return "bass_octaves" + return "bass_sub" + if "chord" in pat: + if "chorus" in pat: + return "chords_chorus" + return "chords_verse" + if "melody" in pat or "lead" in pat: + return "melody_simple" + if "snare" in pat or "perc" in pat or "hat" in pat or "hihat" in pat: + return "dembow_standard" + + return "dembow_standard" # default fallback + + +def _resolve_sample(sample_ref: str, lib_root: str, tempo: float = 95.0) -> Optional[str]: + """Resolve a sample reference to an absolute filesystem path. + + Formats accepted: + "kick/auto" → best WAV from /kick/ + "kick/kick 1.wav" → exact file /kick/kick 1.wav + "kick 1.wav" → sanitized to "kick/auto" + "/C:/absolute/path.wav" → passthrough + """ + if not sample_ref: + return None + + # Sanitize invented filenames + sample_ref = _sanitize_sample_ref(sample_ref) + + # Already absolute + if os.path.isabs(sample_ref): + return sample_ref if os.path.isfile(sample_ref) else None + + parts = sample_ref.replace("\\", "/").split("/") + + if parts[-1].lower() == "auto": + folder = os.path.join(lib_root, *parts[:-1]) + return _pick_best(folder, tempo) + else: + # Exact relative path + path = os.path.join(lib_root, *parts) + if os.path.isfile(path): + return path + # Fallback: auto-select from the folder + folder = os.path.join(lib_root, *parts[:-1]) if len(parts) > 1 else lib_root + best = _pick_best(folder, tempo) + if best: + return best + # Last resort: try keyword mapping on the whole ref + sanitized = _sanitize_sample_ref(sample_ref) + if sanitized != sample_ref: + return _resolve_sample(sanitized, lib_root, tempo) + return None + + +def _pick_best(folder: str, tempo: float = 95.0) -> Optional[str]: + """Pick the best audio file from a folder. + + Strategy: + 1. Prefer files whose name contains a BPM number close to project tempo. + 2. If no BPM info available, return the first file alphabetically. + """ + if not os.path.isdir(folder): + return None + + files = sorted([ + os.path.join(folder, f) + for f in os.listdir(folder) + if f.lower().endswith((".wav", ".aif", ".aiff", ".mp3")) + ]) + + if not files: + return None + + def bpm_score(fpath: str) -> float: + fname = os.path.basename(fpath).replace("-", " ").replace("_", " ") + for tok in fname.split(): + try: + bpm = float(tok) + if 60 < bpm < 220: + return abs(bpm - tempo) + except ValueError: + pass + return 999.0 + + scores = [(bpm_score(f), f) for f in files] + best = min(scores, key=lambda x: x[0]) + + return best[1] if best[0] < 15.0 else files[0] + + +# ------------------------------------------------------------------ +# MIDI pattern generators — pure Python, no Ableton communication +# ------------------------------------------------------------------ + +_KEY_ROOTS: Dict[str, int] = { + "C": 48, "C#": 49, "Db": 49, + "D": 50, "D#": 51, "Eb": 51, + "E": 52, + "F": 53, "F#": 54, "Gb": 54, + "G": 55, "G#": 56, "Ab": 56, + "A": 57, "A#": 58, "Bb": 58, + "B": 59, + # Minor keys + "Am": 45, "Dm": 38, "Em": 40, "Bm": 47, + "F#m": 54, "C#m": 49, "Gm": 43, "Fm": 41, +} + + +def _root(key: str) -> int: + return _KEY_ROOTS.get(key, 45) # Default Am root + + +def _gen_dembow(bars: int, variation: str, key: str) -> List[Dict]: + """Dembow drum pattern on MIDI note 36 (kick).""" + bpb = 4 + total = bars * bpb + notes = [] + patterns = { + "minimal": [0.0, 2.5], + "standard": [0.0, 1.5, 2.0, 2.5, 3.0, 3.5], + "double": [0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5], + } + pos_list = patterns.get(variation, patterns["standard"]) + + for bar in range(bars): + for pos in pos_list: + start = bar * bpb + pos + if start >= total: + continue + vel = 110 if pos == 0.0 else (90 if pos in (2.0, 3.0) else 75) + notes.append({"pitch": 36, "start_time": start, "duration": 0.22, "velocity": vel}) + + return notes + + +def _gen_bass(bars: int, style: str, key: str) -> List[Dict]: + """Sub-bass MIDI patterns.""" + root = _root(key) + bpb = 4 + notes = [] + + for bar in range(bars): + b = bar * bpb + if style == "bass_sub": + notes += [ + {"pitch": root - 12, "start_time": b, "duration": 0.5, "velocity": 110}, + {"pitch": root - 12, "start_time": b + 2.0, "duration": 0.5, "velocity": 100}, + ] + elif style == "bass_octaves": + notes += [ + {"pitch": root - 12, "start_time": b, "duration": 0.5, "velocity": 110}, + {"pitch": root, "start_time": b + 2.0, "duration": 0.5, "velocity": 90}, + {"pitch": root - 12, "start_time": b + 3.0, "duration": 0.25, "velocity": 80}, + ] + elif style == "bass_pluck": + notes += [ + {"pitch": root - 12, "start_time": b, "duration": 0.25, "velocity": 110}, + {"pitch": root - 12, "start_time": b + 1.5, "duration": 0.25, "velocity": 85}, + {"pitch": root - 7, "start_time": b + 2.0, "duration": 0.25, "velocity": 90}, + {"pitch": root - 12, "start_time": b + 3.0, "duration": 0.25, "velocity": 80}, + ] + elif style == "bass_sustained": + notes.append( + {"pitch": root - 12, "start_time": b, "duration": float(bpb) - 0.25, "velocity": 100} + ) + else: + notes.append({"pitch": root - 12, "start_time": b, "duration": 0.5, "velocity": 100}) + + return notes + + +def _gen_chords(bars: int, style: str, key: str) -> List[Dict]: + """Chord voicing patterns.""" + root = _root(key) + bpb = 4 + notes = [] + + PROG_VERSE = [(0, 3, 7), (-5, -2, 2), (-3, 0, 4), (-7, -4, 0)] + PROG_CHORUS = [(0, 3, 7), (-3, 0, 4), (5, 8, 12), (0, 3, 7)] + prog = PROG_VERSE if "verse" in style else PROG_CHORUS + + for bar in range(bars): + chord_intervals = prog[bar % len(prog)] + start = float(bar * bpb) + for interval in chord_intervals: + notes.append({ + "pitch": root + interval, + "start_time": start, + "duration": float(bpb) - 0.25, + "velocity": 72, + }) + + return notes + + +def _gen_melody_simple(bars: int, key: str) -> List[Dict]: + """Simple pentatonic melodic line.""" + root = _root(key) + scale = [0, 3, 5, 7, 10, 12] + bpb = 4 + notes = [] + rhythm = [0.0, 0.75, 1.5, 2.0, 2.75, 3.0, 3.5] + + for bar in range(bars): + b = bar * bpb + for i, pos in enumerate(rhythm): + pitch = root + scale[(bar * 3 + i) % len(scale)] + 12 + notes.append({"pitch": pitch, "start_time": b + pos, "duration": 0.5, "velocity": 85}) + + return notes + + +# Registry: pattern_name → generator(bars, key) → List[Dict] +PATTERN_GENERATORS: Dict = { + "dembow_minimal": lambda bars, key: _gen_dembow(bars, "minimal", key), + "dembow_standard": lambda bars, key: _gen_dembow(bars, "standard", key), + "dembow_double": lambda bars, key: _gen_dembow(bars, "double", key), + "bass_sub": lambda bars, key: _gen_bass(bars, "bass_sub", key), + "bass_pluck": lambda bars, key: _gen_bass(bars, "bass_pluck", key), + "bass_octaves": lambda bars, key: _gen_bass(bars, "bass_octaves", key), + "bass_sustained": lambda bars, key: _gen_bass(bars, "bass_sustained", key), + "chords_verse": lambda bars, key: _gen_chords(bars, "chords_verse", key), + "chords_chorus": lambda bars, key: _gen_chords(bars, "chords_chorus", key), + "melody_simple": lambda bars, key: _gen_melody_simple(bars, key), +} + + +# ------------------------------------------------------------------ +# ScoreRenderer — SESSION VIEW +# ------------------------------------------------------------------ + +class ScoreRenderer: + """Renders a SongScore into Ableton Live Session View via TCP. + + Mapping: + SectionDef → Ableton Scene (one scene per section) + TrackDef → Ableton Track (one track per track definition) + ClipDef → Clip Slot at (track_index, scene_index) + + Usage: + renderer = ScoreRenderer(lib_root="C:\\...\\libreria\\reggaeton") + result = renderer.render(score, clear_first=True) + """ + + def __init__(self, lib_root: str): + self.lib_root = str(lib_root) + + # ---------------------------------------------------------------- + # Public entry point + # ---------------------------------------------------------------- + + def render(self, score: SongScore, clear_first: bool = True) -> dict: + """Render score into Ableton Live Session View. + + Returns: + { + "title": str, + "scenes_created": int, + "tracks_created": list, + "clips_created": int, + "errors": list[str], + "success": bool, + } + """ + result: Dict = { + "title": score.meta.get("title", ""), + "scenes_created": 0, + "tracks_created": [], + "clips_created": 0, + "errors": [], + } + + # Validate score first (no Ableton needed) + warnings = score.validate() + if warnings: + result["errors"].extend(["[VALIDATION] " + w for w in warnings]) + + # 0. Clear project. Ableton leaves 1 track (minimum). We accept this + # and offset our track creation accordingly. + if clear_first: + _send("clear_project", {}, timeout=30.0) + + # 1. Set meta (tempo, signature) + self._set_meta(score.meta, result) + + # 2. Create scenes (one per section in score.structure) + section_scene_map = self._create_scenes(score.structure, result) + + # 3. Create tracks + track_index_map = self._create_tracks(score.tracks, result) + + # 4. Place clips into clip slots + self._place_clips(score, track_index_map, section_scene_map, result) + + # 6. Apply mixer settings + self._apply_mixer(score.tracks, track_index_map, result) + + result["success"] = len(result["errors"]) == 0 + result["section_map"] = section_scene_map + return result + + # ---------------------------------------------------------------- + # Meta + # ---------------------------------------------------------------- + + def _set_meta(self, meta: dict, result: dict) -> None: + tempo = meta.get("tempo", 95) + resp = _send("set_tempo", {"tempo": tempo}, timeout=10.0) + if resp.get("status") != "success": + result["errors"].append("set_tempo failed: " + resp.get("message", "?")) + + sig = meta.get("time_signature", "4/4").split("/") + if len(sig) == 2: + _send("set_signature", + {"numerator": int(sig[0]), "denominator": int(sig[1])}, + timeout=10.0) + + # ---------------------------------------------------------------- + # Scenes — one per section + # ---------------------------------------------------------------- + + def _create_scenes(self, structure, result: dict) -> Dict[str, int]: + """Create one scene per section. Returns {section_name: scene_index}.""" + section_scene_map: Dict[str, int] = {} + + for i, section in enumerate(structure): + # Ableton starts with at least 1 empty scene; + # create additional scenes as needed + if i == 0: + scene_idx = 0 # reuse the default first scene + else: + resp = _send("create_scene", {"index": -1}, timeout=15.0) + if resp.get("status") != "success": + result["errors"].append( + "create_scene failed for '%s': %s" + % (section.name, resp.get("message", "?")) + ) + scene_idx = i # fallback: assume sequential + else: + scene_idx = resp.get("result", {}).get("scene_index", i) + + # Name the scene + _send("set_scene_name", + {"scene_index": scene_idx, "name": section.name}, + timeout=10.0) + + section_scene_map[section.name] = scene_idx + result["scenes_created"] += 1 + + return section_scene_map + + # ---------------------------------------------------------------- + # Tracks + # ---------------------------------------------------------------- + + def _create_tracks(self, tracks: List[TrackDef], result: dict) -> Dict[str, int]: + """Create all tracks and return {track_id: ableton_track_index}. + + IMPORTANT: Ableton groups tracks by type. All audio tracks come first, + then all MIDI tracks. After clear_project, 1 leftover track remains. + + Strategy: + 1. Snapshot track count before creation + 2. Create all tracks (Ableton auto-groups audio before MIDI) + 3. Snapshot after creation + 4. New tracks are identified by comparing before/after + 5. Map our tracks to the new ones by type order + """ + if not tracks: + return {} + + # Count pre-existing tracks by type + resp = _send("get_tracks", {}, timeout=10.0) + pre_audio = 0 + pre_midi = 0 + if resp.get("status") == "success": + for t in resp.get("result", {}).get("tracks", []): + if isinstance(t, dict): + if t.get("is_audio"): + pre_audio += 1 + elif t.get("is_midi"): + pre_midi += 1 + + # Create all tracks + for track in tracks: + if track.type == "audio": + _send("create_audio_track", {"index": -1}, timeout=15.0) + else: + _send("create_midi_track", {"index": -1}, timeout=15.0) + + # Now count tracks AFTER creation to find the new ones + resp = _send("get_tracks", {}, timeout=10.0) + ableton_tracks = resp.get("result", {}).get("tracks", []) if resp.get("status") == "success" else [] + + new_audio = [t for t in ableton_tracks if isinstance(t, dict) and t.get("is_audio")] + new_midi = [t for t in ableton_tracks if isinstance(t, dict) and t.get("is_midi")] + + # Separate our tracks by type + our_audio = [t for t in tracks if t.type == "audio"] + our_midi = [t for t in tracks if t.type == "midi"] + + track_index_map: Dict[str, int] = {} + + # Map audio tracks: our_audio[i] → new_audio[pre_audio + i] + for i, our_t in enumerate(our_audio): + ableton_idx = pre_audio + i + if ableton_idx < len(new_audio): + a_idx = new_audio[ableton_idx].get("index", ableton_idx) + else: + a_idx = ableton_idx # fallback + + _send("set_track_name", {"track_index": a_idx, "name": our_t.name}) + _send("set_track_volume", {"track_index": a_idx, "volume": our_t.mixer.volume}) + if our_t.mixer.pan != 0.0: + _send("set_track_pan", {"track_index": a_idx, "pan": our_t.mixer.pan}) + + track_index_map[our_t.id] = a_idx + result["tracks_created"].append({ + "id": our_t.id, "name": our_t.name, + "index": a_idx, "type": "audio", + }) + + # Map MIDI tracks: our_midi[i] → new_midi[pre_midi + i] + for i, our_t in enumerate(our_midi): + ableton_idx = pre_midi + i + if ableton_idx < len(new_midi): + m_idx = new_midi[ableton_idx].get("index", ableton_idx) + else: + m_idx = ableton_idx + len(new_audio) # fallback: after all audio + + _send("set_track_name", {"track_index": m_idx, "name": our_t.name}) + _send("set_track_volume", {"track_index": m_idx, "volume": our_t.mixer.volume}) + if our_t.mixer.pan != 0.0: + _send("set_track_pan", {"track_index": m_idx, "pan": our_t.mixer.pan}) + + if our_t.instrument: + _send("insert_device", + {"track_index": m_idx, + "device_name": our_t.instrument, + "device_type": "instrument"}, + timeout=30.0) + + track_index_map[our_t.id] = m_idx + result["tracks_created"].append({ + "id": our_t.id, "name": our_t.name, + "index": m_idx, "type": "midi", + }) + + return track_index_map + + # ---------------------------------------------------------------- + # Clip placement — Session View clip slots + # ---------------------------------------------------------------- + + def _place_clips(self, score: SongScore, track_index_map: Dict[str, int], + section_scene_map: Dict[str, int], result: dict) -> None: + key = score.meta.get("key", "Am") + tempo = score.meta.get("tempo", 95.0) + + for track in score.tracks: + if track.id not in track_index_map: + continue + t_idx = track_index_map[track.id] + + for clip in track.clips: + # Resolve scene index from section name + section_name = clip.section + if section_name and section_name in section_scene_map: + scene_idx = section_scene_map[section_name] + elif clip.start_bar is not None: + # Fallback: treat start_bar as a scene index approximation + scene_idx = 0 + else: + scene_idx = 0 + + clip_label = "%s_%s" % (section_name or "clip", track.id) + + if track.type == "audio": + self._place_audio_clip(t_idx, scene_idx, clip, clip_label, tempo, result) + else: + self._place_midi_clip(t_idx, scene_idx, clip, clip_label, key, result) + + def _place_audio_clip(self, track_idx: int, scene_idx: int, + clip: ClipDef, label: str, + tempo: float, result: dict) -> None: + """Load an audio sample into a Session View clip slot.""" + sample_path = _resolve_sample(clip.sample, self.lib_root, tempo) + if not sample_path: + result["errors"].append( + "Clip '%s': sample '%s' not found (lib_root=%s)" + % (label, clip.sample, self.lib_root) + ) + return + + resp = _send("load_sample_direct", { + "track_index": track_idx, + "slot_index": scene_idx, + "file_path": sample_path, + "warp": clip.warp, + }, timeout=30.0) + + if resp.get("status") == "success" or resp.get("loaded"): + result["clips_created"] += 1 + else: + resp2 = _send("load_sample_to_clip", { + "track_index": track_idx, + "clip_index": scene_idx, + "sample_path": sample_path, + }, timeout=30.0) + + if resp2.get("status") == "success" or resp2.get("loaded"): + result["clips_created"] += 1 + else: + result["errors"].append( + "Audio clip '%s' failed: primary=%s fallback=%s path=%s" + % (label, resp.get("error", resp.get("message", "?")), + resp2.get("error", resp2.get("message", "?")), sample_path) + ) + + def _place_midi_clip(self, track_idx: int, scene_idx: int, + clip: ClipDef, label: str, + key: str, result: dict) -> None: + """Create a MIDI clip in Session View and fill it with notes.""" + length_beats = clip.duration_bars * 4 # assume 4/4 + + # 1. Create the clip slot + resp = _send("create_clip", { + "track_index": track_idx, + "clip_index": scene_idx, + "length": length_beats, + }, timeout=20.0) + + if resp.get("status") != "success": + result["errors"].append( + "MIDI clip create '%s' failed: %s" % (label, resp.get("message", "?")) + ) + return + + # 2. Resolve notes + if clip.notes: + notes = clip.notes + elif clip.pattern: + gen = PATTERN_GENERATORS.get(clip.pattern) + if gen: + notes = gen(int(clip.duration_bars), key) + else: + result["errors"].append( + "Unknown pattern '%s' on clip '%s'" % (clip.pattern, label) + ) + notes = [] + else: + notes = [] + + # 3. Add notes + if notes: + resp = _send("add_notes_to_clip", { + "track_index": track_idx, + "clip_index": scene_idx, + "notes": notes, + }, timeout=20.0) + + if resp.get("status") != "success": + result["errors"].append( + "add_notes '%s' failed: %s" % (label, resp.get("message", "?")) + ) + + result["clips_created"] += 1 + + # ---------------------------------------------------------------- + # Mixer / Effects + # ---------------------------------------------------------------- + + def _apply_mixer(self, tracks: List[TrackDef], track_index_map: Dict[str, int], + result: dict) -> None: + for track in tracks: + if track.id not in track_index_map: + continue + t_idx = track_index_map[track.id] + mx = track.mixer + + if mx.eq_preset: + resp = _send("configure_eq", + {"track_index": t_idx, "preset": mx.eq_preset}, + timeout=15.0) + if resp.get("status") != "success": + result["errors"].append( + "EQ preset '%s' on '%s' failed: %s" + % (mx.eq_preset, track.id, resp.get("message", "?")) + ) + + # Compression presets are stored but not applied (configure_compressor not available) + + if mx.send_reverb > 0: + _send("set_track_send", + {"track_index": t_idx, "send_index": 0, "amount": mx.send_reverb}) + if mx.send_delay > 0: + _send("set_track_send", + {"track_index": t_idx, "send_index": 1, "amount": mx.send_delay}) + + +# ------------------------------------------------------------------ +# Convenience: render a score file directly +# ------------------------------------------------------------------ + +def render_file(json_path: str, lib_root: str, clear_first: bool = True) -> dict: + """Load a SongScore JSON from disk and render it into Ableton Session View.""" + score = SongScore.load(json_path) + renderer = ScoreRenderer(lib_root) + return renderer.render(score, clear_first=clear_first) diff --git a/AbletonMCP_AI/mcp_server/scores/README.md b/AbletonMCP_AI/mcp_server/scores/README.md new file mode 100644 index 0000000..e1eab22 --- /dev/null +++ b/AbletonMCP_AI/mcp_server/scores/README.md @@ -0,0 +1,3 @@ +# This directory stores SongScore JSON files. +# Each file represents a complete song ready to be rendered into Ableton Live. +# Use the MCP tools: save_score / load_score / list_scores / render_score_from_file diff --git a/AbletonMCP_AI/mcp_server/server.py b/AbletonMCP_AI/mcp_server/server.py index 922d77c..87e3077 100644 --- a/AbletonMCP_AI/mcp_server/server.py +++ b/AbletonMCP_AI/mcp_server/server.py @@ -1,4 +1,4 @@ -""" +""" AbletonMCP_AI MCP Server - Clean FastMCP server for Ableton Live 12. Communicates with the Ableton Remote Script via TCP socket on port 9877. """ @@ -7286,6 +7286,633 @@ def produce_with_spectral_coherence(ctx: Context, return _err(f"SPECTRAL OUTER: type={type(e).__name__} msg={str(e)!r}\n{tb[:1500]}") +# ================================================================== +# SPRINT 9 — SCORE → RENDER PIPELINE +# Compose a SongScore JSON incrementally, then inject it into Ableton +# in one atomic render_score() call. +# ================================================================== + +# Lazy imports so server still starts if score_engine is missing +def _import_score_engine(): + try: + import score_engine as _se + return _se + except ImportError: + return None + +def _import_score_renderer(): + try: + import score_renderer as _sr + return _sr + except ImportError: + return None + +_REGGAETON_LIB = str(PROJECT_DIR.parent / "libreria" / "reggaeton") + + +@mcp.tool() +def new_score( + ctx: Context, + title: str = "Untitled", + tempo: float = 95.0, + key: str = "Am", + genre: str = "reggaeton", + time_signature: str = "4/4", + gap_bars: float = 2.0, +) -> str: + """Create a fresh SongScore in memory and make it the active score. + + This clears any previous in-memory score. + + Args: + title: Song title + tempo: BPM (80-160) + key: Musical key (Am, C, Dm, F, G, etc.) + genre: Genre tag (for documentation) + time_signature: e.g. "4/4" + gap_bars: Bars of silence automatically inserted between sections + + Returns: + Score summary including title, tempo, key. + """ + se = _import_score_engine() + if not se: + return _err("score_engine module not found in mcp_server/") + score = se.SongScore(title=title, tempo=tempo, key=key, genre=genre, + time_signature=time_signature, gap_bars=gap_bars) + se.set_current_score(score) + return _ok({"created": True, "meta": score.meta, + "instructions": "Score created. Use compose_structure() next."}) + + +@mcp.tool() +def get_score(ctx: Context) -> str: + """Return the complete active SongScore as JSON. + + Use this to inspect the score before rendering, or to extract the JSON + for external storage / batch generation. + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + score = se.get_current_score() + if score is None: + return _err("No active score. Call new_score() or load_score() first.") + return _ok({"score": score.to_dict(), + "warnings": score.validate(), + "total_bars": score.total_bars()}) + + +@mcp.tool() +def compose_structure(ctx: Context, sections: list) -> str: + """Define the temporal structure of the active score. + + Calculates start_bar automatically using the score's gap_bars setting. + + Args: + sections: List of section dicts, each containing: + - name (str): Section name, e.g. "Intro", "Chorus" + - duration_bars (int): Length of the section in bars + - start_bar (float, optional): Override auto-calculated position + + Example sections: + [ + {"name": "Intro", "duration_bars": 4}, + {"name": "Verse", "duration_bars": 8}, + {"name": "Chorus", "duration_bars": 8}, + {"name": "Bridge", "duration_bars": 4}, + {"name": "Outro", "duration_bars": 4} + ] + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + try: + score = se.require_score() + score.set_structure(sections) + struct = score.get_structure_dict() + return _ok({"structure_set": True, "sections": len(struct), + "structure": struct, "total_bars": score.total_bars()}) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def compose_audio_track( + ctx: Context, + track_id: str, + name: str, + clips: list, + mixer: dict = None, +) -> str: + """Add an audio track to the active score. + + Args: + track_id: Unique identifier (e.g. "kick", "drum_loop") + name: Display name in Ableton (e.g. "Kick") + clips: List of clip dicts. Each clip must have: + - section (str): Which section this clip belongs to + - sample (str): Sample reference, e.g. "kick/auto" or exact path + - loop (bool, optional, default True) + - warp (bool, optional, default True) + mixer: Optional dict with volume (0-1), pan, eq_preset, + compression_preset, send_reverb, send_delay + + Sample reference format: + "kick/auto" → auto-selects best kick sample + "drumloops/auto" → auto-selects best drum loop + "kick/kick_01.wav" → exact file within libreria/reggaeton/kick/ + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + try: + score = se.require_score() + struct = score.get_structure_dict() + track = se.TrackDef( + track_id = track_id, + name = name, + track_type = "audio", + clips = [se.ClipDef.from_raw(c, struct) for c in (clips or [])], + mixer = se.MixerDef.from_dict(mixer or {}), + ) + score.add_track(track) + return _ok({"track_added": True, "id": track_id, "clips": len(track.clips)}) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def compose_midi_track( + ctx: Context, + track_id: str, + name: str, + instrument: str, + clips: list, + mixer: dict = None, +) -> str: + """Add a MIDI track to the active score. + + Args: + track_id: Unique identifier (e.g. "dembow", "bass") + name: Display name (e.g. "Dembow", "Sub Bass") + instrument: Live instrument to load: "Wavetable" or "Operator" + clips: List of clip dicts, each with: + - section (str): Section name + - pattern (str): MIDI pattern name (see below) + OR + - notes (list): Explicit MIDI notes + mixer: Optional mixer settings dict + + Available patterns: + dembow_minimal, dembow_standard, dembow_double + bass_sub, bass_pluck, bass_octaves, bass_sustained + chords_verse, chords_chorus + melody_simple + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + try: + score = se.require_score() + struct = score.get_structure_dict() + track = se.TrackDef( + track_id = track_id, + name = name, + track_type = "midi", + instrument = instrument, + clips = [se.ClipDef.from_raw(c, struct) for c in (clips or [])], + mixer = se.MixerDef.from_dict(mixer or {}), + ) + score.add_track(track) + return _ok({"track_added": True, "id": track_id, "instrument": instrument, + "clips": len(track.clips)}) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def compose_pattern( + ctx: Context, + track_id: str, + section: str, + pattern: str, +) -> str: + """Add a MIDI pattern clip to an existing track in the active score. + + Args: + track_id: ID of an existing MIDI track (must already be in score) + section: Section name where the clip will be placed + pattern: Pattern name (dembow_standard, bass_pluck, chords_verse, etc.) + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + try: + score = se.require_score() + score.add_clip_to_track(track_id, {"section": section, "pattern": pattern}) + return _ok({"clip_added": True, "track": track_id, "section": section, "pattern": pattern}) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def compose_notes( + ctx: Context, + track_id: str, + section: str, + notes: list, +) -> str: + """Add explicit MIDI notes to an existing track for a specific section. + + Args: + track_id: ID of an existing MIDI track + section: Section name + notes: List of note dicts: [{pitch, start_time, duration, velocity}, ...] + - pitch: MIDI note number (0-127) + - start_time: position in beats (relative to clip start) + - duration: note length in beats + - velocity: 0-127 + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + try: + score = se.require_score() + score.add_clip_to_track(track_id, {"section": section, "notes": notes}) + return _ok({"notes_added": True, "track": track_id, "section": section, + "note_count": len(notes)}) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def compose_mixer( + ctx: Context, + track_id: str, + volume: float = None, + pan: float = None, + eq_preset: str = None, + compression_preset: str = None, + send_reverb: float = None, + send_delay: float = None, +) -> str: + """Update mixer settings for a track in the active score. + + Args: + track_id: Track ID + volume: 0.0 - 1.0 + pan: -1.0 (left) to 1.0 (right) + eq_preset: kick, snare, bass, synth, master, kick_sub, etc. + compression_preset: kick_punch, bass_glue, buss_glue, parallel_drum, etc. + send_reverb: 0.0 - 1.0 (Reverb return send level) + send_delay: 0.0 - 1.0 (Delay return send level) + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + try: + score = se.require_score() + kwargs = {k: v for k, v in { + "volume": volume, "pan": pan, "eq_preset": eq_preset, + "compression_preset": compression_preset, + "send_reverb": send_reverb, "send_delay": send_delay, + }.items() if v is not None} + score.set_mixer(track_id, **kwargs) + return _ok({"mixer_updated": True, "track": track_id, "settings": kwargs}) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def compose_from_template( + ctx: Context, + template_name: str, + title: str = None, + tempo: float = None, + key: str = None, + gap_bars: float = None, +) -> str: + """Create a complete SongScore from a predefined template and make it active. + + Available templates: + reggaeton_basic — Intro/Verse/Chorus/Bridge/Outro with full track set + reggaeton_13scenes — 13-section professional reggaeton structure + minimal_loop — Single 8-bar loop with drums + bass + + Args: + template_name: Template identifier (see above) + title: Override title (optional) + tempo: Override BPM (optional) + key: Override key (optional, e.g. "Dm", "F") + gap_bars: Override gap between sections (optional) + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + try: + overrides = {} + if title is not None: overrides["title"] = title + if tempo is not None: overrides["tempo"] = tempo + if key is not None: overrides["key"] = key + if gap_bars is not None: overrides["gap_bars"] = gap_bars + + score = se.SongScore.from_template(template_name, **overrides) + se.set_current_score(score) + return _ok({ + "template": template_name, + "created": True, + "meta": score.meta, + "sections": len(score.structure), + "tracks": len(score.tracks), + "total_bars": score.total_bars(), + "structure": score.get_structure_dict(), + "warnings": score.validate(), + }) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def compose_validate(ctx: Context) -> str: + """Validate the active SongScore without touching Ableton. + + Checks structure completeness, track/clip consistency, sample references. + + Returns: + List of warnings (empty = all good). + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + try: + score = se.require_score() + warnings = score.validate() + return _ok({ + "valid": len(warnings) == 0, + "warnings": warnings, + "sections": len(score.structure), + "tracks": len(score.tracks), + "total_bars": score.total_bars(), + }) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def save_score(ctx: Context, filename: str = None) -> str: + """Save the active SongScore to disk as a JSON file. + + Args: + filename: File name (without path). If omitted, auto-generated from title + timestamp. + Extension .json is added automatically if missing. + File is saved to: AbletonMCP_AI/mcp_server/scores/ + + Returns: + Absolute path of the saved file. + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + try: + score = se.require_score() + if not filename: + ts = __import__("datetime").datetime.now().strftime("%Y%m%d_%H%M%S") + safe = "".join(c if c.isalnum() or c in "_- " else "_" + for c in score.meta.get("title", "untitled")) + safe = safe.replace(" ", "_").strip("_")[:40] + filename = "%s_%s.json" % (safe, ts) + if not filename.endswith(".json"): + filename += ".json" + path = se.SCORES_DIR / filename + score.save(path) + return _ok({"saved": True, "filename": filename, "path": str(path), + "size_bytes": path.stat().st_size}) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def load_score(ctx: Context, filename: str) -> str: + """Load a SongScore from disk and make it the active score. + + Args: + filename: File name in scores/ directory (e.g. "perreo_eterno.json"). + Use list_scores() to see available files. + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + try: + if not filename.endswith(".json"): + filename += ".json" + path = se.SCORES_DIR / filename + if not path.exists(): + return _err("File not found: %s. Use list_scores() to see available scores." % filename) + score = se.SongScore.load(path) + se.set_current_score(score) + return _ok({ + "loaded": True, + "filename": filename, + "meta": score.meta, + "sections": len(score.structure), + "tracks": len(score.tracks), + "total_bars": score.total_bars(), + "warnings": score.validate(), + }) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def list_scores(ctx: Context) -> str: + """List all SongScore JSON files saved in mcp_server/scores/. + + Returns file names, sizes, and any readable metadata. + """ + se = _import_score_engine() + if not se: + return _err("score_engine not available") + try: + files = sorted(se.SCORES_DIR.glob("*.json")) + entries = [] + for f in files: + entry = {"filename": f.name, "size_bytes": f.stat().st_size} + try: + data = __import__("json").loads(f.read_text(encoding="utf-8")) + m = data.get("meta", {}) + entry.update({ + "title": m.get("title", "?"), + "tempo": m.get("tempo"), + "key": m.get("key"), + "tracks": len(data.get("tracks", [])), + }) + except Exception: + pass + entries.append(entry) + return _ok({"count": len(entries), "scores": entries, + "directory": str(se.SCORES_DIR)}) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def render_score( + ctx: Context, + clear_first: bool = True, + score_json: str = None, +) -> str: + """Render the active SongScore into Ableton Live. + + Translates the score into Ableton operations: creates tracks, places clips + in Arrangement View at the correct positions, applies mixer settings. + + Args: + clear_first: Remove all existing tracks/clips before rendering (default True). + score_json: Optional. Pass a raw JSON string to render directly without + making it the active score. If omitted, uses the active score. + + Returns: + Summary: tracks created, clips placed, errors. + """ + se = _import_score_engine() + sr = _import_score_renderer() + if not se or not sr: + return _err("score_engine or score_renderer not available") + try: + if score_json: + score = se.SongScore.from_json(score_json) + else: + score = se.require_score() + + renderer = sr.ScoreRenderer(_REGGAETON_LIB) + result = renderer.render(score, clear_first=clear_first) + return _ok(result) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def render_score_from_file( + ctx: Context, + filename: str, + clear_first: bool = True, +) -> str: + """Load a SongScore JSON from disk and render it into Ableton Live. + + Ideal for batch workflows: save 50 scores with ai_loop.py, then render + them one by one. + + Args: + filename: File name in scores/ (e.g. "song_001.json"). Use list_scores(). + clear_first: Remove existing tracks before rendering (default True). + """ + se = _import_score_engine() + sr = _import_score_renderer() + if not se or not sr: + return _err("score_engine or score_renderer not available") + try: + if not filename.endswith(".json"): + filename += ".json" + path = se.SCORES_DIR / filename + if not path.exists(): + return _err("File not found: %s" % filename) + + score = se.SongScore.load(path) + se.set_current_score(score) # Also make it active + + renderer = sr.ScoreRenderer(_REGGAETON_LIB) + result = renderer.render(score, clear_first=clear_first) + result["filename"] = filename + return _ok(result) + except Exception as exc: + return _err(str(exc)) + + +@mcp.tool() +def render_all_scores( + ctx: Context, + clear_between: bool = True, + delay_seconds: float = 3.0, + limit: int = 0, +) -> str: + """Render all SongScore JSON files from scores/ sequentially into Ableton. + + Designed for batch autonomous production. Run this after ai_loop.py has + generated a batch of scores. + + Args: + clear_between: Clear Ableton project between each score (default True). + delay_seconds: Wait between renders (give Ableton time to process). + limit: Maximum number of scores to render (0 = all). + + Returns: + Summary of all render results. + """ + import time as _time + + se = _import_score_engine() + sr = _import_score_renderer() + if not se or not sr: + return _err("score_engine or score_renderer not available") + try: + files = sorted(se.SCORES_DIR.glob("*.json")) + if limit > 0: + files = files[:limit] + if not files: + return _ok({"rendered": 0, "message": "No score files found in scores/"}) + + renderer = sr.ScoreRenderer(_REGGAETON_LIB) + results = [] + errors = 0 + + for i, f in enumerate(files): + logger.info("[render_all] %d/%d: %s", i + 1, len(files), f.name) + try: + score = se.SongScore.load(f) + result = renderer.render(score, clear_first=clear_between) + result["filename"] = f.name + results.append(result) + if not result.get("success"): + errors += 1 + except Exception as exc: + results.append({"filename": f.name, "success": False, "error": str(exc)}) + errors += 1 + + if delay_seconds > 0 and i < len(files) - 1: + _time.sleep(delay_seconds) + + return _ok({ + "total": len(files), + "success": len(files) - errors, + "errors": errors, + "results": results, + }) + except Exception as exc: + return _err(str(exc)) + + +# Also register timeouts for the new tools +TIMEOUTS.update({ + "new_score": 5.0, + "get_score": 5.0, + "compose_structure": 5.0, + "compose_audio_track": 5.0, + "compose_midi_track": 5.0, + "compose_pattern": 5.0, + "compose_notes": 5.0, + "compose_mixer": 5.0, + "compose_from_template": 5.0, + "compose_validate": 5.0, + "save_score": 5.0, + "load_score": 5.0, + "list_scores": 5.0, + "render_score": 300.0, + "render_score_from_file":300.0, + "render_all_scores": 1800.0, +}) + + # ------------------------------------------------------------------ # MAIN # ------------------------------------------------------------------