""" ArrangementRecorder - Robust state machine for recording Session to Arrangement. This module provides a reliable way to record Session View clips into Arrangement View with proper state management, musical timing, and error handling. """ from enum import Enum, auto from dataclasses import dataclass, field from typing import Optional, Callable, List, Dict, Any, Tuple import time import logging # Configure logging logger = logging.getLogger(__name__) class RecordingState(Enum): """ State machine states for arrangement recording. Transitions: IDLE -> ARMED (via arm()) ARMED -> PRE_ROLL (via start()) PRE_ROLL -> RECORDING (when quantized time reached) RECORDING -> COOLDOWN (when duration elapsed or stop() called) COOLDOWN -> COMPLETED (verification complete) COOLDOWN -> FAILED (verification failed) Any -> IDLE (via reset or error recovery) """ IDLE = auto() ARMED = auto() PRE_ROLL = auto() RECORDING = auto() COOLDOWN = auto() COMPLETED = auto() FAILED = auto() @dataclass class RecordingConfig: """ Configuration for arrangement recording session. Attributes: start_bar: Starting bar position in arrangement duration_bars: Total duration to record in bars pre_roll_bars: Bars to wait before recording starts (default 1.0) tempo: Tempo in BPM for timing calculations scene_index: Scene to fire at start (default 0) on_state_change: Callback when state changes (old_state, new_state) on_progress: Callback with progress 0.0-1.0 on_error: Callback with exception on failure on_completed: Callback with list of new clip IDs on success """ start_bar: float duration_bars: float pre_roll_bars: float = 1.0 tempo: float = 95.0 scene_index: int = 0 on_state_change: Optional[Callable[[RecordingState, RecordingState], None]] = None on_progress: Optional[Callable[[float], None]] = None on_error: Optional[Callable[[Exception], None]] = None on_completed: Optional[Callable[[List[str]], None]] = None def __post_init__(self): """Validate configuration parameters.""" if self.start_bar < 0: raise ValueError(f"start_bar must be >= 0, got {self.start_bar}") if self.duration_bars <= 0: raise ValueError(f"duration_bars must be > 0, got {self.duration_bars}") if self.pre_roll_bars < 0: raise ValueError(f"pre_roll_bars must be >= 0, got {self.pre_roll_bars}") if self.tempo <= 0: raise ValueError(f"tempo must be > 0, got {self.tempo}") if self.scene_index < 0: raise ValueError(f"scene_index must be >= 0, got {self.scene_index}") @dataclass class ArrangementBaseline: """ Captured state of arrangement before recording. Used for verification after recording completes. """ clip_count: int clip_ids: set clip_positions: Dict[str, Tuple[float, float]] # id -> (start, end) total_length: float timestamp: float class ArrangementRecorder: """ Robust recorder for Session to Arrangement with state machine. This class manages the entire recording lifecycle: - Pre-recording verification and setup - Musical timing (bars/beats) instead of wall-clock - Quantized start on bar boundaries - Automatic stop after duration - Post-recording verification Usage: recorder = ArrangementRecorder(song, ableton_connection) config = RecordingConfig(start_bar=0, duration_bars=8, tempo=95) if recorder.arm(config): recorder.start() # Call from update_display() loop # In update_display(): recorder.update() # Processes state machine """ def __init__(self, song, ableton_connection): """ Initialize the arrangement recorder. Args: song: Live.Song.Song object ableton_connection: Connection object for sending commands to Live """ self.song = song self.ableton = ableton_connection # State machine self._state = RecordingState.IDLE self._config: Optional[RecordingConfig] = None # Recording data self._baseline: Optional[ArrangementBaseline] = None self._new_clips: List[str] = [] self._new_clip_ids: set = set() # Timing (musical - in bars/beats) self._target_start_bar: float = 0.0 self._target_end_bar: float = 0.0 self._pre_roll_target_bar: float = 0.0 self._current_progress: float = 0.0 # Update tracking self._last_update_time: float = 0.0 self._last_progress_emit: float = -1.0 self._state_entry_time: float = 0.0 logger.info("ArrangementRecorder initialized") # ======================================================================== # PUBLIC API # ======================================================================== def arm(self, config: RecordingConfig) -> bool: """ Arm the recorder with configuration. Verifies preconditions and captures baseline state. Must be called before start(). Args: config: Recording configuration Returns: True if successfully armed, False otherwise """ if self._state != RecordingState.IDLE: logger.warning(f"Cannot arm from state {self._state.name}") return False try: # Validate config self._config = config # Verify preconditions self._verify_preconditions() # Capture baseline self._baseline = self._capture_baseline() # Transition to ARMED self._transition_to(RecordingState.ARMED) logger.info(f"Recorder armed: bar {config.start_bar}, " f"duration {config.duration_bars} bars, " f"pre-roll {config.pre_roll_bars} bars") return True except Exception as e: logger.error(f"Failed to arm recorder: {e}") self._handle_error(e) return False def start(self) -> bool: """ Start the recording process. Begins pre-roll phase if armed. Recording will start automatically on the next bar boundary after pre-roll. Returns: True if recording sequence started, False otherwise """ if self._state != RecordingState.ARMED: logger.warning(f"Cannot start from state {self._state.name}") return False if not self._config: logger.error("No configuration set") return False try: # Calculate timing current_bar = self._get_current_bar() self._pre_roll_target_bar = current_bar + self._config.pre_roll_bars self._target_start_bar = self._pre_roll_target_bar self._target_end_bar = self._target_start_bar + self._config.duration_bars # Enable arrangement overdub self.song.arrangement_overdub = True # Transition to PRE_ROLL self._transition_to(RecordingState.PRE_ROLL) logger.info(f"Recording sequence started: pre-roll until bar {self._pre_roll_target_bar}, " f"recording until bar {self._target_end_bar}") return True except Exception as e: logger.error(f"Failed to start recording: {e}") self._handle_error(e) return False def stop(self) -> bool: """ Manually stop the recording. Can be called during PRE_ROLL or RECORDING states. Returns: True if stopped successfully, False otherwise """ if self._state not in (RecordingState.PRE_ROLL, RecordingState.RECORDING): logger.warning(f"Cannot stop from state {self._state.name}") return False try: # Stop playback self.song.stop_playing() # Disable overdub self.song.arrangement_overdub = False # Calculate actual end position actual_end = self._get_current_bar() logger.info(f"Recording manually stopped at bar {actual_end}") # Transition to cooldown for verification self._transition_to(RecordingState.COOLDOWN) # Trigger verification self._verify_and_complete() return True except Exception as e: logger.error(f"Failed to stop recording: {e}") self._handle_error(e) return False def update(self) -> None: """ Update the state machine. This method should be called regularly from Ableton's update_display() loop. It handles: - Pre-roll timing - Recording start trigger - Recording duration tracking - Automatic stop - Progress callbacks """ if self._state == RecordingState.IDLE: return if self._state == RecordingState.ARMED: # Waiting for start() call return if self._state == RecordingState.PRE_ROLL: self._handle_pre_roll() return if self._state == RecordingState.RECORDING: self._handle_recording() return if self._state == RecordingState.COOLDOWN: # Verification in progress, nothing to do return def reset(self) -> None: """ Reset the recorder to IDLE state. Clears all recording state. Can be called from any state. """ was_recording = self._state == RecordingState.RECORDING if was_recording: try: self.song.stop_playing() self.song.arrangement_overdub = False except Exception as e: logger.warning(f"Error during reset cleanup: {e}") old_state = self._state self._state = RecordingState.IDLE # Clear all recording data self._config = None self._baseline = None self._new_clips = [] self._new_clip_ids = set() self._target_start_bar = 0.0 self._target_end_bar = 0.0 self._pre_roll_target_bar = 0.0 self._current_progress = 0.0 if old_state != RecordingState.IDLE: self._notify_state_change(old_state, RecordingState.IDLE) logger.info("Recorder reset to IDLE") def get_state(self) -> RecordingState: """Get current recording state.""" return self._state def get_progress(self) -> float: """ Get recording progress from 0.0 to 1.0. Returns: Progress value (0.0-1.0), or -1.0 if not recording """ if self._state not in (RecordingState.PRE_ROLL, RecordingState.RECORDING, RecordingState.COOLDOWN): return -1.0 return self._current_progress def get_new_clips(self) -> List[str]: """ Get list of new clip IDs recorded in this session. Returns: List of clip identifiers (track_index:clip_index format) """ return self._new_clips.copy() def is_active(self) -> bool: """ Check if recorder is in an active state. Returns: True if armed, pre-rolling, recording, or in cooldown """ return self._state in ( RecordingState.ARMED, RecordingState.PRE_ROLL, RecordingState.RECORDING, RecordingState.COOLDOWN ) # ======================================================================== # PRIVATE METHODS - State Machine # ======================================================================== def _transition_to(self, new_state: RecordingState) -> None: """Transition to a new state with notification.""" old_state = self._state self._state = new_state self._state_entry_time = time.time() logger.debug(f"State transition: {old_state.name} -> {new_state.name}") self._notify_state_change(old_state, new_state) def _notify_state_change(self, old: RecordingState, new: RecordingState) -> None: """Notify state change callback.""" if self._config and self._config.on_state_change: try: self._config.on_state_change(old, new) except Exception as e: logger.warning(f"State change callback error: {e}") def _notify_progress(self, progress: float) -> None: """Notify progress callback (throttled).""" # Throttle to avoid flooding callbacks if abs(progress - self._last_progress_emit) < 0.01: return self._last_progress_emit = progress if self._config and self._config.on_progress: try: self._config.on_progress(progress) except Exception as e: logger.warning(f"Progress callback error: {e}") def _handle_error(self, error: Exception) -> None: """Handle error and transition to FAILED state.""" logger.error(f"Recording error: {error}") # Notify error callback if self._config and self._config.on_error: try: self._config.on_error(error) except Exception as e: logger.warning(f"Error callback failed: {e}") # Transition to failed state old_state = self._state self._state = RecordingState.FAILED self._notify_state_change(old_state, RecordingState.FAILED) # Cleanup try: self.song.arrangement_overdub = False except: pass def _handle_pre_roll(self) -> None: """Handle pre-roll phase - wait until quantized start time.""" current_bar = self._get_current_bar() # Calculate progress through pre-roll (0.0 = start, 1.0 = recording starts) if self._config and self._config.pre_roll_bars > 0: pre_roll_start = self._pre_roll_target_bar - self._config.pre_roll_bars self._current_progress = (current_bar - pre_roll_start) / self._config.pre_roll_bars self._current_progress = max(0.0, min(0.99, self._current_progress)) else: self._current_progress = 0.99 self._notify_progress(self._current_progress) # Check if we've reached the target bar if current_bar >= self._pre_roll_target_bar: self._on_quantized_start() def _handle_recording(self) -> None: """Handle recording phase - track progress and auto-stop.""" current_bar = self._get_current_bar() # Calculate progress through recording recording_bars = self._target_end_bar - self._target_start_bar bars_elapsed = current_bar - self._target_start_bar self._current_progress = min(1.0, bars_elapsed / recording_bars) self._notify_progress(self._current_progress) # Check if recording should end if current_bar >= self._target_end_bar: self._on_recording_end() # ======================================================================== # PRIVATE METHODS - Recording Lifecycle # ======================================================================== def _verify_preconditions(self) -> None: """ Verify that recording can proceed. Raises: RuntimeError: If preconditions are not met """ if not self.song: raise RuntimeError("No song object available") # Check that we have scenes to fire if not hasattr(self.song, 'scenes') or len(self.song.scenes) == 0: raise RuntimeError("No scenes available in project") if self._config and self._config.scene_index >= len(self.song.scenes): raise RuntimeError(f"Scene index {self._config.scene_index} out of range") # Check that we have tracks if not hasattr(self.song, 'tracks') or len(self.song.tracks) == 0: raise RuntimeError("No tracks available in project") # Check arrangement_overdub can be set try: # Test setting and resetting original = self.song.arrangement_overdub self.song.arrangement_overdub = True self.song.arrangement_overdub = original except Exception as e: raise RuntimeError(f"Cannot control arrangement_overdub: {e}") logger.debug("Preconditions verified successfully") def _capture_baseline(self) -> ArrangementBaseline: """ Capture current arrangement state for later comparison. Returns: ArrangementBaseline with current state """ clip_ids = set() clip_positions = {} clip_count = 0 try: for track_idx, track in enumerate(self.song.tracks): if hasattr(track, 'arrangement_clips'): for clip in track.arrangement_clips: if clip: clip_id = f"{track_idx}:{clip.start_time}" clip_ids.add(clip_id) clip_positions[clip_id] = (clip.start_time, clip.end_time) clip_count += 1 # Get current arrangement length total_length = 0.0 if hasattr(self.song, 'last_event_time'): total_length = float(self.song.last_event_time) baseline = ArrangementBaseline( clip_count=clip_count, clip_ids=clip_ids, clip_positions=clip_positions, total_length=total_length, timestamp=time.time() ) logger.debug(f"Captured baseline: {clip_count} clips, length {total_length:.2f} beats") return baseline except Exception as e: logger.warning(f"Could not capture complete baseline: {e}") return ArrangementBaseline( clip_count=0, clip_ids=set(), clip_positions={}, total_length=0.0, timestamp=time.time() ) def _calculate_pre_roll(self) -> float: """ Calculate pre-roll time in beats until next bar boundary. Returns: Number of beats until next bar """ current_time = self._get_current_song_time() beats_per_bar = 4.0 # Default 4/4 try: if hasattr(self.song, 'signature_numerator'): beats_per_bar = float(self.song.signature_numerator) except: pass # Find next bar boundary current_bar = current_time / beats_per_bar next_bar_num = int(current_bar) + 1 next_bar_time = next_bar_num * beats_per_bar pre_roll = next_bar_time - current_time return max(0.0, pre_roll) def _on_quantized_start(self) -> None: """ Fire at exact bar boundary to start recording. Fires the scene and begins recording. """ try: # Fire the scene if self._config: scene = self.song.scenes[self._config.scene_index] scene.fire() # Ensure we're playing and overdubbing if not self.song.is_playing: self.song.start_playing() self.song.arrangement_overdub = True # Transition to recording self._transition_to(RecordingState.RECORDING) logger.info(f"Recording started at bar {self._target_start_bar}") except Exception as e: logger.error(f"Failed to start recording at quantized time: {e}") self._handle_error(e) def _on_recording_end(self) -> None: """ Stop recording and transition to verification. """ try: # Stop playback self.song.stop_playing() # Disable overdub self.song.arrangement_overdub = False logger.info(f"Recording ended at bar {self._target_end_bar}") # Transition to cooldown self._transition_to(RecordingState.COOLDOWN) # Trigger verification self._verify_and_complete() except Exception as e: logger.error(f"Error ending recording: {e}") self._handle_error(e) def _verify_and_complete(self) -> None: """ Verify recording success and transition to COMPLETED or FAILED. """ try: success, new_clips = self._verify_recording_success() if success: self._new_clips = new_clips self._transition_to(RecordingState.COMPLETED) # Notify completion if self._config and self._config.on_completed: try: self._config.on_completed(new_clips) except Exception as e: logger.warning(f"Completion callback error: {e}") logger.info(f"Recording completed successfully with {len(new_clips)} new clips") else: error = RuntimeError("Recording verification failed - no new clips detected") self._handle_error(error) except Exception as e: logger.error(f"Verification failed: {e}") self._handle_error(e) def _verify_recording_success(self) -> Tuple[bool, List[str]]: """ Compare before/after state to verify recording succeeded. Returns: Tuple of (success: bool, new_clip_ids: list) """ if not self._baseline: logger.warning("No baseline captured, cannot verify") return (True, []) # Assume success if we can't verify try: # Capture current state current_count = 0 current_ids = set() for track_idx, track in enumerate(self.song.tracks): if hasattr(track, 'arrangement_clips'): for clip in track.arrangement_clips: if clip: clip_id = f"{track_idx}:{clip.start_time}" current_ids.add(clip_id) current_count += 1 # Find new clips new_clip_ids = current_ids - self._baseline.clip_ids # Heuristic: at least one new clip should exist # But sometimes clips are merged or extended, so we also check count success = len(new_clip_ids) > 0 or current_count > self._baseline.clip_count if not success: logger.warning(f"Verification failed: {self._baseline.clip_count} -> {current_count} clips, " f"{len(new_clip_ids)} new") else: logger.debug(f"Verification passed: {len(new_clip_ids)} new clips") return (success, list(new_clip_ids)) except Exception as e: logger.error(f"Error during verification: {e}") return (False, []) # ======================================================================== # PRIVATE METHODS - Utilities # ======================================================================== def _get_current_bar(self) -> float: """ Get current song position in bars (musical time). Returns: Current bar number (can be fractional) """ try: beats = float(self.song.current_song_time) beats_per_bar = 4.0 if hasattr(self.song, 'signature_numerator'): beats_per_bar = float(self.song.signature_numerator) return beats / beats_per_bar except Exception as e: logger.warning(f"Error getting current bar: {e}") return 0.0 def _get_current_song_time(self) -> float: """ Get current song position in beats. Returns: Current position in beats """ try: return float(self.song.current_song_time) except Exception as e: logger.warning(f"Error getting song time: {e}") return 0.0 def __repr__(self) -> str: """String representation for debugging.""" state = self._state.name progress = f"{self._current_progress:.1%}" if self._current_progress >= 0 else "N/A" return f"ArrangementRecorder(state={state}, progress={progress})"