Hybrid pipeline: RPPBuilder writes VST3/MIDI/audio skeleton, ReaScript handles built-in plugins (ReaEQ, ReaComp) via TrackFX_AddByName + TrackFX_SetParam with multi-action dispatch, adaptive API check, and builtin plugin auto-detection from PLUGIN_REGISTRY. 326 tests (298 existing + 28 new), 12/12 spec scenarios compliant.
467 lines
15 KiB
Python
467 lines
15 KiB
Python
"""ReaScript generator — writes self-contained Python ReaScripts for REAPER."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
from .commands import ReaScriptCommand
|
|
|
|
|
|
#: Known action names and their ordering in the dispatch loop.
|
|
_KNOWN_ACTIONS: frozenset[str] = frozenset([
|
|
"add_plugins",
|
|
"configure_fx_params",
|
|
"verify_fx",
|
|
"calibrate",
|
|
"render",
|
|
])
|
|
|
|
|
|
class ReaScriptGenerator:
|
|
"""Generate a self-contained Python ReaScript file."""
|
|
|
|
def generate(self, path: Path, command: ReaScriptCommand) -> None:
|
|
"""Write a self-contained ReaScript .py to path that REAPER can execute."""
|
|
code = self._build_script(command)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
f.write(code)
|
|
|
|
# --------------------------------------------------------------------
|
|
# Hand-rolled JSON parser (~20 lines, no import json)
|
|
# ReaScript Python has no json module; we parse via string splitting.
|
|
# --------------------------------------------------------------------
|
|
JSON_PARSER_SRC = """\
|
|
def parse_json(s):
|
|
s = s.strip()
|
|
if s[0] != "{" or s[-1] != "}":
|
|
raise ValueError("not a JSON object")
|
|
s = s[1:-1].strip()
|
|
if not s:
|
|
return {}
|
|
result = {}
|
|
segments = []
|
|
depth = 0
|
|
in_string = False
|
|
escape = False
|
|
start = 0
|
|
i = 0
|
|
while i < len(s):
|
|
c = s[i]
|
|
if not in_string and c in "{[":
|
|
depth += 1
|
|
elif not in_string and c in "}]":
|
|
depth -= 1
|
|
elif c == "\\\\":
|
|
escape = not escape
|
|
elif not escape and c == '"':
|
|
in_string = not in_string
|
|
elif not in_string and depth == 0 and c == ",":
|
|
segments.append(s[start:i].strip())
|
|
start = i + 1
|
|
i += 1
|
|
if start < len(s):
|
|
segments.append(s[start:].strip())
|
|
for seg in segments:
|
|
colon_idx = -1
|
|
in_str = False
|
|
esc = False
|
|
for j, ch in enumerate(seg):
|
|
if ch == "\\\\":
|
|
esc = not esc
|
|
elif not esc and ch == '"':
|
|
in_str = not in_str
|
|
elif not in_str and ch == ":":
|
|
colon_idx = j
|
|
break
|
|
if colon_idx < 0:
|
|
continue
|
|
key = seg[:colon_idx].strip()
|
|
val = seg[colon_idx + 1:].strip()
|
|
if key.startswith('"') and key.endswith('"'):
|
|
key = key[1:-1]
|
|
else:
|
|
key = key.strip()
|
|
if val.startswith('"') and val.endswith('"'):
|
|
result[key] = val[1:-1]
|
|
elif val == "true":
|
|
result[key] = True
|
|
elif val == "false":
|
|
result[key] = False
|
|
elif val == "null":
|
|
result[key] = None
|
|
else:
|
|
try:
|
|
result[key] = int(val)
|
|
except ValueError:
|
|
try:
|
|
result[key] = float(val)
|
|
except ValueError:
|
|
result[key] = val
|
|
return result
|
|
|
|
|
|
def to_json_val(v):
|
|
if v is None:
|
|
return "null"
|
|
if isinstance(v, bool):
|
|
return "true" if v else "false"
|
|
if isinstance(v, (int, float)):
|
|
return str(v)
|
|
if isinstance(v, dict):
|
|
return "{" + ", ".join('"' + str(k) + '": ' + to_json_val(val) for k, val in v.items()) + "}"
|
|
if isinstance(v, list):
|
|
return "[" + ", ".join(to_json_val(x) for x in v) + "]"
|
|
return "\\"" + str(v) + "\\""
|
|
|
|
|
|
def write_json(path, data):
|
|
lines = []
|
|
lines.append("{")
|
|
items = list(data.items())
|
|
for i, (k, v) in enumerate(items):
|
|
lines.append(" \\"" + str(k) + "\\": " + to_json_val(v) + ("," if i < len(items) - 1 else ""))
|
|
lines.append("}")
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
f.write("\\n".join(lines) + "\\n")
|
|
"""
|
|
|
|
# --------------------------------------------------------------------
|
|
# Builder
|
|
# --------------------------------------------------------------------
|
|
|
|
def _build_script(self, command: ReaScriptCommand) -> str:
|
|
"""Build the full ReaScript source with per-action dispatch."""
|
|
# Normalize action to list (backward compat: str → [str])
|
|
raw = command.action
|
|
if isinstance(raw, str):
|
|
actions = [raw] if raw else ["calibrate"]
|
|
elif isinstance(raw, list):
|
|
actions = raw if raw else ["calibrate"]
|
|
else:
|
|
actions = ["calibrate"]
|
|
|
|
lines: list[str] = []
|
|
lines.append("# -*- coding: utf-8 -*-")
|
|
lines.append("# auto-generated by fl_control ReaScriptGenerator")
|
|
lines.append('__doc__ = "fl_control Phase 2 ReaScript"')
|
|
lines.append("")
|
|
|
|
# API check function (adaptive)
|
|
lines.append(self._api_check_src(actions))
|
|
lines.append("")
|
|
|
|
# JSON utilities (hand-rolled, no import json)
|
|
lines.append(self.JSON_PARSER_SRC)
|
|
lines.append("")
|
|
|
|
# Track-find helper (needed by add_plugins / configure_fx_params)
|
|
needs_find_track = any(a in actions for a in ("add_plugins", "configure_fx_params"))
|
|
if needs_find_track:
|
|
lines.append(self._find_track_src())
|
|
lines.append("")
|
|
|
|
# Per-action functions (emitted only if their action is present)
|
|
if "add_plugins" in actions:
|
|
lines.append(self._add_plugins_src())
|
|
lines.append("")
|
|
if "configure_fx_params" in actions:
|
|
lines.append(self._configure_fx_params_src())
|
|
lines.append("")
|
|
|
|
# verify_fx, calibrate, render are always emitted when present
|
|
if "verify_fx" in actions:
|
|
lines.append(self._verify_fx_src())
|
|
lines.append("")
|
|
if "calibrate" in actions:
|
|
lines.append(self._calibrate_src())
|
|
lines.append("")
|
|
if "render" in actions:
|
|
lines.append(self._render_src())
|
|
lines.append("")
|
|
|
|
# Main block with dispatch loop
|
|
lines.append(self._main_block_src(actions))
|
|
lines.append("")
|
|
lines.append("main()")
|
|
|
|
return "\n".join(lines)
|
|
|
|
# --------------------------------------------------------------------
|
|
# Per-action generated source blocks
|
|
# --------------------------------------------------------------------
|
|
|
|
def _find_track_src(self) -> str:
|
|
return """\
|
|
def find_track(name):
|
|
num_tracks = RPR_CountTracks(0)
|
|
for t in range(num_tracks):
|
|
track = RPR_GetTrack(0, t)
|
|
buf = chr(0) * 256
|
|
RPR_GetSetMediaTrackInfo_String(track, "P_NAME", buf, False)
|
|
track_name = buf.rstrip(chr(0)).strip()
|
|
if track_name.lower() == name.lower():
|
|
return track
|
|
return None
|
|
"""
|
|
|
|
def _add_plugins_src(self) -> str:
|
|
return """\
|
|
def add_plugins(cmd):
|
|
plugins = cmd.get("plugins_to_add", [])
|
|
results = []
|
|
for p in plugins:
|
|
track_name = p.get("track_name", "")
|
|
fx_name = p.get("fx_name", "")
|
|
track = find_track(track_name)
|
|
if track is None:
|
|
results.append({
|
|
"fx_name": fx_name,
|
|
"instance_id": -1,
|
|
"track_name": track_name,
|
|
"status": "error: track not found: " + track_name
|
|
})
|
|
continue
|
|
fx_idx = RPR_TrackFX_AddByName(track, fx_name, False, 1)
|
|
if fx_idx >= 0:
|
|
buf = chr(0) * 512
|
|
RPR_TrackFX_GetFXName(track, fx_idx, buf, 512)
|
|
returned_name = buf.rstrip(chr(0)).strip()
|
|
if returned_name.lower() == fx_name.lower():
|
|
results.append({
|
|
"fx_name": fx_name,
|
|
"instance_id": fx_idx,
|
|
"track_name": track_name,
|
|
"status": "ok"
|
|
})
|
|
else:
|
|
results.append({
|
|
"fx_name": fx_name,
|
|
"instance_id": fx_idx,
|
|
"track_name": track_name,
|
|
"status": "failed to load " + fx_name + " on track " + track_name
|
|
})
|
|
else:
|
|
results.append({
|
|
"fx_name": fx_name,
|
|
"instance_id": -1,
|
|
"track_name": track_name,
|
|
"status": "failed to load " + fx_name + " on track " + track_name
|
|
})
|
|
return results
|
|
"""
|
|
|
|
def _configure_fx_params_src(self) -> str:
|
|
return """\
|
|
def configure_fx_params(cmd):
|
|
plugins = cmd.get("plugins_to_add", [])
|
|
for p in plugins:
|
|
track_name = p.get("track_name", "")
|
|
track = find_track(track_name)
|
|
if track is None:
|
|
continue
|
|
fx_count = RPR_TrackFX_GetCount(track, False)
|
|
fx_name = p.get("fx_name", "")
|
|
target_idx = -1
|
|
for fi in range(fx_count):
|
|
buf = chr(0) * 512
|
|
RPR_TrackFX_GetFXName(track, fi, buf, 512)
|
|
returned_name = buf.rstrip(chr(0)).strip()
|
|
if returned_name.lower() == fx_name.lower():
|
|
target_idx = fi
|
|
break
|
|
if target_idx < 0:
|
|
continue
|
|
params = p.get("params", {})
|
|
for param_idx_str, value in params.items():
|
|
param_idx = int(param_idx_str)
|
|
RPR_TrackFX_SetParam(track, target_idx, param_idx, value)
|
|
"""
|
|
|
|
def _verify_fx_src(self) -> str:
|
|
return """\
|
|
def verify_fx():
|
|
fx_errors = []
|
|
tracks_verified = 0
|
|
num_tracks = RPR_CountTracks(0)
|
|
for t in range(num_tracks):
|
|
track = RPR_GetTrack(0, t)
|
|
fx_count = RPR_TrackFX_GetCount(track, False)
|
|
for fi in range(fx_count):
|
|
buf = chr(0) * 512
|
|
RPR_TrackFX_GetFXName(track, fi, buf, 512)
|
|
fx_name = buf.rstrip(chr(0)).strip()
|
|
if not fx_name:
|
|
fx_errors.append({
|
|
"track_index": t,
|
|
"fx_index": fi,
|
|
"name": "",
|
|
"expected": ""
|
|
})
|
|
tracks_verified += 1
|
|
return fx_errors, tracks_verified
|
|
"""
|
|
|
|
def _calibrate_src(self) -> str:
|
|
return """\
|
|
def calibrate(track_cal):
|
|
for cal in track_cal:
|
|
track_idx = cal.get("track_index", 0)
|
|
track = RPR_GetTrack(0, track_idx)
|
|
vol = cal.get("volume", 1.0)
|
|
pan = cal.get("pan", 0.0)
|
|
RPR_SetMediaTrackInfo_Value(track, "D_VOL", vol)
|
|
RPR_SetMediaTrackInfo_Value(track, "D_PAN", pan)
|
|
for send in cal.get("sends", []):
|
|
dest_idx = send.get("dest_track_index", -1)
|
|
level = send.get("level", 0.0)
|
|
if dest_idx >= 0:
|
|
dest_track = RPR_GetTrack(0, dest_idx)
|
|
RPR_CreateTrackSend(track, dest_track)
|
|
"""
|
|
|
|
def _render_src(self) -> str:
|
|
return """\
|
|
def do_render(render_path):
|
|
if render_path:
|
|
RPR_Main_RenderFile(0, render_path)
|
|
|
|
def measure_loudness(render_path):
|
|
lufs = None
|
|
integrated_lufs = None
|
|
short_term_lufs = None
|
|
if render_path:
|
|
try:
|
|
loudness_str = RPR_CalcMediaSrcLoudness(render_path, True)
|
|
parts = loudness_str.split(",")
|
|
for p in parts:
|
|
p = p.strip().lower()
|
|
if "integrated" in p:
|
|
try:
|
|
integrated_lufs = float(p.split(":")[1].split("lufs")[0].strip())
|
|
except (ValueError, IndexError):
|
|
pass
|
|
elif "short-term" in p or "short term" in p:
|
|
try:
|
|
short_term_lufs = float(p.split(":")[1].split("lufs")[0].strip())
|
|
except (ValueError, IndexError):
|
|
pass
|
|
lufs = integrated_lufs
|
|
except Exception:
|
|
pass
|
|
return lufs, integrated_lufs, short_term_lufs
|
|
"""
|
|
|
|
# --------------------------------------------------------------------
|
|
# API check (adaptive)
|
|
# --------------------------------------------------------------------
|
|
|
|
def _api_check_src(self, actions: list[str]) -> str:
|
|
"""Generate check_api() that requires FX APIs only when relevant actions present."""
|
|
# Base required APIs (always needed)
|
|
required_lines = [
|
|
' "Main_openProject",',
|
|
' "TrackFX_GetCount",',
|
|
' "TrackFX_GetFXName",',
|
|
' "SetMediaTrackInfo_Value",',
|
|
' "CreateTrackSend",',
|
|
' "Main_RenderFile",',
|
|
' "CalcMediaSrcLoudness",',
|
|
' "GetLastError",',
|
|
' "GetProjectName",',
|
|
]
|
|
|
|
needs_fx = any(a in actions for a in ("add_plugins", "configure_fx_params"))
|
|
if needs_fx:
|
|
required_lines.append(' "TrackFX_AddByName",')
|
|
required_lines.append(' "TrackFX_SetParam",')
|
|
|
|
required = "\n".join(required_lines)
|
|
|
|
return f"""\
|
|
def check_api():
|
|
required = [
|
|
{required}
|
|
]
|
|
missing = []
|
|
for name in required:
|
|
if RPR_GetFunctionMetadata(name) == "":
|
|
missing.append(name)
|
|
return missing
|
|
"""
|
|
|
|
# --------------------------------------------------------------------
|
|
# Main block (dispatch loop)
|
|
# --------------------------------------------------------------------
|
|
|
|
def _main_block_src(self, actions: list[str]) -> str:
|
|
"""Generate main() with dispatch loop over actions list."""
|
|
return """\
|
|
def main():
|
|
cmd_path = RPR_ResourcePath() + "scripts/fl_control_command.json"
|
|
res_path = RPR_ResourcePath() + "scripts/fl_control_result.json"
|
|
|
|
# Read command JSON
|
|
try:
|
|
with open(cmd_path, encoding="utf-8") as f:
|
|
cmd_text = f.read()
|
|
cmd = parse_json(cmd_text)
|
|
except Exception as e:
|
|
write_json(res_path, {"version": 1, "status": "error", "message": "failed to read command: " + str(e)})
|
|
return
|
|
|
|
# API availability check
|
|
missing = check_api()
|
|
if missing:
|
|
write_json(res_path, {"version": 1, "status": "error", "message": "missing API: " + ", ".join(missing)})
|
|
return
|
|
|
|
rpp_path = cmd.get("rpp_path", "")
|
|
render_path = cmd.get("render_path", "")
|
|
action = cmd.get("action", "calibrate")
|
|
# Normalize action to list
|
|
if isinstance(action, str):
|
|
action_list = [action] if action else ["calibrate"]
|
|
else:
|
|
action_list = action if action else ["calibrate"]
|
|
track_cal = cmd.get("track_calibration", [])
|
|
|
|
# 0. Open project
|
|
RPR_Main_openProject(rpp_path)
|
|
|
|
# State variables populated by actions
|
|
fx_errors = []
|
|
tracks_verified = 0
|
|
added_plugins = []
|
|
lufs = None
|
|
integrated_lufs = None
|
|
short_term_lufs = None
|
|
|
|
# 1. Dispatch loop
|
|
for action_name in action_list:
|
|
if action_name == "add_plugins":
|
|
added_plugins = add_plugins(cmd)
|
|
elif action_name == "configure_fx_params":
|
|
configure_fx_params(cmd)
|
|
elif action_name == "verify_fx":
|
|
fx_errors, tracks_verified = verify_fx()
|
|
elif action_name == "calibrate":
|
|
calibrate(track_cal)
|
|
elif action_name == "render":
|
|
do_render(render_path)
|
|
lufs, integrated_lufs, short_term_lufs = measure_loudness(render_path)
|
|
|
|
# 2. Write result JSON
|
|
result = {
|
|
"version": 1,
|
|
"status": "ok",
|
|
"message": "",
|
|
"lufs": lufs,
|
|
"integrated_lufs": integrated_lufs,
|
|
"short_term_lufs": short_term_lufs,
|
|
"fx_errors": fx_errors,
|
|
"tracks_verified": tracks_verified,
|
|
"added_plugins": added_plugins,
|
|
}
|
|
write_json(res_path, result)
|
|
"""
|