- Add _cmd_create_arrangement_audio_pattern with 5-method fallback chain - Method 1: track.insert_arrangement_clip() [Live 12+] - Method 2: track.create_audio_clip() [Live 11+] - Method 3: arrangement_clips.add_new_clip() [Live 12+] - Method 4: Session->duplicate_clip_to_arrangement [Legacy] - Method 5: Session->Recording [Universal] - Add _cmd_duplicate_clip_to_arrangement for session-to-arrangement workflow - Update skills documentation - Verified: 3 clips created at positions [0, 4, 8] in Arrangement View Closes: Audio injection in Arrangement View
1301 lines
46 KiB
Python
1301 lines
46 KiB
Python
"""Comprehensive tests for Senior Architecture (v3.0).
|
|
|
|
Test Categories:
|
|
1. Metadata Store Tests - SQLite database operations
|
|
2. Hybrid Extractor Tests - Database + librosa analysis
|
|
3. Arrangement Recorder Tests - State machine for recording
|
|
4. LiveBridge Tests - Direct Ableton API execution
|
|
5. Integration Tests - Component interactions
|
|
6. End-to-End Workflow Tests - Complete workflows
|
|
|
|
Usage:
|
|
# Run all tests
|
|
python test_senior_architecture.py
|
|
|
|
# Run specific test class
|
|
python test_senior_architecture.py TestMetadataStore
|
|
|
|
# Run with verbose output
|
|
python test_senior_architecture.py -v
|
|
|
|
Requirements:
|
|
- pytest (optional, for better output)
|
|
- unittest (standard library)
|
|
- tempfile, sqlite3, json (standard library)
|
|
- Optional: numpy, librosa (for hybrid extractor tests)
|
|
|
|
Test Coverage:
|
|
- Database initialization and CRUD operations
|
|
- Feature extraction with database caching
|
|
- Recording state machine transitions
|
|
- Live API bridge operations (mocked)
|
|
- Full workflow without numpy
|
|
- Full workflow with numpy (if available)
|
|
"""
|
|
|
|
import unittest
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import sqlite3
|
|
import json
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional, List, Dict, Any, Callable, Tuple, Set
|
|
from enum import Enum, auto
|
|
from unittest.mock import Mock, MagicMock, patch, call
|
|
|
|
# Configure logging for tests
|
|
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Add parent to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
# Try importing Senior Architecture components
|
|
try:
|
|
from mcp_server.engines.metadata_store import SampleMetadataStore, SampleFeatures
|
|
from mcp_server.engines.abstract_analyzer import (
|
|
HybridExtractor, DatabaseExtractor, LibrosaExtractor
|
|
)
|
|
from mcp_server.engines.arrangement_recorder import (
|
|
ArrangementRecorder, RecordingState, RecordingConfig
|
|
)
|
|
from mcp_server.engines.live_bridge import (
|
|
AbletonLiveBridge, MixConfiguration, CompressorSettings
|
|
)
|
|
from mcp_server.engines import get_system_capabilities, is_module_available
|
|
SENIOR_ARCHITECTURE_AVAILABLE = True
|
|
logger.info("Senior Architecture components imported successfully")
|
|
except ImportError as e:
|
|
logger.warning(f"Could not import Senior Architecture components: {e}")
|
|
SENIOR_ARCHITECTURE_AVAILABLE = False
|
|
|
|
|
|
# =============================================================================
|
|
# MOCK CLASSES FOR ABLETON LIVE API
|
|
# =============================================================================
|
|
|
|
class MockParameter:
|
|
"""Mock parameter object for Ableton Live."""
|
|
def __init__(self, name: str, value: Any = 0.0, min_val: float = 0.0, max_val: float = 1.0):
|
|
self.name = name
|
|
self.value = value
|
|
self.min = min_val
|
|
self.max = max_val
|
|
|
|
|
|
class MockMixerDevice:
|
|
"""Mock mixer device for Ableton tracks."""
|
|
def __init__(self):
|
|
self.volume = MockParameter("Volume", 0.85)
|
|
self.panning = MockParameter("Panning", 0.0, -1.0, 1.0)
|
|
self.sends: List[MockParameter] = []
|
|
|
|
|
|
class MockClip:
|
|
"""Mock clip for Ableton Live."""
|
|
def __init__(self, name: str = "Clip", start_time: float = 0.0, end_time: float = 4.0):
|
|
self.name = name
|
|
self.start_time = start_time
|
|
self.end_time = end_time
|
|
self.warping = False
|
|
self.looping = False
|
|
self.parameters: List[MockParameter] = []
|
|
self.notes: List[Dict[str, Any]] = []
|
|
|
|
def add_note(self, pitch: int, start: float, duration: float, velocity: int, muted: bool = False):
|
|
self.notes.append({
|
|
"pitch": pitch,
|
|
"start_time": start,
|
|
"duration": duration,
|
|
"velocity": velocity,
|
|
"muted": muted
|
|
})
|
|
|
|
|
|
class MockTrack:
|
|
"""Mock track for Ableton Live."""
|
|
|
|
def __init__(self, name: str = "Track", track_type: str = "audio"):
|
|
self.name = name
|
|
self.type = track_type # "audio" or "midi"
|
|
self.clip_slots: List[Optional[MockClip]] = []
|
|
self.arrangement_clips: List[MockClip] = []
|
|
self.devices: List[Mock] = []
|
|
self.mixer_device = MockMixerDevice()
|
|
self.mute = False
|
|
self.solo = False
|
|
self.output_routing_type = None
|
|
self.group_track = None
|
|
|
|
def insert_clip(self, file_path: str, start_bar: float, duration: float):
|
|
clip = MockClip(f"Clip_{len(self.arrangement_clips)}", start_bar, start_bar + duration)
|
|
self.arrangement_clips.append(clip)
|
|
return clip
|
|
|
|
def create_clip(self, start_bar: float, duration: float):
|
|
clip = MockClip(f"MIDI_Clip_{len(self.arrangement_clips)}", start_bar, start_bar + duration)
|
|
self.arrangement_clips.append(clip)
|
|
return clip
|
|
|
|
def load_device(self, device: Any):
|
|
mock_device = Mock()
|
|
mock_device.name = str(device) if not isinstance(device, str) else device
|
|
mock_device.parameters = [
|
|
MockParameter("Threshold", -20.0, -60.0, 0.0),
|
|
MockParameter("Ratio", 4.0, 1.0, 20.0),
|
|
]
|
|
self.devices.append(mock_device)
|
|
return len(self.devices) - 1
|
|
|
|
def delete_device(self, index: int):
|
|
if 0 <= index < len(self.devices):
|
|
self.devices.pop(index)
|
|
|
|
|
|
class MockScene:
|
|
"""Mock scene for Ableton Live Session View."""
|
|
def __init__(self, name: str = "Scene"):
|
|
self.name = name
|
|
self._fired = False
|
|
|
|
def fire(self):
|
|
self._fired = True
|
|
|
|
|
|
class MockSong:
|
|
"""Mock Ableton Live song object for testing."""
|
|
|
|
def __init__(self):
|
|
self.tracks: List[MockTrack] = []
|
|
self.scenes: List[MockScene] = []
|
|
self.return_tracks: List[MockTrack] = []
|
|
self.tempo = 120.0
|
|
self.current_song_time = 0.0
|
|
self.arrangement_overdub = False
|
|
self.is_playing = False
|
|
self.signature_numerator = 4
|
|
self.last_event_time = 0.0
|
|
self._browser = None
|
|
|
|
def start_playing(self):
|
|
self.is_playing = True
|
|
|
|
def stop_playing(self):
|
|
self.is_playing = False
|
|
|
|
def create_midi_track(self, index: int = -1):
|
|
track = MockTrack(f"MIDI Track {len(self.tracks)}", "midi")
|
|
if index < 0:
|
|
self.tracks.append(track)
|
|
else:
|
|
self.tracks.insert(index, track)
|
|
return track
|
|
|
|
def create_audio_track(self, index: int = -1):
|
|
track = MockTrack(f"Audio Track {len(self.tracks)}", "audio")
|
|
if index < 0:
|
|
self.tracks.append(track)
|
|
else:
|
|
self.tracks.insert(index, track)
|
|
return track
|
|
|
|
def create_return_track(self):
|
|
track = MockTrack(f"Return {len(self.return_tracks)}", "return")
|
|
self.return_tracks.append(track)
|
|
return track
|
|
|
|
@property
|
|
def browser(self):
|
|
if self._browser is None:
|
|
self._browser = Mock()
|
|
self._browser.audio_effects = []
|
|
self._browser.instruments = []
|
|
return self._browser
|
|
|
|
def application(self):
|
|
app = Mock()
|
|
app.get_major_version = Mock(return_value="12")
|
|
return app
|
|
|
|
|
|
class MockConnection:
|
|
"""Mock MCP TCP connection."""
|
|
|
|
def __init__(self):
|
|
self.commands: List[Dict[str, Any]] = []
|
|
self.responses: List[Dict[str, Any]] = []
|
|
|
|
def send(self, data: bytes):
|
|
try:
|
|
cmd = json.loads(data.decode())
|
|
self.commands.append(cmd)
|
|
except:
|
|
self.commands.append({"raw": data.decode()})
|
|
|
|
def recv(self, size: int) -> bytes:
|
|
response = {"status": "success", "result": {}}
|
|
self.responses.append(response)
|
|
return json.dumps(response).encode()
|
|
|
|
def send_command(self, cmd: Dict[str, Any]) -> Dict[str, Any]:
|
|
self.commands.append(cmd)
|
|
return {"status": "success", "result": {}}
|
|
|
|
|
|
# =============================================================================
|
|
# TEST: METADATA STORE
|
|
# =============================================================================
|
|
|
|
class TestMetadataStore(unittest.TestCase):
|
|
"""Test SQLite metadata store operations."""
|
|
|
|
def setUp(self):
|
|
"""Create temporary database for each test."""
|
|
if not SENIOR_ARCHITECTURE_AVAILABLE:
|
|
self.skipTest("Senior Architecture not available")
|
|
|
|
self.db_fd, self.db_path = tempfile.mkstemp(suffix='.db')
|
|
self.store = SampleMetadataStore(self.db_path)
|
|
self.store.init_database()
|
|
|
|
def tearDown(self):
|
|
"""Clean up temporary database."""
|
|
if hasattr(self, 'store'):
|
|
self.store.close()
|
|
if hasattr(self, 'db_fd'):
|
|
os.close(self.db_fd)
|
|
if hasattr(self, 'db_path') and os.path.exists(self.db_path):
|
|
os.unlink(self.db_path)
|
|
|
|
def test_init_database(self):
|
|
"""Test database initialization creates proper schema."""
|
|
# Verify tables exist
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
|
)
|
|
tables = {row[0] for row in cursor.fetchall()}
|
|
|
|
self.assertIn('samples', tables)
|
|
self.assertIn('sample_categories', tables)
|
|
self.assertIn('analysis_metadata', tables)
|
|
conn.close()
|
|
|
|
def test_init_database_creates_indexes(self):
|
|
"""Test that indexes are created for performance."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index'"
|
|
)
|
|
indexes = {row[0] for row in cursor.fetchall()}
|
|
conn.close()
|
|
|
|
# Check for expected indexes
|
|
self.assertTrue(
|
|
any('key' in idx for idx in indexes),
|
|
"Key index should exist"
|
|
)
|
|
self.assertTrue(
|
|
any('bpm' in idx for idx in indexes),
|
|
"BPM index should exist"
|
|
)
|
|
|
|
def test_save_and_get_sample(self):
|
|
"""Test saving and retrieving sample features."""
|
|
features = SampleFeatures(
|
|
path="/test/kick.wav",
|
|
bpm=95.0,
|
|
key="Am",
|
|
duration=2.5,
|
|
rms=-12.0,
|
|
spectral_centroid=1500.0,
|
|
spectral_rolloff=8000.0,
|
|
zero_crossing_rate=0.05,
|
|
mfcc_1=0.1, mfcc_2=0.1, mfcc_3=0.1, mfcc_4=0.1,
|
|
mfcc_5=0.1, mfcc_6=0.1, mfcc_7=0.1, mfcc_8=0.1,
|
|
mfcc_9=0.1, mfcc_10=0.1, mfcc_11=0.1, mfcc_12=0.1, mfcc_13=0.1,
|
|
categories=["kick", "drums"]
|
|
)
|
|
|
|
# Save
|
|
result = self.store.save_sample_features("/test/kick.wav", features)
|
|
self.assertTrue(result)
|
|
|
|
# Retrieve
|
|
retrieved = self.store.get_sample_features("/test/kick.wav")
|
|
|
|
self.assertIsNotNone(retrieved)
|
|
self.assertEqual(retrieved.bpm, 95.0)
|
|
self.assertEqual(retrieved.key, "Am")
|
|
self.assertEqual(retrieved.duration, 2.5)
|
|
self.assertEqual(retrieved.rms, -12.0)
|
|
self.assertEqual(retrieved.mfcc_1, 0.1)
|
|
|
|
def test_sample_not_found(self):
|
|
"""Test querying non-existent sample returns None."""
|
|
result = self.store.get_sample_features("/nonexistent.wav")
|
|
self.assertIsNone(result)
|
|
|
|
def test_update_existing_sample(self):
|
|
"""Test updating existing sample overwrites previous data."""
|
|
# Save initial
|
|
features1 = SampleFeatures(
|
|
path="/test/snare.wav",
|
|
bpm=100.0,
|
|
key="Cm",
|
|
duration=1.0
|
|
)
|
|
self.store.save_sample_features("/test/snare.wav", features1)
|
|
|
|
# Update
|
|
features2 = SampleFeatures(
|
|
path="/test/snare.wav",
|
|
bpm=110.0,
|
|
key="Dm",
|
|
duration=1.2
|
|
)
|
|
self.store.save_sample_features("/test/snare.wav", features2)
|
|
|
|
# Verify update
|
|
retrieved = self.store.get_sample_features("/test/snare.wav")
|
|
self.assertEqual(retrieved.bpm, 110.0)
|
|
self.assertEqual(retrieved.key, "Dm")
|
|
|
|
def test_delete_sample(self):
|
|
"""Test deleting sample from database."""
|
|
# Save
|
|
features = SampleFeatures(path="/test/hihat.wav", bpm=120.0)
|
|
self.store.save_sample_features("/test/hihat.wav", features)
|
|
|
|
# Verify exists
|
|
self.assertIsNotNone(self.store.get_sample_features("/test/hihat.wav"))
|
|
|
|
# Delete
|
|
result = self.store.delete_sample("/test/hihat.wav")
|
|
self.assertTrue(result)
|
|
|
|
# Verify gone
|
|
self.assertIsNone(self.store.get_sample_features("/test/hihat.wav"))
|
|
|
|
def test_sample_exists_check(self):
|
|
"""Test sample existence check."""
|
|
# Non-existent
|
|
self.assertFalse(self.store.sample_exists("/test/new.wav"))
|
|
|
|
# Save
|
|
features = SampleFeatures(path="/test/exists.wav", bpm=95.0)
|
|
self.store.save_sample_features("/test/exists.wav", features)
|
|
|
|
# Existent
|
|
self.assertTrue(self.store.sample_exists("/test/exists.wav"))
|
|
|
|
def test_get_samples_by_category(self):
|
|
"""Test retrieving samples by category."""
|
|
# Save samples with categories
|
|
kick = SampleFeatures(path="/test/kick.wav", bpm=95.0, categories=["kick", "drums"])
|
|
snare = SampleFeatures(path="/test/snare.wav", bpm=100.0, categories=["snare", "drums"])
|
|
bass = SampleFeatures(path="/test/bass.wav", bpm=95.0, categories=["bass"])
|
|
|
|
self.store.save_sample_features(kick.path, kick)
|
|
self.store.save_sample_features(snare.path, snare)
|
|
self.store.save_sample_features(bass.path, bass)
|
|
|
|
# Query by category
|
|
drums = self.store.get_samples_by_category("drums")
|
|
self.assertEqual(len(drums), 2)
|
|
self.assertIn("/test/kick.wav", drums)
|
|
self.assertIn("/test/snare.wav", drums)
|
|
|
|
kicks = self.store.get_samples_by_category("kick")
|
|
self.assertEqual(len(kicks), 1)
|
|
|
|
basses = self.store.get_samples_by_category("bass")
|
|
self.assertEqual(len(basses), 1)
|
|
|
|
def test_search_samples_with_filters(self):
|
|
"""Test searching samples with multiple filters."""
|
|
# Save samples
|
|
samples = [
|
|
SampleFeatures("/test/kick1.wav", bpm=95.0, key="Am", categories=["kick"]),
|
|
SampleFeatures("/test/kick2.wav", bpm=100.0, key="Am", categories=["kick"]),
|
|
SampleFeatures("/test/kick3.wav", bpm=110.0, key="Cm", categories=["kick"]),
|
|
SampleFeatures("/test/snare1.wav", bpm=95.0, key="Am", categories=["snare"]),
|
|
]
|
|
for s in samples:
|
|
self.store.save_sample_features(s.path, s)
|
|
|
|
# Search with filters
|
|
result = self.store.search_samples(category="kick", key="Am")
|
|
self.assertEqual(len(result), 2)
|
|
|
|
result = self.store.search_samples(bpm_min=90.0, bpm_max=100.0)
|
|
self.assertEqual(len(result), 3)
|
|
|
|
result = self.store.search_samples(category="kick", bpm_min=100.0)
|
|
self.assertEqual(len(result), 2)
|
|
|
|
def test_get_stats(self):
|
|
"""Test retrieving database statistics."""
|
|
# Empty stats
|
|
stats = self.store.get_stats()
|
|
self.assertEqual(stats['total_samples'], 0)
|
|
|
|
# Add samples
|
|
for i in range(5):
|
|
features = SampleFeatures(
|
|
path=f"/test/sample{i}.wav",
|
|
bpm=95.0 + i,
|
|
categories=["drums"] if i < 3 else ["bass"]
|
|
)
|
|
self.store.save_sample_features(features.path, features)
|
|
|
|
# Check stats
|
|
stats = self.store.get_stats()
|
|
self.assertEqual(stats['total_samples'], 5)
|
|
self.assertEqual(stats['categories'].get('drums'), 3)
|
|
self.assertEqual(stats['categories'].get('bass'), 2)
|
|
|
|
|
|
# =============================================================================
|
|
# TEST: HYBRID EXTRACTOR
|
|
# =============================================================================
|
|
|
|
class TestHybridExtractor(unittest.TestCase):
|
|
"""Test hybrid extraction with database fallback."""
|
|
|
|
def setUp(self):
|
|
"""Set up test database and extractor."""
|
|
if not SENIOR_ARCHITECTURE_AVAILABLE:
|
|
self.skipTest("Senior Architecture not available")
|
|
|
|
self.db_fd, self.db_path = tempfile.mkstemp(suffix='.db')
|
|
|
|
# Use abstract_analyzer's SampleMetadataStore which has JSON mfccs column
|
|
from mcp_server.engines.abstract_analyzer import SampleMetadataStore as AnalyzerMetadataStore
|
|
from mcp_server.engines.abstract_analyzer import SampleFeatures as AnalyzerSampleFeatures
|
|
|
|
self.analyzer_store = AnalyzerMetadataStore(self.db_path)
|
|
|
|
# Pre-populate with test data using the analyzer's schema
|
|
features = AnalyzerSampleFeatures(
|
|
path="/test/snare.wav",
|
|
bpm=100.0,
|
|
key="Cm",
|
|
duration=1.0,
|
|
rms=-10.0,
|
|
spectral_centroid=2000.0,
|
|
spectral_rolloff=10000.0,
|
|
zero_crossing_rate=0.1,
|
|
mfccs=[0.2] * 13,
|
|
source="database"
|
|
)
|
|
self.analyzer_store.save(features)
|
|
|
|
def tearDown(self):
|
|
"""Clean up."""
|
|
if hasattr(self, 'analyzer_store'):
|
|
del self.analyzer_store
|
|
if hasattr(self, 'db_fd'):
|
|
os.close(self.db_fd)
|
|
if hasattr(self, 'db_path') and os.path.exists(self.db_path):
|
|
try:
|
|
os.unlink(self.db_path)
|
|
except PermissionError:
|
|
pass # File may be locked, will be cleaned up later
|
|
|
|
def test_database_extractor_cache_hit(self):
|
|
"""Test database-only extraction retrieves cached data."""
|
|
extractor = DatabaseExtractor(self.db_path)
|
|
# Mock file existence check
|
|
with patch.object(extractor, '_check_file_exists', return_value=True):
|
|
bpm = extractor.extract_bpm("/test/snare.wav")
|
|
self.assertEqual(bpm, 100.0)
|
|
|
|
def test_database_extractor_cache_miss(self):
|
|
"""Test database extractor returns None for missing sample."""
|
|
extractor = DatabaseExtractor(self.db_path)
|
|
# Mock file existence check
|
|
with patch.object(extractor, '_check_file_exists', return_value=True):
|
|
bpm = extractor.extract_bpm("/test/unknown.wav")
|
|
self.assertIsNone(bpm)
|
|
|
|
def test_hybrid_extractor_database_first(self):
|
|
"""Test hybrid extractor uses database when available."""
|
|
extractor = HybridExtractor(self.db_path)
|
|
# Mock file existence check on both extractors
|
|
with patch.object(extractor, '_check_file_exists', return_value=True):
|
|
with patch.object(extractor.db_extractor, '_check_file_exists', return_value=True):
|
|
features = extractor.get_or_analyze("/test/snare.wav")
|
|
|
|
self.assertIsNotNone(features)
|
|
self.assertEqual(features.bpm, 100.0)
|
|
self.assertEqual(features.key, "Cm")
|
|
self.assertEqual(features.source, "database")
|
|
|
|
def test_hybrid_extractor_extract_all_features(self):
|
|
"""Test extracting all features via hybrid extractor."""
|
|
extractor = HybridExtractor(self.db_path)
|
|
# Mock file existence check
|
|
with patch.object(extractor, '_check_file_exists', return_value=True):
|
|
with patch.object(extractor.db_extractor, '_check_file_exists', return_value=True):
|
|
features = extractor.extract_all_features("/test/snare.wav")
|
|
|
|
# Should get all cached features
|
|
self.assertEqual(features.bpm, 100.0)
|
|
self.assertEqual(features.key, "Cm")
|
|
self.assertEqual(features.duration, 1.0)
|
|
self.assertEqual(features.rms, -10.0)
|
|
|
|
def test_database_extractor_is_cached(self):
|
|
"""Test cache check functionality."""
|
|
extractor = DatabaseExtractor(self.db_path)
|
|
|
|
self.assertTrue(extractor.is_cached("/test/snare.wav"))
|
|
self.assertFalse(extractor.is_cached("/test/unknown.wav"))
|
|
|
|
def test_database_extractor_get_all_features(self):
|
|
"""Test getting all features from database."""
|
|
extractor = DatabaseExtractor(self.db_path)
|
|
# Mock file existence check
|
|
with patch.object(extractor, '_check_file_exists', return_value=True):
|
|
features = extractor.extract_all_features("/test/snare.wav")
|
|
|
|
self.assertEqual(features.path, "/test/snare.wav")
|
|
self.assertEqual(features.source, "database")
|
|
|
|
def test_database_extractor_not_found(self):
|
|
"""Test handling of non-existent sample."""
|
|
extractor = DatabaseExtractor(self.db_path)
|
|
# Mock file existence check
|
|
with patch.object(extractor, '_check_file_exists', return_value=True):
|
|
features = extractor.extract_all_features("/test/missing.wav")
|
|
|
|
self.assertEqual(features.source, "not_found")
|
|
|
|
|
|
# =============================================================================
|
|
# TEST: ARRANGEMENT RECORDER
|
|
# =============================================================================
|
|
|
|
class TestArrangementRecorder(unittest.TestCase):
|
|
"""Test arrangement recorder state machine."""
|
|
|
|
def setUp(self):
|
|
"""Set up mock song and recorder."""
|
|
if not SENIOR_ARCHITECTURE_AVAILABLE:
|
|
self.skipTest("Senior Architecture not available")
|
|
|
|
self.mock_song = MockSong()
|
|
# Add tracks and scenes
|
|
self.mock_song.create_audio_track()
|
|
self.mock_song.create_midi_track()
|
|
self.mock_song.scenes.append(MockScene("Scene 1"))
|
|
|
|
self.mock_connection = MockConnection()
|
|
self.recorder = ArrangementRecorder(self.mock_song, self.mock_connection)
|
|
|
|
def test_initial_state(self):
|
|
"""Test initial state is IDLE."""
|
|
self.assertEqual(self.recorder.get_state(), RecordingState.IDLE)
|
|
self.assertEqual(self.recorder.get_progress(), -1.0)
|
|
|
|
def test_arm_transition(self):
|
|
"""Test arming moves to ARMED state."""
|
|
config = RecordingConfig(
|
|
start_bar=0.0,
|
|
duration_bars=4.0,
|
|
tempo=95.0
|
|
)
|
|
|
|
result = self.recorder.arm(config)
|
|
|
|
self.assertTrue(result)
|
|
self.assertEqual(self.recorder.get_state(), RecordingState.ARMED)
|
|
|
|
def test_arm_invalid_config(self):
|
|
"""Test arming with invalid config fails."""
|
|
# Negative duration
|
|
with self.assertRaises(ValueError):
|
|
config = RecordingConfig(
|
|
start_bar=0.0,
|
|
duration_bars=-1.0,
|
|
tempo=95.0
|
|
)
|
|
self.recorder.arm(config)
|
|
|
|
def test_start_from_armed(self):
|
|
"""Test starting from ARMED state."""
|
|
config = RecordingConfig(
|
|
start_bar=0.0,
|
|
duration_bars=4.0,
|
|
pre_roll_bars=1.0,
|
|
tempo=95.0
|
|
)
|
|
|
|
self.recorder.arm(config)
|
|
result = self.recorder.start()
|
|
|
|
self.assertTrue(result)
|
|
self.assertEqual(self.recorder.get_state(), RecordingState.PRE_ROLL)
|
|
self.assertTrue(self.mock_song.arrangement_overdub)
|
|
|
|
def test_start_from_wrong_state(self):
|
|
"""Test starting from non-ARMED state fails."""
|
|
result = self.recorder.start()
|
|
self.assertFalse(result)
|
|
|
|
def test_stop_recording(self):
|
|
"""Test stopping recording."""
|
|
config = RecordingConfig(
|
|
start_bar=0.0,
|
|
duration_bars=4.0,
|
|
tempo=95.0
|
|
)
|
|
|
|
# Arm and start
|
|
self.recorder.arm(config)
|
|
self.recorder.start()
|
|
|
|
# Transition to recording manually
|
|
self.recorder._transition_to(RecordingState.RECORDING)
|
|
|
|
# Stop
|
|
result = self.recorder.stop()
|
|
|
|
self.assertTrue(result)
|
|
self.assertFalse(self.mock_song.arrangement_overdub)
|
|
|
|
def test_reset(self):
|
|
"""Test reset clears all state."""
|
|
config = RecordingConfig(
|
|
start_bar=0.0,
|
|
duration_bars=4.0,
|
|
tempo=95.0
|
|
)
|
|
|
|
# Arm
|
|
self.recorder.arm(config)
|
|
self.assertEqual(self.recorder.get_state(), RecordingState.ARMED)
|
|
|
|
# Reset
|
|
self.recorder.reset()
|
|
|
|
self.assertEqual(self.recorder.get_state(), RecordingState.IDLE)
|
|
self.assertEqual(self.recorder.get_progress(), -1.0)
|
|
self.assertEqual(len(self.recorder.get_new_clips()), 0)
|
|
|
|
def test_is_active(self):
|
|
"""Test is_active returns correct state."""
|
|
self.assertFalse(self.recorder.is_active())
|
|
|
|
# Arm
|
|
config = RecordingConfig(
|
|
start_bar=0.0,
|
|
duration_bars=4.0,
|
|
tempo=95.0
|
|
)
|
|
self.recorder.arm(config)
|
|
|
|
self.assertTrue(self.recorder.is_active())
|
|
|
|
# Reset
|
|
self.recorder.reset()
|
|
self.assertFalse(self.recorder.is_active())
|
|
|
|
def test_state_transitions(self):
|
|
"""Test complete state transition flow."""
|
|
states_seen = []
|
|
|
|
def on_state_change(old, new):
|
|
states_seen.append((old, new))
|
|
|
|
config = RecordingConfig(
|
|
start_bar=0.0,
|
|
duration_bars=4.0,
|
|
pre_roll_bars=0.0, # No pre-roll for immediate start
|
|
tempo=95.0,
|
|
on_state_change=on_state_change
|
|
)
|
|
|
|
# Arm
|
|
self.recorder.arm(config)
|
|
|
|
# Start
|
|
self.recorder.start()
|
|
|
|
# Verify state transitions
|
|
self.assertEqual(len(states_seen), 2)
|
|
self.assertEqual(states_seen[0], (RecordingState.IDLE, RecordingState.ARMED))
|
|
self.assertEqual(states_seen[1], (RecordingState.ARMED, RecordingState.PRE_ROLL))
|
|
|
|
def test_progress_callback(self):
|
|
"""Test progress callback is called."""
|
|
progress_values = []
|
|
|
|
def on_progress(p):
|
|
progress_values.append(p)
|
|
|
|
config = RecordingConfig(
|
|
start_bar=0.0,
|
|
duration_bars=4.0,
|
|
tempo=95.0,
|
|
on_progress=on_progress
|
|
)
|
|
|
|
# Arm and start pre-roll
|
|
self.recorder.arm(config)
|
|
self.recorder.start()
|
|
|
|
# Simulate update
|
|
self.recorder.update()
|
|
|
|
# Progress should have been called
|
|
self.assertTrue(len(progress_values) > 0 or self.recorder.get_state() != RecordingState.PRE_ROLL)
|
|
|
|
|
|
# =============================================================================
|
|
# TEST: LIVE BRIDGE
|
|
# =============================================================================
|
|
|
|
class TestLiveBridge(unittest.TestCase):
|
|
"""Test LiveBridge operations."""
|
|
|
|
def setUp(self):
|
|
"""Set up mock song and bridge."""
|
|
if not SENIOR_ARCHITECTURE_AVAILABLE:
|
|
self.skipTest("Senior Architecture not available")
|
|
|
|
self.mock_song = MockSong()
|
|
self.mock_song.create_audio_track()
|
|
self.mock_song.create_midi_track()
|
|
|
|
self.mock_connection = MockConnection()
|
|
self.bridge = AbletonLiveBridge(self.mock_song, self.mock_connection)
|
|
|
|
def test_create_bus_track(self):
|
|
"""Test bus track creation."""
|
|
result = self.bridge.create_bus_track("Drums Bus", "drums")
|
|
|
|
self.assertTrue(result['success'])
|
|
self.assertIn('track_index', result['data'])
|
|
self.assertEqual(result['data']['name'], "Drums Bus")
|
|
|
|
def test_create_return_track(self):
|
|
"""Test return track creation."""
|
|
result = self.bridge.create_return_track("Reverb", "Reverb")
|
|
|
|
self.assertTrue(result['success'])
|
|
self.assertIn('return_index', result['data'])
|
|
self.assertEqual(result['data']['name'], "Reverb")
|
|
|
|
def test_set_track_volume(self):
|
|
"""Test setting track volume."""
|
|
result = self.bridge.set_track_volume(0, 0.75)
|
|
|
|
self.assertTrue(result['success'])
|
|
self.assertEqual(self.mock_song.tracks[0].mixer_device.volume.value, 0.75)
|
|
|
|
def test_set_track_pan(self):
|
|
"""Test setting track pan."""
|
|
result = self.bridge.set_track_pan(0, -0.5)
|
|
|
|
self.assertTrue(result['success'])
|
|
self.assertEqual(self.mock_song.tracks[0].mixer_device.panning.value, -0.5)
|
|
|
|
def test_set_track_name(self):
|
|
"""Test setting track name."""
|
|
result = self.bridge.set_track_name(0, "Kick Track")
|
|
|
|
self.assertTrue(result['success'])
|
|
self.assertEqual(self.mock_song.tracks[0].name, "Kick Track")
|
|
|
|
def test_insert_device(self):
|
|
"""Test device insertion."""
|
|
# Setup mock browser with a device
|
|
mock_device = Mock()
|
|
mock_device.name = "Compressor"
|
|
self.mock_song._browser = Mock()
|
|
self.mock_song._browser.audio_effects = [mock_device]
|
|
self.mock_song._browser.instruments = []
|
|
|
|
result = self.bridge.insert_device(0, "Compressor")
|
|
|
|
# Should succeed even if device not found, or create track with device
|
|
self.assertIn('success', result)
|
|
if result['success']:
|
|
self.assertIn('device_index', result['data'])
|
|
else:
|
|
# Expected to fail with current mock setup
|
|
self.assertIn('not found', result['message'])
|
|
|
|
def test_set_track_send(self):
|
|
"""Test configuring track send."""
|
|
# First create a return track
|
|
self.mock_song.create_return_track()
|
|
self.mock_song.tracks[0].mixer_device.sends = [MockParameter("Send 1", 0.0)]
|
|
|
|
result = self.bridge.set_track_send(0, 0, 0.5)
|
|
|
|
self.assertTrue(result['success'])
|
|
|
|
def test_set_tempo(self):
|
|
"""Test setting project tempo."""
|
|
result = self.bridge.set_tempo(110.0)
|
|
|
|
self.assertTrue(result['success'])
|
|
self.assertEqual(self.mock_song.tempo, 110.0)
|
|
|
|
def test_start_stop_playback(self):
|
|
"""Test playback control."""
|
|
# Start
|
|
result = self.bridge.start_playback()
|
|
self.assertTrue(result['success'])
|
|
self.assertTrue(self.mock_song.is_playing)
|
|
|
|
# Stop
|
|
result = self.bridge.stop_playback()
|
|
self.assertTrue(result['success'])
|
|
self.assertFalse(self.mock_song.is_playing)
|
|
|
|
def test_route_track_to_bus(self):
|
|
"""Test routing track to bus."""
|
|
# Create bus first
|
|
bus_result = self.bridge.create_bus_track("Drum Bus")
|
|
bus_name = bus_result['data']['name']
|
|
|
|
# Route track to bus
|
|
result = self.bridge.route_track_to_bus(0, bus_name)
|
|
|
|
self.assertTrue(result['success'])
|
|
|
|
def test_insert_arrangement_midi(self):
|
|
"""Test inserting MIDI clip into arrangement."""
|
|
notes = [
|
|
{"pitch": 60, "start_time": 0.0, "duration": 0.25, "velocity": 100},
|
|
{"pitch": 62, "start_time": 0.5, "duration": 0.25, "velocity": 100},
|
|
]
|
|
|
|
result = self.bridge.insert_arrangement_midi(1, 4.0, 4.0, notes)
|
|
|
|
self.assertTrue(result['success'])
|
|
self.assertEqual(len(self.mock_song.tracks[1].arrangement_clips), 1)
|
|
|
|
def test_execute_mix_config(self):
|
|
"""Test executing full mix configuration."""
|
|
config = MixConfiguration(
|
|
track_index=0,
|
|
volume=0.8,
|
|
pan=0.2,
|
|
mute=False,
|
|
solo=False
|
|
)
|
|
|
|
result = self.bridge.execute_mix_config(config)
|
|
|
|
self.assertTrue(result['success'])
|
|
|
|
|
|
# =============================================================================
|
|
# TEST: INTEGRATION
|
|
# =============================================================================
|
|
|
|
class TestIntegration(unittest.TestCase):
|
|
"""Integration tests for component interactions."""
|
|
|
|
def setUp(self):
|
|
"""Set up integration test environment."""
|
|
if not SENIOR_ARCHITECTURE_AVAILABLE:
|
|
self.skipTest("Senior Architecture not available")
|
|
|
|
self.db_fd, self.db_path = tempfile.mkstemp(suffix='.db')
|
|
self.store = SampleMetadataStore(self.db_path)
|
|
self.store.init_database()
|
|
|
|
def tearDown(self):
|
|
"""Clean up."""
|
|
if hasattr(self, 'store'):
|
|
self.store.close()
|
|
if hasattr(self, 'db_fd'):
|
|
os.close(self.db_fd)
|
|
if hasattr(self, 'db_path') and os.path.exists(self.db_path):
|
|
os.unlink(self.db_path)
|
|
|
|
def test_metadata_to_sample_selection(self):
|
|
"""Test metadata store feeds sample selection workflow."""
|
|
# Add samples to metadata store
|
|
samples = [
|
|
SampleFeatures("/drums/kick1.wav", bpm=95.0, key="Am", categories=["kick", "drums"]),
|
|
SampleFeatures("/drums/kick2.wav", bpm=95.0, key="Am", categories=["kick", "drums"]),
|
|
SampleFeatures("/drums/snare1.wav", bpm=95.0, key="Am", categories=["snare", "drums"]),
|
|
SampleFeatures("/bass/bass1.wav", bpm=95.0, key="Am", categories=["bass"]),
|
|
]
|
|
for s in samples:
|
|
self.store.save_sample_features(s.path, s)
|
|
|
|
# Query by category
|
|
kicks = self.store.get_samples_by_category("kick")
|
|
self.assertEqual(len(kicks), 2)
|
|
|
|
# Query by BPM and key
|
|
matching = self.store.search_samples(bpm_min=90.0, bpm_max=100.0, key="Am")
|
|
self.assertEqual(len(matching), 4)
|
|
|
|
def test_extract_and_cache(self):
|
|
"""Test feature extraction and caching flow."""
|
|
# Use a separate database for abstract_analyzer to avoid schema conflicts
|
|
from mcp_server.engines.abstract_analyzer import SampleMetadataStore as AnalyzerStore
|
|
from mcp_server.engines.abstract_analyzer import SampleFeatures as AnalyzerFeatures
|
|
|
|
db_path = self.db_path + ".analyzer.db"
|
|
|
|
try:
|
|
# Create analyzer store with sample
|
|
analyzer_store = AnalyzerStore(db_path)
|
|
features = AnalyzerFeatures(
|
|
path="/test/sample.wav",
|
|
bpm=95.0,
|
|
key="Am",
|
|
mfccs=[0.1] * 13,
|
|
source="database"
|
|
)
|
|
analyzer_store.save(features)
|
|
|
|
# Create hybrid extractor
|
|
extractor = HybridExtractor(db_path)
|
|
|
|
# Check that sample is cached
|
|
self.assertTrue(extractor.store.exists("/test/sample.wav"))
|
|
|
|
# Mock file check and retrieve from cache
|
|
with patch.object(extractor, '_check_file_exists', return_value=True):
|
|
with patch.object(extractor.db_extractor, '_check_file_exists', return_value=True):
|
|
retrieved = extractor.extract_all_features("/test/sample.wav")
|
|
self.assertEqual(retrieved.source, "database")
|
|
|
|
del analyzer_store
|
|
finally:
|
|
# Cleanup
|
|
if os.path.exists(db_path):
|
|
try:
|
|
os.unlink(db_path)
|
|
except:
|
|
pass
|
|
|
|
def test_recorder_integration_with_song(self):
|
|
"""Test recorder with mock song."""
|
|
mock_song = MockSong()
|
|
mock_song.create_audio_track()
|
|
mock_song.create_midi_track()
|
|
mock_song.scenes.append(MockScene("Scene 1"))
|
|
|
|
mock_connection = MockConnection()
|
|
recorder = ArrangementRecorder(mock_song, mock_connection)
|
|
|
|
# Configure recording
|
|
config = RecordingConfig(
|
|
start_bar=0.0,
|
|
duration_bars=8.0,
|
|
tempo=95.0
|
|
)
|
|
|
|
# Arm should succeed with valid song
|
|
result = recorder.arm(config)
|
|
self.assertTrue(result)
|
|
|
|
def test_bridge_with_mix_config(self):
|
|
"""Test LiveBridge applying complete mix configuration."""
|
|
mock_song = MockSong()
|
|
mock_song.create_audio_track()
|
|
mock_song.create_midi_track()
|
|
|
|
bridge = AbletonLiveBridge(mock_song, MockConnection())
|
|
|
|
# Apply mix config to first track
|
|
config = MixConfiguration(
|
|
track_index=0,
|
|
volume=0.75,
|
|
pan=-0.3,
|
|
mute=False,
|
|
solo=False
|
|
)
|
|
|
|
result = bridge.execute_mix_config(config)
|
|
self.assertTrue(result['success'])
|
|
|
|
# Verify settings applied
|
|
track = mock_song.tracks[0]
|
|
self.assertEqual(track.mixer_device.volume.value, 0.75)
|
|
self.assertEqual(track.mixer_device.panning.value, -0.3)
|
|
|
|
|
|
# =============================================================================
|
|
# TEST: END-TO-END WORKFLOWS
|
|
# =============================================================================
|
|
|
|
class TestEndToEndWorkflows(unittest.TestCase):
|
|
"""End-to-end workflow tests."""
|
|
|
|
def setUp(self):
|
|
"""Set up complete test environment."""
|
|
if not SENIOR_ARCHITECTURE_AVAILABLE:
|
|
self.skipTest("Senior Architecture not available")
|
|
|
|
self.db_fd, self.db_path = tempfile.mkstemp(suffix='.db')
|
|
|
|
def tearDown(self):
|
|
"""Clean up."""
|
|
if hasattr(self, 'db_fd'):
|
|
os.close(self.db_fd)
|
|
if hasattr(self, 'db_path') and os.path.exists(self.db_path):
|
|
os.unlink(self.db_path)
|
|
|
|
def test_full_workflow_no_numpy(self):
|
|
"""Test complete workflow without numpy/librosa."""
|
|
# 1. Create metadata store
|
|
store = SampleMetadataStore(self.db_path)
|
|
store.init_database()
|
|
|
|
# 2. Add sample metadata manually (simulating pre-analyzed library)
|
|
samples = [
|
|
SampleFeatures("/drums/kick.wav", bpm=95.0, key="Am",
|
|
duration=1.0, rms=-10.0, spectral_centroid=100.0,
|
|
categories=["kick", "drums"]),
|
|
SampleFeatures("/drums/snare.wav", bpm=95.0, key="Am",
|
|
duration=1.0, rms=-12.0, spectral_centroid=2000.0,
|
|
categories=["snare", "drums"]),
|
|
SampleFeatures("/bass/bass.wav", bpm=95.0, key="Am",
|
|
duration=2.0, rms=-15.0, spectral_centroid=150.0,
|
|
categories=["bass"]),
|
|
]
|
|
for s in samples:
|
|
store.save_sample_features(s.path, s)
|
|
|
|
# 3. Query samples for production
|
|
kicks = store.search_samples(category="kick", key="Am")
|
|
self.assertEqual(len(kicks), 1)
|
|
|
|
drums = store.get_samples_by_category("drums")
|
|
self.assertEqual(len(drums), 2)
|
|
|
|
# 4. Create Ableton project via LiveBridge (mocked)
|
|
mock_song = MockSong()
|
|
mock_song.create_audio_track() # Drums
|
|
mock_song.create_audio_track() # Bass
|
|
mock_song.create_midi_track() # Melody
|
|
|
|
bridge = AbletonLiveBridge(mock_song, MockConnection())
|
|
|
|
# Name tracks
|
|
bridge.set_track_name(0, "Drums")
|
|
bridge.set_track_name(1, "Bass")
|
|
bridge.set_track_name(2, "Melody")
|
|
|
|
# Set volumes
|
|
bridge.set_track_volume(0, 0.8)
|
|
bridge.set_track_volume(1, 0.7)
|
|
bridge.set_track_volume(2, 0.75)
|
|
|
|
# Verify project setup
|
|
self.assertEqual(mock_song.tracks[0].name, "Drums")
|
|
self.assertEqual(mock_song.tracks[0].mixer_device.volume.value, 0.8)
|
|
|
|
# 5. Set up arrangement recording
|
|
mock_song.scenes.append(MockScene("Intro"))
|
|
mock_song.scenes.append(MockScene("Drop"))
|
|
|
|
recorder = ArrangementRecorder(mock_song, MockConnection())
|
|
|
|
config = RecordingConfig(
|
|
start_bar=0.0,
|
|
duration_bars=16.0,
|
|
tempo=95.0
|
|
)
|
|
|
|
arm_result = recorder.arm(config)
|
|
self.assertTrue(arm_result)
|
|
|
|
store.close()
|
|
|
|
def test_workflow_with_database_extractor(self):
|
|
"""Test workflow using database-only extraction."""
|
|
# Use abstract_analyzer's store for compatibility
|
|
from mcp_server.engines.abstract_analyzer import SampleMetadataStore as AnalyzerStore
|
|
from mcp_server.engines.abstract_analyzer import SampleFeatures as AnalyzerFeatures
|
|
|
|
# Set up metadata store
|
|
store = AnalyzerStore(self.db_path)
|
|
|
|
# Populate with test data
|
|
for i in range(10):
|
|
features = AnalyzerFeatures(
|
|
path=f"/samples/synth{i}.wav",
|
|
bpm=128.0,
|
|
key="Cm",
|
|
duration=4.0,
|
|
spectral_centroid=3000.0 + i * 100,
|
|
mfccs=[0.1] * 13,
|
|
source="database"
|
|
)
|
|
store.save(features)
|
|
|
|
del store # Release store
|
|
|
|
# Use database extractor with db_path
|
|
extractor = DatabaseExtractor(self.db_path)
|
|
|
|
# Retrieve all samples
|
|
all_samples = []
|
|
with patch.object(extractor, '_check_file_exists', return_value=True):
|
|
for i in range(10):
|
|
features = extractor.extract_all_features(f"/samples/synth{i}.wav")
|
|
all_samples.append(features)
|
|
|
|
self.assertEqual(len(all_samples), 10)
|
|
|
|
# Verify all have correct source
|
|
for s in all_samples:
|
|
self.assertEqual(s.source, "database")
|
|
|
|
def test_arrangement_to_database_workflow(self):
|
|
"""Test recording arrangement and storing metadata."""
|
|
# Create mock environment
|
|
mock_song = MockSong()
|
|
mock_song.create_audio_track()
|
|
mock_song.create_midi_track()
|
|
mock_song.scenes.append(MockScene("Scene 1"))
|
|
|
|
# Add some arrangement clips
|
|
mock_song.tracks[0].insert_clip("/samples/kick.wav", 0.0, 4.0)
|
|
mock_song.tracks[0].insert_clip("/samples/snare.wav", 4.0, 4.0)
|
|
|
|
# Set up metadata store
|
|
store = SampleMetadataStore(self.db_path)
|
|
store.init_database()
|
|
|
|
# Store metadata for clips
|
|
for clip in mock_song.tracks[0].arrangement_clips:
|
|
features = SampleFeatures(
|
|
path=f"/samples/{clip.name}.wav",
|
|
bpm=95.0,
|
|
duration=clip.end_time - clip.start_time
|
|
)
|
|
store.save_sample_features(features.path, features)
|
|
|
|
# Verify stored
|
|
self.assertEqual(store.get_stats()['total_samples'], 2)
|
|
|
|
store.close()
|
|
|
|
|
|
# =============================================================================
|
|
# TEST: SYSTEM CAPABILITIES
|
|
# =============================================================================
|
|
|
|
class TestSystemCapabilities(unittest.TestCase):
|
|
"""Test system capability detection."""
|
|
|
|
def test_get_system_capabilities(self):
|
|
"""Test capability detection returns proper structure."""
|
|
if not SENIOR_ARCHITECTURE_AVAILABLE:
|
|
self.skipTest("Senior Architecture not available")
|
|
|
|
capabilities = get_system_capabilities()
|
|
|
|
# Check required keys
|
|
self.assertIn('numpy', capabilities)
|
|
self.assertIn('librosa', capabilities)
|
|
self.assertIn('sqlite3', capabilities)
|
|
self.assertIn('python_version', capabilities)
|
|
self.assertIn('modules', capabilities)
|
|
self.assertIn('has_advanced_analysis', capabilities)
|
|
self.assertIn('has_metadata_db', capabilities)
|
|
|
|
# Check types
|
|
self.assertIsInstance(capabilities['numpy'], bool)
|
|
self.assertIsInstance(capabilities['librosa'], bool)
|
|
self.assertIsInstance(capabilities['sqlite3'], bool)
|
|
self.assertIsInstance(capabilities['modules'], dict)
|
|
|
|
def test_module_availability(self):
|
|
"""Test module availability checking."""
|
|
if not SENIOR_ARCHITECTURE_AVAILABLE:
|
|
self.skipTest("Senior Architecture not available")
|
|
|
|
# Check known modules
|
|
self.assertTrue(is_module_available("metadata_store"))
|
|
self.assertTrue(is_module_available("abstract_analyzer"))
|
|
self.assertTrue(is_module_available("arrangement_recorder"))
|
|
self.assertTrue(is_module_available("live_bridge"))
|
|
|
|
|
|
# =============================================================================
|
|
# TEST RUNNER
|
|
# =============================================================================
|
|
|
|
def run_tests():
|
|
"""Run all tests with detailed output."""
|
|
# Create test suite
|
|
loader = unittest.TestLoader()
|
|
suite = unittest.TestSuite()
|
|
|
|
# Add all test classes
|
|
test_classes = [
|
|
TestMetadataStore,
|
|
TestHybridExtractor,
|
|
TestArrangementRecorder,
|
|
TestLiveBridge,
|
|
TestIntegration,
|
|
TestEndToEndWorkflows,
|
|
TestSystemCapabilities,
|
|
]
|
|
|
|
for test_class in test_classes:
|
|
tests = loader.loadTestsFromTestCase(test_class)
|
|
suite.addTests(tests)
|
|
|
|
# Run with verbose output
|
|
runner = unittest.TextTestRunner(verbosity=2)
|
|
result = runner.run(suite)
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 70)
|
|
print("TEST SUMMARY")
|
|
print("=" * 70)
|
|
print(f"Tests Run: {result.testsRun}")
|
|
print(f"Failures: {len(result.failures)}")
|
|
print(f"Errors: {len(result.errors)}")
|
|
print(f"Skipped: {len(result.skipped)}")
|
|
|
|
if result.wasSuccessful():
|
|
print("\n✅ All tests passed!")
|
|
else:
|
|
print("\n❌ Some tests failed!")
|
|
|
|
if result.failures:
|
|
print("\nFailures:")
|
|
for test, trace in result.failures:
|
|
print(f" - {test}")
|
|
|
|
if result.errors:
|
|
print("\nErrors:")
|
|
for test, trace in result.errors:
|
|
print(f" - {test}")
|
|
|
|
return result.wasSuccessful()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Check if pytest is available for better output
|
|
try:
|
|
import pytest
|
|
# Use pytest if available
|
|
sys.exit(pytest.main([__file__, '-v']))
|
|
except ImportError:
|
|
# Fall back to unittest runner
|
|
success = run_tests()
|
|
sys.exit(0 if success else 1)
|