feat: SDD workflow — test sync, song generation + validation, ReaScript hybrid pipeline
- compose-test-sync: fix 3 failing tests (NOTE_TO_MIDI, DrumLoopAnalyzer mock, section name) - generate-song: CLI wrapper + RPP validator (6 structural checks) + 4 e2e tests - reascript-hybrid: ReaScriptGenerator + command protocol + CLI + 16 unit tests - 110/110 tests passing - Full SDD cycle (propose→spec→design→tasks→apply→verify) for all 3 changes
This commit is contained in:
266
src/reaper_scripting/__init__.py
Normal file
266
src/reaper_scripting/__init__.py
Normal file
@@ -0,0 +1,266 @@
|
||||
"""ReaScript generator — writes self-contained Python ReaScripts for REAPER."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from .commands import ReaScriptCommand
|
||||
|
||||
|
||||
class ReaScriptGenerator:
|
||||
"""Generate a self-contained Python ReaScript file."""
|
||||
|
||||
def generate(self, path: Path, command: ReaScriptCommand) -> None:
|
||||
"""Write a self-contained ReaScript .py to path that REAPER can execute."""
|
||||
code = self._build_script(command)
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
f.write(code)
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# Hand-rolled JSON parser (~20 lines, no import json)
|
||||
# ReaScript Python has no json module; we parse via string splitting.
|
||||
# --------------------------------------------------------------------
|
||||
JSON_PARSER_SRC = """\
|
||||
def parse_json(s):
|
||||
s = s.strip()
|
||||
if s[0] != "{" or s[-1] != "}":
|
||||
raise ValueError("not a JSON object")
|
||||
s = s[1:-1].strip()
|
||||
if not s:
|
||||
return {}
|
||||
result = {}
|
||||
segments = []
|
||||
depth = 0
|
||||
in_string = False
|
||||
escape = False
|
||||
start = 0
|
||||
i = 0
|
||||
while i < len(s):
|
||||
c = s[i]
|
||||
if not in_string and c in "{[":
|
||||
depth += 1
|
||||
elif not in_string and c in "}]":
|
||||
depth -= 1
|
||||
elif c == "\\\\":
|
||||
escape = not escape
|
||||
elif not escape and c == '"':
|
||||
in_string = not in_string
|
||||
elif not in_string and depth == 0 and c == ",":
|
||||
segments.append(s[start:i].strip())
|
||||
start = i + 1
|
||||
i += 1
|
||||
if start < len(s):
|
||||
segments.append(s[start:].strip())
|
||||
for seg in segments:
|
||||
colon_idx = -1
|
||||
in_str = False
|
||||
esc = False
|
||||
for j, ch in enumerate(seg):
|
||||
if ch == "\\\\":
|
||||
esc = not esc
|
||||
elif not esc and ch == '"':
|
||||
in_str = not in_str
|
||||
elif not in_str and ch == ":":
|
||||
colon_idx = j
|
||||
break
|
||||
if colon_idx < 0:
|
||||
continue
|
||||
key = seg[:colon_idx].strip()
|
||||
val = seg[colon_idx + 1:].strip()
|
||||
if key.startswith('"') and key.endswith('"'):
|
||||
key = key[1:-1]
|
||||
else:
|
||||
key = key.strip()
|
||||
if val.startswith('"') and val.endswith('"'):
|
||||
result[key] = val[1:-1]
|
||||
elif val == "true":
|
||||
result[key] = True
|
||||
elif val == "false":
|
||||
result[key] = False
|
||||
elif val == "null":
|
||||
result[key] = None
|
||||
else:
|
||||
try:
|
||||
result[key] = int(val)
|
||||
except ValueError:
|
||||
try:
|
||||
result[key] = float(val)
|
||||
except ValueError:
|
||||
result[key] = val
|
||||
return result
|
||||
|
||||
|
||||
def to_json_val(v):
|
||||
if v is None:
|
||||
return "null"
|
||||
if isinstance(v, bool):
|
||||
return "true" if v else "false"
|
||||
if isinstance(v, (int, float)):
|
||||
return str(v)
|
||||
if isinstance(v, dict):
|
||||
return "{" + ", ".join('"' + str(k) + '": ' + to_json_val(val) for k, val in v.items()) + "}"
|
||||
if isinstance(v, list):
|
||||
return "[" + ", ".join(to_json_val(x) for x in v) + "]"
|
||||
return "\\"" + str(v) + "\\""
|
||||
|
||||
|
||||
def write_json(path, data):
|
||||
lines = []
|
||||
lines.append("{")
|
||||
items = list(data.items())
|
||||
for i, (k, v) in enumerate(items):
|
||||
lines.append(" \\"" + str(k) + "\\": " + to_json_val(v) + ("," if i < len(items) - 1 else ""))
|
||||
lines.append("}")
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
f.write("\\n".join(lines) + "\\n")
|
||||
"""
|
||||
|
||||
def _build_script(self, command: ReaScriptCommand) -> str:
|
||||
"""Build the full ReaScript source."""
|
||||
lines = []
|
||||
lines.append("# -*- coding: utf-8 -*-")
|
||||
lines.append("# auto-generated by fl_control ReaScriptGenerator")
|
||||
lines.append('__doc__ = "fl_control Phase 2 ReaScript"')
|
||||
lines.append("")
|
||||
|
||||
# API check function
|
||||
lines.append(self._api_check_src())
|
||||
lines.append("")
|
||||
|
||||
# JSON utilities (hand-rolled, no import json)
|
||||
lines.append(self.JSON_PARSER_SRC)
|
||||
lines.append("")
|
||||
|
||||
# Main block
|
||||
lines.append(self._main_block_src())
|
||||
lines.append("")
|
||||
lines.append("main()")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _api_check_src(self) -> str:
|
||||
return """\
|
||||
def check_api():
|
||||
required = [
|
||||
"Main_openProject",
|
||||
"TrackFX_GetCount",
|
||||
"TrackFX_GetFXName",
|
||||
"SetMediaTrackInfo_Value",
|
||||
"CreateTrackSend",
|
||||
"Main_RenderFile",
|
||||
"CalcMediaSrcLoudness",
|
||||
"GetLastError",
|
||||
"GetProjectName",
|
||||
]
|
||||
missing = []
|
||||
for name in required:
|
||||
if RPR_GetFunctionMetadata(name) == "":
|
||||
missing.append(name)
|
||||
return missing
|
||||
"""
|
||||
|
||||
def _main_block_src(self) -> str:
|
||||
return """\
|
||||
def main():
|
||||
cmd_path = RPR_ResourcePath() + "scripts/fl_control_command.json"
|
||||
res_path = RPR_ResourcePath() + "scripts/fl_control_result.json"
|
||||
|
||||
# Read command JSON
|
||||
try:
|
||||
with open(cmd_path, encoding="utf-8") as f:
|
||||
cmd_text = f.read()
|
||||
cmd = parse_json(cmd_text)
|
||||
except Exception as e:
|
||||
write_json(res_path, {"version": 1, "status": "error", "message": "failed to read command: " + str(e)})
|
||||
return
|
||||
|
||||
# API availability check
|
||||
missing = check_api()
|
||||
if missing:
|
||||
write_json(res_path, {"version": 1, "status": "error", "message": "missing API: " + ", ".join(missing)})
|
||||
return
|
||||
|
||||
rpp_path = cmd.get("rpp_path", "")
|
||||
render_path = cmd.get("render_path", "")
|
||||
action = cmd.get("action", "calibrate")
|
||||
track_cal = cmd.get("track_calibration", [])
|
||||
|
||||
# 1. Open project
|
||||
RPR_Main_openProject(rpp_path)
|
||||
|
||||
# 2. Verify FX on all tracks
|
||||
fx_errors = []
|
||||
tracks_verified = 0
|
||||
num_tracks = RPR_CountTracks(0)
|
||||
for t in range(num_tracks):
|
||||
track = RPR_GetTrack(0, t)
|
||||
fx_count = RPR_TrackFX_GetCount(track, False)
|
||||
for fi in range(fx_count):
|
||||
buf = chr(0) * 512
|
||||
RPR_TrackFX_GetFXName(track, fi, buf, 512)
|
||||
fx_name = buf.rstrip(chr(0)).strip()
|
||||
if not fx_name:
|
||||
fx_errors.append({
|
||||
"track_index": t,
|
||||
"fx_index": fi,
|
||||
"name": "",
|
||||
"expected": ""
|
||||
})
|
||||
tracks_verified += 1
|
||||
|
||||
# 3. Calibrate tracks (volume/pan/sends)
|
||||
for cal in track_cal:
|
||||
track_idx = cal.get("track_index", 0)
|
||||
track = RPR_GetTrack(0, track_idx)
|
||||
vol = cal.get("volume", 1.0)
|
||||
pan = cal.get("pan", 0.0)
|
||||
RPR_SetMediaTrackInfo_Value(track, "D_VOL", vol)
|
||||
RPR_SetMediaTrackInfo_Value(track, "D_PAN", pan)
|
||||
for send in cal.get("sends", []):
|
||||
dest_idx = send.get("dest_track_index", -1)
|
||||
level = send.get("level", 0.0)
|
||||
if dest_idx >= 0:
|
||||
dest_track = RPR_GetTrack(0, dest_idx)
|
||||
RPR_CreateTrackSend(track, dest_track)
|
||||
|
||||
# 4. Render project
|
||||
if render_path:
|
||||
RPR_Main_RenderFile(0, render_path)
|
||||
|
||||
# 5. Measure loudness
|
||||
lufs = None
|
||||
integrated_lufs = None
|
||||
short_term_lufs = None
|
||||
if render_path:
|
||||
try:
|
||||
loudness_str = RPR_CalcMediaSrcLoudness(render_path, True)
|
||||
parts = loudness_str.split(",")
|
||||
for p in parts:
|
||||
p = p.strip().lower()
|
||||
if "integrated" in p:
|
||||
try:
|
||||
integrated_lufs = float(p.split(":")[1].split("lufs")[0].strip())
|
||||
except:
|
||||
pass
|
||||
elif "short-term" in p or "short term" in p:
|
||||
try:
|
||||
short_term_lufs = float(p.split(":")[1].split("lufs")[0].strip())
|
||||
except:
|
||||
pass
|
||||
lufs = integrated_lufs
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 6. Write result JSON
|
||||
result = {
|
||||
"version": 1,
|
||||
"status": "ok",
|
||||
"message": "",
|
||||
"lufs": lufs,
|
||||
"integrated_lufs": integrated_lufs,
|
||||
"short_term_lufs": short_term_lufs,
|
||||
"fx_errors": fx_errors,
|
||||
"tracks_verified": tracks_verified,
|
||||
}
|
||||
write_json(res_path, result)
|
||||
"""
|
||||
Reference in New Issue
Block a user