Checkpoint: Score→Render pipeline working with GLM-5-Turbo

- score_engine.py: 3-phase track type auto-correction (detects pattern
  names in sample field, converts audio→midi when all clips are patterns)
- score_renderer.py: Track creation with Ableton audio/MIDI grouping,
  load_sample_direct with fallback, pre/post snapshot for correct index
  mapping despite leftover tracks from clear_project
- ai_loop.py: Rewritten with GLM-5-Turbo as default, 4-attempt JSON
  parser with bracket fix, clean SYSTEM_PROMPT with exact sample paths
- server.py: Score→Render MCP tools (compose_from_template, render_score,
  etc.)
- SYSTEM_SCORE_RENDER.md: Architecture documentation

Test results:
- Template render: 29 clips, 0 errors (reggaeton_basic)
- GLM-5-Turbo render: 64 clips, 0 errors (Luna de Miel en el Block)
- All track types correctly mapped (audio/MIDI)
- Instruments loaded on MIDI tracks (Wavetable/Operator)
- Audio samples resolved from libreria/reggaeton/ correctly
This commit is contained in:
Administrator
2026-04-14 15:52:23 -03:00
parent febb411c3f
commit 96ecf86812
6 changed files with 2652 additions and 1 deletions

View File

@@ -0,0 +1,90 @@
# System: Score → Render Pipeline (Sprint 9)
Effective: 2026-04-14
Primary Workflow: **Compose-then-Render**
Target View: **Session View**
## Overview
The Score → Render pipeline introduces a decoupled architecture where musical composition is separated from Ableton Live execution. This allows for:
1. **Incremental Composition**: Build a song piece-by-piece in a JSON score.
2. **Offline Generation**: Use AI agents (OpenAI/Anthropic) to generate scores without needing Ableton open.
3. **Batch Rendering**: Render 50+ unique songs sequentially from JSON files.
4. **Deterministic Deployment**: Entire song structures are injected into Session View in one atomic call.
---
## Core Components
### 1. SongScore (`score_engine.py`)
A pure Python data model representing a song. No Ableton dependencies.
- **Meta**: Title, Tempo, Key, Gap Bars.
- **Structure**: Ordered list of sections (Intro, Chorus, etc.) with durations.
- **Tracks**: List of track definitions (Audio or MIDI).
- **Clips**: Clips mapped to specific sections.
- **Mixer**: Volume, Pan, EQ/Compressor presets, Return Sends.
### 2. ScoreRenderer (`score_renderer.py`)
Translates `SongScore` into TCP commands for Ableton Live.
- **Mapping**: Sections → Scenes | Tracks → Tracks | Clips → Clip Slots.
- **Sample Selection**: Resolver for `"auto"` samples based on BPM proximity.
- **MIDI Resolution**: Resolves pattern names (e.g., `dembow_standard`) into explicit MIDI notes before sending.
- **Mixer Application**: Configures devices (EQ Eight, Compressor) and sends.
### 3. AI Loop (`ai_loop.py`)
An autonomous production script compatible with Anthropic/OpenRouter/Local LLMs.
- Queries AI for valid `SongScore` JSON.
- Validates and saves to `mcp_server/scores/`.
- Optionally renders immediately to Ableton.
---
## Technical Mapping (Session View)
The system is strictly Session-View only to avoid Arrangement complexity and allow clip-based performance.
| SongScore Element | Ableton Element | Command Used |
|-------------------|-----------------|--------------|
| `SectionDef` | **Scene** | `create_scene`, `set_scene_name` |
| `TrackDef` | **Track** | `create_audio_track`, `create_midi_track` |
| `ClipDef` (Audio) | **Clip Slot** | `load_sample_to_clip` |
| `ClipDef` (MIDI) | **Clip Slot** | `create_clip`, `add_notes_to_clip` |
| `MixerDef` | **Devices** | `configure_eq`, `configure_compressor`, `set_track_send` |
---
## Available Tools (MCP)
### Composer Tools
- `new_score`: Initialize active score.
- `compose_structure`: Define sections and durations.
- `compose_audio_track`: Add audio tracks with sample references.
- `compose_midi_track`: Add MIDI tracks with instruments.
- `compose_pattern`: Apply predefined MIDI patterns (dembow, bass, etc.).
- `compose_mixer`: Set levels and FX presets.
- `compose_from_template`: Create full score from "reggaeton_basic", etc.
### Management & Rendering
- `save_score` / `load_score`: Persist JSON to `mcp_server/scores/`.
- `list_scores`: List all saved canciones.
- `render_score`: Inject active score into Ableton.
- `render_score_from_file`: Render a specific JSON file.
- `render_all_scores`: Sequentially render everything in the scores folder.
---
## MIDI Patterns Reference
The following patterns can be used in `compose_midi_track` or `compose_pattern`:
- **Drums**: `dembow_minimal`, `dembow_standard`, `dembow_double`.
- **Bass**: `bass_sub`, `bass_pluck`, `bass_octaves`, `bass_sustained`.
- **Harmony**: `chords_verse`, `chords_chorus`.
- **Melody**: `melody_simple`.
## Best Practices for AI Agents
1. **Always start with a Template**: Use `compose_from_template` first, then modify.
2. **Use "auto" samples**: Let the renderer pick the best file matching the BPM.
3. **Validate before Render**: Use `compose_validate` to catch ID mismatches.
4. **Iterate in JSON**: It's faster to tweak the JSON score via compose tools than to re-render everything.

View File

@@ -0,0 +1,377 @@
"""
ai_loop.py — Autonomous music production loop using an Anthropic-compatible AI.
The loop:
1. Calls an Anthropic-compatible endpoint to generate a SongScore JSON
2. Validates and saves the score to scores/
3. Optionally renders it into Ableton Live
Configuration (environment variables OR command-line args):
AI_BASE_URL → API base URL (default: https://api.anthropic.com)
AI_API_KEY → API key (required)
AI_MODEL → model name (default: GLM-5-Turbo)
AI_MAX_TOKENS → max output tokens (default: 4096)
RENDER_AFTER → "1" to auto-render each score in Ableton (default: 0)
LOOP_COUNT → how many songs to produce (default: 10, 0 = infinite)
LOOP_DELAY → seconds between generations (default: 5)
LIB_ROOT → path to libreria/reggaeton (auto-detected)
Usage examples:
# OpenRouter with Claude Haiku
AI_BASE_URL=https://openrouter.ai/api/v1 AI_API_KEY=sk-xxx python ai_loop.py
# Local LM Studio (Anthropic-compatible)
AI_BASE_URL=http://localhost:1234/v1 AI_API_KEY=sk-any python ai_loop.py --count 5
# Real Anthropic + auto-render
AI_API_KEY=sk-ant-xxx RENDER_AFTER=1 python ai_loop.py
"""
import argparse
import json
import logging
import os
import sys
import time
from datetime import datetime
from pathlib import Path
_THIS_DIR = Path(__file__).resolve().parent
_PROJ_DIR = _THIS_DIR.parent
_BASE_DIR = _PROJ_DIR.parent
for _p in (str(_THIS_DIR), str(_PROJ_DIR)):
if _p not in sys.path:
sys.path.insert(0, _p)
from score_engine import SongScore, SCORES_DIR
from score_renderer import ScoreRenderer
logging.basicConfig(
level = logging.INFO,
format = "%(asctime)s [ai_loop] %(levelname)s: %(message)s",
)
log = logging.getLogger("ai_loop")
_DEFAULT_LIB_ROOT = str(_BASE_DIR / "libreria" / "reggaeton")
SYSTEM_PROMPT = """\
You are a professional reggaeton and Latin urban music producer AI.
Your ONLY job is to output a valid SongScore JSON object for each request.
Do NOT include any explanation, markdown code fences, or commentary.
Output ONLY raw JSON that starts with { and ends with }.
SongScore schema:
{
"meta": {
"title": "<unique Spanish/English song title>",
"tempo": <85-105>,
"key": "<Am|Dm|Em|Fm|Gm|C|F|G|Bb>",
"genre": "reggaeton",
"time_signature": "4/4",
"gap_bars": <1.0-4.0>
},
"structure": [
{ "name": "<section name>", "duration_bars": <integer> },
...
],
"tracks": [
{
"id": "<unique_id>",
"name": "<Track Name>",
"type": "<audio|midi>",
"clips": [
{ "section": "<section name>", "sample": "kick/auto", "loop": true }
],
"instrument": "<Wavetable|Operator>",
"mixer": { "volume": <0-1>, "pan": <-1 to 1>, "eq_preset": "<optional>" }
}
]
}
Available sample subfolders — use EXACTLY these values in the "sample" field:
"kick/auto" -> Kick drums
"snare/auto" -> Snares
"hi-hat (para percs normalmente)/auto" -> Hi-hat / percussion
"drumloops/auto" -> Drum loops
"perc loop/auto" -> Percussion loops
"bass/auto" -> Bass samples
"fx/auto" -> FX/transitions
IMPORTANT: "auto" is a keyword that means "pick the best sample automatically".
Do NOT write "subfolder/auto" literally — that is an instruction, not a valid path.
Available MIDI patterns:
dembow_minimal dembow_standard dembow_double
bass_sub bass_pluck bass_octaves bass_sustained
chords_verse chords_chorus melody_simple
Available EQ presets: kick snare bass synth master
compression_preset is accepted but currently ignored (reserved for future use).
Rules:
- Every track MUST have at least one clip.
- Every clip MUST reference a valid section name from the structure array.
- Always include at minimum: kick, snare or drum_loop, dembow, bass tracks.
- Vary everything: title, tempo, key, gap_bars, structure length (40-90 total bars).
- Use realistic reggaeton/latin structures (Intro, Verse, Pre-Chorus, Chorus, Bridge, Outro).
- Mix audio and MIDI tracks creatively.
- Section names MUST be unique. Use numbered suffixes: "Intro", "Verse A", "Pre-Chorus", "Chorus A", "Verse B", "Chorus B", "Bridge", "Outro". NEVER repeat a section name.
- Do NOT include "start_bar" in sections. The engine calculates it automatically from duration_bars and gap_bars.
- Output ONLY the JSON object. Nothing else.
"""
USER_PROMPT_TEMPLATE = """\
Generate song number {index} of {total}.
Make it unique. Use creative Spanish/English titles.
Output only the SongScore JSON.
"""
def _build_client(base_url: str, api_key: str):
try:
import anthropic
except ImportError:
log.error("anthropic package not installed. Run: pip install anthropic")
sys.exit(1)
kwargs = {"api_key": api_key}
if base_url and "anthropic.com" not in base_url:
kwargs["base_url"] = base_url
return anthropic.Anthropic(**kwargs)
def _generate_score(client, model: str, max_tokens: int,
index: int, total: int) -> str:
user_prompt = USER_PROMPT_TEMPLATE.format(index=index, total=total)
message = client.messages.create(
model = model,
max_tokens = max_tokens,
system = SYSTEM_PROMPT,
messages = [{"role": "user", "content": user_prompt}],
)
content = message.content
if isinstance(content, list):
text_blocks = [b.text for b in content if hasattr(b, "text")]
return "\n".join(text_blocks).strip()
return str(content).strip()
def _fix_brackets(text: str) -> str:
"""Fix common LLM bracket mistakes: } where ] is needed, missing }, etc."""
import re
# GLM-5-Turbo sometimes closes "structure": [...] with } instead of ]
# Pattern: },\n "tracks" -> ],\n "tracks"
text = re.sub(r'\},(\s*\n\s*)"tracks"', r'],\1"tracks"', text, count=1)
# Also: }\n] (array of objects closed with } then ]) -> }\n]
text = re.sub(r'\}\s*\]', '}\n]', text)
# Trailing comma before closing bracket
text = re.sub(r',(\s*\})', r'\1', text)
text = re.sub(r',(\s*\])', r'\1', text)
return text
def _parse_score(raw: str, index: int) -> SongScore:
import re
raw = raw.strip()
if raw.startswith("```"):
lines = raw.split("\n")
raw = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:])
start = raw.find("{")
end = raw.rfind("}") + 1
if start < 0 or end <= start:
raise ValueError("No JSON object found in AI response")
raw = raw[start:end]
# Attempt 1: direct parse
try:
data = json.loads(raw)
return SongScore.from_dict(data)
except json.JSONDecodeError:
pass
# Attempt 2: fix common bracket errors from LLMs
fixed = _fix_brackets(raw)
try:
data = json.loads(fixed)
log.info("JSON bracket fix succeeded on attempt 2")
return SongScore.from_dict(data)
except json.JSONDecodeError:
pass
# Attempt 3: remove // comments + trailing commas + bracket fix
cleaned = re.sub(r'//.*$', '', fixed, flags=re.MULTILINE)
cleaned = re.sub(r',(\s*\})', r'\1', cleaned)
cleaned = re.sub(r',(\s*\])', r'\1', cleaned)
try:
data = json.loads(cleaned)
log.info("JSON cleaned successfully on attempt 3")
return SongScore.from_dict(data)
except json.JSONDecodeError as exc:
# Attempt 4: brute-force close unclosed brackets
open_b = cleaned.count('{') - cleaned.count('}')
open_br = cleaned.count('[') - cleaned.count(']')
if open_b > 0 or open_br > 0:
repaired = cleaned.rstrip().rstrip(',')
repaired += ']' * max(0, open_br)
repaired += '}' * max(0, open_b)
try:
data = json.loads(repaired)
log.info("JSON repaired (bracket closure) on attempt 4")
return SongScore.from_dict(data)
except json.JSONDecodeError as exc4:
pass
raise ValueError(
"JSON parse failed after all attempts: %s\nLast output:\n%s"
% (exc, cleaned[:800])
)
def run_loop(
base_url: str,
api_key: str,
model: str,
max_tokens: int,
count: int,
delay: float,
render: bool,
lib_root: str,
output_prefix: str = "ai_song",
dry_run: bool = False,
):
client = _build_client(base_url, api_key)
renderer = ScoreRenderer(lib_root) if (render and not dry_run) else None
total = count if count > 0 else "inf"
log.info("Starting AI production loop — model=%s count=%s render=%s",
model, total, render)
log.info("Scores will be saved to: %s", SCORES_DIR)
if render:
log.info("Library root: %s", lib_root)
if dry_run:
log.info("DRY RUN — Ableton will NOT be touched")
produced = 0
iteration = 0
while True:
iteration += 1
if count > 0 and produced >= count:
break
log.info("Generating song %d / %s", iteration, total)
try:
raw_json = _generate_score(client, model, max_tokens, iteration, count or 999)
log.debug("Raw AI output:\n%s", raw_json[:500])
score = _parse_score(raw_json, iteration)
warnings = score.validate()
if warnings:
log.warning("Validation warnings: %s", warnings)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = "%s_%03d_%s.json" % (output_prefix, iteration, timestamp)
saved_path = SCORES_DIR / filename
score.save(saved_path)
log.info("Saved: %s (%d tracks, %.0f bars)",
filename, len(score.tracks), score.total_bars())
if renderer:
log.info("Rendering into Ableton...")
result = renderer.render(score, clear_first=True)
if result.get("success"):
log.info("Rendered OK tracks=%d clips=%d bars=%.0f",
len(result["tracks_created"]),
result["clips_created"],
score.total_bars())
else:
log.warning("Render completed with errors:")
for err in result.get("errors", []):
log.warning(" - %s", err)
produced += 1
except KeyboardInterrupt:
log.info("Loop interrupted by user. %d songs produced.", produced)
break
except json.JSONDecodeError as exc:
log.error("JSON parse error on iteration %d: %s", iteration, exc)
except Exception as exc:
log.exception("Unexpected error on iteration %d: %s", iteration, exc)
if count == 0 or produced < count:
if delay > 0:
log.info("Waiting %.0fs before next generation...", delay)
time.sleep(delay)
log.info("Loop complete. %d songs produced and saved to %s", produced, SCORES_DIR)
def main():
parser = argparse.ArgumentParser(
description="Autonomous AI music production loop (Anthropic-compatible)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("--base-url", default=os.environ.get("AI_BASE_URL", "https://api.anthropic.com"))
parser.add_argument("--api-key", default=os.environ.get("AI_API_KEY", ""))
parser.add_argument("--model", default=os.environ.get("AI_MODEL", "GLM-5-Turbo"))
parser.add_argument("--max-tokens",default=int(os.environ.get("AI_MAX_TOKENS", "4096")), type=int)
parser.add_argument("--count", default=int(os.environ.get("LOOP_COUNT", "10")), type=int,
help="Songs to produce (0 = infinite)")
parser.add_argument("--delay", default=float(os.environ.get("LOOP_DELAY", "5")), type=float,
help="Seconds between generations")
parser.add_argument("--render", action="store_true",
default=os.environ.get("RENDER_AFTER", "0") == "1",
help="Render each score into Ableton immediately")
parser.add_argument("--lib-root", default=os.environ.get("LIB_ROOT", _DEFAULT_LIB_ROOT))
parser.add_argument("--prefix", default="ai_song",
help="Filename prefix for saved scores")
parser.add_argument("--dry-run", action="store_true",
help="Generate + validate + save but do NOT call Ableton")
parser.add_argument("--list", action="store_true",
help="List saved scores and exit")
args = parser.parse_args()
if args.list:
scores = sorted(SCORES_DIR.glob("*.json"))
if not scores:
print("No scores saved yet.")
else:
for f in scores:
size = f.stat().st_size
print(" %s (%d bytes)" % (f.name, size))
return
if not args.api_key:
parser.error("API key required. Set --api-key or AI_API_KEY env variable.")
run_loop(
base_url = args.base_url,
api_key = args.api_key,
model = args.model,
max_tokens = args.max_tokens,
count = args.count,
delay = args.delay,
render = args.render,
lib_root = args.lib_root,
output_prefix = args.prefix,
dry_run = args.dry_run,
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,780 @@
"""
score_engine.py — SongScore data model, templates and in-memory singleton.
Pure Python — zero dependencies on Ableton, MCP, or any audio library.
This module is designed to be importable from anywhere: server.py, ai_loop.py,
test scripts, etc.
SongScore JSON schema:
{
"meta": { "title", "tempo", "key", "genre", "time_signature", "gap_bars", "version" },
"structure": [ { "name", "start_bar", "duration_bars" } ],
"tracks": [
{
"id", "name", "type", # type = "audio" | "midi"
"instrument", # only for MIDI tracks (e.g. "Wavetable")
"clips": [
{
"section", # section name → resolves start_bar automatically
"start_bar", # OR explicit start position (in bars)
"duration_bars",
"sample", # audio only e.g. "kick/auto" or "kick/kick1.wav"
"pattern", # MIDI only e.g. "dembow_standard"
"notes", # MIDI only explicit note list (overrides pattern)
"loop", "warp" # audio flags
}
],
"mixer": { "volume","pan","eq_preset","compression_preset","send_reverb","send_delay" }
}
]
}
"""
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
# Scores directory (created automatically)
SCORES_DIR = Path(__file__).parent / "scores"
SCORES_DIR.mkdir(exist_ok=True)
# Valid MIDI pattern names (used by sanitization)
_VALID_PATTERNS_SET = {
"dembow_minimal", "dembow_standard", "dembow_double",
"bass_sub", "bass_pluck", "bass_octaves", "bass_sustained",
"chords_verse", "chords_chorus", "melody_simple",
}
# In-memory singleton (one active score per MCP server process)
_current_score: Optional["SongScore"] = None
# ==================================================================
# Data classes
# ==================================================================
class MixerDef:
__slots__ = ("volume", "pan", "eq_preset", "compression_preset",
"send_reverb", "send_delay")
def __init__(self, volume: float = 0.75, pan: float = 0.0,
eq_preset: str = None, compression_preset: str = None,
send_reverb: float = 0.0, send_delay: float = 0.0):
self.volume = float(volume)
self.pan = float(pan)
self.eq_preset = eq_preset
self.compression_preset = compression_preset
self.send_reverb = float(send_reverb)
self.send_delay = float(send_delay)
def to_dict(self) -> Dict:
d: Dict[str, Any] = {"volume": self.volume, "pan": self.pan}
if self.eq_preset:
d["eq_preset"] = self.eq_preset
if self.compression_preset:
d["compression_preset"] = self.compression_preset
if self.send_reverb:
d["send_reverb"] = self.send_reverb
if self.send_delay:
d["send_delay"] = self.send_delay
return d
@classmethod
def from_dict(cls, d: Dict) -> "MixerDef":
return cls(
volume=d.get("volume", 0.75),
pan=d.get("pan", 0.0),
eq_preset=d.get("eq_preset"),
compression_preset=d.get("compression_preset"),
send_reverb=d.get("send_reverb", 0.0),
send_delay=d.get("send_delay", 0.0),
)
class ClipDef:
"""Represents a single clip inside a track."""
def __init__(self, start_bar: float = 0.0, duration_bars: float = 4.0,
clip_type: str = "audio", sample: str = None,
pattern: str = None, notes: List[Dict] = None,
loop: bool = True, warp: bool = True, section: str = None,
name: str = None):
self.start_bar = float(start_bar)
self.duration_bars = float(duration_bars)
self.clip_type = clip_type # "audio" | "midi"
self.sample = sample # relative ref or "/abs/path.wav"
self.pattern = pattern # e.g. "dembow_standard"
self.notes = notes or [] # explicit MIDI notes
self.loop = bool(loop)
self.warp = bool(warp)
self.section = section # section name (informational)
self.name = name
def to_dict(self) -> Dict:
d: Dict[str, Any] = {
"start_bar": self.start_bar,
"duration_bars": self.duration_bars,
}
if self.section:
d["section"] = self.section
if self.name:
d["name"] = self.name
if self.sample:
d["sample"] = self.sample
d["loop"] = self.loop
d["warp"] = self.warp
if self.pattern:
d["pattern"] = self.pattern
if self.notes:
d["notes"] = self.notes
return d
@classmethod
def from_raw(cls, raw: Dict, structure: List[Dict] = None) -> "ClipDef":
"""Build ClipDef from a raw dict, resolving section → start_bar if needed."""
start_bar = raw.get("start_bar")
duration_bars = raw.get("duration_bars")
section_name = raw.get("section")
if start_bar is None and section_name and structure:
for sec in structure:
if sec["name"] == section_name:
start_bar = sec["start_bar"]
if duration_bars is None:
duration_bars = sec["duration_bars"]
break
if start_bar is None:
start_bar = 0.0
if duration_bars is None:
duration_bars = 4.0
# Infer clip type from keys
clip_type = "audio" if raw.get("sample") else "midi"
return cls(
start_bar = start_bar,
duration_bars = duration_bars,
clip_type = clip_type,
sample = raw.get("sample"),
pattern = raw.get("pattern"),
notes = raw.get("notes", []),
loop = raw.get("loop", True),
warp = raw.get("warp", True),
section = section_name,
name = raw.get("name"),
)
class TrackDef:
"""Represents a single track with all its clips."""
def __init__(self, track_id: str, name: str, track_type: str,
instrument: str = None,
clips: List[ClipDef] = None,
mixer: MixerDef = None):
self.id = track_id
self.name = name
self.type = track_type # "audio" | "midi"
self.instrument = instrument # "Wavetable", "Operator", etc.
self.clips = clips or []
self.mixer = mixer or MixerDef()
def to_dict(self) -> Dict:
d: Dict[str, Any] = {
"id": self.id,
"name": self.name,
"type": self.type,
"clips": [c.to_dict() for c in self.clips],
"mixer": self.mixer.to_dict(),
}
if self.instrument:
d["instrument"] = self.instrument
return d
@classmethod
def from_raw(cls, raw: Dict, structure: List[Dict] = None) -> "TrackDef":
track_type = raw.get("type", "audio")
# ── Phase 1: Auto-correct track type from ORIGINAL clip data (before coercion) ──
raw_clips = raw.get("clips", [])
orig_has_sample = any(c.get("sample") for c in raw_clips)
orig_has_pattern = any(c.get("pattern") for c in raw_clips)
orig_has_notes = any(c.get("notes") for c in raw_clips)
orig_has_midi = orig_has_pattern or orig_has_notes
if track_type == "midi" and orig_has_sample and not orig_has_midi:
track_type = "audio"
elif track_type == "midi" and orig_has_sample and orig_has_midi:
all_samples_not_patterns = all(
c.get("sample") and c.get("sample").replace("/auto", "").replace("/", "_")
not in _VALID_PATTERNS_SET
for c in raw_clips if c.get("sample")
)
sample_count = sum(1 for c in raw_clips if c.get("sample"))
midi_count = sum(1 for c in raw_clips if c.get("pattern") or c.get("notes"))
if sample_count > midi_count:
track_type = "audio"
elif track_type == "audio" and orig_has_midi and not orig_has_sample:
track_type = "midi"
elif track_type == "audio" and orig_has_sample and not orig_has_midi:
all_samples_are_patterns = all(
c.get("sample", "").replace("/auto", "").replace("/", "_")
in _VALID_PATTERNS_SET
for c in raw_clips if c.get("sample")
)
if all_samples_are_patterns:
track_type = "midi"
# ── Phase 2: Build clips with corrected track type ──
clips = [ClipDef.from_raw(c, structure) for c in raw_clips]
for clip in clips:
if track_type == "midi":
clip.clip_type = "midi"
if not clip.pattern and not clip.notes:
if clip.sample:
from score_renderer import _sanitize_pattern_name
clip.pattern = _sanitize_pattern_name(clip.sample)
else:
clip.pattern = "dembow_standard"
clip.sample = None
elif clip.sample and (clip.pattern or clip.notes):
clip.sample = None
else:
clip.clip_type = "audio"
if clip.pattern and not clip.sample:
from score_renderer import _sanitize_sample_ref
clip.sample = _sanitize_sample_ref(clip.pattern)
clip.pattern = None
elif clip.pattern and clip.sample:
clip.pattern = None
# Ensure MIDI tracks have an instrument
instrument = raw.get("instrument")
if track_type == "midi" and not instrument:
if any(c.pattern and ("chord" in c.pattern or "melody" in c.pattern) for c in clips):
instrument = "Wavetable"
else:
instrument = "Operator"
mixer = MixerDef.from_dict(raw.get("mixer", {}))
return cls(
track_id = raw.get("id", raw.get("name", "Track")),
name = raw.get("name", "Track"),
track_type = track_type,
instrument = instrument,
clips = clips,
mixer = mixer,
)
class SectionDef:
"""A named temporal section of the song."""
def __init__(self, name: str, start_bar: float, duration_bars: float):
self.name = name
self.start_bar = float(start_bar)
self.duration_bars = float(duration_bars)
def to_dict(self) -> Dict:
return {
"name": self.name,
"start_bar": self.start_bar,
"duration_bars": self.duration_bars,
}
# ==================================================================
# SongScore — main model
# ==================================================================
class SongScore:
"""Complete musical score — pure data, no Ableton dependencies.
Build using the builder API (set_structure, add_track, add_clip, etc.)
or load from a dict/JSON/template.
"""
SCHEMA_VERSION = "1.0"
def __init__(self, title: str = "Untitled", tempo: float = 95.0,
key: str = "Am", genre: str = "reggaeton",
time_signature: str = "4/4", gap_bars: float = 2.0):
self.meta: Dict[str, Any] = {
"title": title,
"tempo": float(tempo),
"key": key,
"genre": genre,
"time_signature": time_signature,
"gap_bars": float(gap_bars),
"version": self.SCHEMA_VERSION,
"created_at": datetime.now().isoformat(),
}
self.structure: List[SectionDef] = []
self.tracks: List[TrackDef] = []
# ------------------------------------------------------------------
# Builder API
# ------------------------------------------------------------------
def set_structure(self, sections: List[Dict]) -> "SongScore":
"""Set the temporal structure. Calculates start_bar using meta['gap_bars']."""
gap = float(self.meta.get("gap_bars", 2.0))
current_bar = 0.0
self.structure = []
for sec in sections:
name = sec.get("name", "Section")
duration = float(sec.get("duration_bars", 8))
# Explicit start_bar overrides auto-calculation
start = float(sec.get("start_bar", current_bar))
self.structure.append(SectionDef(name, start, duration))
current_bar = start + duration + gap
return self
def add_track(self, track: TrackDef) -> "SongScore":
"""Add or replace a track by ID."""
for i, t in enumerate(self.tracks):
if t.id == track.id:
self.tracks[i] = track
return self
self.tracks.append(track)
return self
def add_clip_to_track(self, track_id: str, clip_raw: Dict) -> "SongScore":
"""Add a clip to an existing track. clip_raw may use 'section' keyword."""
track = self.get_track(track_id)
if track is None:
raise KeyError("Track '%s' not found. Create it first." % track_id)
clip = ClipDef.from_raw(clip_raw, self.get_structure_dict())
track.clips.append(clip)
return self
def set_mixer(self, track_id: str, **kwargs) -> "SongScore":
"""Update mixer settings for a track."""
track = self.get_track(track_id)
if track is None:
raise KeyError("Track '%s' not found." % track_id)
for k, v in kwargs.items():
if hasattr(track.mixer, k):
setattr(track.mixer, k, v)
return self
# ------------------------------------------------------------------
# Query helpers
# ------------------------------------------------------------------
def get_track(self, track_id: str) -> Optional[TrackDef]:
for t in self.tracks:
if t.id == track_id:
return t
return None
def get_section(self, name: str) -> Optional[SectionDef]:
for s in self.structure:
if s.name == name:
return s
return None
def get_structure_dict(self) -> List[Dict]:
return [s.to_dict() for s in self.structure]
def total_bars(self) -> float:
if not self.structure:
return 0.0
last = self.structure[-1]
return last.start_bar + last.duration_bars
# ------------------------------------------------------------------
# Validation
# ------------------------------------------------------------------
def validate(self) -> List[str]:
"""Return a list of warning strings. Empty list = valid."""
warnings: List[str] = []
if not self.structure:
warnings.append("No structure defined — call set_structure() first.")
if not self.tracks:
warnings.append("No tracks defined.")
seen_names = set()
for s in self.structure:
if s.name in seen_names:
warnings.append(
"Duplicate section name '%s' — clips may map to wrong scene." % s.name
)
seen_names.add(s.name)
section_names = {s.name for s in self.structure}
for track in self.tracks:
if not track.clips:
warnings.append("Track '%s' has no clips." % track.id)
continue
for clip in track.clips:
if clip.section and clip.section not in section_names:
warnings.append(
"Track '%s': clip section '%s' not in structure."
% (track.id, clip.section)
)
if track.type == "audio" and not clip.sample:
warnings.append(
"Track '%s': audio clip has no sample defined." % track.id
)
if track.type == "midi" and not clip.pattern and not clip.notes:
warnings.append(
"Track '%s': MIDI clip has no pattern or notes." % track.id
)
return warnings
# ------------------------------------------------------------------
# Serialization
# ------------------------------------------------------------------
def to_dict(self) -> Dict:
return {
"meta": self.meta,
"structure": [s.to_dict() for s in self.structure],
"tracks": [t.to_dict() for t in self.tracks],
}
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), indent=indent, ensure_ascii=False)
@classmethod
def from_dict(cls, d: Dict) -> "SongScore":
meta = d.get("meta", {})
score = cls(
title = meta.get("title", "Untitled"),
tempo = meta.get("tempo", 95),
key = meta.get("key", "Am"),
genre = meta.get("genre", "reggaeton"),
time_signature = meta.get("time_signature", "4/4"),
gap_bars = meta.get("gap_bars", 2.0),
)
# Preserve all meta fields
score.meta.update(meta)
# Structure — ignore start_bar from JSON, calculate automatically
gap = float(score.meta.get("gap_bars", 2.0))
current_bar = 0.0
seen_names = set()
for sec in d.get("structure", []):
name = sec["name"]
duration = sec.get("duration_bars", 8)
# Auto-deduplicate section names
base_name = name
counter = 2
while name in seen_names:
name = "%s %d" % (base_name, counter)
counter += 1
seen_names.add(name)
score.structure.append(SectionDef(name, current_bar, duration))
current_bar += duration + gap
# Tracks (clips resolved against structure)
struct_dict = score.get_structure_dict()
for raw in d.get("tracks", []):
score.tracks.append(TrackDef.from_raw(raw, struct_dict))
return score
@classmethod
def from_json(cls, json_str: str) -> "SongScore":
return cls.from_dict(json.loads(json_str))
def save(self, path: Path) -> Path:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(self.to_json(), encoding="utf-8")
return path
@classmethod
def load(cls, path: Path) -> "SongScore":
return cls.from_json(Path(path).read_text(encoding="utf-8"))
# ------------------------------------------------------------------
# Templates
# ------------------------------------------------------------------
@classmethod
def from_template(cls, template_name: str, **meta_overrides) -> "SongScore":
"""Create a complete SongScore from a named template.
meta_overrides: tempo, key, gap_bars, title, etc.
Available templates: reggaeton_basic, reggaeton_13scenes, minimal_loop
"""
templates = _get_templates()
if template_name not in templates:
raise ValueError(
"Template '%s' not found. Available: %s"
% (template_name, sorted(templates.keys()))
)
tmpl = templates[template_name]
meta = {**tmpl["meta"], **meta_overrides}
score = cls(
title = meta.get("title", template_name.replace("_", " ").title()),
tempo = meta.get("tempo", 95),
key = meta.get("key", "Am"),
genre = meta.get("genre", "reggaeton"),
time_signature = meta.get("time_signature", "4/4"),
gap_bars = meta.get("gap_bars", 2.0),
)
score.set_structure(tmpl["structure"])
struct_dict = score.get_structure_dict()
for raw in tmpl["tracks"]:
score.tracks.append(TrackDef.from_raw(raw, struct_dict))
return score
def list_templates(self) -> List[str]:
return sorted(_get_templates().keys())
# ==================================================================
# Singleton helpers (used by server.py)
# ==================================================================
def get_current_score() -> Optional[SongScore]:
return _current_score
def set_current_score(score: Optional[SongScore]) -> None:
global _current_score
_current_score = score
def require_score() -> SongScore:
if _current_score is None:
raise RuntimeError("No active score. Call new_score() or load_score() first.")
return _current_score
# ==================================================================
# Templates
# ==================================================================
def _get_templates() -> Dict[str, Dict]:
"""Return all built-in templates."""
# Clips that reference 'section' get start_bar resolved automatically
return {
# ──────────────────────────────────────────────────────────────
"reggaeton_basic": {
"meta": {"tempo": 95, "key": "Am", "genre": "reggaeton", "gap_bars": 2.0},
"structure": [
{"name": "Intro", "duration_bars": 4},
{"name": "Verse", "duration_bars": 8},
{"name": "Chorus", "duration_bars": 8},
{"name": "Verse 2", "duration_bars": 8},
{"name": "Chorus 2", "duration_bars": 8},
{"name": "Bridge", "duration_bars": 4},
{"name": "Outro", "duration_bars": 4},
],
"tracks": [
{
"id": "drum_loop", "name": "Drum Loop", "type": "audio",
"clips": [
{"section": "Verse", "sample": "drumloops/auto", "loop": True},
{"section": "Chorus", "sample": "drumloops/auto", "loop": True},
{"section": "Verse 2", "sample": "drumloops/auto", "loop": True},
{"section": "Chorus 2", "sample": "drumloops/auto", "loop": True},
],
"mixer": {"volume": 0.95},
},
{
"id": "kick", "name": "Kick", "type": "audio",
"clips": [
{"section": "Verse", "sample": "kick/auto"},
{"section": "Chorus", "sample": "kick/auto"},
{"section": "Verse 2", "sample": "kick/auto"},
{"section": "Chorus 2", "sample": "kick/auto"},
],
"mixer": {"volume": 0.85, "eq_preset": "kick",
"compression_preset": "kick_punch"},
},
{
"id": "snare", "name": "Snare", "type": "audio",
"clips": [
{"section": "Verse", "sample": "snare/auto"},
{"section": "Chorus", "sample": "snare/auto"},
{"section": "Verse 2", "sample": "snare/auto"},
{"section": "Chorus 2", "sample": "snare/auto"},
],
"mixer": {"volume": 0.82, "eq_preset": "snare"},
},
{
"id": "perc", "name": "Perc", "type": "audio",
"clips": [
{"section": "Verse", "sample": "perc loop/auto", "loop": True},
{"section": "Chorus", "sample": "perc loop/auto", "loop": True},
{"section": "Verse 2", "sample": "perc loop/auto", "loop": True},
{"section": "Chorus 2", "sample": "perc loop/auto", "loop": True},
],
"mixer": {"volume": 0.65},
},
{
"id": "dembow", "name": "Dembow", "type": "midi",
"instrument": "Wavetable",
"clips": [
{"section": "Intro", "pattern": "dembow_minimal"},
{"section": "Verse", "pattern": "dembow_standard"},
{"section": "Chorus", "pattern": "dembow_double"},
{"section": "Verse 2", "pattern": "dembow_standard"},
{"section": "Chorus 2", "pattern": "dembow_double"},
],
"mixer": {"volume": 0.80},
},
{
"id": "bass", "name": "Sub Bass", "type": "midi",
"instrument": "Operator",
"clips": [
{"section": "Verse", "pattern": "bass_pluck"},
{"section": "Chorus", "pattern": "bass_octaves"},
{"section": "Verse 2", "pattern": "bass_pluck"},
{"section": "Chorus 2", "pattern": "bass_octaves"},
],
"mixer": {"volume": 0.70},
},
{
"id": "chords", "name": "Chords", "type": "midi",
"instrument": "Wavetable",
"clips": [
{"section": "Verse", "pattern": "chords_verse"},
{"section": "Chorus", "pattern": "chords_chorus"},
{"section": "Verse 2", "pattern": "chords_verse"},
{"section": "Chorus 2", "pattern": "chords_chorus"},
],
"mixer": {"volume": 0.68},
},
],
},
# ──────────────────────────────────────────────────────────────
"reggaeton_13scenes": {
"meta": {"tempo": 95, "key": "Am", "genre": "reggaeton", "gap_bars": 2.0},
"structure": [
{"name": "Intro Suave", "duration_bars": 4},
{"name": "Build Up", "duration_bars": 4},
{"name": "Intro Full", "duration_bars": 4},
{"name": "Verse A", "duration_bars": 8},
{"name": "Pre-Chorus", "duration_bars": 4},
{"name": "Chorus A", "duration_bars": 8},
{"name": "Verse B", "duration_bars": 8},
{"name": "Pre-Chorus 2", "duration_bars": 4},
{"name": "Chorus B", "duration_bars": 8},
{"name": "Bridge", "duration_bars": 4},
{"name": "Breakdown", "duration_bars": 4},
{"name": "Final Chorus", "duration_bars": 8},
{"name": "Outro", "duration_bars": 4},
],
"tracks": [
{
"id": "kick", "name": "Kick", "type": "audio",
"clips": [
{"section": "Intro Full", "sample": "kick/auto"},
{"section": "Verse A", "sample": "kick/auto"},
{"section": "Pre-Chorus", "sample": "kick/auto"},
{"section": "Chorus A", "sample": "kick/auto"},
{"section": "Verse B", "sample": "kick/auto"},
{"section": "Pre-Chorus 2", "sample": "kick/auto"},
{"section": "Chorus B", "sample": "kick/auto"},
{"section": "Final Chorus", "sample": "kick/auto"},
],
"mixer": {"volume": 0.85, "eq_preset": "kick",
"compression_preset": "kick_punch"},
},
{
"id": "snare", "name": "Snare", "type": "audio",
"clips": [
{"section": "Verse A", "sample": "snare/auto"},
{"section": "Chorus A", "sample": "snare/auto"},
{"section": "Verse B", "sample": "snare/auto"},
{"section": "Chorus B", "sample": "snare/auto"},
{"section": "Final Chorus", "sample": "snare/auto"},
],
"mixer": {"volume": 0.82, "eq_preset": "snare"},
},
{
"id": "drum_loop", "name": "Drum Loop", "type": "audio",
"clips": [
{"section": "Verse A", "sample": "drumloops/auto", "loop": True},
{"section": "Chorus A", "sample": "drumloops/auto", "loop": True},
{"section": "Verse B", "sample": "drumloops/auto", "loop": True},
{"section": "Chorus B", "sample": "drumloops/auto", "loop": True},
{"section": "Final Chorus", "sample": "drumloops/auto", "loop": True},
],
"mixer": {"volume": 0.90},
},
{
"id": "dembow", "name": "Dembow", "type": "midi",
"instrument": "Wavetable",
"clips": [
{"section": "Build Up", "pattern": "dembow_minimal"},
{"section": "Intro Full", "pattern": "dembow_minimal"},
{"section": "Verse A", "pattern": "dembow_standard"},
{"section": "Pre-Chorus", "pattern": "dembow_standard"},
{"section": "Chorus A", "pattern": "dembow_double"},
{"section": "Verse B", "pattern": "dembow_standard"},
{"section": "Pre-Chorus 2", "pattern": "dembow_standard"},
{"section": "Chorus B", "pattern": "dembow_double"},
{"section": "Final Chorus", "pattern": "dembow_double"},
],
"mixer": {"volume": 0.80},
},
{
"id": "bass", "name": "Sub Bass", "type": "midi",
"instrument": "Operator",
"clips": [
{"section": "Verse A", "pattern": "bass_pluck"},
{"section": "Chorus A", "pattern": "bass_octaves"},
{"section": "Verse B", "pattern": "bass_pluck"},
{"section": "Chorus B", "pattern": "bass_octaves"},
{"section": "Final Chorus", "pattern": "bass_octaves"},
],
"mixer": {"volume": 0.70},
},
],
},
# ──────────────────────────────────────────────────────────────
"minimal_loop": {
"meta": {"tempo": 100, "key": "C", "genre": "reggaeton", "gap_bars": 0.0},
"structure": [
{"name": "Loop", "duration_bars": 8},
],
"tracks": [
{
"id": "drum", "name": "Drums", "type": "audio",
"clips": [{"section": "Loop", "sample": "drumloops/auto", "loop": True}],
"mixer": {"volume": 0.95},
},
{
"id": "bass", "name": "Bass", "type": "midi",
"instrument": "Operator",
"clips": [{"section": "Loop", "pattern": "bass_sub"}],
"mixer": {"volume": 0.75},
},
{
"id": "dembow", "name": "Dembow", "type": "midi",
"instrument": "Wavetable",
"clips": [{"section": "Loop", "pattern": "dembow_standard"}],
"mixer": {"volume": 0.80},
},
],
},
}

View File

@@ -0,0 +1,774 @@
"""
score_renderer.py — Translates a SongScore into Ableton Live SESSION VIEW operations via TCP.
Architecture:
- Each SectionDef in score.structure → one Ableton Scene
- Each TrackDef in score.tracks → one Ableton track
- Each ClipDef in a track → clip slot at (track_index, scene_index)
Session View mapping:
section "Verse" → scene index 1
section "Chorus" → scene index 2
...
Clip placement (Session View only):
- MIDI tracks: create_clip + add_notes_to_clip
- Audio tracks: load_sample_to_clip (loads .wav into a clip slot)
Pattern generators (all computed on server side — no Ableton logic needed):
MIDI drums: dembow_minimal, dembow_standard, dembow_double
MIDI bass: bass_sub, bass_pluck, bass_octaves, bass_sustained
MIDI harmony: chords_verse, chords_chorus, melody_simple
"""
import json
import os
import socket
from pathlib import Path
from typing import Dict, List, Optional
from score_engine import SongScore, TrackDef, ClipDef
# ------------------------------------------------------------------
# Ableton TCP transport (self-contained — no FastMCP dependency)
# ------------------------------------------------------------------
ABLETON_HOST = "127.0.0.1"
ABLETON_PORT = 9877
_TERMINATOR = b"\n"
def _send(cmd_type: str, params: dict, timeout: float = 30.0) -> dict:
"""Send a command to Ableton via TCP and return the parsed response."""
sock = None
try:
sock = socket.create_connection((ABLETON_HOST, ABLETON_PORT), timeout=timeout)
sock.settimeout(timeout)
msg = json.dumps({"type": cmd_type, "params": params}) + "\n"
sock.sendall(msg.encode("utf-8"))
buf = b""
while True:
chunk = sock.recv(65536)
if not chunk:
break
buf += chunk
if _TERMINATOR in buf:
raw, _, _ = buf.partition(_TERMINATOR)
return json.loads(raw.decode("utf-8"))
return {"status": "error", "message": "No response terminator received"}
except socket.timeout:
return {"status": "error", "message": "Timeout after %.0fs on '%s'" % (timeout, cmd_type)}
except ConnectionRefusedError:
return {"status": "error",
"message": "Cannot connect to Ableton on %s:%d" % (ABLETON_HOST, ABLETON_PORT)}
except Exception as exc:
return {"status": "error", "message": str(exc)}
finally:
if sock:
try:
sock.close()
except Exception:
pass
# ------------------------------------------------------------------
# Sample resolution — "kick/auto" or "kick/kick_01.wav" → absolute path
# ------------------------------------------------------------------
# Keyword mapping: invented filenames → correct folder/auto paths
_SAMPLE_KEYWORD_MAP = {
"kick": "kick/auto",
"snare": "snare/auto",
"hihat": "hi-hat (para percs normalmente)/auto",
"hi-hat": "hi-hat (para percs normalmente)/auto",
"hat": "hi-hat (para percs normalmente)/auto",
"drumloop": "drumloops/auto",
"drum": "drumloops/auto",
"perc": "perc loop/auto",
"bass": "bass/auto",
"fx": "fx/auto",
"transition": "fx/auto",
"transicion": "fx/auto",
"riser": "fx/auto",
"impact": "fx/auto",
"oneshot": "oneshots/auto",
}
# Valid MIDI pattern names
_VALID_PATTERNS = {
"dembow_minimal", "dembow_standard", "dembow_double",
"bass_sub", "bass_pluck", "bass_octaves", "bass_sustained",
"chords_verse", "chords_chorus", "melody_simple",
}
def _sanitize_sample_ref(sample_ref: str) -> str:
"""Map invented filenames to correct folder/auto paths.
Handles cases where LLMs generate names like:
"kick 1.wav""kick/auto"
"snare 3.wav""snare/auto"
"hi-hat 1.wav""hi-hat (para percs normalmente)/auto"
"""
if not sample_ref:
return sample_ref
if "/" in sample_ref:
return sample_ref # already has folder structure
# Already an "auto" path without folder
if sample_ref.lower() == "auto":
return "kick/auto"
# Strip common suffixes: .wav, .mp3, .aif, numbers, spaces
name = sample_ref.lower()
name = os.path.splitext(name)[0] # remove .wav etc
# Remove trailing numbers: "kick 1" → "kick", "bass_sub 2" → "bass_sub"
name = name.strip()
name_parts = name.rsplit(None, 1)
if len(name_parts) == 2 and name_parts[1].isdigit():
name = name_parts[0]
# Keyword match
for keyword, path in _SAMPLE_KEYWORD_MAP.items():
if keyword in name.lower():
return path
return sample_ref # no match, return as-is
def _sanitize_pattern_name(pattern: str) -> str:
"""Map invented pattern names to valid pattern names."""
if not pattern:
return "dembow_standard"
if pattern in _VALID_PATTERNS:
return pattern
pat = pattern.lower().strip()
# Remove file extensions
pat = os.path.splitext(pat)[0]
# Remove trailing numbers
parts = pat.rsplit(None, 1)
if len(parts) == 2 and parts[1].isdigit():
pat = parts[0]
# Keyword matching
if "dembow" in pat:
return "dembow_standard"
if "bass" in pat:
if "sub" in pat:
return "bass_sub"
if "pluck" in pat:
return "bass_pluck"
if "octave" in pat:
return "bass_octaves"
return "bass_sub"
if "chord" in pat:
if "chorus" in pat:
return "chords_chorus"
return "chords_verse"
if "melody" in pat or "lead" in pat:
return "melody_simple"
if "snare" in pat or "perc" in pat or "hat" in pat or "hihat" in pat:
return "dembow_standard"
return "dembow_standard" # default fallback
def _resolve_sample(sample_ref: str, lib_root: str, tempo: float = 95.0) -> Optional[str]:
"""Resolve a sample reference to an absolute filesystem path.
Formats accepted:
"kick/auto" → best WAV from <lib_root>/kick/
"kick/kick 1.wav" → exact file <lib_root>/kick/kick 1.wav
"kick 1.wav" → sanitized to "kick/auto"
"/C:/absolute/path.wav" → passthrough
"""
if not sample_ref:
return None
# Sanitize invented filenames
sample_ref = _sanitize_sample_ref(sample_ref)
# Already absolute
if os.path.isabs(sample_ref):
return sample_ref if os.path.isfile(sample_ref) else None
parts = sample_ref.replace("\\", "/").split("/")
if parts[-1].lower() == "auto":
folder = os.path.join(lib_root, *parts[:-1])
return _pick_best(folder, tempo)
else:
# Exact relative path
path = os.path.join(lib_root, *parts)
if os.path.isfile(path):
return path
# Fallback: auto-select from the folder
folder = os.path.join(lib_root, *parts[:-1]) if len(parts) > 1 else lib_root
best = _pick_best(folder, tempo)
if best:
return best
# Last resort: try keyword mapping on the whole ref
sanitized = _sanitize_sample_ref(sample_ref)
if sanitized != sample_ref:
return _resolve_sample(sanitized, lib_root, tempo)
return None
def _pick_best(folder: str, tempo: float = 95.0) -> Optional[str]:
"""Pick the best audio file from a folder.
Strategy:
1. Prefer files whose name contains a BPM number close to project tempo.
2. If no BPM info available, return the first file alphabetically.
"""
if not os.path.isdir(folder):
return None
files = sorted([
os.path.join(folder, f)
for f in os.listdir(folder)
if f.lower().endswith((".wav", ".aif", ".aiff", ".mp3"))
])
if not files:
return None
def bpm_score(fpath: str) -> float:
fname = os.path.basename(fpath).replace("-", " ").replace("_", " ")
for tok in fname.split():
try:
bpm = float(tok)
if 60 < bpm < 220:
return abs(bpm - tempo)
except ValueError:
pass
return 999.0
scores = [(bpm_score(f), f) for f in files]
best = min(scores, key=lambda x: x[0])
return best[1] if best[0] < 15.0 else files[0]
# ------------------------------------------------------------------
# MIDI pattern generators — pure Python, no Ableton communication
# ------------------------------------------------------------------
_KEY_ROOTS: Dict[str, int] = {
"C": 48, "C#": 49, "Db": 49,
"D": 50, "D#": 51, "Eb": 51,
"E": 52,
"F": 53, "F#": 54, "Gb": 54,
"G": 55, "G#": 56, "Ab": 56,
"A": 57, "A#": 58, "Bb": 58,
"B": 59,
# Minor keys
"Am": 45, "Dm": 38, "Em": 40, "Bm": 47,
"F#m": 54, "C#m": 49, "Gm": 43, "Fm": 41,
}
def _root(key: str) -> int:
return _KEY_ROOTS.get(key, 45) # Default Am root
def _gen_dembow(bars: int, variation: str, key: str) -> List[Dict]:
"""Dembow drum pattern on MIDI note 36 (kick)."""
bpb = 4
total = bars * bpb
notes = []
patterns = {
"minimal": [0.0, 2.5],
"standard": [0.0, 1.5, 2.0, 2.5, 3.0, 3.5],
"double": [0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5],
}
pos_list = patterns.get(variation, patterns["standard"])
for bar in range(bars):
for pos in pos_list:
start = bar * bpb + pos
if start >= total:
continue
vel = 110 if pos == 0.0 else (90 if pos in (2.0, 3.0) else 75)
notes.append({"pitch": 36, "start_time": start, "duration": 0.22, "velocity": vel})
return notes
def _gen_bass(bars: int, style: str, key: str) -> List[Dict]:
"""Sub-bass MIDI patterns."""
root = _root(key)
bpb = 4
notes = []
for bar in range(bars):
b = bar * bpb
if style == "bass_sub":
notes += [
{"pitch": root - 12, "start_time": b, "duration": 0.5, "velocity": 110},
{"pitch": root - 12, "start_time": b + 2.0, "duration": 0.5, "velocity": 100},
]
elif style == "bass_octaves":
notes += [
{"pitch": root - 12, "start_time": b, "duration": 0.5, "velocity": 110},
{"pitch": root, "start_time": b + 2.0, "duration": 0.5, "velocity": 90},
{"pitch": root - 12, "start_time": b + 3.0, "duration": 0.25, "velocity": 80},
]
elif style == "bass_pluck":
notes += [
{"pitch": root - 12, "start_time": b, "duration": 0.25, "velocity": 110},
{"pitch": root - 12, "start_time": b + 1.5, "duration": 0.25, "velocity": 85},
{"pitch": root - 7, "start_time": b + 2.0, "duration": 0.25, "velocity": 90},
{"pitch": root - 12, "start_time": b + 3.0, "duration": 0.25, "velocity": 80},
]
elif style == "bass_sustained":
notes.append(
{"pitch": root - 12, "start_time": b, "duration": float(bpb) - 0.25, "velocity": 100}
)
else:
notes.append({"pitch": root - 12, "start_time": b, "duration": 0.5, "velocity": 100})
return notes
def _gen_chords(bars: int, style: str, key: str) -> List[Dict]:
"""Chord voicing patterns."""
root = _root(key)
bpb = 4
notes = []
PROG_VERSE = [(0, 3, 7), (-5, -2, 2), (-3, 0, 4), (-7, -4, 0)]
PROG_CHORUS = [(0, 3, 7), (-3, 0, 4), (5, 8, 12), (0, 3, 7)]
prog = PROG_VERSE if "verse" in style else PROG_CHORUS
for bar in range(bars):
chord_intervals = prog[bar % len(prog)]
start = float(bar * bpb)
for interval in chord_intervals:
notes.append({
"pitch": root + interval,
"start_time": start,
"duration": float(bpb) - 0.25,
"velocity": 72,
})
return notes
def _gen_melody_simple(bars: int, key: str) -> List[Dict]:
"""Simple pentatonic melodic line."""
root = _root(key)
scale = [0, 3, 5, 7, 10, 12]
bpb = 4
notes = []
rhythm = [0.0, 0.75, 1.5, 2.0, 2.75, 3.0, 3.5]
for bar in range(bars):
b = bar * bpb
for i, pos in enumerate(rhythm):
pitch = root + scale[(bar * 3 + i) % len(scale)] + 12
notes.append({"pitch": pitch, "start_time": b + pos, "duration": 0.5, "velocity": 85})
return notes
# Registry: pattern_name → generator(bars, key) → List[Dict]
PATTERN_GENERATORS: Dict = {
"dembow_minimal": lambda bars, key: _gen_dembow(bars, "minimal", key),
"dembow_standard": lambda bars, key: _gen_dembow(bars, "standard", key),
"dembow_double": lambda bars, key: _gen_dembow(bars, "double", key),
"bass_sub": lambda bars, key: _gen_bass(bars, "bass_sub", key),
"bass_pluck": lambda bars, key: _gen_bass(bars, "bass_pluck", key),
"bass_octaves": lambda bars, key: _gen_bass(bars, "bass_octaves", key),
"bass_sustained": lambda bars, key: _gen_bass(bars, "bass_sustained", key),
"chords_verse": lambda bars, key: _gen_chords(bars, "chords_verse", key),
"chords_chorus": lambda bars, key: _gen_chords(bars, "chords_chorus", key),
"melody_simple": lambda bars, key: _gen_melody_simple(bars, key),
}
# ------------------------------------------------------------------
# ScoreRenderer — SESSION VIEW
# ------------------------------------------------------------------
class ScoreRenderer:
"""Renders a SongScore into Ableton Live Session View via TCP.
Mapping:
SectionDef → Ableton Scene (one scene per section)
TrackDef → Ableton Track (one track per track definition)
ClipDef → Clip Slot at (track_index, scene_index)
Usage:
renderer = ScoreRenderer(lib_root="C:\\...\\libreria\\reggaeton")
result = renderer.render(score, clear_first=True)
"""
def __init__(self, lib_root: str):
self.lib_root = str(lib_root)
# ----------------------------------------------------------------
# Public entry point
# ----------------------------------------------------------------
def render(self, score: SongScore, clear_first: bool = True) -> dict:
"""Render score into Ableton Live Session View.
Returns:
{
"title": str,
"scenes_created": int,
"tracks_created": list,
"clips_created": int,
"errors": list[str],
"success": bool,
}
"""
result: Dict = {
"title": score.meta.get("title", ""),
"scenes_created": 0,
"tracks_created": [],
"clips_created": 0,
"errors": [],
}
# Validate score first (no Ableton needed)
warnings = score.validate()
if warnings:
result["errors"].extend(["[VALIDATION] " + w for w in warnings])
# 0. Clear project. Ableton leaves 1 track (minimum). We accept this
# and offset our track creation accordingly.
if clear_first:
_send("clear_project", {}, timeout=30.0)
# 1. Set meta (tempo, signature)
self._set_meta(score.meta, result)
# 2. Create scenes (one per section in score.structure)
section_scene_map = self._create_scenes(score.structure, result)
# 3. Create tracks
track_index_map = self._create_tracks(score.tracks, result)
# 4. Place clips into clip slots
self._place_clips(score, track_index_map, section_scene_map, result)
# 6. Apply mixer settings
self._apply_mixer(score.tracks, track_index_map, result)
result["success"] = len(result["errors"]) == 0
result["section_map"] = section_scene_map
return result
# ----------------------------------------------------------------
# Meta
# ----------------------------------------------------------------
def _set_meta(self, meta: dict, result: dict) -> None:
tempo = meta.get("tempo", 95)
resp = _send("set_tempo", {"tempo": tempo}, timeout=10.0)
if resp.get("status") != "success":
result["errors"].append("set_tempo failed: " + resp.get("message", "?"))
sig = meta.get("time_signature", "4/4").split("/")
if len(sig) == 2:
_send("set_signature",
{"numerator": int(sig[0]), "denominator": int(sig[1])},
timeout=10.0)
# ----------------------------------------------------------------
# Scenes — one per section
# ----------------------------------------------------------------
def _create_scenes(self, structure, result: dict) -> Dict[str, int]:
"""Create one scene per section. Returns {section_name: scene_index}."""
section_scene_map: Dict[str, int] = {}
for i, section in enumerate(structure):
# Ableton starts with at least 1 empty scene;
# create additional scenes as needed
if i == 0:
scene_idx = 0 # reuse the default first scene
else:
resp = _send("create_scene", {"index": -1}, timeout=15.0)
if resp.get("status") != "success":
result["errors"].append(
"create_scene failed for '%s': %s"
% (section.name, resp.get("message", "?"))
)
scene_idx = i # fallback: assume sequential
else:
scene_idx = resp.get("result", {}).get("scene_index", i)
# Name the scene
_send("set_scene_name",
{"scene_index": scene_idx, "name": section.name},
timeout=10.0)
section_scene_map[section.name] = scene_idx
result["scenes_created"] += 1
return section_scene_map
# ----------------------------------------------------------------
# Tracks
# ----------------------------------------------------------------
def _create_tracks(self, tracks: List[TrackDef], result: dict) -> Dict[str, int]:
"""Create all tracks and return {track_id: ableton_track_index}.
IMPORTANT: Ableton groups tracks by type. All audio tracks come first,
then all MIDI tracks. After clear_project, 1 leftover track remains.
Strategy:
1. Snapshot track count before creation
2. Create all tracks (Ableton auto-groups audio before MIDI)
3. Snapshot after creation
4. New tracks are identified by comparing before/after
5. Map our tracks to the new ones by type order
"""
if not tracks:
return {}
# Count pre-existing tracks by type
resp = _send("get_tracks", {}, timeout=10.0)
pre_audio = 0
pre_midi = 0
if resp.get("status") == "success":
for t in resp.get("result", {}).get("tracks", []):
if isinstance(t, dict):
if t.get("is_audio"):
pre_audio += 1
elif t.get("is_midi"):
pre_midi += 1
# Create all tracks
for track in tracks:
if track.type == "audio":
_send("create_audio_track", {"index": -1}, timeout=15.0)
else:
_send("create_midi_track", {"index": -1}, timeout=15.0)
# Now count tracks AFTER creation to find the new ones
resp = _send("get_tracks", {}, timeout=10.0)
ableton_tracks = resp.get("result", {}).get("tracks", []) if resp.get("status") == "success" else []
new_audio = [t for t in ableton_tracks if isinstance(t, dict) and t.get("is_audio")]
new_midi = [t for t in ableton_tracks if isinstance(t, dict) and t.get("is_midi")]
# Separate our tracks by type
our_audio = [t for t in tracks if t.type == "audio"]
our_midi = [t for t in tracks if t.type == "midi"]
track_index_map: Dict[str, int] = {}
# Map audio tracks: our_audio[i] → new_audio[pre_audio + i]
for i, our_t in enumerate(our_audio):
ableton_idx = pre_audio + i
if ableton_idx < len(new_audio):
a_idx = new_audio[ableton_idx].get("index", ableton_idx)
else:
a_idx = ableton_idx # fallback
_send("set_track_name", {"track_index": a_idx, "name": our_t.name})
_send("set_track_volume", {"track_index": a_idx, "volume": our_t.mixer.volume})
if our_t.mixer.pan != 0.0:
_send("set_track_pan", {"track_index": a_idx, "pan": our_t.mixer.pan})
track_index_map[our_t.id] = a_idx
result["tracks_created"].append({
"id": our_t.id, "name": our_t.name,
"index": a_idx, "type": "audio",
})
# Map MIDI tracks: our_midi[i] → new_midi[pre_midi + i]
for i, our_t in enumerate(our_midi):
ableton_idx = pre_midi + i
if ableton_idx < len(new_midi):
m_idx = new_midi[ableton_idx].get("index", ableton_idx)
else:
m_idx = ableton_idx + len(new_audio) # fallback: after all audio
_send("set_track_name", {"track_index": m_idx, "name": our_t.name})
_send("set_track_volume", {"track_index": m_idx, "volume": our_t.mixer.volume})
if our_t.mixer.pan != 0.0:
_send("set_track_pan", {"track_index": m_idx, "pan": our_t.mixer.pan})
if our_t.instrument:
_send("insert_device",
{"track_index": m_idx,
"device_name": our_t.instrument,
"device_type": "instrument"},
timeout=30.0)
track_index_map[our_t.id] = m_idx
result["tracks_created"].append({
"id": our_t.id, "name": our_t.name,
"index": m_idx, "type": "midi",
})
return track_index_map
# ----------------------------------------------------------------
# Clip placement — Session View clip slots
# ----------------------------------------------------------------
def _place_clips(self, score: SongScore, track_index_map: Dict[str, int],
section_scene_map: Dict[str, int], result: dict) -> None:
key = score.meta.get("key", "Am")
tempo = score.meta.get("tempo", 95.0)
for track in score.tracks:
if track.id not in track_index_map:
continue
t_idx = track_index_map[track.id]
for clip in track.clips:
# Resolve scene index from section name
section_name = clip.section
if section_name and section_name in section_scene_map:
scene_idx = section_scene_map[section_name]
elif clip.start_bar is not None:
# Fallback: treat start_bar as a scene index approximation
scene_idx = 0
else:
scene_idx = 0
clip_label = "%s_%s" % (section_name or "clip", track.id)
if track.type == "audio":
self._place_audio_clip(t_idx, scene_idx, clip, clip_label, tempo, result)
else:
self._place_midi_clip(t_idx, scene_idx, clip, clip_label, key, result)
def _place_audio_clip(self, track_idx: int, scene_idx: int,
clip: ClipDef, label: str,
tempo: float, result: dict) -> None:
"""Load an audio sample into a Session View clip slot."""
sample_path = _resolve_sample(clip.sample, self.lib_root, tempo)
if not sample_path:
result["errors"].append(
"Clip '%s': sample '%s' not found (lib_root=%s)"
% (label, clip.sample, self.lib_root)
)
return
resp = _send("load_sample_direct", {
"track_index": track_idx,
"slot_index": scene_idx,
"file_path": sample_path,
"warp": clip.warp,
}, timeout=30.0)
if resp.get("status") == "success" or resp.get("loaded"):
result["clips_created"] += 1
else:
resp2 = _send("load_sample_to_clip", {
"track_index": track_idx,
"clip_index": scene_idx,
"sample_path": sample_path,
}, timeout=30.0)
if resp2.get("status") == "success" or resp2.get("loaded"):
result["clips_created"] += 1
else:
result["errors"].append(
"Audio clip '%s' failed: primary=%s fallback=%s path=%s"
% (label, resp.get("error", resp.get("message", "?")),
resp2.get("error", resp2.get("message", "?")), sample_path)
)
def _place_midi_clip(self, track_idx: int, scene_idx: int,
clip: ClipDef, label: str,
key: str, result: dict) -> None:
"""Create a MIDI clip in Session View and fill it with notes."""
length_beats = clip.duration_bars * 4 # assume 4/4
# 1. Create the clip slot
resp = _send("create_clip", {
"track_index": track_idx,
"clip_index": scene_idx,
"length": length_beats,
}, timeout=20.0)
if resp.get("status") != "success":
result["errors"].append(
"MIDI clip create '%s' failed: %s" % (label, resp.get("message", "?"))
)
return
# 2. Resolve notes
if clip.notes:
notes = clip.notes
elif clip.pattern:
gen = PATTERN_GENERATORS.get(clip.pattern)
if gen:
notes = gen(int(clip.duration_bars), key)
else:
result["errors"].append(
"Unknown pattern '%s' on clip '%s'" % (clip.pattern, label)
)
notes = []
else:
notes = []
# 3. Add notes
if notes:
resp = _send("add_notes_to_clip", {
"track_index": track_idx,
"clip_index": scene_idx,
"notes": notes,
}, timeout=20.0)
if resp.get("status") != "success":
result["errors"].append(
"add_notes '%s' failed: %s" % (label, resp.get("message", "?"))
)
result["clips_created"] += 1
# ----------------------------------------------------------------
# Mixer / Effects
# ----------------------------------------------------------------
def _apply_mixer(self, tracks: List[TrackDef], track_index_map: Dict[str, int],
result: dict) -> None:
for track in tracks:
if track.id not in track_index_map:
continue
t_idx = track_index_map[track.id]
mx = track.mixer
if mx.eq_preset:
resp = _send("configure_eq",
{"track_index": t_idx, "preset": mx.eq_preset},
timeout=15.0)
if resp.get("status") != "success":
result["errors"].append(
"EQ preset '%s' on '%s' failed: %s"
% (mx.eq_preset, track.id, resp.get("message", "?"))
)
# Compression presets are stored but not applied (configure_compressor not available)
if mx.send_reverb > 0:
_send("set_track_send",
{"track_index": t_idx, "send_index": 0, "amount": mx.send_reverb})
if mx.send_delay > 0:
_send("set_track_send",
{"track_index": t_idx, "send_index": 1, "amount": mx.send_delay})
# ------------------------------------------------------------------
# Convenience: render a score file directly
# ------------------------------------------------------------------
def render_file(json_path: str, lib_root: str, clear_first: bool = True) -> dict:
"""Load a SongScore JSON from disk and render it into Ableton Session View."""
score = SongScore.load(json_path)
renderer = ScoreRenderer(lib_root)
return renderer.render(score, clear_first=clear_first)

View File

@@ -0,0 +1,3 @@
# This directory stores SongScore JSON files.
# Each file represents a complete song ready to be rendered into Ableton Live.
# Use the MCP tools: save_score / load_score / list_scores / render_score_from_file

View File

@@ -1,4 +1,4 @@
"""
"""
AbletonMCP_AI MCP Server - Clean FastMCP server for Ableton Live 12.
Communicates with the Ableton Remote Script via TCP socket on port 9877.
"""
@@ -7286,6 +7286,633 @@ def produce_with_spectral_coherence(ctx: Context,
return _err(f"SPECTRAL OUTER: type={type(e).__name__} msg={str(e)!r}\n{tb[:1500]}")
# ==================================================================
# SPRINT 9 — SCORE → RENDER PIPELINE
# Compose a SongScore JSON incrementally, then inject it into Ableton
# in one atomic render_score() call.
# ==================================================================
# Lazy imports so server still starts if score_engine is missing
def _import_score_engine():
try:
import score_engine as _se
return _se
except ImportError:
return None
def _import_score_renderer():
try:
import score_renderer as _sr
return _sr
except ImportError:
return None
_REGGAETON_LIB = str(PROJECT_DIR.parent / "libreria" / "reggaeton")
@mcp.tool()
def new_score(
ctx: Context,
title: str = "Untitled",
tempo: float = 95.0,
key: str = "Am",
genre: str = "reggaeton",
time_signature: str = "4/4",
gap_bars: float = 2.0,
) -> str:
"""Create a fresh SongScore in memory and make it the active score.
This clears any previous in-memory score.
Args:
title: Song title
tempo: BPM (80-160)
key: Musical key (Am, C, Dm, F, G, etc.)
genre: Genre tag (for documentation)
time_signature: e.g. "4/4"
gap_bars: Bars of silence automatically inserted between sections
Returns:
Score summary including title, tempo, key.
"""
se = _import_score_engine()
if not se:
return _err("score_engine module not found in mcp_server/")
score = se.SongScore(title=title, tempo=tempo, key=key, genre=genre,
time_signature=time_signature, gap_bars=gap_bars)
se.set_current_score(score)
return _ok({"created": True, "meta": score.meta,
"instructions": "Score created. Use compose_structure() next."})
@mcp.tool()
def get_score(ctx: Context) -> str:
"""Return the complete active SongScore as JSON.
Use this to inspect the score before rendering, or to extract the JSON
for external storage / batch generation.
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
score = se.get_current_score()
if score is None:
return _err("No active score. Call new_score() or load_score() first.")
return _ok({"score": score.to_dict(),
"warnings": score.validate(),
"total_bars": score.total_bars()})
@mcp.tool()
def compose_structure(ctx: Context, sections: list) -> str:
"""Define the temporal structure of the active score.
Calculates start_bar automatically using the score's gap_bars setting.
Args:
sections: List of section dicts, each containing:
- name (str): Section name, e.g. "Intro", "Chorus"
- duration_bars (int): Length of the section in bars
- start_bar (float, optional): Override auto-calculated position
Example sections:
[
{"name": "Intro", "duration_bars": 4},
{"name": "Verse", "duration_bars": 8},
{"name": "Chorus", "duration_bars": 8},
{"name": "Bridge", "duration_bars": 4},
{"name": "Outro", "duration_bars": 4}
]
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
try:
score = se.require_score()
score.set_structure(sections)
struct = score.get_structure_dict()
return _ok({"structure_set": True, "sections": len(struct),
"structure": struct, "total_bars": score.total_bars()})
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def compose_audio_track(
ctx: Context,
track_id: str,
name: str,
clips: list,
mixer: dict = None,
) -> str:
"""Add an audio track to the active score.
Args:
track_id: Unique identifier (e.g. "kick", "drum_loop")
name: Display name in Ableton (e.g. "Kick")
clips: List of clip dicts. Each clip must have:
- section (str): Which section this clip belongs to
- sample (str): Sample reference, e.g. "kick/auto" or exact path
- loop (bool, optional, default True)
- warp (bool, optional, default True)
mixer: Optional dict with volume (0-1), pan, eq_preset,
compression_preset, send_reverb, send_delay
Sample reference format:
"kick/auto" → auto-selects best kick sample
"drumloops/auto" → auto-selects best drum loop
"kick/kick_01.wav" → exact file within libreria/reggaeton/kick/
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
try:
score = se.require_score()
struct = score.get_structure_dict()
track = se.TrackDef(
track_id = track_id,
name = name,
track_type = "audio",
clips = [se.ClipDef.from_raw(c, struct) for c in (clips or [])],
mixer = se.MixerDef.from_dict(mixer or {}),
)
score.add_track(track)
return _ok({"track_added": True, "id": track_id, "clips": len(track.clips)})
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def compose_midi_track(
ctx: Context,
track_id: str,
name: str,
instrument: str,
clips: list,
mixer: dict = None,
) -> str:
"""Add a MIDI track to the active score.
Args:
track_id: Unique identifier (e.g. "dembow", "bass")
name: Display name (e.g. "Dembow", "Sub Bass")
instrument: Live instrument to load: "Wavetable" or "Operator"
clips: List of clip dicts, each with:
- section (str): Section name
- pattern (str): MIDI pattern name (see below)
OR
- notes (list): Explicit MIDI notes
mixer: Optional mixer settings dict
Available patterns:
dembow_minimal, dembow_standard, dembow_double
bass_sub, bass_pluck, bass_octaves, bass_sustained
chords_verse, chords_chorus
melody_simple
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
try:
score = se.require_score()
struct = score.get_structure_dict()
track = se.TrackDef(
track_id = track_id,
name = name,
track_type = "midi",
instrument = instrument,
clips = [se.ClipDef.from_raw(c, struct) for c in (clips or [])],
mixer = se.MixerDef.from_dict(mixer or {}),
)
score.add_track(track)
return _ok({"track_added": True, "id": track_id, "instrument": instrument,
"clips": len(track.clips)})
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def compose_pattern(
ctx: Context,
track_id: str,
section: str,
pattern: str,
) -> str:
"""Add a MIDI pattern clip to an existing track in the active score.
Args:
track_id: ID of an existing MIDI track (must already be in score)
section: Section name where the clip will be placed
pattern: Pattern name (dembow_standard, bass_pluck, chords_verse, etc.)
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
try:
score = se.require_score()
score.add_clip_to_track(track_id, {"section": section, "pattern": pattern})
return _ok({"clip_added": True, "track": track_id, "section": section, "pattern": pattern})
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def compose_notes(
ctx: Context,
track_id: str,
section: str,
notes: list,
) -> str:
"""Add explicit MIDI notes to an existing track for a specific section.
Args:
track_id: ID of an existing MIDI track
section: Section name
notes: List of note dicts: [{pitch, start_time, duration, velocity}, ...]
- pitch: MIDI note number (0-127)
- start_time: position in beats (relative to clip start)
- duration: note length in beats
- velocity: 0-127
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
try:
score = se.require_score()
score.add_clip_to_track(track_id, {"section": section, "notes": notes})
return _ok({"notes_added": True, "track": track_id, "section": section,
"note_count": len(notes)})
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def compose_mixer(
ctx: Context,
track_id: str,
volume: float = None,
pan: float = None,
eq_preset: str = None,
compression_preset: str = None,
send_reverb: float = None,
send_delay: float = None,
) -> str:
"""Update mixer settings for a track in the active score.
Args:
track_id: Track ID
volume: 0.0 - 1.0
pan: -1.0 (left) to 1.0 (right)
eq_preset: kick, snare, bass, synth, master, kick_sub, etc.
compression_preset: kick_punch, bass_glue, buss_glue, parallel_drum, etc.
send_reverb: 0.0 - 1.0 (Reverb return send level)
send_delay: 0.0 - 1.0 (Delay return send level)
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
try:
score = se.require_score()
kwargs = {k: v for k, v in {
"volume": volume, "pan": pan, "eq_preset": eq_preset,
"compression_preset": compression_preset,
"send_reverb": send_reverb, "send_delay": send_delay,
}.items() if v is not None}
score.set_mixer(track_id, **kwargs)
return _ok({"mixer_updated": True, "track": track_id, "settings": kwargs})
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def compose_from_template(
ctx: Context,
template_name: str,
title: str = None,
tempo: float = None,
key: str = None,
gap_bars: float = None,
) -> str:
"""Create a complete SongScore from a predefined template and make it active.
Available templates:
reggaeton_basic — Intro/Verse/Chorus/Bridge/Outro with full track set
reggaeton_13scenes — 13-section professional reggaeton structure
minimal_loop — Single 8-bar loop with drums + bass
Args:
template_name: Template identifier (see above)
title: Override title (optional)
tempo: Override BPM (optional)
key: Override key (optional, e.g. "Dm", "F")
gap_bars: Override gap between sections (optional)
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
try:
overrides = {}
if title is not None: overrides["title"] = title
if tempo is not None: overrides["tempo"] = tempo
if key is not None: overrides["key"] = key
if gap_bars is not None: overrides["gap_bars"] = gap_bars
score = se.SongScore.from_template(template_name, **overrides)
se.set_current_score(score)
return _ok({
"template": template_name,
"created": True,
"meta": score.meta,
"sections": len(score.structure),
"tracks": len(score.tracks),
"total_bars": score.total_bars(),
"structure": score.get_structure_dict(),
"warnings": score.validate(),
})
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def compose_validate(ctx: Context) -> str:
"""Validate the active SongScore without touching Ableton.
Checks structure completeness, track/clip consistency, sample references.
Returns:
List of warnings (empty = all good).
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
try:
score = se.require_score()
warnings = score.validate()
return _ok({
"valid": len(warnings) == 0,
"warnings": warnings,
"sections": len(score.structure),
"tracks": len(score.tracks),
"total_bars": score.total_bars(),
})
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def save_score(ctx: Context, filename: str = None) -> str:
"""Save the active SongScore to disk as a JSON file.
Args:
filename: File name (without path). If omitted, auto-generated from title + timestamp.
Extension .json is added automatically if missing.
File is saved to: AbletonMCP_AI/mcp_server/scores/
Returns:
Absolute path of the saved file.
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
try:
score = se.require_score()
if not filename:
ts = __import__("datetime").datetime.now().strftime("%Y%m%d_%H%M%S")
safe = "".join(c if c.isalnum() or c in "_- " else "_"
for c in score.meta.get("title", "untitled"))
safe = safe.replace(" ", "_").strip("_")[:40]
filename = "%s_%s.json" % (safe, ts)
if not filename.endswith(".json"):
filename += ".json"
path = se.SCORES_DIR / filename
score.save(path)
return _ok({"saved": True, "filename": filename, "path": str(path),
"size_bytes": path.stat().st_size})
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def load_score(ctx: Context, filename: str) -> str:
"""Load a SongScore from disk and make it the active score.
Args:
filename: File name in scores/ directory (e.g. "perreo_eterno.json").
Use list_scores() to see available files.
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
try:
if not filename.endswith(".json"):
filename += ".json"
path = se.SCORES_DIR / filename
if not path.exists():
return _err("File not found: %s. Use list_scores() to see available scores." % filename)
score = se.SongScore.load(path)
se.set_current_score(score)
return _ok({
"loaded": True,
"filename": filename,
"meta": score.meta,
"sections": len(score.structure),
"tracks": len(score.tracks),
"total_bars": score.total_bars(),
"warnings": score.validate(),
})
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def list_scores(ctx: Context) -> str:
"""List all SongScore JSON files saved in mcp_server/scores/.
Returns file names, sizes, and any readable metadata.
"""
se = _import_score_engine()
if not se:
return _err("score_engine not available")
try:
files = sorted(se.SCORES_DIR.glob("*.json"))
entries = []
for f in files:
entry = {"filename": f.name, "size_bytes": f.stat().st_size}
try:
data = __import__("json").loads(f.read_text(encoding="utf-8"))
m = data.get("meta", {})
entry.update({
"title": m.get("title", "?"),
"tempo": m.get("tempo"),
"key": m.get("key"),
"tracks": len(data.get("tracks", [])),
})
except Exception:
pass
entries.append(entry)
return _ok({"count": len(entries), "scores": entries,
"directory": str(se.SCORES_DIR)})
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def render_score(
ctx: Context,
clear_first: bool = True,
score_json: str = None,
) -> str:
"""Render the active SongScore into Ableton Live.
Translates the score into Ableton operations: creates tracks, places clips
in Arrangement View at the correct positions, applies mixer settings.
Args:
clear_first: Remove all existing tracks/clips before rendering (default True).
score_json: Optional. Pass a raw JSON string to render directly without
making it the active score. If omitted, uses the active score.
Returns:
Summary: tracks created, clips placed, errors.
"""
se = _import_score_engine()
sr = _import_score_renderer()
if not se or not sr:
return _err("score_engine or score_renderer not available")
try:
if score_json:
score = se.SongScore.from_json(score_json)
else:
score = se.require_score()
renderer = sr.ScoreRenderer(_REGGAETON_LIB)
result = renderer.render(score, clear_first=clear_first)
return _ok(result)
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def render_score_from_file(
ctx: Context,
filename: str,
clear_first: bool = True,
) -> str:
"""Load a SongScore JSON from disk and render it into Ableton Live.
Ideal for batch workflows: save 50 scores with ai_loop.py, then render
them one by one.
Args:
filename: File name in scores/ (e.g. "song_001.json"). Use list_scores().
clear_first: Remove existing tracks before rendering (default True).
"""
se = _import_score_engine()
sr = _import_score_renderer()
if not se or not sr:
return _err("score_engine or score_renderer not available")
try:
if not filename.endswith(".json"):
filename += ".json"
path = se.SCORES_DIR / filename
if not path.exists():
return _err("File not found: %s" % filename)
score = se.SongScore.load(path)
se.set_current_score(score) # Also make it active
renderer = sr.ScoreRenderer(_REGGAETON_LIB)
result = renderer.render(score, clear_first=clear_first)
result["filename"] = filename
return _ok(result)
except Exception as exc:
return _err(str(exc))
@mcp.tool()
def render_all_scores(
ctx: Context,
clear_between: bool = True,
delay_seconds: float = 3.0,
limit: int = 0,
) -> str:
"""Render all SongScore JSON files from scores/ sequentially into Ableton.
Designed for batch autonomous production. Run this after ai_loop.py has
generated a batch of scores.
Args:
clear_between: Clear Ableton project between each score (default True).
delay_seconds: Wait between renders (give Ableton time to process).
limit: Maximum number of scores to render (0 = all).
Returns:
Summary of all render results.
"""
import time as _time
se = _import_score_engine()
sr = _import_score_renderer()
if not se or not sr:
return _err("score_engine or score_renderer not available")
try:
files = sorted(se.SCORES_DIR.glob("*.json"))
if limit > 0:
files = files[:limit]
if not files:
return _ok({"rendered": 0, "message": "No score files found in scores/"})
renderer = sr.ScoreRenderer(_REGGAETON_LIB)
results = []
errors = 0
for i, f in enumerate(files):
logger.info("[render_all] %d/%d: %s", i + 1, len(files), f.name)
try:
score = se.SongScore.load(f)
result = renderer.render(score, clear_first=clear_between)
result["filename"] = f.name
results.append(result)
if not result.get("success"):
errors += 1
except Exception as exc:
results.append({"filename": f.name, "success": False, "error": str(exc)})
errors += 1
if delay_seconds > 0 and i < len(files) - 1:
_time.sleep(delay_seconds)
return _ok({
"total": len(files),
"success": len(files) - errors,
"errors": errors,
"results": results,
})
except Exception as exc:
return _err(str(exc))
# Also register timeouts for the new tools
TIMEOUTS.update({
"new_score": 5.0,
"get_score": 5.0,
"compose_structure": 5.0,
"compose_audio_track": 5.0,
"compose_midi_track": 5.0,
"compose_pattern": 5.0,
"compose_notes": 5.0,
"compose_mixer": 5.0,
"compose_from_template": 5.0,
"compose_validate": 5.0,
"save_score": 5.0,
"load_score": 5.0,
"list_scores": 5.0,
"render_score": 300.0,
"render_score_from_file":300.0,
"render_all_scores": 1800.0,
})
# ------------------------------------------------------------------
# MAIN
# ------------------------------------------------------------------