refactor: migrate from FL Studio to REAPER with rpp library
Replace FL Studio binary .flp output with REAPER text-based .rpp output using the rpp Python library (Perlence/rpp). - Add core/schema.py: DAW-agnostic data types (SongDefinition, TrackDef, ClipDef, MidiNote, PluginDef) - Add reaper_builder/: RPP file generation via rpp.Element + headless render via reaper.exe CLI - Add composer/converters.py: bridge rhythm.py/melodic.py note dicts to core.schema MidiNote objects - Rewrite scripts/compose.py: real generator pipeline with --render flag - Delete src/flp_builder/, src/scanner/, mcp/, flstudio-mcp/, old scripts - Add 40 passing tests (schema, builder, converters, compose, render)
This commit is contained in:
65
src/composer/converters.py
Normal file
65
src/composer/converters.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""Converters — transform generator output to MIDI notes for SongDefinition.
|
||||
|
||||
rhythm generators → MidiNote list (channel → GM pitch mapping)
|
||||
melodic generators → MidiNote list (note["key"] = pitch directly)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from src.core.schema import MidiNote
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GM drum pitch mapping — channels 10-16
|
||||
# ---------------------------------------------------------------------------
|
||||
CHANNEL_PITCH: dict[int, int] = {
|
||||
10: 39, # perc (General MIDI channel 10 = percussion)
|
||||
11: 36, # kick
|
||||
12: 38, # snare
|
||||
13: 37, # rim
|
||||
14: 50, # perc2
|
||||
15: 42, # hihat
|
||||
16: 39, # clap
|
||||
}
|
||||
|
||||
|
||||
def rhythm_to_midi(note_dict: dict[int, list[dict]]) -> list[MidiNote]:
|
||||
"""Convert rhythm generator output (channel → note list) to MidiNote list.
|
||||
|
||||
note_dict: {channel: [{"pos", "len", "key", "vel"}, ...]}
|
||||
- channel must be in CHANNEL_PITCH (10-16)
|
||||
- pitch = CHANNEL_PITCH[channel]
|
||||
- start = note["pos"]
|
||||
- duration = note["len"]
|
||||
- velocity = note["vel"]
|
||||
"""
|
||||
midi_notes: list[MidiNote] = []
|
||||
for channel, notes in note_dict.items():
|
||||
pitch = CHANNEL_PITCH.get(channel, 60)
|
||||
for note in notes:
|
||||
midi_notes.append(MidiNote(
|
||||
pitch=pitch,
|
||||
start=note["pos"],
|
||||
duration=note["len"],
|
||||
velocity=note["vel"],
|
||||
))
|
||||
return midi_notes
|
||||
|
||||
|
||||
def melodic_to_midi(note_list: list[dict]) -> list[MidiNote]:
|
||||
"""Convert melodic generator output (list of note dicts) to MidiNote list.
|
||||
|
||||
note_list: [{"pos", "len", "key", "vel"}, ...]
|
||||
- pitch = note["key"] (directly used, not mapped)
|
||||
- start = note["pos"]
|
||||
- duration = note["len"]
|
||||
- velocity = note["vel"]
|
||||
"""
|
||||
return [
|
||||
MidiNote(
|
||||
pitch=note["key"],
|
||||
start=note["pos"],
|
||||
duration=note["len"],
|
||||
velocity=note["vel"],
|
||||
)
|
||||
for note in note_list
|
||||
]
|
||||
@@ -4,6 +4,8 @@ All generators return list[dict] with format {pos, len, key, vel}.
|
||||
Designed to feed MelodicTrack notes in SongDefinition.
|
||||
"""
|
||||
|
||||
import random
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Scale definitions
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -52,6 +54,18 @@ def _clamp_vel(v: int) -> int:
|
||||
return max(1, min(127, v))
|
||||
|
||||
|
||||
def _apply_humanize(notes, humanize):
|
||||
"""Apply humanization (velocity jitter + position nudge) to note list."""
|
||||
if humanize <= 0:
|
||||
return notes
|
||||
jitter = humanize * 5
|
||||
nudge = humanize * 0.03
|
||||
for n in notes:
|
||||
n["vel"] = _clamp_vel(int(n["vel"] + random.uniform(-jitter, jitter)))
|
||||
n["pos"] = max(0, n["pos"] + random.uniform(-nudge, nudge))
|
||||
return notes
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Bass: tresillo
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -61,6 +75,7 @@ def bass_tresillo(
|
||||
bars: int,
|
||||
octave: int = 3,
|
||||
velocity_mult: float = 1.0,
|
||||
humanize: float = 0.0,
|
||||
) -> list[dict]:
|
||||
"""Reggaeton tresillo bass pattern.
|
||||
|
||||
@@ -90,7 +105,7 @@ def bass_tresillo(
|
||||
vel = _clamp_vel(int(vel * velocity_mult))
|
||||
notes.append({"pos": o + pos, "len": 0.25, "key": key_note, "vel": vel})
|
||||
|
||||
return notes
|
||||
return _apply_humanize(notes, humanize)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -103,6 +118,7 @@ def lead_hook(
|
||||
octave: int = 5,
|
||||
density: float = 0.6,
|
||||
velocity_mult: float = 1.0,
|
||||
humanize: float = 0.0,
|
||||
) -> list[dict]:
|
||||
"""Simple melodic hook over 4-8 bars.
|
||||
|
||||
@@ -154,7 +170,7 @@ def lead_hook(
|
||||
else:
|
||||
pos += 0.5
|
||||
|
||||
return notes
|
||||
return _apply_humanize(notes, humanize)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -166,6 +182,7 @@ def chords_block(
|
||||
bars: int,
|
||||
octave: int = 4,
|
||||
velocity_mult: float = 1.0,
|
||||
humanize: float = 0.0,
|
||||
) -> list[dict]:
|
||||
"""Blocked chords every 2 beats (half-bar).
|
||||
|
||||
@@ -231,7 +248,7 @@ def chords_block(
|
||||
"vel": vel,
|
||||
})
|
||||
|
||||
return notes
|
||||
return _apply_humanize(notes, humanize)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -243,6 +260,7 @@ def pad_sustain(
|
||||
bars: int,
|
||||
octave: int = 4,
|
||||
velocity_mult: float = 1.0,
|
||||
humanize: float = 0.0,
|
||||
) -> list[dict]:
|
||||
"""Long sustained pad notes, one per bar.
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
"""Reggaeton rhythm generators — pure functions returning note dicts per channel."""
|
||||
|
||||
import random
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Channel constants — match SAMPLE_MAP in channel_skeleton.py
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -21,6 +23,20 @@ CH_CL = 16 # clap.wav
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _apply_groove(notes: list[dict], groove_strength: float) -> list[dict]:
|
||||
"""Apply groove timing and velocity variations to notes.
|
||||
|
||||
groove_strength: 0.0 = no effect, 1.0 = maximum groove feel.
|
||||
"""
|
||||
if groove_strength <= 0:
|
||||
return notes
|
||||
jitter = 5 + groove_strength * 10
|
||||
nudge = groove_strength * 0.02
|
||||
for n in notes:
|
||||
n["vel"] = max(1, min(127, n["vel"] + random.uniform(-jitter, jitter)))
|
||||
n["pos"] = max(0, n["pos"] + random.uniform(-nudge, nudge))
|
||||
return notes
|
||||
|
||||
def _clamp_vel(vel: int) -> int:
|
||||
"""Clamp velocity to valid MIDI range [1, 127]."""
|
||||
return max(1, min(127, vel))
|
||||
@@ -44,6 +60,7 @@ def kick_main_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""Dembow kick: beat 1 (hard, vel 115) + beat 2-and (the dembow hit, vel 105).
|
||||
|
||||
@@ -55,13 +72,14 @@ def kick_main_notes(
|
||||
o = b * 4.0
|
||||
notes.append(_note(o, 0.25, _apply_vel(115, velocity_mult)))
|
||||
notes.append(_note(o + 1.5, 0.25, _apply_vel(105, velocity_mult)))
|
||||
return {CH_K: notes}
|
||||
return _apply_groove({CH_K: notes}, groove_strength)
|
||||
|
||||
|
||||
def kick_sparse_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""Sparse intro/outro kick: just beat 1 per bar (vel 110).
|
||||
|
||||
@@ -71,20 +89,21 @@ def kick_sparse_notes(
|
||||
for b in range(bars):
|
||||
o = b * 4.0
|
||||
notes.append(_note(o, 0.25, _apply_vel(110, velocity_mult)))
|
||||
return {CH_K: notes}
|
||||
return _apply_groove({CH_K: notes}, groove_strength)
|
||||
|
||||
|
||||
def kick_outro_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""Outro kick: dembow pattern with 0.75 baseline softness.
|
||||
|
||||
Delegates to kick_main_notes with an additional 0.75 velocity scaling.
|
||||
Returns {CH_K: [notes...]}.
|
||||
"""
|
||||
return kick_main_notes(bars, velocity_mult=velocity_mult * 0.75, density=density)
|
||||
return kick_main_notes(bars, velocity_mult=velocity_mult * 0.75, density=density, groove_strength=groove_strength)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -95,6 +114,7 @@ def snare_verse_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""Reggaeton snare: beats 2, 3, 3-and, 4 per bar.
|
||||
|
||||
@@ -107,13 +127,14 @@ def snare_verse_notes(
|
||||
o = b * 4.0
|
||||
for p, v in _PATTERN:
|
||||
notes.append(_note(o + p, 0.15, _apply_vel(v, velocity_mult)))
|
||||
return {CH_S: notes}
|
||||
return _apply_groove({CH_S: notes}, groove_strength)
|
||||
|
||||
|
||||
def snare_fill_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""Busier snare with 16th-note fills: adds positions 2.25 and 3.75.
|
||||
|
||||
@@ -133,20 +154,21 @@ def snare_fill_notes(
|
||||
o = b * 4.0
|
||||
for p, v in _PATTERN:
|
||||
notes.append(_note(o + p, 0.15, _apply_vel(v, velocity_mult)))
|
||||
return {CH_S: notes}
|
||||
return _apply_groove({CH_S: notes}, groove_strength)
|
||||
|
||||
|
||||
def snare_outro_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""Softer outro snare (velocity_mult on top of 0.7 baseline).
|
||||
|
||||
Delegates to snare_verse_notes with an additional 0.7 velocity scaling.
|
||||
Returns {CH_S: [notes...]}.
|
||||
"""
|
||||
return snare_verse_notes(bars, velocity_mult=velocity_mult * 0.7, density=density)
|
||||
return snare_verse_notes(bars, velocity_mult=velocity_mult * 0.7, density=density, groove_strength=groove_strength)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -157,6 +179,7 @@ def hihat_16th_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""16th-note hihat with three-tier accent mapping.
|
||||
|
||||
@@ -177,13 +200,14 @@ def hihat_16th_notes(
|
||||
else: # 16th note position
|
||||
base_vel = 40
|
||||
notes.append(_note(o + beat_frac, 0.1, _apply_vel(base_vel, velocity_mult)))
|
||||
return {CH_H: notes}
|
||||
return _apply_groove({CH_H: notes}, groove_strength)
|
||||
|
||||
|
||||
def hihat_8th_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""8th-note hihat for intro/breakdown.
|
||||
|
||||
@@ -196,17 +220,14 @@ def hihat_8th_notes(
|
||||
for i in range(8):
|
||||
base_vel = 70 if i % 2 == 0 else 50
|
||||
notes.append(_note(o + i * 0.5, 0.1, _apply_vel(base_vel, velocity_mult)))
|
||||
return {CH_H: notes}
|
||||
return _apply_groove({CH_H: notes}, groove_strength)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Clap generator
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def clap_24_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""Classic reggaeton clap: beats 2 and 4 → positions 1.0 and 3.0 per bar.
|
||||
|
||||
@@ -218,17 +239,14 @@ def clap_24_notes(
|
||||
o = b * 4.0
|
||||
notes.append(_note(o + 1.0, 0.15, _apply_vel(120, velocity_mult)))
|
||||
notes.append(_note(o + 3.0, 0.15, _apply_vel(120, velocity_mult)))
|
||||
return {CH_CL: notes}
|
||||
return _apply_groove({CH_CL: notes}, groove_strength)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Percussion generators
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def perc_combo_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""Perc1 + Perc2 offbeat combo (tumba feel).
|
||||
|
||||
@@ -244,13 +262,14 @@ def perc_combo_notes(
|
||||
p2_notes.append(_note(o + 2.75, 0.1, _apply_vel(80, velocity_mult)))
|
||||
p1_notes.append(_note(o + 1.5, 0.1, _apply_vel(70, velocity_mult)))
|
||||
p1_notes.append(_note(o + 3.5, 0.1, _apply_vel(65, velocity_mult)))
|
||||
return {CH_P1: p1_notes, CH_P2: p2_notes}
|
||||
return _apply_groove({CH_P1: p1_notes, CH_P2: p2_notes}, groove_strength)
|
||||
|
||||
|
||||
def rim_build_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""Rim roll that builds intensity across bars (4-bar cycle).
|
||||
|
||||
@@ -278,7 +297,7 @@ def rim_build_notes(
|
||||
vel = _apply_vel(base_vel, velocity_mult)
|
||||
for idx in _PATTERNS[cycle]:
|
||||
notes.append(_note(o + idx * 0.25, 0.1, vel))
|
||||
return {CH_R: notes}
|
||||
return _apply_groove({CH_R: notes}, groove_strength)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -305,7 +324,8 @@ def get_notes(
|
||||
bars: int,
|
||||
velocity_mult: float = 1.0,
|
||||
density: float = 1.0,
|
||||
groove_strength: float = 0.0,
|
||||
) -> dict[int, list[dict]]:
|
||||
"""Dispatch to the named generator. Raises KeyError if not found."""
|
||||
gen = GENERATORS[generator_name]
|
||||
return gen(bars, velocity_mult, density)
|
||||
return gen(bars, velocity_mult, density, groove_strength)
|
||||
|
||||
@@ -18,7 +18,7 @@ import random
|
||||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
|
||||
from ..flp_builder.schema import (
|
||||
from ..core.schema import (
|
||||
ArrangementItemDef,
|
||||
ArrangementTrack,
|
||||
PatternDef,
|
||||
|
||||
0
src/core/__init__.py
Normal file
0
src/core/__init__.py
Normal file
253
src/core/schema.py
Normal file
253
src/core/schema.py
Normal file
@@ -0,0 +1,253 @@
|
||||
"""Core schema definitions for REAPER project generation.
|
||||
|
||||
Represents the intermediate representation (SongDefinition) used to build
|
||||
REAPER .rpp files via RPPBuilder.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Key validation
|
||||
# ---------------------------------------------------------------------------
|
||||
import re
|
||||
|
||||
_KEY_RE = re.compile(r"^[A-G][b#]?m?$")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dataclasses
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class SongMeta:
|
||||
"""Song metadata — tempo, key, time signature."""
|
||||
|
||||
bpm: float # 20–999
|
||||
key: str # e.g. "Am", "Dm", "Gm"
|
||||
title: str = "" # song title
|
||||
ppq: int = 960 # ticks per quarter note (REAPER default)
|
||||
time_sig_num: int = 4 # numerator e.g. 4
|
||||
time_sig_den: int = 4 # denominator e.g. 4
|
||||
|
||||
|
||||
@dataclass
|
||||
class MidiNote:
|
||||
"""A single MIDI note event.
|
||||
|
||||
Attributes:
|
||||
pitch: MIDI note number 0–127 (60 = middle C)
|
||||
start: Start time in beats (from start of item)
|
||||
duration: Duration in beats
|
||||
velocity: 0–127
|
||||
"""
|
||||
|
||||
pitch: int
|
||||
start: float # beats
|
||||
duration: float # beats
|
||||
velocity: int = 64
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArrangementTrack:
|
||||
"""A track in the REAPER arrangement with index and display name."""
|
||||
|
||||
index: int
|
||||
name: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArrangementItemDef:
|
||||
"""An item placed in the arrangement referencing a pattern on a track.
|
||||
|
||||
Attributes:
|
||||
pattern: Pattern ID
|
||||
bar: Start position in bars (float)
|
||||
bars: Length in bars (float)
|
||||
track: Track index
|
||||
"""
|
||||
|
||||
pattern: int
|
||||
bar: float
|
||||
bars: float
|
||||
track: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class PatternDef:
|
||||
"""A pattern definition with generator and variation axes.
|
||||
|
||||
Attributes:
|
||||
id: Unique pattern ID
|
||||
name: Display name (e.g. "Kick Main")
|
||||
instrument: Sample/instrument key (e.g. "kick", "snare")
|
||||
channel: MIDI channel (11 = kick, 12 = snare, etc.)
|
||||
bars: Length in bars
|
||||
generator: Generator function name
|
||||
velocity_mult: Velocity multiplier (0.85–1.1)
|
||||
density: Note density 0.0–1.0
|
||||
"""
|
||||
|
||||
id: int
|
||||
name: str
|
||||
instrument: str
|
||||
channel: int
|
||||
bars: int
|
||||
generator: str
|
||||
velocity_mult: float = 1.0
|
||||
density: float = 1.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class ClipDef:
|
||||
"""A clip placed on a track — either audio or MIDI.
|
||||
|
||||
Attributes:
|
||||
position: Start position in beats
|
||||
length: Duration in beats
|
||||
audio_path: Absolute path to audio file (for audio clips)
|
||||
midi_notes: List of MIDI notes (for MIDI clips)
|
||||
name: Display name
|
||||
"""
|
||||
|
||||
position: float
|
||||
length: float
|
||||
name: str = ""
|
||||
audio_path: str | None = None # for audio clips
|
||||
midi_notes: list[MidiNote] = field(default_factory=list) # for MIDI clips
|
||||
|
||||
@property
|
||||
def is_midi(self) -> bool:
|
||||
return bool(self.midi_notes)
|
||||
|
||||
@property
|
||||
def is_audio(self) -> bool:
|
||||
return self.audio_path is not None
|
||||
|
||||
|
||||
@dataclass
|
||||
class PluginDef:
|
||||
"""A VST plugin instance on a track.
|
||||
|
||||
Attributes:
|
||||
name: Display name (e.g. "Serum 2")
|
||||
path: Plugin path/identifier (e.g. "VST3: Serum 2 (Xfer Records)")
|
||||
index: Chain position (0 = first)
|
||||
params: Optional dict of parameter index → value
|
||||
"""
|
||||
|
||||
name: str
|
||||
path: str
|
||||
index: int = 0
|
||||
params: dict[int, float] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TrackDef:
|
||||
"""A track in the REAPER project.
|
||||
|
||||
Attributes:
|
||||
name: Track display name
|
||||
volume: 0.0–1.0 (maps to REAPER volume fader)
|
||||
pan: -1.0 to 1.0
|
||||
color: REAPER color index (0–67), 0 = default
|
||||
clips: Audio/MIDI clips placed on this track
|
||||
plugins: VST plugins on this track
|
||||
send_reverb: Reverb send level 0.0–1.0
|
||||
send_delay: Delay send level 0.0–1.0
|
||||
"""
|
||||
|
||||
name: str
|
||||
volume: float = 0.85
|
||||
pan: float = 0.0
|
||||
color: int = 0
|
||||
clips: list[ClipDef] = field(default_factory=list)
|
||||
plugins: list[PluginDef] = field(default_factory=list)
|
||||
send_reverb: float = 0.0
|
||||
send_delay: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class SongDefinition:
|
||||
"""Complete song definition — the source of truth for one .rpp file.
|
||||
|
||||
This holds the minimal data needed by RPPBuilder to write a complete .rpp.
|
||||
|
||||
Attributes:
|
||||
meta: Song metadata (bpm, key, title, time signature)
|
||||
tracks: List of REAPER tracks (TrackDef) with clips and plugins
|
||||
patterns: Pattern definitions (PatternDef) for arrangement
|
||||
items: Arrangement items (ArrangementItemDef) referencing patterns
|
||||
progression_name: Chord progression name (e.g. "i-VII-VI-VII")
|
||||
section_template: Section template name (default "standard")
|
||||
samples: Sample file map (name → filename)
|
||||
"""
|
||||
|
||||
meta: SongMeta
|
||||
tracks: list[TrackDef] = field(default_factory=list)
|
||||
patterns: list[PatternDef] = field(default_factory=list)
|
||||
items: list[ArrangementItemDef] = field(default_factory=list)
|
||||
progression_name: str = "i-VII-VI-VII"
|
||||
section_template: str = "standard"
|
||||
samples: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Validation
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def validate(self) -> list[str]:
|
||||
"""Return list of validation errors (empty list = valid)."""
|
||||
errors: list[str] = []
|
||||
|
||||
# BPM range
|
||||
if not (20 <= self.meta.bpm <= 999):
|
||||
errors.append(f"meta.bpm must be 20–999, got {self.meta.bpm}")
|
||||
|
||||
# Key format
|
||||
if not _KEY_RE.match(self.meta.key):
|
||||
errors.append(f"meta.key must match ^[A-G][b#]?m?$, got '{self.meta.key}'")
|
||||
|
||||
# Track names unique
|
||||
names = [t.name for t in self.tracks]
|
||||
if len(names) != len(set(names)):
|
||||
errors.append("Duplicate track names found")
|
||||
|
||||
# Check for clips with neither audio_path nor midi_notes
|
||||
for ti, track in enumerate(self.tracks):
|
||||
for ci, clip in enumerate(track.clips):
|
||||
if not clip.is_audio and not clip.is_midi:
|
||||
errors.append(
|
||||
f"tracks[{ti}].clips[{ci}] has no audio_path and no midi_notes"
|
||||
)
|
||||
|
||||
return errors
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Computed helpers
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
@property
|
||||
def length_beats(self) -> float:
|
||||
"""Compute the total length in beats from all clips."""
|
||||
if not self.tracks:
|
||||
return 0.0
|
||||
max_end = 0.0
|
||||
for track in self.tracks:
|
||||
for clip in track.clips:
|
||||
end = clip.position + clip.length
|
||||
if end > max_end:
|
||||
max_end = end
|
||||
return max_end
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Serialization
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def to_json(self, indent: int = 2) -> str:
|
||||
"""Serialize to a JSON string."""
|
||||
import json
|
||||
from dataclasses import asdict
|
||||
return json.dumps(asdict(self), indent=indent, ensure_ascii=False)
|
||||
@@ -1,12 +0,0 @@
|
||||
from .writer import FLPWriter
|
||||
from .writer import FLPWriter
|
||||
from .project import FLPProject, Note, Channel, Pattern, Plugin
|
||||
|
||||
__all__ = [
|
||||
"FLPWriter",
|
||||
"FLPProject",
|
||||
"Note",
|
||||
"Channel",
|
||||
"Pattern",
|
||||
"Plugin",
|
||||
]
|
||||
@@ -1,222 +0,0 @@
|
||||
"""FL Studio arrangement/playlist encoding.
|
||||
|
||||
Encodes playlist items (ID233) and track data (ID238) into binary format
|
||||
matching FL Studio's internal structure. Extracted from the proven v15 builder
|
||||
(output/build_reggaeton_v15.py, lines 61-90).
|
||||
|
||||
Arrangement block sequence:
|
||||
ArrNew(99) → ArrName(241) → Flag36(36) → Playlist(233)
|
||||
→ TrackData(238)×N → ArrCurrent(100)
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
import struct
|
||||
|
||||
from .events import encode_byte_event, encode_data_event, encode_word_event
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
PPQ_DEFAULT: int = 96
|
||||
MAX_TRACKS_DEFAULT: int = 500
|
||||
PATTERN_BASE: int = 20480
|
||||
|
||||
# Arrangement event IDs (not yet in EventID enum — raw constants)
|
||||
EID_ARR_NEW = 99
|
||||
EID_ARR_CURRENT = 100
|
||||
EID_ARR_NAME = 241
|
||||
EID_FLAG_36 = 36
|
||||
EID_PLAYLIST = 233
|
||||
EID_TRACK_DATA = 238
|
||||
|
||||
# TrackData template size (bytes), extracted from reference FLP
|
||||
TRACK_DATA_SIZE = 66
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ArrangementItem dataclass
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class ArrangementItem:
|
||||
"""A single playlist item placed on the arrangement timeline.
|
||||
|
||||
Args:
|
||||
pattern_id: Pattern number (1-based).
|
||||
bar: Start bar (0-based, fractional allowed).
|
||||
num_bars: Length in bars (fractional allowed).
|
||||
track_index: Track row index (0-based).
|
||||
muted: Whether the item is muted in the playlist.
|
||||
"""
|
||||
|
||||
pattern_id: int # pattern number (1-based)
|
||||
bar: float # start bar (0-based)
|
||||
num_bars: float # length in bars
|
||||
track_index: int # 0-based track index
|
||||
muted: bool = False
|
||||
|
||||
def to_bytes(
|
||||
self,
|
||||
ppq: int = PPQ_DEFAULT,
|
||||
max_tracks: int = MAX_TRACKS_DEFAULT,
|
||||
) -> bytes:
|
||||
"""Encode as a 32-byte playlist item (ID233 format).
|
||||
|
||||
Encoding rules (from reverse-engineered FL Studio format):
|
||||
position = int(bar × ppq × 4) — ticks, truncated
|
||||
pattern_base = 20480 — constant
|
||||
item_index = 20480 + pattern_id
|
||||
length = int(num_bars × ppq × 4) — ticks, truncated
|
||||
track_rvidx = (max_tracks - 1) - track_index — REVERSED
|
||||
flags = 0x2040 if muted else 0x0040
|
||||
"""
|
||||
position = int(self.bar * ppq * 4)
|
||||
item_index = PATTERN_BASE + self.pattern_id
|
||||
length = int(self.num_bars * ppq * 4)
|
||||
track_rvidx = (max_tracks - 1) - self.track_index
|
||||
flags = 0x2040 if self.muted else 0x0040
|
||||
|
||||
return struct.pack(
|
||||
"<IHHIHH HH 4B ff",
|
||||
position,
|
||||
PATTERN_BASE,
|
||||
item_index,
|
||||
length,
|
||||
track_rvidx,
|
||||
0, # group
|
||||
0x0078,
|
||||
flags,
|
||||
64, 100, 128, 128,
|
||||
-1.0, -1.0,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TrackData helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def build_track_data_template(reference_flp_bytes: bytes) -> bytes:
|
||||
"""Extract the 66-byte TrackData template from a reference FLP.
|
||||
|
||||
Scans the raw FLP bytes for the first ID238 event and returns its
|
||||
66-byte payload. This template is then cloned and patched for each
|
||||
of the *max_tracks* track data entries in the arrangement section.
|
||||
|
||||
Args:
|
||||
reference_flp_bytes: Full contents of a valid .flp file.
|
||||
|
||||
Returns:
|
||||
The 66-byte track-data template.
|
||||
|
||||
Raises:
|
||||
ValueError: If no ID238 event of the expected size is found.
|
||||
"""
|
||||
pos = 22 # skip FLhd (14 bytes) + FLdt header (8 bytes)
|
||||
|
||||
while pos < len(reference_flp_bytes):
|
||||
ib = reference_flp_bytes[pos]
|
||||
pos += 1
|
||||
|
||||
if ib < 64:
|
||||
# Byte event: 1-byte value
|
||||
pos += 1
|
||||
elif ib < 128:
|
||||
# Word event: 2-byte value
|
||||
pos += 2
|
||||
elif ib < 192:
|
||||
# Dword event: 4-byte value
|
||||
pos += 4
|
||||
else:
|
||||
# Data / text event: varint length + payload
|
||||
size = 0
|
||||
shift = 0
|
||||
while True:
|
||||
b = reference_flp_bytes[pos]
|
||||
pos += 1
|
||||
size |= (b & 0x7F) << shift
|
||||
shift += 7
|
||||
if not (b & 0x80):
|
||||
break
|
||||
|
||||
if ib == EID_TRACK_DATA and size == TRACK_DATA_SIZE:
|
||||
return bytes(reference_flp_bytes[pos:pos + size])
|
||||
|
||||
pos += size
|
||||
|
||||
raise ValueError(
|
||||
f"No ID{EID_TRACK_DATA} TrackData event ({TRACK_DATA_SIZE} bytes) "
|
||||
"found in reference FLP"
|
||||
)
|
||||
|
||||
|
||||
def encode_track_data(iid: int, enabled: int, template: bytes) -> bytes:
|
||||
"""Clone *template*, patch iid at byte 0 (uint32 LE) and enabled at byte 12.
|
||||
|
||||
Args:
|
||||
iid: Internal track ID (sequential from 1).
|
||||
enabled: 0 = disabled, 1 = enabled.
|
||||
template: 66-byte template extracted by :func:`build_track_data_template`.
|
||||
|
||||
Returns:
|
||||
66-byte patched track data.
|
||||
"""
|
||||
td = bytearray(template)
|
||||
struct.pack_into("<I", td, 0, iid)
|
||||
td[12] = enabled & 0xFF
|
||||
return bytes(td)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Full arrangement section builder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def build_arrangement_section(
|
||||
items: list[ArrangementItem],
|
||||
track_data_template: bytes,
|
||||
ppq: int = PPQ_DEFAULT,
|
||||
max_tracks: int = MAX_TRACKS_DEFAULT,
|
||||
) -> bytes:
|
||||
"""Build the full post-channel arrangement section bytes.
|
||||
|
||||
Produces the exact byte sequence FL Studio expects after the channel
|
||||
events:
|
||||
|
||||
ArrNew(99) → ArrName(241) → Flag36(36) → Playlist(233)
|
||||
→ TrackData(238) × *max_tracks* → ArrCurrent(100)
|
||||
|
||||
Args:
|
||||
items: Playlist items to encode.
|
||||
track_data_template: 66-byte template from :func:`build_track_data_template`.
|
||||
ppq: Pulses-per-quarter-note (default 96).
|
||||
max_tracks: Total track-data entries to write (default 500).
|
||||
|
||||
Returns:
|
||||
Complete arrangement section as raw bytes.
|
||||
"""
|
||||
result = bytearray()
|
||||
|
||||
# 1. ArrNew — word event, value = 0
|
||||
result.extend(encode_word_event(EID_ARR_NEW, 0))
|
||||
|
||||
# 2. ArrName — "Arrangement" as UTF-16-LE + null terminator
|
||||
arr_name = "Arrangement".encode("utf-16-le") + b"\x00\x00"
|
||||
result.extend(encode_data_event(EID_ARR_NAME, arr_name))
|
||||
|
||||
# 3. Flag36 — byte event, value = 0
|
||||
result.extend(encode_byte_event(EID_FLAG_36, 0))
|
||||
|
||||
# 4. Playlist — data event, concatenation of all 32-byte items
|
||||
pl_data = b"".join(item.to_bytes(ppq, max_tracks) for item in items)
|
||||
result.extend(encode_data_event(EID_PLAYLIST, pl_data))
|
||||
|
||||
# 5. TrackData × max_tracks — first track (iid=1) disabled, rest enabled
|
||||
for i in range(1, max_tracks + 1):
|
||||
enabled = 0 if i == 1 else 1
|
||||
td = encode_track_data(i, enabled, track_data_template)
|
||||
result.extend(encode_data_event(EID_TRACK_DATA, td))
|
||||
|
||||
# 6. ArrCurrent — word event, value = 0
|
||||
result.extend(encode_word_event(EID_ARR_CURRENT, 0))
|
||||
|
||||
return bytes(result)
|
||||
@@ -1,382 +0,0 @@
|
||||
"""JSON->FLP builder - converts SongDefinition to a valid FL Studio FLP file.
|
||||
|
||||
Replicates the proven assembly logic from ``output/build_reggaeton_v15.py`` but
|
||||
driven entirely by a :class:`SongDefinition` object instead of hardcoded values.
|
||||
|
||||
Assembly order (matches v15):
|
||||
FLhd header + FLdt wrapper around:
|
||||
header_events + pattern_events + channel_events + arrangement_events
|
||||
|
||||
Usage::
|
||||
|
||||
builder = FLPBuilder()
|
||||
flp_bytes = builder.build(song)
|
||||
Path("out.flp").write_bytes(flp_bytes)
|
||||
"""
|
||||
|
||||
import struct
|
||||
from pathlib import Path
|
||||
|
||||
from .schema import SongDefinition, PatternDef, MelodicTrack
|
||||
from .skeleton import ChannelSkeletonLoader
|
||||
from .arrangement import ArrangementItem, build_arrangement_section, build_track_data_template
|
||||
from .events import (
|
||||
EventID,
|
||||
encode_text_event,
|
||||
encode_word_event,
|
||||
encode_data_event,
|
||||
encode_notes_block,
|
||||
)
|
||||
from ..composer.rhythm import get_notes
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Default paths (relative to project root)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
REF_FLP = Path(__file__).parents[2] / "my space ryt" / "my space ryt.flp"
|
||||
CH11_TMPL = Path(__file__).parents[2] / "output" / "ch11_kick_template.bin"
|
||||
SAMPLES = Path(__file__).parents[2] / "output" / "samples"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Note format conversion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _convert_rhythm_notes(notes: list[dict]) -> list[dict]:
|
||||
"""Convert rhythm.py note format to events.py format.
|
||||
|
||||
rhythm.py: ``{"pos", "len", "key", "vel"}``
|
||||
events.py: ``{"position", "length", "key", "velocity"}``
|
||||
"""
|
||||
return [
|
||||
{"position": n["pos"], "length": n["len"], "key": n["key"], "velocity": n["vel"]}
|
||||
for n in notes
|
||||
]
|
||||
|
||||
|
||||
def _convert_melodic_notes(notes: list) -> list[dict]:
|
||||
"""Convert MelodicNote (pos/len/key/vel) to events.py format.
|
||||
|
||||
MelodicNote: ``{pos, len, key, vel}``
|
||||
events.py: ``{"position", "length", "key", "velocity"}``
|
||||
"""
|
||||
return [
|
||||
{"position": n.pos, "length": n.len, "key": n.key, "velocity": n.vel}
|
||||
for n in notes
|
||||
]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FLPBuilder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class FLPBuilder:
|
||||
"""Builds an FLP binary from a :class:`SongDefinition`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ref_flp:
|
||||
Path to a reference FLP used for header events and channel skeleton.
|
||||
ch11_template:
|
||||
Path to the ch11_kick_template.bin for empty sampler channels.
|
||||
samples_dir:
|
||||
Directory containing .wav sample files.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ref_flp: str | Path = REF_FLP,
|
||||
ch11_template: str | Path = CH11_TMPL,
|
||||
samples_dir: str | Path = SAMPLES,
|
||||
):
|
||||
self._ref_flp = Path(ref_flp)
|
||||
self._ch11 = Path(ch11_template)
|
||||
self._samples = Path(samples_dir)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public API
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def build(self, song: SongDefinition) -> bytes:
|
||||
"""Convert *song* to raw FLP bytes.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If song validation fails or the reference FLP is malformed.
|
||||
FileNotFoundError
|
||||
If reference FLP or templates are missing.
|
||||
"""
|
||||
# 1. Validate
|
||||
errors = song.validate()
|
||||
if errors:
|
||||
raise ValueError(
|
||||
"Song validation failed:\n - " + "\n - ".join(errors)
|
||||
)
|
||||
|
||||
# 2. Read reference FLP
|
||||
ref_bytes = self._ref_flp.read_bytes()
|
||||
num_channels = struct.unpack("<H", ref_bytes[10:12])[0]
|
||||
|
||||
# 3. Build each section
|
||||
header_bytes = self._build_header(song, ref_bytes)
|
||||
pattern_bytes = self._build_all_patterns(song)
|
||||
|
||||
# 3b. Build melodic map and melodic pattern bytes
|
||||
melodic_map: dict[int, tuple[str, str]] = {}
|
||||
melodic_pattern_bytes = b""
|
||||
if song.melodic_tracks:
|
||||
for mt in song.melodic_tracks:
|
||||
wav_dir = str(Path(mt.sample_path).parent)
|
||||
wav_name = Path(mt.sample_path).name
|
||||
melodic_map[mt.channel_index] = (wav_dir, wav_name)
|
||||
|
||||
# Assign pattern IDs after drum patterns (1-based)
|
||||
drum_pattern_count = len(song.patterns)
|
||||
for i, mt in enumerate(song.melodic_tracks):
|
||||
pattern_id = drum_pattern_count + i + 1
|
||||
melodic_pattern_bytes += self._build_melodic_pattern(
|
||||
mt, pattern_id, song.meta.ppq
|
||||
)
|
||||
else:
|
||||
# No melodic tracks: melodic_map stays empty, same as before
|
||||
pass
|
||||
|
||||
loader = ChannelSkeletonLoader(
|
||||
str(self._ref_flp),
|
||||
str(self._ch11),
|
||||
str(self._samples),
|
||||
)
|
||||
channel_bytes = loader.load(song.samples, melodic_map=melodic_map)
|
||||
|
||||
track_data_template = build_track_data_template(ref_bytes)
|
||||
arrangement_bytes = self._build_arrangement(song, track_data_template)
|
||||
|
||||
# 4. Assemble body: header + patterns + melodic_patterns + channels + arrangement
|
||||
body = (
|
||||
header_bytes
|
||||
+ pattern_bytes
|
||||
+ melodic_pattern_bytes
|
||||
+ channel_bytes
|
||||
+ arrangement_bytes
|
||||
)
|
||||
|
||||
# 5. Wrap with FLhd + FLdt headers (matches v15 line 317-318)
|
||||
flp = (
|
||||
struct.pack("<4sIhHH", b"FLhd", 6, 0, num_channels, song.meta.ppq)
|
||||
+ b"FLdt"
|
||||
+ struct.pack("<I", len(body))
|
||||
+ body
|
||||
)
|
||||
|
||||
return flp
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Header
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _build_header(self, song: SongDefinition, ref_bytes: bytes) -> bytes:
|
||||
"""Extract header events from reference FLP and patch with song.meta values.
|
||||
|
||||
The "header" is everything between offset 22 (after FLhd+FLdt chunk
|
||||
headers) and the first ``PatNew`` event. This includes version info,
|
||||
tempo, time-signature, etc. We patch the tempo (BPM) to match the
|
||||
song definition.
|
||||
|
||||
This replicates v15 lines 133-141.
|
||||
"""
|
||||
# Find first PatNew event
|
||||
first_pat = self._find_first_event(ref_bytes, EventID.PatNew)
|
||||
if first_pat is None:
|
||||
raise ValueError("No PatNew event found in reference FLP")
|
||||
|
||||
# Extract header events (everything before first pattern)
|
||||
header = bytearray(ref_bytes[22:first_pat])
|
||||
|
||||
# Patch BPM — Tempo event (ID 156) is a dword, value = BPM * 1000
|
||||
p = 0
|
||||
while p < len(header):
|
||||
np, _, ib, _v, _vt = self._read_ev(bytes(header), p)
|
||||
if ib == EventID.Tempo:
|
||||
struct.pack_into("<I", header, p + 1, int(song.meta.bpm * 1000))
|
||||
break
|
||||
p = np
|
||||
|
||||
return bytes(header)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Patterns
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _build_pattern_bytes(self, pattern: PatternDef, ppq: int) -> bytes:
|
||||
"""Build all FLP events for one pattern.
|
||||
|
||||
Sequence:
|
||||
1. ``PatNew`` (word event) — value = pattern.id - 1 (0-based)
|
||||
2. ``PatName`` (text event) — UTF-16-LE pattern name
|
||||
3. ``PatNotes`` (data event) per channel from ``get_notes()``
|
||||
|
||||
Returns raw bytes for this pattern.
|
||||
"""
|
||||
buf = bytearray()
|
||||
|
||||
# 1. PatNew — word event, 0-based index
|
||||
buf += encode_word_event(EventID.PatNew, pattern.id - 1)
|
||||
|
||||
# 2. PatName — text event (UTF-16-LE + null terminator)
|
||||
if pattern.name:
|
||||
buf += encode_text_event(EventID.PatName, pattern.name)
|
||||
|
||||
# 3. Generate notes via rhythm.py dispatcher
|
||||
notes_by_channel = get_notes(
|
||||
pattern.generator,
|
||||
pattern.bars,
|
||||
pattern.velocity_mult,
|
||||
pattern.density,
|
||||
)
|
||||
|
||||
# 4. Encode notes for each channel
|
||||
for ch_idx, raw_notes in notes_by_channel.items():
|
||||
converted = _convert_rhythm_notes(raw_notes)
|
||||
buf += encode_data_event(
|
||||
EventID.PatNotes,
|
||||
encode_notes_block(ch_idx, converted, ppq),
|
||||
)
|
||||
|
||||
return bytes(buf)
|
||||
|
||||
def _build_all_patterns(self, song: SongDefinition) -> bytes:
|
||||
"""Build bytes for all patterns in *song.patterns*."""
|
||||
buf = bytearray()
|
||||
for pattern in song.patterns:
|
||||
buf += self._build_pattern_bytes(pattern, song.meta.ppq)
|
||||
return bytes(buf)
|
||||
|
||||
def _build_melodic_pattern(
|
||||
self, mt: MelodicTrack, pattern_id: int, ppq: int
|
||||
) -> bytes:
|
||||
"""Build FLP events for one melodic track pattern.
|
||||
|
||||
Sequence:
|
||||
1. ``PatNew`` (word event) — value = pattern_id - 1 (0-based)
|
||||
2. ``PatName`` (text event) — UTF-16-LE with ``mt.role`` as name
|
||||
3. ``PatNotes`` (data event) with notes for the melodic channel
|
||||
|
||||
Returns raw bytes for this melodic pattern.
|
||||
"""
|
||||
buf = bytearray()
|
||||
|
||||
# 1. PatNew — word event, 0-based index
|
||||
buf += encode_word_event(EventID.PatNew, pattern_id - 1)
|
||||
|
||||
# 2. PatName — text event (UTF-16-LE + null terminator)
|
||||
if mt.role:
|
||||
buf += encode_text_event(EventID.PatName, mt.role)
|
||||
|
||||
# 3. Convert MelodicNotes to events.py format and encode
|
||||
converted = _convert_melodic_notes(mt.notes)
|
||||
buf += encode_data_event(
|
||||
EventID.PatNotes,
|
||||
encode_notes_block(mt.channel_index, converted, ppq),
|
||||
)
|
||||
|
||||
return bytes(buf)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Arrangement
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _build_arrangement(
|
||||
self, song: SongDefinition, track_data_template: bytes
|
||||
) -> bytes:
|
||||
"""Convert *song.items* to arrangement section bytes.
|
||||
|
||||
Each :class:`ArrangementItemDef` (1-based track) is converted to an
|
||||
:class:`ArrangementItem` (0-based track_index) and fed to
|
||||
:func:`build_arrangement_section`.
|
||||
"""
|
||||
items = [
|
||||
ArrangementItem(
|
||||
pattern_id=item.pattern,
|
||||
bar=item.bar,
|
||||
num_bars=item.bars,
|
||||
track_index=item.track - 1, # 1-based -> 0-based
|
||||
muted=item.muted,
|
||||
)
|
||||
for item in song.items
|
||||
]
|
||||
|
||||
# Add melodic track items after drum items
|
||||
if song.melodic_tracks:
|
||||
drum_pattern_count = len(song.patterns)
|
||||
# Determine starting track index (after drum tracks)
|
||||
max_drum_track = max((item.track for item in song.items), default=1)
|
||||
for i, mt in enumerate(song.melodic_tracks):
|
||||
pattern_id = drum_pattern_count + i + 1
|
||||
track_index = max_drum_track + i # 0-based, after drum tracks
|
||||
items.append(
|
||||
ArrangementItem(
|
||||
pattern_id=pattern_id,
|
||||
bar=0,
|
||||
num_bars=4, # default 4 bars
|
||||
track_index=track_index,
|
||||
muted=False,
|
||||
)
|
||||
)
|
||||
|
||||
return build_arrangement_section(
|
||||
items,
|
||||
track_data_template,
|
||||
ppq=song.meta.ppq,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Event parsing helpers (minimal, for header scanning)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _read_ev(data: bytes, pos: int) -> tuple:
|
||||
"""Read one FLP event from *data* starting at *pos*.
|
||||
|
||||
Returns ``(next_pos, start, event_id, value, value_type)``.
|
||||
"""
|
||||
start = pos
|
||||
ib = data[pos]
|
||||
pos += 1
|
||||
|
||||
if ib < 64:
|
||||
# Byte event: 1 byte ID + 1 byte value
|
||||
return pos + 1, start, ib, data[start + 1], "byte"
|
||||
elif ib < 128:
|
||||
# Word event: 1 byte ID + 2 byte value
|
||||
return pos + 2, start, ib, struct.unpack("<H", data[pos : pos + 2])[0], "word"
|
||||
elif ib < 192:
|
||||
# Dword event: 1 byte ID + 4 byte value
|
||||
return pos + 4, start, ib, struct.unpack("<I", data[pos : pos + 4])[0], "dword"
|
||||
else:
|
||||
# Data/text event: 1 byte ID + varint size + payload
|
||||
sz = 0
|
||||
sh = 0
|
||||
while True:
|
||||
b = data[pos]
|
||||
pos += 1
|
||||
sz |= (b & 0x7F) << sh
|
||||
sh += 7
|
||||
if not (b & 0x80):
|
||||
break
|
||||
return pos + sz, start, ib, data[pos : pos + sz], "data"
|
||||
|
||||
@classmethod
|
||||
def _find_first_event(cls, data: bytes, event_id: int) -> int | None:
|
||||
"""Find the byte offset of the first occurrence of *event_id*.
|
||||
|
||||
Starts scanning at offset 22 (past FLhd + FLdt chunk headers).
|
||||
Returns ``None`` if the event is not found.
|
||||
"""
|
||||
pos = 22
|
||||
while pos < len(data):
|
||||
np, start, ib, _val, _vt = cls._read_ev(data, pos)
|
||||
if ib == event_id:
|
||||
return start
|
||||
pos = np
|
||||
return None
|
||||
@@ -1,225 +0,0 @@
|
||||
import struct
|
||||
from enum import IntEnum
|
||||
|
||||
|
||||
class EventID(IntEnum):
|
||||
WORD = 64
|
||||
DWORD = 128
|
||||
TEXT = 192
|
||||
DATA = 208
|
||||
|
||||
LoopActive = 9
|
||||
ShowInfo = 10
|
||||
Volume = 12
|
||||
PanLaw = 23
|
||||
Licensed = 28
|
||||
TempoCoarse = 66
|
||||
Pitch = 80
|
||||
TempoFine = 93
|
||||
CurGroupId = 146
|
||||
Tempo = 156
|
||||
FLBuild = 159
|
||||
Title = 194
|
||||
Comments = 195
|
||||
Url = 197
|
||||
RTFComments = 198
|
||||
FLVersion = 199
|
||||
Licensee = 200
|
||||
DataPath = 202
|
||||
Genre = 206
|
||||
Artists = 207
|
||||
Timestamp = 237
|
||||
|
||||
ChIsEnabled = 0
|
||||
ChVolByte = 2
|
||||
ChPanByte = 3
|
||||
ChZipped = 15
|
||||
ChType = 21
|
||||
ChRoutedTo = 22
|
||||
ChIsLocked = 32
|
||||
ChNew = 64
|
||||
ChFreqTilt = 69
|
||||
ChFXFlags = 70
|
||||
ChCutoff = 71
|
||||
ChVolWord = 72
|
||||
ChPanWord = 73
|
||||
ChPreamp = 74
|
||||
ChFadeOut = 75
|
||||
ChFadeIn = 76
|
||||
ChResonance = 83
|
||||
ChStereoDelay = 85
|
||||
ChPogo = 86
|
||||
ChTimeShift = 89
|
||||
ChChildren = 94
|
||||
ChSwing = 97
|
||||
ChRingMod = 131
|
||||
ChCutGroup = 132
|
||||
ChRootNote = 135
|
||||
ChDelayModXY = 138
|
||||
ChReverb = 139
|
||||
ChStretchTime = 140
|
||||
ChFineTune = 142
|
||||
ChSamplerFlags = 143
|
||||
ChLayerFlags = 144
|
||||
ChGroupNum = 145
|
||||
ChAUSampleRate = 153
|
||||
ChName = 192
|
||||
ChSamplePath = 196
|
||||
ChDelay = 209
|
||||
ChParameters = 215
|
||||
ChEnvelopeLFO = 218
|
||||
ChLevels = 219
|
||||
ChPolyphony = 221
|
||||
ChTracking = 228
|
||||
ChLevelAdjusts = 229
|
||||
ChAutomation = 234
|
||||
|
||||
PatLooped = 26
|
||||
PatNew = 65
|
||||
PatColor = 150
|
||||
PatName = 193
|
||||
PatChannelIID = 160
|
||||
PatLength = 164
|
||||
PatControllers = 223
|
||||
PatNotes = 224
|
||||
|
||||
PluginColor = 128
|
||||
PluginIcon = 155
|
||||
PluginInternalName = 201
|
||||
PluginName = 203
|
||||
PluginWrapper = 212
|
||||
PluginData = 213
|
||||
|
||||
MixerAPDC = 29
|
||||
MixerParams = 225
|
||||
|
||||
|
||||
def encode_varint(value: int) -> bytes:
|
||||
result = bytearray()
|
||||
while True:
|
||||
byte = value & 0x7F
|
||||
value >>= 7
|
||||
if value:
|
||||
byte |= 0x80
|
||||
result.append(byte)
|
||||
if not value:
|
||||
break
|
||||
return bytes(result)
|
||||
|
||||
|
||||
def encode_text(text: str, utf16: bool = True) -> bytes:
|
||||
if utf16:
|
||||
return text.encode("utf-16-le") + b"\x00\x00"
|
||||
return text.encode("ascii") + b"\x00"
|
||||
|
||||
|
||||
def encode_byte_event(id_: int, value: int) -> bytes:
|
||||
return bytes([id_, value & 0xFF])
|
||||
|
||||
|
||||
def encode_word_event(id_: int, value: int) -> bytes:
|
||||
return bytes([id_]) + struct.pack("<H", value)
|
||||
|
||||
|
||||
def encode_dword_event(id_: int, value: int) -> bytes:
|
||||
return bytes([id_]) + struct.pack("<I", value)
|
||||
|
||||
|
||||
def encode_text_event(id_: int, text: str) -> bytes:
|
||||
data = encode_text(text)
|
||||
return bytes([id_]) + encode_varint(len(data)) + data
|
||||
|
||||
|
||||
def encode_data_event(id_: int, data: bytes) -> bytes:
|
||||
return bytes([id_]) + encode_varint(len(data)) + data
|
||||
|
||||
|
||||
def encode_note_24(
|
||||
position: int,
|
||||
flags: int,
|
||||
rack_channel: int,
|
||||
length: int,
|
||||
key: int,
|
||||
group: int,
|
||||
fine_pitch: int,
|
||||
release: int,
|
||||
midi_channel: int,
|
||||
pan: int,
|
||||
velocity: int,
|
||||
mod_x: int,
|
||||
mod_y: int,
|
||||
) -> bytes:
|
||||
"""Encode a single note in FL Studio's 24-byte format.
|
||||
|
||||
Format (24 bytes, all absolute values):
|
||||
position: uint32 (4) - absolute position in PPQ ticks
|
||||
flags: uint16 (2) - note flags (0x4000 = standard note)
|
||||
rack_channel: uint16 (2) - channel rack index
|
||||
length: uint32 (4) - duration in PPQ ticks
|
||||
key: uint16 (2) - MIDI note number (0-127)
|
||||
group: uint16 (2) - note group
|
||||
fine_pitch: uint8 (1) - fine pitch (0x78 = 120 = no detune)
|
||||
_u1: uint8 (1) - unknown (0x40)
|
||||
release: uint8 (1) - release value
|
||||
midi_channel: uint8 (1) - MIDI channel
|
||||
pan: int8 (1) - stereo pan (64 = center)
|
||||
velocity: uint8 (1) - note velocity
|
||||
mod_x: uint8 (1) - modulation X (128 = center)
|
||||
mod_y: uint8 (1) - modulation Y (128 = center)
|
||||
"""
|
||||
return struct.pack(
|
||||
"<IHHIHHBBBBBBBB",
|
||||
position,
|
||||
flags,
|
||||
rack_channel,
|
||||
length,
|
||||
key,
|
||||
group,
|
||||
fine_pitch,
|
||||
0x40, # unknown byte, always 0x40 in observed data
|
||||
release,
|
||||
midi_channel,
|
||||
pan,
|
||||
velocity,
|
||||
mod_x,
|
||||
mod_y,
|
||||
)
|
||||
|
||||
|
||||
def encode_notes_block(
|
||||
channel_index: int,
|
||||
notes: list[dict],
|
||||
ppq: int = 96,
|
||||
) -> bytes:
|
||||
"""Encode all notes for a pattern as raw note data (no header).
|
||||
|
||||
FL Studio stores notes as a flat array of 24-byte structs.
|
||||
No header or count prefix needed - the event size determines count.
|
||||
"""
|
||||
note_data = bytearray()
|
||||
|
||||
for note in notes:
|
||||
pos = int(note.get("position", 0) * ppq)
|
||||
length = int(note.get("length", 1) * ppq)
|
||||
key = note.get("key", 60)
|
||||
velocity = note.get("velocity", 100)
|
||||
rack_channel = note.get("rack_channel", channel_index)
|
||||
|
||||
note_bytes = encode_note_24(
|
||||
position=pos,
|
||||
flags=0x4000,
|
||||
rack_channel=rack_channel,
|
||||
length=max(length, 1),
|
||||
key=key & 0x7F,
|
||||
group=0,
|
||||
fine_pitch=120,
|
||||
release=64,
|
||||
midi_channel=0,
|
||||
pan=64,
|
||||
velocity=velocity & 0x7F,
|
||||
mod_x=128,
|
||||
mod_y=128,
|
||||
)
|
||||
note_data.extend(note_bytes)
|
||||
|
||||
return bytes(note_data)
|
||||
@@ -1,134 +0,0 @@
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class Note:
|
||||
position: float
|
||||
length: float
|
||||
key: int
|
||||
velocity: int = 100
|
||||
fine_pitch: int = 0
|
||||
pan: int = 0
|
||||
midi_channel: int = 0
|
||||
slide: bool = False
|
||||
release: int = 0
|
||||
mod_x: int = 0
|
||||
mod_y: int = 0
|
||||
group: int = 0
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"position": self.position,
|
||||
"length": self.length,
|
||||
"key": self.key,
|
||||
"velocity": self.velocity,
|
||||
"fine_pitch": self.fine_pitch,
|
||||
"pan": self.pan,
|
||||
"midi_channel": self.midi_channel,
|
||||
"slide": self.slide,
|
||||
"release": self.release,
|
||||
"mod_x": self.mod_x,
|
||||
"mod_y": self.mod_y,
|
||||
"group": self.group,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class Pattern:
|
||||
name: str = ""
|
||||
index: int = 0
|
||||
notes: dict[int, list[Note]] = field(default_factory=dict)
|
||||
color: int = 0
|
||||
length: int = 0
|
||||
|
||||
def add_note(self, channel_index: int, note: Note):
|
||||
if channel_index not in self.notes:
|
||||
self.notes[channel_index] = []
|
||||
self.notes[channel_index].append(note)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Plugin:
|
||||
internal_name: str = ""
|
||||
display_name: str = ""
|
||||
plugin_data: Optional[bytes] = None
|
||||
color: int = 0
|
||||
icon: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class Channel:
|
||||
name: str = ""
|
||||
index: int = 0
|
||||
enabled: bool = True
|
||||
volume: int = 256
|
||||
pan: int = 0
|
||||
plugin: Optional[Plugin] = None
|
||||
mixer_track: int = 0
|
||||
color: int = 0
|
||||
root_note: int = 60
|
||||
channel_type: int = 0
|
||||
|
||||
FL_TYPE_GENERATOR = 2
|
||||
FL_TYPE_SAMPLER = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class MixerTrack:
|
||||
name: str = ""
|
||||
index: int = 0
|
||||
volume: float = 1.0
|
||||
pan: float = 0.0
|
||||
muted: bool = False
|
||||
effects: list[Plugin] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class FLPProject:
|
||||
tempo: float = 140.0
|
||||
title: str = ""
|
||||
genre: str = ""
|
||||
artists: str = ""
|
||||
comments: str = ""
|
||||
fl_version: str = "24.7.1.73"
|
||||
ppq: int = 96
|
||||
channels: list[Channel] = field(default_factory=list)
|
||||
patterns: list[Pattern] = field(default_factory=list)
|
||||
mixer_tracks: list[MixerTrack] = field(default_factory=list)
|
||||
|
||||
def add_channel(
|
||||
self,
|
||||
name: str,
|
||||
plugin_internal_name: str = "",
|
||||
plugin_display_name: str = "",
|
||||
plugin_data: Optional[bytes] = None,
|
||||
mixer_track: int = -1,
|
||||
channel_type: int = 2,
|
||||
volume: int = 256,
|
||||
) -> Channel:
|
||||
idx = len(self.channels)
|
||||
plugin = None
|
||||
if plugin_internal_name:
|
||||
plugin = Plugin(
|
||||
internal_name=plugin_internal_name,
|
||||
display_name=plugin_display_name or plugin_internal_name,
|
||||
plugin_data=plugin_data,
|
||||
)
|
||||
ch = Channel(
|
||||
name=name,
|
||||
index=idx,
|
||||
plugin=plugin,
|
||||
mixer_track=mixer_track if mixer_track >= 0 else idx,
|
||||
channel_type=channel_type,
|
||||
volume=volume,
|
||||
)
|
||||
self.channels.append(ch)
|
||||
return ch
|
||||
|
||||
def add_pattern(self, name: str = "") -> Pattern:
|
||||
idx = len(self.patterns) + 1
|
||||
pat = Pattern(name=name, index=idx)
|
||||
self.patterns.append(pat)
|
||||
return pat
|
||||
@@ -1,395 +0,0 @@
|
||||
"""Song definition schema for FL Studio FLP generation.
|
||||
|
||||
Provides the JSON contract that decouples song composition from FLP rendering.
|
||||
A SongDefinition is the single source of truth for one ``.flp`` file.
|
||||
|
||||
Usage::
|
||||
|
||||
song = SongDefinition.load_file("knowledge/songs/reggaeton_template.json")
|
||||
errors = song.validate()
|
||||
json_str = song.to_json()
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Key validation pattern: A-G, optional flat/sharp, optional minor 'm'
|
||||
# ---------------------------------------------------------------------------
|
||||
_KEY_RE = re.compile(r"^[A-G][b#]?m?$")
|
||||
|
||||
# Allowed top-level keys in the JSON document
|
||||
_TOP_LEVEL_KEYS = frozenset({
|
||||
"meta", "samples", "patterns", "tracks", "items",
|
||||
"melodic_tracks", "progression_name", "section_template",
|
||||
})
|
||||
|
||||
# Allowed keys in nested objects
|
||||
_META_KEYS = frozenset({
|
||||
"bpm", "key", "title", "ppq", "time_sig_num", "time_sig_den",
|
||||
})
|
||||
_PATTERN_KEYS = frozenset({
|
||||
"id", "name", "instrument", "channel", "bars", "generator",
|
||||
"velocity_mult", "density",
|
||||
})
|
||||
_TRACK_KEYS = frozenset({"index", "name"})
|
||||
_ITEM_KEYS = frozenset({"pattern", "bar", "bars", "track", "muted"})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dataclasses
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class SongMeta:
|
||||
"""Song metadata — tempo, key, time signature."""
|
||||
|
||||
bpm: float # 20–999
|
||||
key: str # e.g. "Am", "Dm", "Gm"
|
||||
title: str # song title
|
||||
ppq: int = 96 # ticks per quarter note
|
||||
time_sig_num: int = 4
|
||||
time_sig_den: int = 4
|
||||
|
||||
|
||||
@dataclass
|
||||
class PatternNote:
|
||||
"""A single note within a pattern (used when embedding notes directly)."""
|
||||
|
||||
pos: float # beat position (0.0 = beat 1 of bar)
|
||||
len: float # duration in beats
|
||||
key: int # MIDI note (60 = C4)
|
||||
vel: int # velocity 0–127
|
||||
|
||||
|
||||
@dataclass
|
||||
class PatternDef:
|
||||
"""Pattern definition — recipe for generating note data.
|
||||
|
||||
The ``generator`` field names a function in ``composer/rhythm.py``
|
||||
that produces the actual MIDI notes for this pattern.
|
||||
"""
|
||||
|
||||
id: int # pattern number (1-based)
|
||||
name: str # human label
|
||||
instrument: str # "kick", "snare", "hihat", etc.
|
||||
channel: int # channel rack index (10–16)
|
||||
bars: int # pattern length in bars
|
||||
generator: str # rhythm.py function name
|
||||
velocity_mult: float = 1.0 # scales all velocities
|
||||
density: float = 1.0 # 0.5=sparse, 1.0=full
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArrangementTrack:
|
||||
"""A track row in the FL Studio playlist / arrangement."""
|
||||
|
||||
index: int # 1-based track index in arrangement
|
||||
name: str # display name
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArrangementItemDef:
|
||||
"""Placement of a pattern on the arrangement timeline."""
|
||||
|
||||
pattern: int # pattern id
|
||||
bar: float # start bar (0-based)
|
||||
bars: float # duration in bars
|
||||
track: int # track index (1-based, must exist in tracks[])
|
||||
muted: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class MelodicNote:
|
||||
"""A single note in a melodic track. Unified format: {pos, len, key, vel}."""
|
||||
|
||||
pos: float # beat position (0.0 = beat 1 of bar)
|
||||
len: float # duration in beats
|
||||
key: int # MIDI note (60 = C4)
|
||||
vel: int # velocity 0–127
|
||||
|
||||
|
||||
@dataclass
|
||||
class MelodicTrack:
|
||||
"""A melodic track referencing an audio sample with MIDI note triggers.
|
||||
|
||||
The sample is loaded into a sampler channel and notes trigger playback.
|
||||
"""
|
||||
|
||||
role: str # "bass", "lead", "pad", "pluck", etc.
|
||||
sample_path: str # absolute path to .wav file
|
||||
notes: list[MelodicNote] # note events
|
||||
channel_index: int # FL Studio channel (17+ for melodic)
|
||||
volume: float = 0.85 # 0.0–1.0
|
||||
pan: float = 0.0 # -1.0 to 1.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class SongDefinition:
|
||||
"""Complete song definition — the single source of truth for one .flp.
|
||||
|
||||
Serialization round-trips through ``to_json()`` / ``from_json()``.
|
||||
Use ``validate()`` to check constraints before rendering.
|
||||
"""
|
||||
|
||||
meta: SongMeta
|
||||
samples: dict[str, str] # {"kick": "kick.wav", ...}
|
||||
patterns: list[PatternDef]
|
||||
tracks: list[ArrangementTrack]
|
||||
items: list[ArrangementItemDef]
|
||||
melodic_tracks: list[MelodicTrack] = field(default_factory=list)
|
||||
|
||||
# Optional metadata for variation engine
|
||||
progression_name: str = ""
|
||||
section_template: str = "standard"
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Validation
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def validate(self) -> list[str]:
|
||||
"""Return list of validation errors (empty list = valid).
|
||||
|
||||
Checks:
|
||||
1. meta.bpm in 20–999
|
||||
2. meta.key matches ``^[A-G][b#]?m?$``
|
||||
3. meta.ppq == 96
|
||||
4. All pattern ``id`` values are unique
|
||||
5. All ``item.pattern`` reference an existing pattern id
|
||||
6. All ``item.track`` reference an existing track index
|
||||
"""
|
||||
errors: list[str] = []
|
||||
|
||||
# 1. BPM range
|
||||
if not (20 <= self.meta.bpm <= 999):
|
||||
errors.append(
|
||||
f"meta.bpm must be 20–999, got {self.meta.bpm}"
|
||||
)
|
||||
|
||||
# 2. Key format
|
||||
if not _KEY_RE.match(self.meta.key):
|
||||
errors.append(
|
||||
f"meta.key must match ^[A-G][b#]?m?$, got '{self.meta.key}'"
|
||||
)
|
||||
|
||||
# 3. PPQ
|
||||
if self.meta.ppq != 96:
|
||||
errors.append(
|
||||
f"meta.ppq must be 96, got {self.meta.ppq}"
|
||||
)
|
||||
|
||||
# 4. Unique pattern ids
|
||||
pattern_ids = [p.id for p in self.patterns]
|
||||
seen: set[int] = set()
|
||||
for pid in pattern_ids:
|
||||
if pid in seen:
|
||||
errors.append(f"Duplicate pattern id: {pid}")
|
||||
seen.add(pid)
|
||||
|
||||
valid_pattern_ids = set(pattern_ids)
|
||||
|
||||
# 5. All items reference valid pattern id
|
||||
for i, item in enumerate(self.items):
|
||||
if item.pattern not in valid_pattern_ids:
|
||||
errors.append(
|
||||
f"items[{i}].pattern={item.pattern} does not reference "
|
||||
f"an existing pattern id"
|
||||
)
|
||||
|
||||
# 6. All items reference valid track index
|
||||
valid_track_indices = {t.index for t in self.tracks}
|
||||
for i, item in enumerate(self.items):
|
||||
if item.track not in valid_track_indices:
|
||||
errors.append(
|
||||
f"items[{i}].track={item.track} does not reference "
|
||||
f"an existing track index"
|
||||
)
|
||||
|
||||
return errors
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Serialization
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def to_json(self, indent: int = 2) -> str:
|
||||
"""Serialize to a JSON string."""
|
||||
return json.dumps(asdict(self), indent=indent, ensure_ascii=False)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, data: str | dict) -> SongDefinition:
|
||||
"""Deserialize from a JSON string or dict.
|
||||
|
||||
Raises:
|
||||
ValueError: On unknown keys, missing fields, or validation errors.
|
||||
"""
|
||||
if isinstance(data, str):
|
||||
raw = json.loads(data)
|
||||
else:
|
||||
raw = data
|
||||
|
||||
if not isinstance(raw, dict):
|
||||
raise ValueError(f"Expected dict, got {type(raw).__name__}")
|
||||
|
||||
# Reject unknown top-level keys
|
||||
unknown = set(raw.keys()) - _TOP_LEVEL_KEYS
|
||||
if unknown:
|
||||
raise ValueError(f"Unknown top-level keys: {sorted(unknown)}")
|
||||
|
||||
# --- meta ---
|
||||
meta_raw = raw.get("meta")
|
||||
if not isinstance(meta_raw, dict):
|
||||
raise ValueError("Missing or invalid 'meta' object")
|
||||
|
||||
unknown_meta = set(meta_raw.keys()) - _META_KEYS
|
||||
if unknown_meta:
|
||||
raise ValueError(f"Unknown meta keys: {sorted(unknown_meta)}")
|
||||
|
||||
try:
|
||||
meta = SongMeta(
|
||||
bpm=float(meta_raw["bpm"]),
|
||||
key=str(meta_raw["key"]),
|
||||
title=str(meta_raw.get("title", "")),
|
||||
ppq=int(meta_raw.get("ppq", 96)),
|
||||
time_sig_num=int(meta_raw.get("time_sig_num", 4)),
|
||||
time_sig_den=int(meta_raw.get("time_sig_den", 4)),
|
||||
)
|
||||
except KeyError as exc:
|
||||
raise ValueError(f"Missing required meta field: {exc}") from exc
|
||||
|
||||
# --- samples ---
|
||||
samples = raw.get("samples")
|
||||
if not isinstance(samples, dict):
|
||||
raise ValueError("Missing or invalid 'samples' dict")
|
||||
|
||||
# --- patterns ---
|
||||
patterns_raw = raw.get("patterns")
|
||||
if not isinstance(patterns_raw, list):
|
||||
raise ValueError("Missing or invalid 'patterns' list")
|
||||
|
||||
patterns: list[PatternDef] = []
|
||||
for idx, p in enumerate(patterns_raw):
|
||||
if not isinstance(p, dict):
|
||||
raise ValueError(f"patterns[{idx}] must be a dict")
|
||||
unknown_p = set(p.keys()) - _PATTERN_KEYS
|
||||
if unknown_p:
|
||||
raise ValueError(
|
||||
f"patterns[{idx}] unknown keys: {sorted(unknown_p)}"
|
||||
)
|
||||
try:
|
||||
patterns.append(PatternDef(
|
||||
id=int(p["id"]),
|
||||
name=str(p["name"]),
|
||||
instrument=str(p["instrument"]),
|
||||
channel=int(p["channel"]),
|
||||
bars=int(p["bars"]),
|
||||
generator=str(p["generator"]),
|
||||
velocity_mult=float(p.get("velocity_mult", 1.0)),
|
||||
density=float(p.get("density", 1.0)),
|
||||
))
|
||||
except KeyError as exc:
|
||||
raise ValueError(
|
||||
f"patterns[{idx}] missing required field: {exc}"
|
||||
) from exc
|
||||
|
||||
# --- tracks ---
|
||||
tracks_raw = raw.get("tracks")
|
||||
if not isinstance(tracks_raw, list):
|
||||
raise ValueError("Missing or invalid 'tracks' list")
|
||||
|
||||
tracks: list[ArrangementTrack] = []
|
||||
for idx, t in enumerate(tracks_raw):
|
||||
if not isinstance(t, dict):
|
||||
raise ValueError(f"tracks[{idx}] must be a dict")
|
||||
unknown_t = set(t.keys()) - _TRACK_KEYS
|
||||
if unknown_t:
|
||||
raise ValueError(
|
||||
f"tracks[{idx}] unknown keys: {sorted(unknown_t)}"
|
||||
)
|
||||
try:
|
||||
tracks.append(ArrangementTrack(
|
||||
index=int(t["index"]),
|
||||
name=str(t["name"]),
|
||||
))
|
||||
except KeyError as exc:
|
||||
raise ValueError(
|
||||
f"tracks[{idx}] missing required field: {exc}"
|
||||
) from exc
|
||||
|
||||
# --- items ---
|
||||
items_raw = raw.get("items")
|
||||
if not isinstance(items_raw, list):
|
||||
raise ValueError("Missing or invalid 'items' list")
|
||||
|
||||
items: list[ArrangementItemDef] = []
|
||||
for idx, it in enumerate(items_raw):
|
||||
if not isinstance(it, dict):
|
||||
raise ValueError(f"items[{idx}] must be a dict")
|
||||
unknown_it = set(it.keys()) - _ITEM_KEYS
|
||||
if unknown_it:
|
||||
raise ValueError(
|
||||
f"items[{idx}] unknown keys: {sorted(unknown_it)}"
|
||||
)
|
||||
try:
|
||||
items.append(ArrangementItemDef(
|
||||
pattern=int(it["pattern"]),
|
||||
bar=float(it["bar"]),
|
||||
bars=float(it["bars"]),
|
||||
track=int(it["track"]),
|
||||
muted=bool(it.get("muted", False)),
|
||||
))
|
||||
except KeyError as exc:
|
||||
raise ValueError(
|
||||
f"items[{idx}] missing required field: {exc}"
|
||||
) from exc
|
||||
|
||||
song = cls(
|
||||
meta=meta,
|
||||
samples=samples,
|
||||
patterns=patterns,
|
||||
tracks=tracks,
|
||||
items=items,
|
||||
progression_name=str(raw.get("progression_name", "")),
|
||||
section_template=str(raw.get("section_template", "standard")),
|
||||
)
|
||||
|
||||
# Validate and raise on errors
|
||||
errors = song.validate()
|
||||
if errors:
|
||||
raise ValueError(
|
||||
"Song validation failed:\n - " + "\n - ".join(errors)
|
||||
)
|
||||
|
||||
return song
|
||||
|
||||
@classmethod
|
||||
def load_file(cls, path: str | Path) -> SongDefinition:
|
||||
"""Load and validate from a ``.json`` file.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the file does not exist.
|
||||
ValueError: If validation fails.
|
||||
"""
|
||||
p = Path(path)
|
||||
if not p.exists():
|
||||
raise FileNotFoundError(f"Song file not found: {p}")
|
||||
return cls.from_json(p.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Convenience
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_song_json(path: str | Path) -> SongDefinition:
|
||||
"""Load + validate a song definition from a JSON file.
|
||||
|
||||
Raises:
|
||||
ValueError: If validation fails.
|
||||
FileNotFoundError: If file does not exist.
|
||||
"""
|
||||
return SongDefinition.load_file(path)
|
||||
@@ -1,382 +0,0 @@
|
||||
"""Channel skeleton loader — extracts sampler channels from reference FLP and patches sample paths."""
|
||||
|
||||
import os
|
||||
import struct
|
||||
from pathlib import Path
|
||||
|
||||
# Default channel→sample mapping (index: sample_key)
|
||||
# Only Ch10-19 are sampler channels in the reference FLP
|
||||
DEFAULT_CHANNEL_MAP = {
|
||||
10: "channel10",
|
||||
11: "channel11",
|
||||
12: "channel12",
|
||||
13: "channel13",
|
||||
14: "channel14",
|
||||
15: "channel15",
|
||||
16: "channel16",
|
||||
17: "channel17",
|
||||
18: "channel18",
|
||||
19: "channel19",
|
||||
}
|
||||
|
||||
# Channels to replace with empty sampler (non-drum channels from original)
|
||||
EMPTY_SAMPLER_CHANNELS = {3, 4, 8, 17, 18, 19}
|
||||
|
||||
|
||||
class ChannelSkeletonLoader:
|
||||
"""Loads sampler channel configuration from a reference FLP binary.
|
||||
|
||||
Usage:
|
||||
loader = ChannelSkeletonLoader(ref_flp_path, ch11_template_path, samples_dir)
|
||||
channel_bytes = loader.load(sample_map={"kick": "kick.wav", ...})
|
||||
"""
|
||||
|
||||
def __init__(self, ref_flp_path: str, ch11_template_path: str, samples_dir: str):
|
||||
self.ref_flp_path = ref_flp_path
|
||||
self.ch11_template_path = ch11_template_path
|
||||
self.samples_dir = samples_dir
|
||||
self._cache: bytes | None = None
|
||||
self._ch11_template: bytes | None = None
|
||||
|
||||
def load(
|
||||
self,
|
||||
sample_map: dict[str, str] | None = None,
|
||||
melodic_map: dict[int, tuple[str, str]] | None = None,
|
||||
) -> bytes:
|
||||
"""Return assembled channel bytes with sample paths patched.
|
||||
|
||||
sample_map: {"kick": "kick.wav", "snare": "snare.wav", ...}
|
||||
Keys must match DEFAULT_CHANNEL_MAP values.
|
||||
If None, uses DEFAULT_CHANNEL_MAP with filenames as "<key>.wav"
|
||||
melodic_map: {ch_idx: (samples_dir, wav_name), ...}
|
||||
Maps melodic channel indices to their sample file.
|
||||
These channels get sampler clones with real samples instead of empty.
|
||||
Returns raw bytes for all channels (stripped of post-channel data).
|
||||
Caches result — calling load() multiple times returns same bytes.
|
||||
"""
|
||||
if self._cache is not None:
|
||||
return self._cache
|
||||
|
||||
# Resolve sample_map: map channel_index → wav filename
|
||||
if sample_map is None:
|
||||
ch_to_wav = {ch: f"{key}.wav" for ch, key in DEFAULT_CHANNEL_MAP.items()}
|
||||
else:
|
||||
ch_to_wav = {ch: sample_map[key] for ch, key in DEFAULT_CHANNEL_MAP.items() if key in sample_map}
|
||||
|
||||
melodic_channels = set(melodic_map.keys()) if melodic_map else set()
|
||||
|
||||
extracted = self._extract_channels()
|
||||
order = extracted["order"]
|
||||
segments: dict[int, bytearray] = extracted["segments"]
|
||||
|
||||
# Replace channels not in drum/melodic maps with empty sampler clones
|
||||
channels_with_samples = set(ch_to_wav.keys()) | melodic_channels
|
||||
for ch_idx in list(segments.keys()):
|
||||
if ch_idx not in channels_with_samples:
|
||||
segments[ch_idx] = bytearray(self._make_empty_sampler(ch_idx))
|
||||
|
||||
# For melodic channels: clone ch11 template and patch with real sample path
|
||||
if melodic_map:
|
||||
for ch_idx, (sample_dir, wav_name) in melodic_map.items():
|
||||
if ch_idx in segments:
|
||||
segments[ch_idx] = bytearray(
|
||||
self._make_sampler_with_sample(ch_idx, sample_dir, wav_name)
|
||||
)
|
||||
|
||||
# Patch sample paths for drum channels (skip melodic — already patched)
|
||||
for ch_idx, wav_name in ch_to_wav.items():
|
||||
if ch_idx in segments and ch_idx not in melodic_channels:
|
||||
segments[ch_idx] = bytearray(self._patch_sample_path(bytes(segments[ch_idx]), wav_name))
|
||||
|
||||
# Assemble in original order
|
||||
buf = bytearray()
|
||||
for ch_idx in order:
|
||||
buf += segments[ch_idx]
|
||||
|
||||
self._cache = bytes(buf)
|
||||
return self._cache
|
||||
|
||||
# ── Event parsing ──────────────────────────────────────────────────────────
|
||||
|
||||
def _read_ev(self, data: bytes, pos: int) -> tuple:
|
||||
"""Read one FLP event. Returns (next_pos, start, event_id, value, value_type)."""
|
||||
start = pos
|
||||
ib = data[pos]
|
||||
pos += 1
|
||||
|
||||
if ib < 64:
|
||||
# Byte event: 1 byte ID + 1 byte value
|
||||
return pos + 1, start, ib, data[start + 1], "byte"
|
||||
elif ib < 128:
|
||||
# Word event: 1 byte ID + 2 byte value
|
||||
return pos + 2, start, ib, struct.unpack("<H", data[pos : pos + 2])[0], "word"
|
||||
elif ib < 192:
|
||||
# Dword event: 1 byte ID + 4 byte value
|
||||
return pos + 4, start, ib, struct.unpack("<I", data[pos : pos + 4])[0], "dword"
|
||||
else:
|
||||
# Data/TEXT event: 1 byte ID + varint size + payload
|
||||
sz = 0
|
||||
sh = 0
|
||||
while True:
|
||||
b = data[pos]
|
||||
pos += 1
|
||||
sz |= (b & 0x7F) << sh
|
||||
sh += 7
|
||||
if not (b & 0x80):
|
||||
break
|
||||
return pos + sz, start, ib, data[pos : pos + sz], "data"
|
||||
|
||||
def _encode_varint(self, n: int) -> bytes:
|
||||
"""Encode an integer as a varint (LEB128)."""
|
||||
r = bytearray()
|
||||
while True:
|
||||
b = n & 0x7F
|
||||
n >>= 7
|
||||
if n:
|
||||
b |= 0x80
|
||||
r.append(b)
|
||||
if not n:
|
||||
break
|
||||
return bytes(r)
|
||||
|
||||
# ── Channel extraction ─────────────────────────────────────────────────────
|
||||
|
||||
def _extract_channels(self) -> dict:
|
||||
"""Parse reference FLP, extract channel segments, find post-channel boundary.
|
||||
|
||||
Returns:
|
||||
{
|
||||
'order': [ch_idx, ...], # channels in original order
|
||||
'segments': {idx: bytes}, # raw bytes per channel
|
||||
'last_ch': idx, # index of last channel
|
||||
}
|
||||
"""
|
||||
with open(self.ref_flp_path, "rb") as f:
|
||||
data = f.read()
|
||||
|
||||
# Skip FLhd header (6 bytes) + FLdt chunk header (8 bytes) = 14 bytes,
|
||||
# then the FLhd body. v15 starts scanning at offset 22.
|
||||
pos = 22
|
||||
first_ch = None
|
||||
current_ch = -1
|
||||
ch_ranges: dict[int, list[int]] = {}
|
||||
channels_order: list[int] = []
|
||||
|
||||
# Import here to avoid circular — events is a leaf module
|
||||
from src.flp_builder.events import EventID
|
||||
|
||||
while pos < len(data):
|
||||
np, st, ib, val, vt = self._read_ev(data, pos)
|
||||
if ib == EventID.ChNew:
|
||||
if first_ch is None:
|
||||
first_ch = st
|
||||
if current_ch >= 0:
|
||||
ch_ranges[current_ch] = (ch_ranges[current_ch][0], st)
|
||||
current_ch = val
|
||||
ch_ranges[current_ch] = (st, st)
|
||||
channels_order.append(current_ch)
|
||||
pos = np
|
||||
|
||||
if current_ch >= 0:
|
||||
ch_ranges[current_ch] = (ch_ranges[current_ch][0], len(data))
|
||||
|
||||
if not channels_order:
|
||||
raise ValueError("No channels found in reference FLP")
|
||||
|
||||
# Find post-channel boundary in last channel segment
|
||||
# Scan for ID 99 (ArrNew) — everything from there onward is post-channel
|
||||
last_ch = channels_order[-1]
|
||||
last_seg_start = ch_ranges[last_ch][0]
|
||||
last_seg_data = data[last_seg_start:]
|
||||
p = 0
|
||||
post_ch_offset = len(last_seg_data)
|
||||
while p < len(last_seg_data):
|
||||
np, st, ib, val, vt = self._read_ev(last_seg_data, p)
|
||||
if ib == 99: # ArrNew
|
||||
post_ch_offset = st
|
||||
break
|
||||
p = np
|
||||
|
||||
# Build channel segments, stripping post-channel data from last one
|
||||
segments: dict[int, bytearray] = {}
|
||||
for ch_idx in channels_order:
|
||||
s, e = ch_ranges[ch_idx]
|
||||
if ch_idx == last_ch:
|
||||
segments[ch_idx] = bytearray(data[s : s + post_ch_offset])
|
||||
else:
|
||||
segments[ch_idx] = bytearray(data[s:e])
|
||||
|
||||
return {
|
||||
"order": channels_order,
|
||||
"segments": segments,
|
||||
"last_ch": last_ch,
|
||||
}
|
||||
|
||||
# ── Sampler with real sample ────────────────────────────────────────────────
|
||||
|
||||
# Events to strip when cloning: old sample path, old sample name, cached data
|
||||
STRIP_EVENTS = {0xC4, 0xCB, 0xDA, 0xD7, 0xE4, 0xE5, 0xDD, 0xD1}
|
||||
|
||||
def _make_sampler_with_sample(self, ch_idx: int, samples_dir: str, wav_name: str) -> bytes:
|
||||
"""Clone the FL Studio-created sampler template and patch with real sample.
|
||||
|
||||
Uses output/flstudio_sampler_template.bin which was extracted from a
|
||||
channel that FL Studio itself created (guaranteed correct format).
|
||||
"""
|
||||
template_path = os.path.join(
|
||||
os.path.dirname(self.ref_flp_path), "..", "output", "flstudio_sampler_template.bin"
|
||||
)
|
||||
template_path = os.path.normpath(template_path)
|
||||
if not os.path.isfile(template_path):
|
||||
# Fallback: extract from debug_sampler.flp
|
||||
raise FileNotFoundError(f"Sampler template not found: {template_path}")
|
||||
|
||||
with open(template_path, "rb") as f:
|
||||
source = f.read()
|
||||
|
||||
# Rebuild: keep non-cached events, patch ChNew index
|
||||
seg = bytearray()
|
||||
pos = 0
|
||||
while pos < len(source):
|
||||
np, st, ib, val, vt = self._read_ev(source, pos)
|
||||
if ib in self.STRIP_EVENTS:
|
||||
pass # Remove stale cached data
|
||||
elif ib == 0x40 and vt == "word":
|
||||
seg += struct.pack("<BH", 0x40, ch_idx)
|
||||
else:
|
||||
seg += source[st:np]
|
||||
pos = np
|
||||
|
||||
# Add sample name (0xCB)
|
||||
sample_name = os.path.splitext(wav_name)[0]
|
||||
encoded_name = sample_name.encode("utf-16-le") + b"\x00\x00"
|
||||
seg += bytes([0xCB]) + self._encode_varint(len(encoded_name)) + encoded_name
|
||||
|
||||
# Add sample path (0xC4) — absolute path, no %USERPROFILE%
|
||||
full_path = os.path.join(samples_dir, wav_name)
|
||||
encoded_path = full_path.encode("utf-16-le") + b"\x00\x00"
|
||||
seg += bytes([0xC4]) + self._encode_varint(len(encoded_path)) + encoded_path
|
||||
|
||||
return bytes(seg)
|
||||
|
||||
def _extract_channels_raw(self) -> dict[int, bytes]:
|
||||
"""Extract raw channel segments from reference FLP without caching.
|
||||
Returns {ch_idx: bytes}."""
|
||||
with open(self.ref_flp_path, "rb") as f:
|
||||
data = f.read()
|
||||
|
||||
from src.flp_builder.events import EventID
|
||||
|
||||
pos = 22
|
||||
current_ch = -1
|
||||
ch_ranges: dict[int, tuple[int, int]] = {}
|
||||
channels_order: list[int] = []
|
||||
|
||||
while pos < len(data):
|
||||
np, st, ib, val, vt = self._read_ev(data, pos)
|
||||
if ib == EventID.ChNew:
|
||||
if current_ch >= 0:
|
||||
ch_ranges[current_ch] = (ch_ranges[current_ch][0], st)
|
||||
current_ch = val
|
||||
ch_ranges[current_ch] = (st, st)
|
||||
channels_order.append(current_ch)
|
||||
pos = np
|
||||
|
||||
if current_ch >= 0:
|
||||
ch_ranges[current_ch] = (ch_ranges[current_ch][0], len(data))
|
||||
|
||||
# Strip post-channel data from last channel
|
||||
last_ch = channels_order[-1]
|
||||
last_start = ch_ranges[last_ch][0]
|
||||
last_data = data[last_start:]
|
||||
p = 0
|
||||
post_offset = len(last_data)
|
||||
while p < len(last_data):
|
||||
np, st, ib, val, vt = self._read_ev(last_data, p)
|
||||
if ib == 99:
|
||||
post_offset = st
|
||||
break
|
||||
p = np
|
||||
|
||||
segments: dict[int, bytes] = {}
|
||||
for ch_idx in channels_order:
|
||||
s, e = ch_ranges[ch_idx]
|
||||
if ch_idx == last_ch:
|
||||
segments[ch_idx] = data[s:s + post_offset]
|
||||
else:
|
||||
segments[ch_idx] = data[s:e]
|
||||
|
||||
return segments
|
||||
|
||||
def _patch_chnew_index(self, seg: bytearray, new_idx: int):
|
||||
"""Find and patch the ChNew word event to a new channel index."""
|
||||
pos = 0
|
||||
while pos < len(seg):
|
||||
np, st, ib, val, vt = self._read_ev(bytes(seg), pos)
|
||||
if ib == 64 and vt == "word": # ChNew
|
||||
struct.pack_into("<H", seg, st + 1, new_idx)
|
||||
return
|
||||
pos = np
|
||||
|
||||
# ── Empty sampler ──────────────────────────────────────────────────────────
|
||||
|
||||
def _make_empty_sampler(self, ch_idx: int) -> bytes:
|
||||
"""Create a minimal empty sampler channel with no sample loaded."""
|
||||
extracted = self._extract_channels_raw()
|
||||
source_idx = 10
|
||||
if source_idx not in extracted:
|
||||
for alt in [11, 12, 13, 14, 15, 16, 17, 18, 19]:
|
||||
if alt in extracted:
|
||||
source_idx = alt
|
||||
break
|
||||
|
||||
seg = bytearray()
|
||||
source = extracted[source_idx]
|
||||
pos = 0
|
||||
while pos < len(source):
|
||||
np, st, ib, val, vt = self._read_ev(source, pos)
|
||||
if ib in self.STRIP_EVENTS or ib == 0xC4:
|
||||
pass # Remove cached data AND old sample path
|
||||
elif ib == 0x40 and vt == "word":
|
||||
seg += struct.pack("<BH", 0x40, ch_idx)
|
||||
else:
|
||||
seg += source[st:np]
|
||||
pos = np
|
||||
|
||||
# Add empty sample path
|
||||
seg += bytes([0xC4, 0x02, 0x00, 0x00])
|
||||
return bytes(seg)
|
||||
|
||||
# ── Sample path patching ───────────────────────────────────────────────────
|
||||
|
||||
def _patch_sample_path(self, seg: bytes, wav_name: str) -> bytes:
|
||||
"""Replace 0xC4 (ChSamplePath) event with encoded wav_path.
|
||||
|
||||
Uses %USERPROFILE% substitution for portability.
|
||||
Paths are encoded as UTF-16-LE + null terminator (\\x00\\x00).
|
||||
"""
|
||||
seg = bytearray(seg)
|
||||
|
||||
# Build full path and substitute USERPROFILE for portability
|
||||
full_path = os.path.join(self.samples_dir, wav_name)
|
||||
userprofile = os.environ.get("USERPROFILE", "")
|
||||
rel_path = full_path.replace(userprofile, "%USERPROFILE%")
|
||||
encoded_path = rel_path.encode("utf-16-le") + b"\x00\x00"
|
||||
|
||||
# Build replacement event: ID byte + varint(size) + encoded path
|
||||
path_ev = bytes([0xC4]) + self._encode_varint(len(encoded_path)) + encoded_path
|
||||
|
||||
# Find all ChSamplePath events
|
||||
local = 0
|
||||
replacements: list[tuple[int, int, bytes]] = []
|
||||
while local < len(seg):
|
||||
nl, es, ib, v, vt = self._read_ev(bytes(seg), local)
|
||||
if ib == 0xC4:
|
||||
replacements.append((es, nl, path_ev))
|
||||
local = nl
|
||||
|
||||
# Apply in reverse to preserve offsets
|
||||
for es, el, nd in reversed(replacements):
|
||||
seg[es:el] = nd
|
||||
|
||||
return bytes(seg)
|
||||
@@ -1,145 +0,0 @@
|
||||
from __future__ import annotations
|
||||
import struct
|
||||
from .events import (
|
||||
EventID,
|
||||
encode_byte_event,
|
||||
encode_word_event,
|
||||
encode_dword_event,
|
||||
encode_text_event,
|
||||
encode_data_event,
|
||||
encode_varint,
|
||||
encode_notes_block,
|
||||
)
|
||||
from .project import FLPProject, Pattern, Note
|
||||
|
||||
|
||||
class FLPWriter:
|
||||
def __init__(self, project: FLPProject):
|
||||
self.project = project
|
||||
self._events: list[bytes] = []
|
||||
|
||||
def build(self) -> bytes:
|
||||
self._events = []
|
||||
self._write_project_header()
|
||||
self._write_patterns()
|
||||
self._write_channels()
|
||||
return self._serialize()
|
||||
|
||||
def _add_event(self, data: bytes):
|
||||
self._events.append(data)
|
||||
|
||||
def _write_project_header(self):
|
||||
p = self.project
|
||||
self._add_event(encode_text_event(EventID.FLVersion, p.fl_version))
|
||||
self._add_event(encode_dword_event(EventID.FLBuild, 1773))
|
||||
self._add_event(encode_byte_event(EventID.Licensed, 1))
|
||||
self._add_event(encode_dword_event(EventID.Tempo, int(p.tempo * 1000)))
|
||||
self._add_event(encode_byte_event(EventID.LoopActive, 1))
|
||||
self._add_event(encode_word_event(EventID.Pitch, 0))
|
||||
self._add_event(encode_byte_event(EventID.PanLaw, 0))
|
||||
if p.title:
|
||||
self._add_event(encode_text_event(EventID.Title, p.title))
|
||||
if p.genre:
|
||||
self._add_event(encode_text_event(EventID.Genre, p.genre))
|
||||
if p.artists:
|
||||
self._add_event(encode_text_event(EventID.Artists, p.artists))
|
||||
if p.comments:
|
||||
self._add_event(encode_text_event(EventID.Comments, p.comments))
|
||||
|
||||
def _write_patterns(self):
|
||||
p = self.project
|
||||
for pat in p.patterns:
|
||||
self._add_event(encode_word_event(EventID.PatNew, pat.index))
|
||||
if pat.name:
|
||||
self._add_event(encode_text_event(EventID.PatName, pat.name))
|
||||
for ch_idx, notes in pat.notes.items():
|
||||
if notes:
|
||||
notes_data = encode_notes_block(
|
||||
ch_idx,
|
||||
[n.to_dict() if isinstance(n, Note) else n for n in notes],
|
||||
ppq=p.ppq,
|
||||
)
|
||||
self._add_event(encode_data_event(EventID.PatNotes, notes_data))
|
||||
|
||||
def _write_channels(self):
|
||||
p = self.project
|
||||
for ch in p.channels:
|
||||
self._add_event(encode_word_event(EventID.ChNew, ch.index))
|
||||
self._add_event(encode_byte_event(EventID.ChType, ch.channel_type))
|
||||
|
||||
if ch.plugin:
|
||||
self._add_event(
|
||||
encode_text_event(EventID.PluginInternalName, ch.plugin.internal_name)
|
||||
)
|
||||
if ch.plugin.plugin_data:
|
||||
self._add_event(
|
||||
encode_data_event(EventID.PluginData, ch.plugin.plugin_data)
|
||||
)
|
||||
elif ch.plugin.internal_name == "Fruity Wrapper":
|
||||
self._add_event(
|
||||
encode_text_event(EventID.PluginName, ch.plugin.display_name)
|
||||
)
|
||||
wrapper_data = self._build_wrapper_stub(ch.plugin.display_name)
|
||||
self._add_event(encode_data_event(EventID.PluginData, wrapper_data))
|
||||
else:
|
||||
self._add_event(
|
||||
encode_text_event(EventID.PluginName, ch.plugin.display_name)
|
||||
)
|
||||
plugin_data = self._build_native_plugin_stub(ch.plugin.internal_name)
|
||||
self._add_event(encode_data_event(EventID.PluginData, plugin_data))
|
||||
|
||||
if ch.plugin.color:
|
||||
self._add_event(
|
||||
encode_dword_event(EventID.PluginColor, ch.plugin.color)
|
||||
)
|
||||
|
||||
self._add_event(encode_text_event(EventID.ChName, ch.name))
|
||||
self._add_event(encode_byte_event(EventID.ChIsEnabled, 1 if ch.enabled else 0))
|
||||
self._add_event(encode_byte_event(EventID.ChRoutedTo, ch.mixer_track & 0xFF))
|
||||
self._add_event(encode_word_event(EventID.ChVolWord, ch.volume))
|
||||
self._add_event(encode_byte_event(EventID.ChRootNote, ch.root_note))
|
||||
|
||||
def _build_wrapper_stub(self, plugin_name: str) -> bytes:
|
||||
# Minimal VST wrapper state - FL Studio will initialize the plugin fresh
|
||||
# 10 params with default values
|
||||
stub = struct.pack("<II", 10, 1) # param_count=10, unknown=1
|
||||
stub += struct.pack("<II", 20, 0) # version=20, flags=0
|
||||
stub += b"\xff\xff\xff\xff\xff\xff\xff\xff" # GUID placeholder
|
||||
stub += b"\x0c\x00\x0c\x00\x0c\x00\x0c\x00" # padding
|
||||
stub += b"\x00" * 16 # zeros
|
||||
return stub
|
||||
|
||||
def _build_native_plugin_stub(self, internal_name: str) -> bytes:
|
||||
# Minimal native plugin state
|
||||
stub = struct.pack("<II", 10, 1)
|
||||
stub += struct.pack("<II", 20, 0)
|
||||
stub += b"\xff\xff\xff\xff\xff\xff\xff\xff"
|
||||
stub += b"\x0c\x00\x0c\x00\x0c\x00\x0c\x00"
|
||||
stub += b"\x00" * 16
|
||||
return stub
|
||||
|
||||
def _serialize(self) -> bytes:
|
||||
num_channels = len(self.project.channels)
|
||||
ppq = self.project.ppq
|
||||
|
||||
header = struct.pack(
|
||||
"<4sIhHH",
|
||||
b"FLhd",
|
||||
6,
|
||||
0,
|
||||
num_channels,
|
||||
ppq,
|
||||
)
|
||||
|
||||
all_events = b"".join(self._events)
|
||||
total_size = len(all_events)
|
||||
|
||||
data_header = b"FLdt" + struct.pack("<I", total_size)
|
||||
|
||||
return header + data_header + all_events
|
||||
|
||||
def write(self, filepath: str):
|
||||
data = self.build()
|
||||
with open(filepath, "wb") as f:
|
||||
f.write(data)
|
||||
return filepath
|
||||
144
src/reaper_builder/__init__.py
Normal file
144
src/reaper_builder/__init__.py
Normal file
@@ -0,0 +1,144 @@
|
||||
"""REAPER .rpp project builder.
|
||||
|
||||
High-level interface: pass a ``core.schema.SongDefinition`` to ``RPPBuilder``
|
||||
and call ``write()`` to emit a valid .rpp text file.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
from rpp import Element, dumps
|
||||
|
||||
from ..core.schema import SongDefinition, TrackDef, ClipDef, PluginDef
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_guid() -> str:
|
||||
"""Generate a random REAPER GUID string."""
|
||||
return str(uuid.uuid4()).upper()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# RPPBuilder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class RPPBuilder:
|
||||
"""Builds a REAPER .rpp file from a SongDefinition.
|
||||
|
||||
Usage::
|
||||
|
||||
song = SongDefinition(meta=SongMeta(bpm=95, key="Am", title="Test"))
|
||||
builder = RPPBuilder(song)
|
||||
builder.write("output.rpp")
|
||||
"""
|
||||
|
||||
def __init__(self, song: SongDefinition) -> None:
|
||||
self.song = song
|
||||
|
||||
def write(self, path: str | Path) -> None:
|
||||
"""Serialize the project to a .rpp file at *path*.
|
||||
|
||||
Raises:
|
||||
OSError: If the file cannot be written.
|
||||
"""
|
||||
root = self._build_element()
|
||||
p = Path(path)
|
||||
p.write_text(dumps(root), encoding="utf-8")
|
||||
|
||||
def _build_element(self) -> Element:
|
||||
"""Build the Element tree for the .rpp file."""
|
||||
m = self.song.meta
|
||||
|
||||
# Project root
|
||||
root = Element("REAPER_PROJECT", ["0.1", "6.0", str(int(uuid.uuid4().time))])
|
||||
|
||||
# TEMPO is a flat attribute line, NOT a child element
|
||||
root.append(["TEMPO", str(m.bpm), str(m.time_sig_num), str(m.time_sig_den)])
|
||||
|
||||
# Master track
|
||||
master = Element("TRACK", [_make_guid()])
|
||||
master.append(['NAME', "master"])
|
||||
master.append(["VOLPAN", "1.0", "0", "-1", "-1", "1"])
|
||||
root.append(master)
|
||||
|
||||
# User tracks
|
||||
for track in self.song.tracks:
|
||||
root.append(self._build_track(track))
|
||||
|
||||
return root
|
||||
|
||||
def _build_track(self, track: TrackDef) -> Element:
|
||||
"""Build a TRACK Element."""
|
||||
track_elem = Element("TRACK", [_make_guid()])
|
||||
track_elem.append(["NAME", track.name])
|
||||
|
||||
vol = track.volume
|
||||
pan = track.pan
|
||||
track_elem.append([f"VOLPAN", f"{vol:.6f}", f"{pan:.6f}", "-1", "-1", "1"])
|
||||
|
||||
if track.color != 0:
|
||||
track_elem.append(["COLOR", str(track.color)])
|
||||
|
||||
# Plugins (FXCHAIN)
|
||||
if track.plugins:
|
||||
fxchain = Element("FXCHAIN", [])
|
||||
for plugin in track.plugins:
|
||||
fxchain.append(self._build_plugin(plugin))
|
||||
track_elem.append(fxchain)
|
||||
|
||||
# Clips (items)
|
||||
for clip in track.clips:
|
||||
track_elem.append(self._build_clip(clip))
|
||||
|
||||
return track_elem
|
||||
|
||||
def _build_plugin(self, plugin: PluginDef) -> Element:
|
||||
"""Build a VST Element inside FXCHAIN."""
|
||||
params_str = " ".join(str(v) for v in plugin.params.values()) if plugin.params else ""
|
||||
vst = Element("VST", [plugin.name, plugin.path, str(plugin.index), "", *params_str.split(), "0", "0"])
|
||||
return vst
|
||||
|
||||
def _build_clip(self, clip: ClipDef) -> Element:
|
||||
"""Build an ITEM Element."""
|
||||
item = Element("ITEM", [])
|
||||
item.append(["POSITION", str(clip.position)])
|
||||
item.append(["LENGTH", str(clip.length)])
|
||||
if clip.name:
|
||||
item.append(["NAME", clip.name])
|
||||
|
||||
if clip.is_audio and clip.audio_path:
|
||||
source = Element("SOURCE", ["WAVE"])
|
||||
source.append(["FILE", clip.audio_path])
|
||||
item.append(source)
|
||||
elif clip.is_midi:
|
||||
item.append(self._build_midi_source(clip))
|
||||
|
||||
return item
|
||||
|
||||
def _build_midi_source(self, clip: ClipDef) -> Element:
|
||||
"""Build a SOURCE MIDI Element with E-lines."""
|
||||
source = Element("SOURCE", ["MIDI"])
|
||||
source.append(["HASDATA", "1", "960", "QN"])
|
||||
|
||||
ppq = 960 # ticks per quarter note
|
||||
sorted_notes = sorted(clip.midi_notes, key=lambda n: n.start)
|
||||
|
||||
cursor = 0.0
|
||||
for note in sorted_notes:
|
||||
start_ticks = int(note.start * ppq)
|
||||
delta = start_ticks - cursor
|
||||
cursor = start_ticks
|
||||
|
||||
# Note on: status 90, pitch, velocity
|
||||
source.append(['E', str(delta), '90', f'{note.pitch:02x}', f'{note.velocity:02x}'])
|
||||
|
||||
# Note off after duration
|
||||
off_delta = int(note.duration * ppq)
|
||||
source.append(['E', str(off_delta), '80', f'{note.pitch:02x}', '00'])
|
||||
|
||||
return source
|
||||
65
src/reaper_builder/render.py
Normal file
65
src/reaper_builder/render.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""REAPER project rendering — headless render to WAV via subprocess."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
# Default REAPER executable path on Windows
|
||||
DEFAULT_REAPER_EXE = Path(r"C:\Program Files\REAPER (x64)\reaper.exe")
|
||||
|
||||
|
||||
def render_project(
|
||||
rpp_path: str | Path,
|
||||
output_wav: str | Path,
|
||||
reaper_exe: str | Path | None = None,
|
||||
timeout_seconds: int = 120,
|
||||
) -> None:
|
||||
"""Render a .rpp project to WAV using the REAPER CLI.
|
||||
|
||||
Args:
|
||||
rpp_path: Path to the .rpp project file.
|
||||
output_wav: Path where the rendered WAV will be written.
|
||||
reaper_exe: Path to reaper.exe. Defaults to
|
||||
``C:\\Program Files\\REAPER (x64)\\reaper.exe``.
|
||||
timeout_seconds: Max seconds to wait for render to complete.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If reaper.exe is not found at the expected path
|
||||
and no explicit path was provided.
|
||||
RuntimeError: If the render process exits with a non-zero code
|
||||
or is killed by the timeout.
|
||||
"""
|
||||
reaper_path = Path(reaper_exe) if reaper_exe else DEFAULT_REAPER_EXE
|
||||
|
||||
if not reaper_path.exists():
|
||||
raise FileNotFoundError(
|
||||
f"REAPER executable not found at: {reaper_path}\n"
|
||||
"Install REAPER or provide an explicit reaper_exe path."
|
||||
)
|
||||
|
||||
rpp_abs = str(Path(rpp_path).resolve())
|
||||
wav_abs = str(Path(output_wav).resolve())
|
||||
|
||||
cmd = [
|
||||
str(reaper_path),
|
||||
"-nosplash",
|
||||
"-render",
|
||||
rpp_abs,
|
||||
"-outfile",
|
||||
wav_abs,
|
||||
]
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout_seconds,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(
|
||||
f"REAPER render failed (exit {result.returncode}):\n"
|
||||
f"stdout: {result.stdout}\n"
|
||||
f"stderr: {result.stderr}"
|
||||
)
|
||||
@@ -1,194 +0,0 @@
|
||||
from __future__ import annotations
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
FL_USER_DIR = Path(os.path.expanduser("~")) / "Documents" / "Image-Line" / "FL Studio"
|
||||
PLUGIN_DB_DIR = FL_USER_DIR / "Presets" / "Plugin database" / "Installed"
|
||||
PROJECT_ROOT = Path(os.path.expanduser("~")) / "Documents" / "fl_control"
|
||||
|
||||
|
||||
def scan_installed_plugins() -> dict:
|
||||
generators = []
|
||||
effects = []
|
||||
|
||||
gen_dir = PLUGIN_DB_DIR / "Generators"
|
||||
if gen_dir.exists():
|
||||
for category_dir in gen_dir.iterdir():
|
||||
if not category_dir.is_dir():
|
||||
continue
|
||||
category = category_dir.name
|
||||
for fst_file in category_dir.glob("*.fst"):
|
||||
name = fst_file.stem
|
||||
generators.append({
|
||||
"name": name,
|
||||
"category": category,
|
||||
"type": "generator",
|
||||
"format": category,
|
||||
"fst_path": str(fst_file),
|
||||
})
|
||||
|
||||
fx_dir = PLUGIN_DB_DIR / "Effects"
|
||||
if fx_dir.exists():
|
||||
for category_dir in fx_dir.iterdir():
|
||||
if not category_dir.is_dir():
|
||||
continue
|
||||
category = category_dir.name
|
||||
for fst_file in category_dir.glob("*.fst"):
|
||||
name = fst_file.stem
|
||||
effects.append({
|
||||
"name": name,
|
||||
"category": category,
|
||||
"type": "effect",
|
||||
"format": category,
|
||||
"fst_path": str(fst_file),
|
||||
})
|
||||
|
||||
return {
|
||||
"generators": generators,
|
||||
"effects": effects,
|
||||
"generator_names": sorted(set(g["name"] for g in generators)),
|
||||
"effect_names": sorted(set(e["name"] for e in effects)),
|
||||
}
|
||||
|
||||
|
||||
def scan_samples(base_dir: Optional[Path] = None) -> dict:
|
||||
if base_dir is None:
|
||||
base_dir = PROJECT_ROOT / "librerias" / "organized_samples"
|
||||
|
||||
categories = {}
|
||||
if not base_dir.exists():
|
||||
return {"categories": {}, "total_files": 0}
|
||||
|
||||
for cat_dir in base_dir.iterdir():
|
||||
if not cat_dir.is_dir():
|
||||
continue
|
||||
files = []
|
||||
for f in cat_dir.rglob("*"):
|
||||
if f.is_file() and f.suffix.lower() in (".wav", ".mp3", ".flac", ".ogg", ".aif", ".aiff"):
|
||||
files.append({
|
||||
"name": f.stem,
|
||||
"path": str(f),
|
||||
"size": f.stat().st_size,
|
||||
"ext": f.suffix.lower(),
|
||||
})
|
||||
categories[cat_dir.name] = files
|
||||
|
||||
total = sum(len(v) for v in categories.values())
|
||||
return {"categories": categories, "total_files": total}
|
||||
|
||||
|
||||
def scan_library_packs(base_dir: Optional[Path] = None) -> dict:
|
||||
if base_dir is None:
|
||||
base_dir = PROJECT_ROOT / "librerias" / "reggaeton"
|
||||
|
||||
packs = []
|
||||
if not base_dir.exists():
|
||||
return {"packs": packs}
|
||||
|
||||
for pack_dir in base_dir.iterdir():
|
||||
if not pack_dir.is_dir():
|
||||
continue
|
||||
pack = {
|
||||
"name": pack_dir.name,
|
||||
"path": str(pack_dir),
|
||||
"contents": {},
|
||||
}
|
||||
for sub in pack_dir.rglob("*"):
|
||||
if sub.is_dir():
|
||||
continue
|
||||
ext = sub.suffix.lower()
|
||||
rel = str(sub.relative_to(pack_dir))
|
||||
content_type = "other"
|
||||
if ext in (".wav", ".mp3", ".flac", ".ogg", ".aif", ".aiff"):
|
||||
content_type = "audio"
|
||||
elif ext == ".mid":
|
||||
content_type = "midi"
|
||||
elif ext in (".fxp", ".fxb", ".fst"):
|
||||
content_type = "preset"
|
||||
|
||||
if content_type not in pack["contents"]:
|
||||
pack["contents"][content_type] = []
|
||||
pack["contents"][content_type].append({
|
||||
"name": sub.stem,
|
||||
"path": str(sub),
|
||||
"ext": ext,
|
||||
"type": content_type,
|
||||
})
|
||||
|
||||
packs.append(pack)
|
||||
|
||||
return {"packs": packs}
|
||||
|
||||
|
||||
def scan_vector_store_metadata(vs_dir: Optional[Path] = None) -> dict:
|
||||
if vs_dir is None:
|
||||
vs_dir = PROJECT_ROOT / "librerias" / "vector_store"
|
||||
|
||||
metadata_path = vs_dir / "metadata.json"
|
||||
if not metadata_path.exists():
|
||||
return {"items": [], "total": 0}
|
||||
|
||||
with open(metadata_path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
|
||||
types = {}
|
||||
for item in data:
|
||||
t = item.get("type", "unknown")
|
||||
types[t] = types.get(t, 0) + 1
|
||||
|
||||
return {
|
||||
"total": len(data),
|
||||
"types": types,
|
||||
"items_with_key": sum(1 for i in data if i.get("key")),
|
||||
"items_with_bpm": sum(1 for i in data if i.get("bpm")),
|
||||
"sample_items": data,
|
||||
}
|
||||
|
||||
|
||||
def full_inventory() -> dict:
|
||||
plugins = scan_installed_plugins()
|
||||
samples = scan_samples()
|
||||
packs = scan_library_packs()
|
||||
vector_store = scan_vector_store_metadata()
|
||||
|
||||
return {
|
||||
"plugins": plugins,
|
||||
"samples": samples,
|
||||
"packs": packs,
|
||||
"vector_store": vector_store,
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.stdout.reconfigure(encoding="utf-8")
|
||||
inv = full_inventory()
|
||||
|
||||
summary = {
|
||||
"plugins": {
|
||||
"generators": inv["plugins"]["generator_names"],
|
||||
"effects": inv["plugins"]["effect_names"],
|
||||
"total_generators": len(inv["plugins"]["generators"]),
|
||||
"total_effects": len(inv["plugins"]["effects"]),
|
||||
},
|
||||
"samples": {
|
||||
"categories": {k: len(v) for k, v in inv["samples"]["categories"].items()},
|
||||
"total_files": inv["samples"]["total_files"],
|
||||
},
|
||||
"packs": [
|
||||
{
|
||||
"name": p["name"],
|
||||
"audio_count": len(p["contents"].get("audio", [])),
|
||||
"midi_count": len(p["contents"].get("midi", [])),
|
||||
}
|
||||
for p in inv["packs"]
|
||||
],
|
||||
"vector_store": {
|
||||
"total": inv["vector_store"]["total"],
|
||||
"types": inv["vector_store"]["types"],
|
||||
},
|
||||
}
|
||||
print(json.dumps(summary, indent=2, ensure_ascii=False))
|
||||
Reference in New Issue
Block a user