Files
ableton-mcp-ai/test_senior_architecture.py
OpenCode Agent 5ce8187c65 feat: Implement senior audio injection with 5 fallback methods
- Add _cmd_create_arrangement_audio_pattern with 5-method fallback chain
- Method 1: track.insert_arrangement_clip() [Live 12+]
- Method 2: track.create_audio_clip() [Live 11+]
- Method 3: arrangement_clips.add_new_clip() [Live 12+]
- Method 4: Session->duplicate_clip_to_arrangement [Legacy]
- Method 5: Session->Recording [Universal]

- Add _cmd_duplicate_clip_to_arrangement for session-to-arrangement workflow
- Update skills documentation
- Verified: 3 clips created at positions [0, 4, 8] in Arrangement View

Closes: Audio injection in Arrangement View
2026-04-12 14:02:32 -03:00

1301 lines
46 KiB
Python

"""Comprehensive tests for Senior Architecture (v3.0).
Test Categories:
1. Metadata Store Tests - SQLite database operations
2. Hybrid Extractor Tests - Database + librosa analysis
3. Arrangement Recorder Tests - State machine for recording
4. LiveBridge Tests - Direct Ableton API execution
5. Integration Tests - Component interactions
6. End-to-End Workflow Tests - Complete workflows
Usage:
# Run all tests
python test_senior_architecture.py
# Run specific test class
python test_senior_architecture.py TestMetadataStore
# Run with verbose output
python test_senior_architecture.py -v
Requirements:
- pytest (optional, for better output)
- unittest (standard library)
- tempfile, sqlite3, json (standard library)
- Optional: numpy, librosa (for hybrid extractor tests)
Test Coverage:
- Database initialization and CRUD operations
- Feature extraction with database caching
- Recording state machine transitions
- Live API bridge operations (mocked)
- Full workflow without numpy
- Full workflow with numpy (if available)
"""
import unittest
import os
import sys
import tempfile
import sqlite3
import json
import time
import logging
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, Callable, Tuple, Set
from enum import Enum, auto
from unittest.mock import Mock, MagicMock, patch, call
# Configure logging for tests
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent))
# Try importing Senior Architecture components
try:
from mcp_server.engines.metadata_store import SampleMetadataStore, SampleFeatures
from mcp_server.engines.abstract_analyzer import (
HybridExtractor, DatabaseExtractor, LibrosaExtractor
)
from mcp_server.engines.arrangement_recorder import (
ArrangementRecorder, RecordingState, RecordingConfig
)
from mcp_server.engines.live_bridge import (
AbletonLiveBridge, MixConfiguration, CompressorSettings
)
from mcp_server.engines import get_system_capabilities, is_module_available
SENIOR_ARCHITECTURE_AVAILABLE = True
logger.info("Senior Architecture components imported successfully")
except ImportError as e:
logger.warning(f"Could not import Senior Architecture components: {e}")
SENIOR_ARCHITECTURE_AVAILABLE = False
# =============================================================================
# MOCK CLASSES FOR ABLETON LIVE API
# =============================================================================
class MockParameter:
"""Mock parameter object for Ableton Live."""
def __init__(self, name: str, value: Any = 0.0, min_val: float = 0.0, max_val: float = 1.0):
self.name = name
self.value = value
self.min = min_val
self.max = max_val
class MockMixerDevice:
"""Mock mixer device for Ableton tracks."""
def __init__(self):
self.volume = MockParameter("Volume", 0.85)
self.panning = MockParameter("Panning", 0.0, -1.0, 1.0)
self.sends: List[MockParameter] = []
class MockClip:
"""Mock clip for Ableton Live."""
def __init__(self, name: str = "Clip", start_time: float = 0.0, end_time: float = 4.0):
self.name = name
self.start_time = start_time
self.end_time = end_time
self.warping = False
self.looping = False
self.parameters: List[MockParameter] = []
self.notes: List[Dict[str, Any]] = []
def add_note(self, pitch: int, start: float, duration: float, velocity: int, muted: bool = False):
self.notes.append({
"pitch": pitch,
"start_time": start,
"duration": duration,
"velocity": velocity,
"muted": muted
})
class MockTrack:
"""Mock track for Ableton Live."""
def __init__(self, name: str = "Track", track_type: str = "audio"):
self.name = name
self.type = track_type # "audio" or "midi"
self.clip_slots: List[Optional[MockClip]] = []
self.arrangement_clips: List[MockClip] = []
self.devices: List[Mock] = []
self.mixer_device = MockMixerDevice()
self.mute = False
self.solo = False
self.output_routing_type = None
self.group_track = None
def insert_clip(self, file_path: str, start_bar: float, duration: float):
clip = MockClip(f"Clip_{len(self.arrangement_clips)}", start_bar, start_bar + duration)
self.arrangement_clips.append(clip)
return clip
def create_clip(self, start_bar: float, duration: float):
clip = MockClip(f"MIDI_Clip_{len(self.arrangement_clips)}", start_bar, start_bar + duration)
self.arrangement_clips.append(clip)
return clip
def load_device(self, device: Any):
mock_device = Mock()
mock_device.name = str(device) if not isinstance(device, str) else device
mock_device.parameters = [
MockParameter("Threshold", -20.0, -60.0, 0.0),
MockParameter("Ratio", 4.0, 1.0, 20.0),
]
self.devices.append(mock_device)
return len(self.devices) - 1
def delete_device(self, index: int):
if 0 <= index < len(self.devices):
self.devices.pop(index)
class MockScene:
"""Mock scene for Ableton Live Session View."""
def __init__(self, name: str = "Scene"):
self.name = name
self._fired = False
def fire(self):
self._fired = True
class MockSong:
"""Mock Ableton Live song object for testing."""
def __init__(self):
self.tracks: List[MockTrack] = []
self.scenes: List[MockScene] = []
self.return_tracks: List[MockTrack] = []
self.tempo = 120.0
self.current_song_time = 0.0
self.arrangement_overdub = False
self.is_playing = False
self.signature_numerator = 4
self.last_event_time = 0.0
self._browser = None
def start_playing(self):
self.is_playing = True
def stop_playing(self):
self.is_playing = False
def create_midi_track(self, index: int = -1):
track = MockTrack(f"MIDI Track {len(self.tracks)}", "midi")
if index < 0:
self.tracks.append(track)
else:
self.tracks.insert(index, track)
return track
def create_audio_track(self, index: int = -1):
track = MockTrack(f"Audio Track {len(self.tracks)}", "audio")
if index < 0:
self.tracks.append(track)
else:
self.tracks.insert(index, track)
return track
def create_return_track(self):
track = MockTrack(f"Return {len(self.return_tracks)}", "return")
self.return_tracks.append(track)
return track
@property
def browser(self):
if self._browser is None:
self._browser = Mock()
self._browser.audio_effects = []
self._browser.instruments = []
return self._browser
def application(self):
app = Mock()
app.get_major_version = Mock(return_value="12")
return app
class MockConnection:
"""Mock MCP TCP connection."""
def __init__(self):
self.commands: List[Dict[str, Any]] = []
self.responses: List[Dict[str, Any]] = []
def send(self, data: bytes):
try:
cmd = json.loads(data.decode())
self.commands.append(cmd)
except:
self.commands.append({"raw": data.decode()})
def recv(self, size: int) -> bytes:
response = {"status": "success", "result": {}}
self.responses.append(response)
return json.dumps(response).encode()
def send_command(self, cmd: Dict[str, Any]) -> Dict[str, Any]:
self.commands.append(cmd)
return {"status": "success", "result": {}}
# =============================================================================
# TEST: METADATA STORE
# =============================================================================
class TestMetadataStore(unittest.TestCase):
"""Test SQLite metadata store operations."""
def setUp(self):
"""Create temporary database for each test."""
if not SENIOR_ARCHITECTURE_AVAILABLE:
self.skipTest("Senior Architecture not available")
self.db_fd, self.db_path = tempfile.mkstemp(suffix='.db')
self.store = SampleMetadataStore(self.db_path)
self.store.init_database()
def tearDown(self):
"""Clean up temporary database."""
if hasattr(self, 'store'):
self.store.close()
if hasattr(self, 'db_fd'):
os.close(self.db_fd)
if hasattr(self, 'db_path') and os.path.exists(self.db_path):
os.unlink(self.db_path)
def test_init_database(self):
"""Test database initialization creates proper schema."""
# Verify tables exist
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
)
tables = {row[0] for row in cursor.fetchall()}
self.assertIn('samples', tables)
self.assertIn('sample_categories', tables)
self.assertIn('analysis_metadata', tables)
conn.close()
def test_init_database_creates_indexes(self):
"""Test that indexes are created for performance."""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index'"
)
indexes = {row[0] for row in cursor.fetchall()}
conn.close()
# Check for expected indexes
self.assertTrue(
any('key' in idx for idx in indexes),
"Key index should exist"
)
self.assertTrue(
any('bpm' in idx for idx in indexes),
"BPM index should exist"
)
def test_save_and_get_sample(self):
"""Test saving and retrieving sample features."""
features = SampleFeatures(
path="/test/kick.wav",
bpm=95.0,
key="Am",
duration=2.5,
rms=-12.0,
spectral_centroid=1500.0,
spectral_rolloff=8000.0,
zero_crossing_rate=0.05,
mfcc_1=0.1, mfcc_2=0.1, mfcc_3=0.1, mfcc_4=0.1,
mfcc_5=0.1, mfcc_6=0.1, mfcc_7=0.1, mfcc_8=0.1,
mfcc_9=0.1, mfcc_10=0.1, mfcc_11=0.1, mfcc_12=0.1, mfcc_13=0.1,
categories=["kick", "drums"]
)
# Save
result = self.store.save_sample_features("/test/kick.wav", features)
self.assertTrue(result)
# Retrieve
retrieved = self.store.get_sample_features("/test/kick.wav")
self.assertIsNotNone(retrieved)
self.assertEqual(retrieved.bpm, 95.0)
self.assertEqual(retrieved.key, "Am")
self.assertEqual(retrieved.duration, 2.5)
self.assertEqual(retrieved.rms, -12.0)
self.assertEqual(retrieved.mfcc_1, 0.1)
def test_sample_not_found(self):
"""Test querying non-existent sample returns None."""
result = self.store.get_sample_features("/nonexistent.wav")
self.assertIsNone(result)
def test_update_existing_sample(self):
"""Test updating existing sample overwrites previous data."""
# Save initial
features1 = SampleFeatures(
path="/test/snare.wav",
bpm=100.0,
key="Cm",
duration=1.0
)
self.store.save_sample_features("/test/snare.wav", features1)
# Update
features2 = SampleFeatures(
path="/test/snare.wav",
bpm=110.0,
key="Dm",
duration=1.2
)
self.store.save_sample_features("/test/snare.wav", features2)
# Verify update
retrieved = self.store.get_sample_features("/test/snare.wav")
self.assertEqual(retrieved.bpm, 110.0)
self.assertEqual(retrieved.key, "Dm")
def test_delete_sample(self):
"""Test deleting sample from database."""
# Save
features = SampleFeatures(path="/test/hihat.wav", bpm=120.0)
self.store.save_sample_features("/test/hihat.wav", features)
# Verify exists
self.assertIsNotNone(self.store.get_sample_features("/test/hihat.wav"))
# Delete
result = self.store.delete_sample("/test/hihat.wav")
self.assertTrue(result)
# Verify gone
self.assertIsNone(self.store.get_sample_features("/test/hihat.wav"))
def test_sample_exists_check(self):
"""Test sample existence check."""
# Non-existent
self.assertFalse(self.store.sample_exists("/test/new.wav"))
# Save
features = SampleFeatures(path="/test/exists.wav", bpm=95.0)
self.store.save_sample_features("/test/exists.wav", features)
# Existent
self.assertTrue(self.store.sample_exists("/test/exists.wav"))
def test_get_samples_by_category(self):
"""Test retrieving samples by category."""
# Save samples with categories
kick = SampleFeatures(path="/test/kick.wav", bpm=95.0, categories=["kick", "drums"])
snare = SampleFeatures(path="/test/snare.wav", bpm=100.0, categories=["snare", "drums"])
bass = SampleFeatures(path="/test/bass.wav", bpm=95.0, categories=["bass"])
self.store.save_sample_features(kick.path, kick)
self.store.save_sample_features(snare.path, snare)
self.store.save_sample_features(bass.path, bass)
# Query by category
drums = self.store.get_samples_by_category("drums")
self.assertEqual(len(drums), 2)
self.assertIn("/test/kick.wav", drums)
self.assertIn("/test/snare.wav", drums)
kicks = self.store.get_samples_by_category("kick")
self.assertEqual(len(kicks), 1)
basses = self.store.get_samples_by_category("bass")
self.assertEqual(len(basses), 1)
def test_search_samples_with_filters(self):
"""Test searching samples with multiple filters."""
# Save samples
samples = [
SampleFeatures("/test/kick1.wav", bpm=95.0, key="Am", categories=["kick"]),
SampleFeatures("/test/kick2.wav", bpm=100.0, key="Am", categories=["kick"]),
SampleFeatures("/test/kick3.wav", bpm=110.0, key="Cm", categories=["kick"]),
SampleFeatures("/test/snare1.wav", bpm=95.0, key="Am", categories=["snare"]),
]
for s in samples:
self.store.save_sample_features(s.path, s)
# Search with filters
result = self.store.search_samples(category="kick", key="Am")
self.assertEqual(len(result), 2)
result = self.store.search_samples(bpm_min=90.0, bpm_max=100.0)
self.assertEqual(len(result), 3)
result = self.store.search_samples(category="kick", bpm_min=100.0)
self.assertEqual(len(result), 2)
def test_get_stats(self):
"""Test retrieving database statistics."""
# Empty stats
stats = self.store.get_stats()
self.assertEqual(stats['total_samples'], 0)
# Add samples
for i in range(5):
features = SampleFeatures(
path=f"/test/sample{i}.wav",
bpm=95.0 + i,
categories=["drums"] if i < 3 else ["bass"]
)
self.store.save_sample_features(features.path, features)
# Check stats
stats = self.store.get_stats()
self.assertEqual(stats['total_samples'], 5)
self.assertEqual(stats['categories'].get('drums'), 3)
self.assertEqual(stats['categories'].get('bass'), 2)
# =============================================================================
# TEST: HYBRID EXTRACTOR
# =============================================================================
class TestHybridExtractor(unittest.TestCase):
"""Test hybrid extraction with database fallback."""
def setUp(self):
"""Set up test database and extractor."""
if not SENIOR_ARCHITECTURE_AVAILABLE:
self.skipTest("Senior Architecture not available")
self.db_fd, self.db_path = tempfile.mkstemp(suffix='.db')
# Use abstract_analyzer's SampleMetadataStore which has JSON mfccs column
from mcp_server.engines.abstract_analyzer import SampleMetadataStore as AnalyzerMetadataStore
from mcp_server.engines.abstract_analyzer import SampleFeatures as AnalyzerSampleFeatures
self.analyzer_store = AnalyzerMetadataStore(self.db_path)
# Pre-populate with test data using the analyzer's schema
features = AnalyzerSampleFeatures(
path="/test/snare.wav",
bpm=100.0,
key="Cm",
duration=1.0,
rms=-10.0,
spectral_centroid=2000.0,
spectral_rolloff=10000.0,
zero_crossing_rate=0.1,
mfccs=[0.2] * 13,
source="database"
)
self.analyzer_store.save(features)
def tearDown(self):
"""Clean up."""
if hasattr(self, 'analyzer_store'):
del self.analyzer_store
if hasattr(self, 'db_fd'):
os.close(self.db_fd)
if hasattr(self, 'db_path') and os.path.exists(self.db_path):
try:
os.unlink(self.db_path)
except PermissionError:
pass # File may be locked, will be cleaned up later
def test_database_extractor_cache_hit(self):
"""Test database-only extraction retrieves cached data."""
extractor = DatabaseExtractor(self.db_path)
# Mock file existence check
with patch.object(extractor, '_check_file_exists', return_value=True):
bpm = extractor.extract_bpm("/test/snare.wav")
self.assertEqual(bpm, 100.0)
def test_database_extractor_cache_miss(self):
"""Test database extractor returns None for missing sample."""
extractor = DatabaseExtractor(self.db_path)
# Mock file existence check
with patch.object(extractor, '_check_file_exists', return_value=True):
bpm = extractor.extract_bpm("/test/unknown.wav")
self.assertIsNone(bpm)
def test_hybrid_extractor_database_first(self):
"""Test hybrid extractor uses database when available."""
extractor = HybridExtractor(self.db_path)
# Mock file existence check on both extractors
with patch.object(extractor, '_check_file_exists', return_value=True):
with patch.object(extractor.db_extractor, '_check_file_exists', return_value=True):
features = extractor.get_or_analyze("/test/snare.wav")
self.assertIsNotNone(features)
self.assertEqual(features.bpm, 100.0)
self.assertEqual(features.key, "Cm")
self.assertEqual(features.source, "database")
def test_hybrid_extractor_extract_all_features(self):
"""Test extracting all features via hybrid extractor."""
extractor = HybridExtractor(self.db_path)
# Mock file existence check
with patch.object(extractor, '_check_file_exists', return_value=True):
with patch.object(extractor.db_extractor, '_check_file_exists', return_value=True):
features = extractor.extract_all_features("/test/snare.wav")
# Should get all cached features
self.assertEqual(features.bpm, 100.0)
self.assertEqual(features.key, "Cm")
self.assertEqual(features.duration, 1.0)
self.assertEqual(features.rms, -10.0)
def test_database_extractor_is_cached(self):
"""Test cache check functionality."""
extractor = DatabaseExtractor(self.db_path)
self.assertTrue(extractor.is_cached("/test/snare.wav"))
self.assertFalse(extractor.is_cached("/test/unknown.wav"))
def test_database_extractor_get_all_features(self):
"""Test getting all features from database."""
extractor = DatabaseExtractor(self.db_path)
# Mock file existence check
with patch.object(extractor, '_check_file_exists', return_value=True):
features = extractor.extract_all_features("/test/snare.wav")
self.assertEqual(features.path, "/test/snare.wav")
self.assertEqual(features.source, "database")
def test_database_extractor_not_found(self):
"""Test handling of non-existent sample."""
extractor = DatabaseExtractor(self.db_path)
# Mock file existence check
with patch.object(extractor, '_check_file_exists', return_value=True):
features = extractor.extract_all_features("/test/missing.wav")
self.assertEqual(features.source, "not_found")
# =============================================================================
# TEST: ARRANGEMENT RECORDER
# =============================================================================
class TestArrangementRecorder(unittest.TestCase):
"""Test arrangement recorder state machine."""
def setUp(self):
"""Set up mock song and recorder."""
if not SENIOR_ARCHITECTURE_AVAILABLE:
self.skipTest("Senior Architecture not available")
self.mock_song = MockSong()
# Add tracks and scenes
self.mock_song.create_audio_track()
self.mock_song.create_midi_track()
self.mock_song.scenes.append(MockScene("Scene 1"))
self.mock_connection = MockConnection()
self.recorder = ArrangementRecorder(self.mock_song, self.mock_connection)
def test_initial_state(self):
"""Test initial state is IDLE."""
self.assertEqual(self.recorder.get_state(), RecordingState.IDLE)
self.assertEqual(self.recorder.get_progress(), -1.0)
def test_arm_transition(self):
"""Test arming moves to ARMED state."""
config = RecordingConfig(
start_bar=0.0,
duration_bars=4.0,
tempo=95.0
)
result = self.recorder.arm(config)
self.assertTrue(result)
self.assertEqual(self.recorder.get_state(), RecordingState.ARMED)
def test_arm_invalid_config(self):
"""Test arming with invalid config fails."""
# Negative duration
with self.assertRaises(ValueError):
config = RecordingConfig(
start_bar=0.0,
duration_bars=-1.0,
tempo=95.0
)
self.recorder.arm(config)
def test_start_from_armed(self):
"""Test starting from ARMED state."""
config = RecordingConfig(
start_bar=0.0,
duration_bars=4.0,
pre_roll_bars=1.0,
tempo=95.0
)
self.recorder.arm(config)
result = self.recorder.start()
self.assertTrue(result)
self.assertEqual(self.recorder.get_state(), RecordingState.PRE_ROLL)
self.assertTrue(self.mock_song.arrangement_overdub)
def test_start_from_wrong_state(self):
"""Test starting from non-ARMED state fails."""
result = self.recorder.start()
self.assertFalse(result)
def test_stop_recording(self):
"""Test stopping recording."""
config = RecordingConfig(
start_bar=0.0,
duration_bars=4.0,
tempo=95.0
)
# Arm and start
self.recorder.arm(config)
self.recorder.start()
# Transition to recording manually
self.recorder._transition_to(RecordingState.RECORDING)
# Stop
result = self.recorder.stop()
self.assertTrue(result)
self.assertFalse(self.mock_song.arrangement_overdub)
def test_reset(self):
"""Test reset clears all state."""
config = RecordingConfig(
start_bar=0.0,
duration_bars=4.0,
tempo=95.0
)
# Arm
self.recorder.arm(config)
self.assertEqual(self.recorder.get_state(), RecordingState.ARMED)
# Reset
self.recorder.reset()
self.assertEqual(self.recorder.get_state(), RecordingState.IDLE)
self.assertEqual(self.recorder.get_progress(), -1.0)
self.assertEqual(len(self.recorder.get_new_clips()), 0)
def test_is_active(self):
"""Test is_active returns correct state."""
self.assertFalse(self.recorder.is_active())
# Arm
config = RecordingConfig(
start_bar=0.0,
duration_bars=4.0,
tempo=95.0
)
self.recorder.arm(config)
self.assertTrue(self.recorder.is_active())
# Reset
self.recorder.reset()
self.assertFalse(self.recorder.is_active())
def test_state_transitions(self):
"""Test complete state transition flow."""
states_seen = []
def on_state_change(old, new):
states_seen.append((old, new))
config = RecordingConfig(
start_bar=0.0,
duration_bars=4.0,
pre_roll_bars=0.0, # No pre-roll for immediate start
tempo=95.0,
on_state_change=on_state_change
)
# Arm
self.recorder.arm(config)
# Start
self.recorder.start()
# Verify state transitions
self.assertEqual(len(states_seen), 2)
self.assertEqual(states_seen[0], (RecordingState.IDLE, RecordingState.ARMED))
self.assertEqual(states_seen[1], (RecordingState.ARMED, RecordingState.PRE_ROLL))
def test_progress_callback(self):
"""Test progress callback is called."""
progress_values = []
def on_progress(p):
progress_values.append(p)
config = RecordingConfig(
start_bar=0.0,
duration_bars=4.0,
tempo=95.0,
on_progress=on_progress
)
# Arm and start pre-roll
self.recorder.arm(config)
self.recorder.start()
# Simulate update
self.recorder.update()
# Progress should have been called
self.assertTrue(len(progress_values) > 0 or self.recorder.get_state() != RecordingState.PRE_ROLL)
# =============================================================================
# TEST: LIVE BRIDGE
# =============================================================================
class TestLiveBridge(unittest.TestCase):
"""Test LiveBridge operations."""
def setUp(self):
"""Set up mock song and bridge."""
if not SENIOR_ARCHITECTURE_AVAILABLE:
self.skipTest("Senior Architecture not available")
self.mock_song = MockSong()
self.mock_song.create_audio_track()
self.mock_song.create_midi_track()
self.mock_connection = MockConnection()
self.bridge = AbletonLiveBridge(self.mock_song, self.mock_connection)
def test_create_bus_track(self):
"""Test bus track creation."""
result = self.bridge.create_bus_track("Drums Bus", "drums")
self.assertTrue(result['success'])
self.assertIn('track_index', result['data'])
self.assertEqual(result['data']['name'], "Drums Bus")
def test_create_return_track(self):
"""Test return track creation."""
result = self.bridge.create_return_track("Reverb", "Reverb")
self.assertTrue(result['success'])
self.assertIn('return_index', result['data'])
self.assertEqual(result['data']['name'], "Reverb")
def test_set_track_volume(self):
"""Test setting track volume."""
result = self.bridge.set_track_volume(0, 0.75)
self.assertTrue(result['success'])
self.assertEqual(self.mock_song.tracks[0].mixer_device.volume.value, 0.75)
def test_set_track_pan(self):
"""Test setting track pan."""
result = self.bridge.set_track_pan(0, -0.5)
self.assertTrue(result['success'])
self.assertEqual(self.mock_song.tracks[0].mixer_device.panning.value, -0.5)
def test_set_track_name(self):
"""Test setting track name."""
result = self.bridge.set_track_name(0, "Kick Track")
self.assertTrue(result['success'])
self.assertEqual(self.mock_song.tracks[0].name, "Kick Track")
def test_insert_device(self):
"""Test device insertion."""
# Setup mock browser with a device
mock_device = Mock()
mock_device.name = "Compressor"
self.mock_song._browser = Mock()
self.mock_song._browser.audio_effects = [mock_device]
self.mock_song._browser.instruments = []
result = self.bridge.insert_device(0, "Compressor")
# Should succeed even if device not found, or create track with device
self.assertIn('success', result)
if result['success']:
self.assertIn('device_index', result['data'])
else:
# Expected to fail with current mock setup
self.assertIn('not found', result['message'])
def test_set_track_send(self):
"""Test configuring track send."""
# First create a return track
self.mock_song.create_return_track()
self.mock_song.tracks[0].mixer_device.sends = [MockParameter("Send 1", 0.0)]
result = self.bridge.set_track_send(0, 0, 0.5)
self.assertTrue(result['success'])
def test_set_tempo(self):
"""Test setting project tempo."""
result = self.bridge.set_tempo(110.0)
self.assertTrue(result['success'])
self.assertEqual(self.mock_song.tempo, 110.0)
def test_start_stop_playback(self):
"""Test playback control."""
# Start
result = self.bridge.start_playback()
self.assertTrue(result['success'])
self.assertTrue(self.mock_song.is_playing)
# Stop
result = self.bridge.stop_playback()
self.assertTrue(result['success'])
self.assertFalse(self.mock_song.is_playing)
def test_route_track_to_bus(self):
"""Test routing track to bus."""
# Create bus first
bus_result = self.bridge.create_bus_track("Drum Bus")
bus_name = bus_result['data']['name']
# Route track to bus
result = self.bridge.route_track_to_bus(0, bus_name)
self.assertTrue(result['success'])
def test_insert_arrangement_midi(self):
"""Test inserting MIDI clip into arrangement."""
notes = [
{"pitch": 60, "start_time": 0.0, "duration": 0.25, "velocity": 100},
{"pitch": 62, "start_time": 0.5, "duration": 0.25, "velocity": 100},
]
result = self.bridge.insert_arrangement_midi(1, 4.0, 4.0, notes)
self.assertTrue(result['success'])
self.assertEqual(len(self.mock_song.tracks[1].arrangement_clips), 1)
def test_execute_mix_config(self):
"""Test executing full mix configuration."""
config = MixConfiguration(
track_index=0,
volume=0.8,
pan=0.2,
mute=False,
solo=False
)
result = self.bridge.execute_mix_config(config)
self.assertTrue(result['success'])
# =============================================================================
# TEST: INTEGRATION
# =============================================================================
class TestIntegration(unittest.TestCase):
"""Integration tests for component interactions."""
def setUp(self):
"""Set up integration test environment."""
if not SENIOR_ARCHITECTURE_AVAILABLE:
self.skipTest("Senior Architecture not available")
self.db_fd, self.db_path = tempfile.mkstemp(suffix='.db')
self.store = SampleMetadataStore(self.db_path)
self.store.init_database()
def tearDown(self):
"""Clean up."""
if hasattr(self, 'store'):
self.store.close()
if hasattr(self, 'db_fd'):
os.close(self.db_fd)
if hasattr(self, 'db_path') and os.path.exists(self.db_path):
os.unlink(self.db_path)
def test_metadata_to_sample_selection(self):
"""Test metadata store feeds sample selection workflow."""
# Add samples to metadata store
samples = [
SampleFeatures("/drums/kick1.wav", bpm=95.0, key="Am", categories=["kick", "drums"]),
SampleFeatures("/drums/kick2.wav", bpm=95.0, key="Am", categories=["kick", "drums"]),
SampleFeatures("/drums/snare1.wav", bpm=95.0, key="Am", categories=["snare", "drums"]),
SampleFeatures("/bass/bass1.wav", bpm=95.0, key="Am", categories=["bass"]),
]
for s in samples:
self.store.save_sample_features(s.path, s)
# Query by category
kicks = self.store.get_samples_by_category("kick")
self.assertEqual(len(kicks), 2)
# Query by BPM and key
matching = self.store.search_samples(bpm_min=90.0, bpm_max=100.0, key="Am")
self.assertEqual(len(matching), 4)
def test_extract_and_cache(self):
"""Test feature extraction and caching flow."""
# Use a separate database for abstract_analyzer to avoid schema conflicts
from mcp_server.engines.abstract_analyzer import SampleMetadataStore as AnalyzerStore
from mcp_server.engines.abstract_analyzer import SampleFeatures as AnalyzerFeatures
db_path = self.db_path + ".analyzer.db"
try:
# Create analyzer store with sample
analyzer_store = AnalyzerStore(db_path)
features = AnalyzerFeatures(
path="/test/sample.wav",
bpm=95.0,
key="Am",
mfccs=[0.1] * 13,
source="database"
)
analyzer_store.save(features)
# Create hybrid extractor
extractor = HybridExtractor(db_path)
# Check that sample is cached
self.assertTrue(extractor.store.exists("/test/sample.wav"))
# Mock file check and retrieve from cache
with patch.object(extractor, '_check_file_exists', return_value=True):
with patch.object(extractor.db_extractor, '_check_file_exists', return_value=True):
retrieved = extractor.extract_all_features("/test/sample.wav")
self.assertEqual(retrieved.source, "database")
del analyzer_store
finally:
# Cleanup
if os.path.exists(db_path):
try:
os.unlink(db_path)
except:
pass
def test_recorder_integration_with_song(self):
"""Test recorder with mock song."""
mock_song = MockSong()
mock_song.create_audio_track()
mock_song.create_midi_track()
mock_song.scenes.append(MockScene("Scene 1"))
mock_connection = MockConnection()
recorder = ArrangementRecorder(mock_song, mock_connection)
# Configure recording
config = RecordingConfig(
start_bar=0.0,
duration_bars=8.0,
tempo=95.0
)
# Arm should succeed with valid song
result = recorder.arm(config)
self.assertTrue(result)
def test_bridge_with_mix_config(self):
"""Test LiveBridge applying complete mix configuration."""
mock_song = MockSong()
mock_song.create_audio_track()
mock_song.create_midi_track()
bridge = AbletonLiveBridge(mock_song, MockConnection())
# Apply mix config to first track
config = MixConfiguration(
track_index=0,
volume=0.75,
pan=-0.3,
mute=False,
solo=False
)
result = bridge.execute_mix_config(config)
self.assertTrue(result['success'])
# Verify settings applied
track = mock_song.tracks[0]
self.assertEqual(track.mixer_device.volume.value, 0.75)
self.assertEqual(track.mixer_device.panning.value, -0.3)
# =============================================================================
# TEST: END-TO-END WORKFLOWS
# =============================================================================
class TestEndToEndWorkflows(unittest.TestCase):
"""End-to-end workflow tests."""
def setUp(self):
"""Set up complete test environment."""
if not SENIOR_ARCHITECTURE_AVAILABLE:
self.skipTest("Senior Architecture not available")
self.db_fd, self.db_path = tempfile.mkstemp(suffix='.db')
def tearDown(self):
"""Clean up."""
if hasattr(self, 'db_fd'):
os.close(self.db_fd)
if hasattr(self, 'db_path') and os.path.exists(self.db_path):
os.unlink(self.db_path)
def test_full_workflow_no_numpy(self):
"""Test complete workflow without numpy/librosa."""
# 1. Create metadata store
store = SampleMetadataStore(self.db_path)
store.init_database()
# 2. Add sample metadata manually (simulating pre-analyzed library)
samples = [
SampleFeatures("/drums/kick.wav", bpm=95.0, key="Am",
duration=1.0, rms=-10.0, spectral_centroid=100.0,
categories=["kick", "drums"]),
SampleFeatures("/drums/snare.wav", bpm=95.0, key="Am",
duration=1.0, rms=-12.0, spectral_centroid=2000.0,
categories=["snare", "drums"]),
SampleFeatures("/bass/bass.wav", bpm=95.0, key="Am",
duration=2.0, rms=-15.0, spectral_centroid=150.0,
categories=["bass"]),
]
for s in samples:
store.save_sample_features(s.path, s)
# 3. Query samples for production
kicks = store.search_samples(category="kick", key="Am")
self.assertEqual(len(kicks), 1)
drums = store.get_samples_by_category("drums")
self.assertEqual(len(drums), 2)
# 4. Create Ableton project via LiveBridge (mocked)
mock_song = MockSong()
mock_song.create_audio_track() # Drums
mock_song.create_audio_track() # Bass
mock_song.create_midi_track() # Melody
bridge = AbletonLiveBridge(mock_song, MockConnection())
# Name tracks
bridge.set_track_name(0, "Drums")
bridge.set_track_name(1, "Bass")
bridge.set_track_name(2, "Melody")
# Set volumes
bridge.set_track_volume(0, 0.8)
bridge.set_track_volume(1, 0.7)
bridge.set_track_volume(2, 0.75)
# Verify project setup
self.assertEqual(mock_song.tracks[0].name, "Drums")
self.assertEqual(mock_song.tracks[0].mixer_device.volume.value, 0.8)
# 5. Set up arrangement recording
mock_song.scenes.append(MockScene("Intro"))
mock_song.scenes.append(MockScene("Drop"))
recorder = ArrangementRecorder(mock_song, MockConnection())
config = RecordingConfig(
start_bar=0.0,
duration_bars=16.0,
tempo=95.0
)
arm_result = recorder.arm(config)
self.assertTrue(arm_result)
store.close()
def test_workflow_with_database_extractor(self):
"""Test workflow using database-only extraction."""
# Use abstract_analyzer's store for compatibility
from mcp_server.engines.abstract_analyzer import SampleMetadataStore as AnalyzerStore
from mcp_server.engines.abstract_analyzer import SampleFeatures as AnalyzerFeatures
# Set up metadata store
store = AnalyzerStore(self.db_path)
# Populate with test data
for i in range(10):
features = AnalyzerFeatures(
path=f"/samples/synth{i}.wav",
bpm=128.0,
key="Cm",
duration=4.0,
spectral_centroid=3000.0 + i * 100,
mfccs=[0.1] * 13,
source="database"
)
store.save(features)
del store # Release store
# Use database extractor with db_path
extractor = DatabaseExtractor(self.db_path)
# Retrieve all samples
all_samples = []
with patch.object(extractor, '_check_file_exists', return_value=True):
for i in range(10):
features = extractor.extract_all_features(f"/samples/synth{i}.wav")
all_samples.append(features)
self.assertEqual(len(all_samples), 10)
# Verify all have correct source
for s in all_samples:
self.assertEqual(s.source, "database")
def test_arrangement_to_database_workflow(self):
"""Test recording arrangement and storing metadata."""
# Create mock environment
mock_song = MockSong()
mock_song.create_audio_track()
mock_song.create_midi_track()
mock_song.scenes.append(MockScene("Scene 1"))
# Add some arrangement clips
mock_song.tracks[0].insert_clip("/samples/kick.wav", 0.0, 4.0)
mock_song.tracks[0].insert_clip("/samples/snare.wav", 4.0, 4.0)
# Set up metadata store
store = SampleMetadataStore(self.db_path)
store.init_database()
# Store metadata for clips
for clip in mock_song.tracks[0].arrangement_clips:
features = SampleFeatures(
path=f"/samples/{clip.name}.wav",
bpm=95.0,
duration=clip.end_time - clip.start_time
)
store.save_sample_features(features.path, features)
# Verify stored
self.assertEqual(store.get_stats()['total_samples'], 2)
store.close()
# =============================================================================
# TEST: SYSTEM CAPABILITIES
# =============================================================================
class TestSystemCapabilities(unittest.TestCase):
"""Test system capability detection."""
def test_get_system_capabilities(self):
"""Test capability detection returns proper structure."""
if not SENIOR_ARCHITECTURE_AVAILABLE:
self.skipTest("Senior Architecture not available")
capabilities = get_system_capabilities()
# Check required keys
self.assertIn('numpy', capabilities)
self.assertIn('librosa', capabilities)
self.assertIn('sqlite3', capabilities)
self.assertIn('python_version', capabilities)
self.assertIn('modules', capabilities)
self.assertIn('has_advanced_analysis', capabilities)
self.assertIn('has_metadata_db', capabilities)
# Check types
self.assertIsInstance(capabilities['numpy'], bool)
self.assertIsInstance(capabilities['librosa'], bool)
self.assertIsInstance(capabilities['sqlite3'], bool)
self.assertIsInstance(capabilities['modules'], dict)
def test_module_availability(self):
"""Test module availability checking."""
if not SENIOR_ARCHITECTURE_AVAILABLE:
self.skipTest("Senior Architecture not available")
# Check known modules
self.assertTrue(is_module_available("metadata_store"))
self.assertTrue(is_module_available("abstract_analyzer"))
self.assertTrue(is_module_available("arrangement_recorder"))
self.assertTrue(is_module_available("live_bridge"))
# =============================================================================
# TEST RUNNER
# =============================================================================
def run_tests():
"""Run all tests with detailed output."""
# Create test suite
loader = unittest.TestLoader()
suite = unittest.TestSuite()
# Add all test classes
test_classes = [
TestMetadataStore,
TestHybridExtractor,
TestArrangementRecorder,
TestLiveBridge,
TestIntegration,
TestEndToEndWorkflows,
TestSystemCapabilities,
]
for test_class in test_classes:
tests = loader.loadTestsFromTestCase(test_class)
suite.addTests(tests)
# Run with verbose output
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
# Print summary
print("\n" + "=" * 70)
print("TEST SUMMARY")
print("=" * 70)
print(f"Tests Run: {result.testsRun}")
print(f"Failures: {len(result.failures)}")
print(f"Errors: {len(result.errors)}")
print(f"Skipped: {len(result.skipped)}")
if result.wasSuccessful():
print("\n✅ All tests passed!")
else:
print("\n❌ Some tests failed!")
if result.failures:
print("\nFailures:")
for test, trace in result.failures:
print(f" - {test}")
if result.errors:
print("\nErrors:")
for test, trace in result.errors:
print(f" - {test}")
return result.wasSuccessful()
if __name__ == '__main__':
# Check if pytest is available for better output
try:
import pytest
# Use pytest if available
sys.exit(pytest.main([__file__, '-v']))
except ImportError:
# Fall back to unittest runner
success = run_tests()
sys.exit(0 if success else 1)