Files
reaper-control/src/flp_builder/builder.py

383 lines
13 KiB
Python

"""JSON->FLP builder - converts SongDefinition to a valid FL Studio FLP file.
Replicates the proven assembly logic from ``output/build_reggaeton_v15.py`` but
driven entirely by a :class:`SongDefinition` object instead of hardcoded values.
Assembly order (matches v15):
FLhd header + FLdt wrapper around:
header_events + pattern_events + channel_events + arrangement_events
Usage::
builder = FLPBuilder()
flp_bytes = builder.build(song)
Path("out.flp").write_bytes(flp_bytes)
"""
import struct
from pathlib import Path
from .schema import SongDefinition, PatternDef, MelodicTrack
from .skeleton import ChannelSkeletonLoader
from .arrangement import ArrangementItem, build_arrangement_section, build_track_data_template
from .events import (
EventID,
encode_text_event,
encode_word_event,
encode_data_event,
encode_notes_block,
)
from ..composer.rhythm import get_notes
# ---------------------------------------------------------------------------
# Default paths (relative to project root)
# ---------------------------------------------------------------------------
REF_FLP = Path(__file__).parents[2] / "my space ryt" / "my space ryt.flp"
CH11_TMPL = Path(__file__).parents[2] / "output" / "ch11_kick_template.bin"
SAMPLES = Path(__file__).parents[2] / "output" / "samples"
# ---------------------------------------------------------------------------
# Note format conversion
# ---------------------------------------------------------------------------
def _convert_rhythm_notes(notes: list[dict]) -> list[dict]:
"""Convert rhythm.py note format to events.py format.
rhythm.py: ``{"pos", "len", "key", "vel"}``
events.py: ``{"position", "length", "key", "velocity"}``
"""
return [
{"position": n["pos"], "length": n["len"], "key": n["key"], "velocity": n["vel"]}
for n in notes
]
def _convert_melodic_notes(notes: list) -> list[dict]:
"""Convert MelodicNote (pos/len/key/vel) to events.py format.
MelodicNote: ``{pos, len, key, vel}``
events.py: ``{"position", "length", "key", "velocity"}``
"""
return [
{"position": n.pos, "length": n.len, "key": n.key, "velocity": n.vel}
for n in notes
]
# ---------------------------------------------------------------------------
# FLPBuilder
# ---------------------------------------------------------------------------
class FLPBuilder:
"""Builds an FLP binary from a :class:`SongDefinition`.
Parameters
----------
ref_flp:
Path to a reference FLP used for header events and channel skeleton.
ch11_template:
Path to the ch11_kick_template.bin for empty sampler channels.
samples_dir:
Directory containing .wav sample files.
"""
def __init__(
self,
ref_flp: str | Path = REF_FLP,
ch11_template: str | Path = CH11_TMPL,
samples_dir: str | Path = SAMPLES,
):
self._ref_flp = Path(ref_flp)
self._ch11 = Path(ch11_template)
self._samples = Path(samples_dir)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def build(self, song: SongDefinition) -> bytes:
"""Convert *song* to raw FLP bytes.
Raises
------
ValueError
If song validation fails or the reference FLP is malformed.
FileNotFoundError
If reference FLP or templates are missing.
"""
# 1. Validate
errors = song.validate()
if errors:
raise ValueError(
"Song validation failed:\n - " + "\n - ".join(errors)
)
# 2. Read reference FLP
ref_bytes = self._ref_flp.read_bytes()
num_channels = struct.unpack("<H", ref_bytes[10:12])[0]
# 3. Build each section
header_bytes = self._build_header(song, ref_bytes)
pattern_bytes = self._build_all_patterns(song)
# 3b. Build melodic map and melodic pattern bytes
melodic_map: dict[int, tuple[str, str]] = {}
melodic_pattern_bytes = b""
if song.melodic_tracks:
for mt in song.melodic_tracks:
wav_dir = str(Path(mt.sample_path).parent)
wav_name = Path(mt.sample_path).name
melodic_map[mt.channel_index] = (wav_dir, wav_name)
# Assign pattern IDs after drum patterns (1-based)
drum_pattern_count = len(song.patterns)
for i, mt in enumerate(song.melodic_tracks):
pattern_id = drum_pattern_count + i + 1
melodic_pattern_bytes += self._build_melodic_pattern(
mt, pattern_id, song.meta.ppq
)
else:
# No melodic tracks: melodic_map stays empty, same as before
pass
loader = ChannelSkeletonLoader(
str(self._ref_flp),
str(self._ch11),
str(self._samples),
)
channel_bytes = loader.load(song.samples, melodic_map=melodic_map)
track_data_template = build_track_data_template(ref_bytes)
arrangement_bytes = self._build_arrangement(song, track_data_template)
# 4. Assemble body: header + patterns + melodic_patterns + channels + arrangement
body = (
header_bytes
+ pattern_bytes
+ melodic_pattern_bytes
+ channel_bytes
+ arrangement_bytes
)
# 5. Wrap with FLhd + FLdt headers (matches v15 line 317-318)
flp = (
struct.pack("<4sIhHH", b"FLhd", 6, 0, num_channels, song.meta.ppq)
+ b"FLdt"
+ struct.pack("<I", len(body))
+ body
)
return flp
# ------------------------------------------------------------------
# Header
# ------------------------------------------------------------------
def _build_header(self, song: SongDefinition, ref_bytes: bytes) -> bytes:
"""Extract header events from reference FLP and patch with song.meta values.
The "header" is everything between offset 22 (after FLhd+FLdt chunk
headers) and the first ``PatNew`` event. This includes version info,
tempo, time-signature, etc. We patch the tempo (BPM) to match the
song definition.
This replicates v15 lines 133-141.
"""
# Find first PatNew event
first_pat = self._find_first_event(ref_bytes, EventID.PatNew)
if first_pat is None:
raise ValueError("No PatNew event found in reference FLP")
# Extract header events (everything before first pattern)
header = bytearray(ref_bytes[22:first_pat])
# Patch BPM — Tempo event (ID 156) is a dword, value = BPM * 1000
p = 0
while p < len(header):
np, _, ib, _v, _vt = self._read_ev(bytes(header), p)
if ib == EventID.Tempo:
struct.pack_into("<I", header, p + 1, int(song.meta.bpm * 1000))
break
p = np
return bytes(header)
# ------------------------------------------------------------------
# Patterns
# ------------------------------------------------------------------
def _build_pattern_bytes(self, pattern: PatternDef, ppq: int) -> bytes:
"""Build all FLP events for one pattern.
Sequence:
1. ``PatNew`` (word event) — value = pattern.id - 1 (0-based)
2. ``PatName`` (text event) — UTF-16-LE pattern name
3. ``PatNotes`` (data event) per channel from ``get_notes()``
Returns raw bytes for this pattern.
"""
buf = bytearray()
# 1. PatNew — word event, 0-based index
buf += encode_word_event(EventID.PatNew, pattern.id - 1)
# 2. PatName — text event (UTF-16-LE + null terminator)
if pattern.name:
buf += encode_text_event(EventID.PatName, pattern.name)
# 3. Generate notes via rhythm.py dispatcher
notes_by_channel = get_notes(
pattern.generator,
pattern.bars,
pattern.velocity_mult,
pattern.density,
)
# 4. Encode notes for each channel
for ch_idx, raw_notes in notes_by_channel.items():
converted = _convert_rhythm_notes(raw_notes)
buf += encode_data_event(
EventID.PatNotes,
encode_notes_block(ch_idx, converted, ppq),
)
return bytes(buf)
def _build_all_patterns(self, song: SongDefinition) -> bytes:
"""Build bytes for all patterns in *song.patterns*."""
buf = bytearray()
for pattern in song.patterns:
buf += self._build_pattern_bytes(pattern, song.meta.ppq)
return bytes(buf)
def _build_melodic_pattern(
self, mt: MelodicTrack, pattern_id: int, ppq: int
) -> bytes:
"""Build FLP events for one melodic track pattern.
Sequence:
1. ``PatNew`` (word event) — value = pattern_id - 1 (0-based)
2. ``PatName`` (text event) — UTF-16-LE with ``mt.role`` as name
3. ``PatNotes`` (data event) with notes for the melodic channel
Returns raw bytes for this melodic pattern.
"""
buf = bytearray()
# 1. PatNew — word event, 0-based index
buf += encode_word_event(EventID.PatNew, pattern_id - 1)
# 2. PatName — text event (UTF-16-LE + null terminator)
if mt.role:
buf += encode_text_event(EventID.PatName, mt.role)
# 3. Convert MelodicNotes to events.py format and encode
converted = _convert_melodic_notes(mt.notes)
buf += encode_data_event(
EventID.PatNotes,
encode_notes_block(mt.channel_index, converted, ppq),
)
return bytes(buf)
# ------------------------------------------------------------------
# Arrangement
# ------------------------------------------------------------------
def _build_arrangement(
self, song: SongDefinition, track_data_template: bytes
) -> bytes:
"""Convert *song.items* to arrangement section bytes.
Each :class:`ArrangementItemDef` (1-based track) is converted to an
:class:`ArrangementItem` (0-based track_index) and fed to
:func:`build_arrangement_section`.
"""
items = [
ArrangementItem(
pattern_id=item.pattern,
bar=item.bar,
num_bars=item.bars,
track_index=item.track - 1, # 1-based -> 0-based
muted=item.muted,
)
for item in song.items
]
# Add melodic track items after drum items
if song.melodic_tracks:
drum_pattern_count = len(song.patterns)
# Determine starting track index (after drum tracks)
max_drum_track = max((item.track for item in song.items), default=1)
for i, mt in enumerate(song.melodic_tracks):
pattern_id = drum_pattern_count + i + 1
track_index = max_drum_track + i # 0-based, after drum tracks
items.append(
ArrangementItem(
pattern_id=pattern_id,
bar=0,
num_bars=4, # default 4 bars
track_index=track_index,
muted=False,
)
)
return build_arrangement_section(
items,
track_data_template,
ppq=song.meta.ppq,
)
# ------------------------------------------------------------------
# Event parsing helpers (minimal, for header scanning)
# ------------------------------------------------------------------
@staticmethod
def _read_ev(data: bytes, pos: int) -> tuple:
"""Read one FLP event from *data* starting at *pos*.
Returns ``(next_pos, start, event_id, value, value_type)``.
"""
start = pos
ib = data[pos]
pos += 1
if ib < 64:
# Byte event: 1 byte ID + 1 byte value
return pos + 1, start, ib, data[start + 1], "byte"
elif ib < 128:
# Word event: 1 byte ID + 2 byte value
return pos + 2, start, ib, struct.unpack("<H", data[pos : pos + 2])[0], "word"
elif ib < 192:
# Dword event: 1 byte ID + 4 byte value
return pos + 4, start, ib, struct.unpack("<I", data[pos : pos + 4])[0], "dword"
else:
# Data/text event: 1 byte ID + varint size + payload
sz = 0
sh = 0
while True:
b = data[pos]
pos += 1
sz |= (b & 0x7F) << sh
sh += 7
if not (b & 0x80):
break
return pos + sz, start, ib, data[pos : pos + sz], "data"
@classmethod
def _find_first_event(cls, data: bytes, event_id: int) -> int | None:
"""Find the byte offset of the first occurrence of *event_id*.
Starts scanning at offset 22 (past FLhd + FLdt chunk headers).
Returns ``None`` if the event is not found.
"""
pos = 22
while pos < len(data):
np, start, ib, _val, _vt = cls._read_ev(data, pos)
if ib == event_id:
return start
pos = np
return None