- Add _cmd_create_arrangement_audio_pattern with 5-method fallback chain - Method 1: track.insert_arrangement_clip() [Live 12+] - Method 2: track.create_audio_clip() [Live 11+] - Method 3: arrangement_clips.add_new_clip() [Live 12+] - Method 4: Session->duplicate_clip_to_arrangement [Legacy] - Method 5: Session->Recording [Universal] - Add _cmd_duplicate_clip_to_arrangement for session-to-arrangement workflow - Update skills documentation - Verified: 3 clips created at positions [0, 4, 8] in Arrangement View Closes: Audio injection in Arrangement View
1522 lines
54 KiB
Python
1522 lines
54 KiB
Python
"""
|
|
Arrangement View Verification and Testing System for AbletonMCP_AI
|
|
|
|
Provides comprehensive verification, automated validation, and test scenarios
|
|
for Arrangement View functionality including clip creation, positioning,
|
|
integrity checks, and recording validation.
|
|
|
|
Author: AbletonMCP_AI
|
|
"""
|
|
import json
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
import socket
|
|
import time
|
|
import traceback
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple, Callable, Union
|
|
|
|
logger = logging.getLogger("ArrangementVerifier")
|
|
|
|
# =============================================================================
|
|
# CONSTANTS AND CONFIGURATION
|
|
# =============================================================================
|
|
|
|
ABLETON_HOST = "127.0.0.1"
|
|
ABLETON_PORT = 9877
|
|
DEFAULT_TIMEOUT = 30.0
|
|
MAX_VERIFICATION_WAIT = 60.0
|
|
|
|
DB_PATH = Path(__file__).parent / "arrangement_tests.db"
|
|
|
|
|
|
# =============================================================================
|
|
# DATA CLASSES
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class VerificationResult:
|
|
"""Result of a single verification check."""
|
|
success: bool
|
|
check_name: str
|
|
message: str
|
|
details: Dict[str, Any] = field(default_factory=dict)
|
|
timestamp: float = field(default_factory=time.time)
|
|
duration_ms: float = 0.0
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"success": self.success,
|
|
"check_name": self.check_name,
|
|
"message": self.message,
|
|
"details": self.details,
|
|
"timestamp": datetime.fromtimestamp(self.timestamp).isoformat(),
|
|
"duration_ms": round(self.duration_ms, 2),
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ClipInfo:
|
|
"""Information about a clip in Arrangement View."""
|
|
name: str
|
|
track_index: int
|
|
track_name: str
|
|
start_time: float
|
|
end_time: float
|
|
length: float
|
|
is_midi: bool
|
|
color: int = 0
|
|
muted: bool = False
|
|
looping: bool = False
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "ClipInfo":
|
|
return cls(
|
|
name=data.get("name", ""),
|
|
track_index=data.get("track_index", 0),
|
|
track_name=data.get("track_name", ""),
|
|
start_time=data.get("start_time", 0.0),
|
|
end_time=data.get("end_time", 0.0),
|
|
length=data.get("length", 0.0),
|
|
is_midi=data.get("is_midi", False),
|
|
color=data.get("color", 0),
|
|
muted=data.get("muted", False),
|
|
looping=data.get("looping", False),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"name": self.name,
|
|
"track_index": self.track_index,
|
|
"track_name": self.track_name,
|
|
"start_time": self.start_time,
|
|
"end_time": self.end_time,
|
|
"length": self.length,
|
|
"is_midi": self.is_midi,
|
|
"color": self.color,
|
|
"muted": self.muted,
|
|
"looping": self.looping,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class TestScenario:
|
|
"""A test scenario with pre and post conditions."""
|
|
name: str
|
|
description: str
|
|
pre_conditions: List[Callable[[], VerificationResult]]
|
|
test_action: Callable[[], Dict[str, Any]]
|
|
post_conditions: List[Callable[[], VerificationResult]]
|
|
timeout_seconds: float = 30.0
|
|
|
|
|
|
@dataclass
|
|
class TestReport:
|
|
"""Complete test report with all results."""
|
|
test_name: str
|
|
started_at: str
|
|
completed_at: str
|
|
duration_seconds: float
|
|
results: List[VerificationResult]
|
|
summary: Dict[str, Any]
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"test_name": self.test_name,
|
|
"started_at": self.started_at,
|
|
"completed_at": self.completed_at,
|
|
"duration_seconds": round(self.duration_seconds, 3),
|
|
"results": [r.to_dict() for r in self.results],
|
|
"summary": self.summary,
|
|
}
|
|
|
|
def to_json(self, indent: int = 2) -> str:
|
|
return json.dumps(self.to_dict(), indent=indent)
|
|
|
|
|
|
# =============================================================================
|
|
# ARRANGEMENT VERIFIER CLASS
|
|
# =============================================================================
|
|
|
|
class ArrangementVerifier:
|
|
"""
|
|
Main verification class for Arrangement View testing.
|
|
|
|
Provides comprehensive verification methods for:
|
|
- Clip creation and counting
|
|
- Clip positioning and timing
|
|
- Content validation
|
|
- Integrity checks
|
|
"""
|
|
|
|
def __init__(self, ableton_host: str = ABLETON_HOST, ableton_port: int = ABLETON_PORT):
|
|
"""
|
|
Initialize the ArrangementVerifier.
|
|
|
|
Args:
|
|
ableton_host: Host where Ableton Live is running
|
|
ableton_port: TCP port for Ableton connection
|
|
"""
|
|
self.host = ableton_host
|
|
self.port = ableton_port
|
|
self._verification_results: List[VerificationResult] = []
|
|
self._last_clips_snapshot: List[ClipInfo] = []
|
|
self._db_connection: Optional[sqlite3.Connection] = None
|
|
|
|
def _send_command(self, cmd_type: str, params: Dict[str, Any] = None,
|
|
timeout: float = DEFAULT_TIMEOUT) -> Dict[str, Any]:
|
|
"""Send a command to Ableton and return the response."""
|
|
sock = None
|
|
try:
|
|
sock = socket.create_connection((self.host, self.port), timeout=timeout)
|
|
sock.settimeout(timeout)
|
|
|
|
msg = json.dumps({"type": cmd_type, "params": params or {}}) + "\n"
|
|
sock.sendall(msg.encode("utf-8"))
|
|
|
|
buf = b""
|
|
while True:
|
|
chunk = sock.recv(65536)
|
|
if not chunk:
|
|
break
|
|
buf += chunk
|
|
if b"\n" in buf:
|
|
raw, _, _ = buf.partition(b"\n")
|
|
response = json.loads(raw.decode("utf-8"))
|
|
return response
|
|
|
|
return {"status": "error", "message": "No response received"}
|
|
except socket.timeout:
|
|
return {"status": "error", "message": f"Timeout after {timeout}s"}
|
|
except ConnectionRefusedError:
|
|
return {"status": "error", "message": f"Connection refused to {self.host}:{self.port}"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
finally:
|
|
if sock:
|
|
try:
|
|
sock.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def _get_arrangement_clips(self, track_index: int = None) -> List[ClipInfo]:
|
|
"""Get all clips from Arrangement View."""
|
|
params = {}
|
|
if track_index is not None:
|
|
params["track_index"] = track_index
|
|
|
|
resp = self._send_command("get_arrangement_clips", params, timeout=15.0)
|
|
|
|
if resp.get("status") != "success":
|
|
return []
|
|
|
|
result = resp.get("result", {})
|
|
clips_data = result.get("clips", [])
|
|
|
|
clips = []
|
|
for clip_data in clips_data:
|
|
if "start_time" in clip_data:
|
|
clips.append(ClipInfo.from_dict(clip_data))
|
|
|
|
return clips
|
|
|
|
def verify_clips_created(self, expected_count: int,
|
|
track_index: int = None) -> bool:
|
|
"""
|
|
Verify that the expected number of clips exists in Arrangement View.
|
|
|
|
Args:
|
|
expected_count: Number of clips expected
|
|
track_index: Optional track index to check (None = all tracks)
|
|
|
|
Returns:
|
|
True if clip count matches expected, False otherwise
|
|
"""
|
|
start_time = time.time()
|
|
clips = self._get_arrangement_clips(track_index)
|
|
actual_count = len(clips)
|
|
|
|
success = actual_count == expected_count
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
|
|
result = VerificationResult(
|
|
success=success,
|
|
check_name="verify_clips_created",
|
|
message=(f"Expected {expected_count} clips, found {actual_count}"
|
|
if not success else f"Found exactly {expected_count} clips"),
|
|
details={
|
|
"expected_count": expected_count,
|
|
"actual_count": actual_count,
|
|
"track_index": track_index,
|
|
"clips": [c.name for c in clips],
|
|
},
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
self._verification_results.append(result)
|
|
|
|
if not success:
|
|
logger.error(f"Clip count mismatch: expected {expected_count}, got {actual_count}")
|
|
|
|
return success
|
|
|
|
def verify_clip_positions(self, expected_positions: List[Dict[str, Any]],
|
|
tolerance_beats: float = 0.01) -> bool:
|
|
"""
|
|
Verify that clips are at expected positions.
|
|
|
|
Args:
|
|
expected_positions: List of dicts with keys:
|
|
- track_index: int
|
|
- start_time: float (in beats)
|
|
- name: str (optional)
|
|
tolerance_beats: Tolerance for position matching in beats
|
|
|
|
Returns:
|
|
True if all clips at expected positions, False otherwise
|
|
"""
|
|
start_time = time.time()
|
|
clips = self._get_arrangement_clips()
|
|
|
|
errors = []
|
|
matched = []
|
|
|
|
for expected in expected_positions:
|
|
exp_track = expected.get("track_index")
|
|
exp_start = expected.get("start_time")
|
|
exp_name = expected.get("name", "")
|
|
|
|
# Find matching clip
|
|
found = False
|
|
for clip in clips:
|
|
if exp_track is not None and clip.track_index != exp_track:
|
|
continue
|
|
if exp_start is not None:
|
|
if abs(clip.start_time - exp_start) <= tolerance_beats:
|
|
if not exp_name or exp_name in clip.name:
|
|
found = True
|
|
matched.append({
|
|
"expected": expected,
|
|
"found": clip.to_dict(),
|
|
})
|
|
break
|
|
|
|
if not found:
|
|
errors.append({
|
|
"expected": expected,
|
|
"error": "No matching clip found",
|
|
"available_clips": [c.to_dict() for c in clips if exp_track is None or c.track_index == exp_track],
|
|
})
|
|
|
|
success = len(errors) == 0
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
|
|
result = VerificationResult(
|
|
success=success,
|
|
check_name="verify_clip_positions",
|
|
message=(f"All {len(expected_positions)} clips at expected positions"
|
|
if success else f"Failed to find {len(errors)} clips at expected positions"),
|
|
details={
|
|
"expected_count": len(expected_positions),
|
|
"matched_count": len(matched),
|
|
"error_count": len(errors),
|
|
"matched": matched,
|
|
"errors": errors,
|
|
"tolerance_beats": tolerance_beats,
|
|
},
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
self._verification_results.append(result)
|
|
|
|
if not success:
|
|
for err in errors:
|
|
logger.error(f"Position mismatch: expected {err['expected']}, not found in arrangement")
|
|
|
|
return success
|
|
|
|
def verify_arrangement_has_content(self, min_clips: int = 1,
|
|
min_length_beats: float = 0.0) -> bool:
|
|
"""
|
|
Verify that Arrangement View has content (clips exist and have length).
|
|
|
|
Args:
|
|
min_clips: Minimum number of clips required
|
|
min_length_beats: Minimum total length in beats
|
|
|
|
Returns:
|
|
True if arrangement has content, False otherwise
|
|
"""
|
|
start_time = time.time()
|
|
clips = self._get_arrangement_clips()
|
|
|
|
clip_count = len(clips)
|
|
total_length = max((c.end_time for c in clips), default=0.0)
|
|
|
|
has_clips = clip_count >= min_clips
|
|
has_length = total_length >= min_length_beats
|
|
success = has_clips and has_length
|
|
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
|
|
result = VerificationResult(
|
|
success=success,
|
|
check_name="verify_arrangement_has_content",
|
|
message=(f"Arrangement has {clip_count} clips, total length {total_length:.1f} beats"
|
|
if success else f"Insufficient content: {clip_count} clips, {total_length:.1f} beats"),
|
|
details={
|
|
"clip_count": clip_count,
|
|
"total_length_beats": total_length,
|
|
"min_clips_required": min_clips,
|
|
"min_length_required": min_length_beats,
|
|
"has_clips": has_clips,
|
|
"has_length": has_length,
|
|
},
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
self._verification_results.append(result)
|
|
|
|
if not success:
|
|
logger.error(f"Arrangement lacks content: {clip_count} clips, {total_length:.1f} beats")
|
|
|
|
return success
|
|
|
|
def verify_clip_integrity(self, clip_info: Dict[str, Any]) -> bool:
|
|
"""
|
|
Verify integrity of a specific clip.
|
|
|
|
Checks:
|
|
- Clip exists at specified location
|
|
- Start time < End time
|
|
- Length is positive
|
|
- Track index is valid
|
|
|
|
Args:
|
|
clip_info: Dict with clip information to verify
|
|
|
|
Returns:
|
|
True if clip integrity verified, False otherwise
|
|
"""
|
|
start_time = time.time()
|
|
errors = []
|
|
|
|
# Required fields
|
|
required = ["track_index", "start_time", "end_time", "length"]
|
|
for field in required:
|
|
if field not in clip_info:
|
|
errors.append(f"Missing required field: {field}")
|
|
|
|
if errors:
|
|
success = False
|
|
else:
|
|
# Validate values
|
|
track_idx = clip_info.get("track_index")
|
|
start = clip_info.get("start_time")
|
|
end = clip_info.get("end_time")
|
|
length = clip_info.get("length")
|
|
|
|
if start >= end:
|
|
errors.append(f"Invalid timing: start_time ({start}) >= end_time ({end})")
|
|
|
|
if length <= 0:
|
|
errors.append(f"Invalid length: {length} (must be positive)")
|
|
|
|
expected_length = end - start
|
|
if abs(length - expected_length) > 0.01:
|
|
errors.append(f"Length mismatch: declared {length}, calculated {expected_length}")
|
|
|
|
# Check track exists
|
|
tracks_resp = self._send_command("get_tracks", timeout=10.0)
|
|
if tracks_resp.get("status") == "success":
|
|
track_count = len(tracks_resp.get("result", {}).get("tracks", []))
|
|
if track_idx < 0 or track_idx >= track_count:
|
|
errors.append(f"Invalid track_index: {track_idx} (0-{track_count-1} available)")
|
|
|
|
success = len(errors) == 0
|
|
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
|
|
result = VerificationResult(
|
|
success=success,
|
|
check_name="verify_clip_integrity",
|
|
message=("Clip integrity verified"
|
|
if success else f"Integrity check failed: {'; '.join(errors)}"),
|
|
details={
|
|
"clip_info": clip_info,
|
|
"errors": errors,
|
|
},
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
self._verification_results.append(result)
|
|
|
|
if not success:
|
|
logger.error(f"Clip integrity failed: {errors}")
|
|
|
|
return success
|
|
|
|
def get_verification_report(self) -> Dict[str, Any]:
|
|
"""
|
|
Get comprehensive verification report.
|
|
|
|
Returns:
|
|
Dict with all verification results and summary statistics
|
|
"""
|
|
total = len(self._verification_results)
|
|
passed = sum(1 for r in self._verification_results if r.success)
|
|
failed = total - passed
|
|
|
|
total_duration_ms = sum(r.duration_ms for r in self._verification_results)
|
|
|
|
# Group by check type
|
|
by_type: Dict[str, List[VerificationResult]] = {}
|
|
for r in self._verification_results:
|
|
by_type.setdefault(r.check_name, []).append(r)
|
|
|
|
summary = {
|
|
"total_checks": total,
|
|
"passed": passed,
|
|
"failed": failed,
|
|
"success_rate": round(passed / total * 100, 1) if total > 0 else 0.0,
|
|
"total_duration_ms": round(total_duration_ms, 2),
|
|
"by_check_type": {
|
|
name: {
|
|
"total": len(results),
|
|
"passed": sum(1 for r in results if r.success),
|
|
"failed": sum(1 for r in results if not r.success),
|
|
}
|
|
for name, results in by_type.items()
|
|
},
|
|
}
|
|
|
|
return {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"results": [r.to_dict() for r in self._verification_results],
|
|
"summary": summary,
|
|
}
|
|
|
|
def clear_results(self):
|
|
"""Clear all stored verification results."""
|
|
self._verification_results = []
|
|
|
|
def save_results_to_db(self, test_name: str) -> bool:
|
|
"""
|
|
Save verification results to SQLite database.
|
|
|
|
Args:
|
|
test_name: Name identifier for this test run
|
|
|
|
Returns:
|
|
True if saved successfully, False otherwise
|
|
"""
|
|
try:
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# Create table if not exists
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS verification_results (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
test_name TEXT,
|
|
check_name TEXT,
|
|
success BOOLEAN,
|
|
message TEXT,
|
|
details TEXT,
|
|
timestamp TEXT,
|
|
duration_ms REAL
|
|
)
|
|
""")
|
|
|
|
# Insert results
|
|
for result in self._verification_results:
|
|
cursor.execute("""
|
|
INSERT INTO verification_results
|
|
(test_name, check_name, success, message, details, timestamp, duration_ms)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
test_name,
|
|
result.check_name,
|
|
result.success,
|
|
result.message,
|
|
json.dumps(result.details),
|
|
datetime.fromtimestamp(result.timestamp).isoformat(),
|
|
result.duration_ms,
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to save results to DB: {e}")
|
|
return False
|
|
|
|
|
|
# =============================================================================
|
|
# HELPER FUNCTIONS
|
|
# =============================================================================
|
|
|
|
def wait_for_arrangement_content(verifier: ArrangementVerifier,
|
|
timeout: float = 30.0,
|
|
poll_interval: float = 0.5,
|
|
min_clips: int = 1) -> Tuple[bool, List[ClipInfo]]:
|
|
"""
|
|
Wait for Arrangement View to have content.
|
|
|
|
Polls Ableton until clips appear or timeout is reached.
|
|
|
|
Args:
|
|
verifier: ArrangementVerifier instance
|
|
timeout: Maximum wait time in seconds
|
|
poll_interval: Time between polls in seconds
|
|
min_clips: Minimum number of clips to consider successful
|
|
|
|
Returns:
|
|
Tuple of (success, list of clips found)
|
|
"""
|
|
start_time = time.time()
|
|
|
|
while (time.time() - start_time) < timeout:
|
|
clips = verifier._get_arrangement_clips()
|
|
if len(clips) >= min_clips:
|
|
logger.info(f"Found {len(clips)} clips after {time.time() - start_time:.1f}s")
|
|
return True, clips
|
|
time.sleep(poll_interval)
|
|
|
|
logger.warning(f"Timeout waiting for content after {timeout}s")
|
|
return False, []
|
|
|
|
|
|
def compare_arrangement_before_after(verifier: ArrangementVerifier,
|
|
action: Callable[[], Any],
|
|
expected_changes: Dict[str, Any] = None) -> Dict[str, Any]:
|
|
"""
|
|
Compare Arrangement View before and after an action.
|
|
|
|
Args:
|
|
verifier: ArrangementVerifier instance
|
|
action: Callable that performs the action
|
|
expected_changes: Dict with expected changes:
|
|
- min_new_clips: int
|
|
- expected_positions: list of clip positions
|
|
|
|
Returns:
|
|
Comparison report with before/after state
|
|
"""
|
|
# Capture before state
|
|
before_clips = verifier._get_arrangement_clips()
|
|
before_count = len(before_clips)
|
|
before_end_time = max((c.end_time for c in before_clips), default=0.0)
|
|
|
|
# Execute action
|
|
action_start = time.time()
|
|
try:
|
|
action_result = action()
|
|
action_success = True
|
|
except Exception as e:
|
|
action_result = str(e)
|
|
action_success = False
|
|
action_duration = time.time() - action_start
|
|
|
|
# Wait briefly for arrangement to update
|
|
time.sleep(0.5)
|
|
|
|
# Capture after state
|
|
after_clips = verifier._get_arrangement_clips()
|
|
after_count = len(after_clips)
|
|
after_end_time = max((c.end_time for c in after_clips), default=0.0)
|
|
|
|
# Calculate differences
|
|
new_clips = after_count - before_count
|
|
length_added = after_end_time - before_end_time
|
|
|
|
# Find new clip details
|
|
before_positions = {(c.track_index, round(c.start_time, 2)): c for c in before_clips}
|
|
new_clip_details = []
|
|
for clip in after_clips:
|
|
key = (clip.track_index, round(clip.start_time, 2))
|
|
if key not in before_positions:
|
|
new_clip_details.append(clip.to_dict())
|
|
|
|
report = {
|
|
"action_success": action_success,
|
|
"action_result": action_result,
|
|
"action_duration_seconds": round(action_duration, 3),
|
|
"before": {
|
|
"clip_count": before_count,
|
|
"end_time_beats": before_end_time,
|
|
},
|
|
"after": {
|
|
"clip_count": after_count,
|
|
"end_time_beats": after_end_time,
|
|
},
|
|
"changes": {
|
|
"new_clips": new_clips,
|
|
"length_added_beats": length_added,
|
|
"new_clip_details": new_clip_details[:10], # Limit to first 10
|
|
},
|
|
}
|
|
|
|
# Validate against expected changes
|
|
if expected_changes:
|
|
min_clips = expected_changes.get("min_new_clips", 0)
|
|
report["validation"] = {
|
|
"expected_min_new_clips": min_clips,
|
|
"actual_new_clips": new_clips,
|
|
"meets_expectations": new_clips >= min_clips,
|
|
}
|
|
|
|
return report
|
|
|
|
|
|
def assert_clip_properties(clip: Union[ClipInfo, Dict[str, Any]],
|
|
expected: Dict[str, Any],
|
|
tolerance: float = 0.01) -> VerificationResult:
|
|
"""
|
|
Assert that a clip has expected properties.
|
|
|
|
Args:
|
|
clip: ClipInfo or dict with clip data
|
|
expected: Dict of expected property values
|
|
tolerance: Tolerance for floating point comparisons
|
|
|
|
Returns:
|
|
VerificationResult with success/failure details
|
|
"""
|
|
start_time = time.time()
|
|
|
|
if isinstance(clip, dict):
|
|
clip_data = clip
|
|
else:
|
|
clip_data = clip.to_dict()
|
|
|
|
mismatches = []
|
|
|
|
for key, expected_value in expected.items():
|
|
actual_value = clip_data.get(key)
|
|
|
|
if actual_value is None:
|
|
mismatches.append(f"Missing property: {key}")
|
|
continue
|
|
|
|
# Compare with tolerance for floats
|
|
if isinstance(expected_value, float):
|
|
if abs(actual_value - expected_value) > tolerance:
|
|
mismatches.append(f"{key}: expected {expected_value}, got {actual_value}")
|
|
elif actual_value != expected_value:
|
|
mismatches.append(f"{key}: expected {expected_value}, got {actual_value}")
|
|
|
|
success = len(mismatches) == 0
|
|
duration_ms = (time.time() - start_time) * 1000
|
|
|
|
return VerificationResult(
|
|
success=success,
|
|
check_name="assert_clip_properties",
|
|
message=("All properties match" if success else f"Property mismatches: {mismatches}"),
|
|
details={
|
|
"clip": clip_data,
|
|
"expected": expected,
|
|
"mismatches": mismatches,
|
|
"tolerance": tolerance,
|
|
},
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# AUTOMATED VALIDATION
|
|
# =============================================================================
|
|
|
|
class ArrangementValidator:
|
|
"""
|
|
Automated validation system for Arrangement View operations.
|
|
|
|
Provides:
|
|
- Pre-condition checks
|
|
- Post-condition checks
|
|
- Error collection and reporting
|
|
"""
|
|
|
|
def __init__(self, verifier: ArrangementVerifier):
|
|
self.verifier = verifier
|
|
self.pre_check_results: List[VerificationResult] = []
|
|
self.post_check_results: List[VerificationResult] = []
|
|
self.errors: List[str] = []
|
|
|
|
def pre_condition_checks(self) -> bool:
|
|
"""
|
|
Run all pre-condition checks before performing arrangement operations.
|
|
|
|
Checks:
|
|
- Ableton is running and reachable
|
|
- arrangement_overdub is available (via health check)
|
|
- No corruption in current arrangement
|
|
|
|
Returns:
|
|
True if all pre-conditions met, False otherwise
|
|
"""
|
|
self.pre_check_results = []
|
|
|
|
# Check 1: Ableton is running
|
|
resp = self.verifier._send_command("health_check", timeout=10.0)
|
|
ableton_ok = resp.get("status") == "success"
|
|
|
|
result = VerificationResult(
|
|
success=ableton_ok,
|
|
check_name="pre_ableton_running",
|
|
message="Ableton is running and responding" if ableton_ok else "Ableton is not reachable",
|
|
details={"health_response": resp.get("result", {}) if ableton_ok else resp.get("message")},
|
|
)
|
|
self.pre_check_results.append(result)
|
|
|
|
if not ableton_ok:
|
|
self.errors.append("Pre-condition failed: Ableton not running")
|
|
return False
|
|
|
|
# Check 2: Session info available
|
|
resp = self.verifier._send_command("get_session_info", timeout=5.0)
|
|
session_ok = resp.get("status") == "success"
|
|
|
|
result = VerificationResult(
|
|
success=session_ok,
|
|
check_name="pre_session_info",
|
|
message="Session info accessible" if session_ok else "Cannot read session info",
|
|
details={"session": resp.get("result", {}) if session_ok else resp.get("message")},
|
|
)
|
|
self.pre_check_results.append(result)
|
|
|
|
if not session_ok:
|
|
self.errors.append("Pre-condition failed: Cannot read session info")
|
|
|
|
# Check 3: Tracks accessible
|
|
resp = self.verifier._send_command("get_tracks", timeout=5.0)
|
|
tracks_ok = resp.get("status") == "success"
|
|
track_count = len(resp.get("result", {}).get("tracks", [])) if tracks_ok else 0
|
|
|
|
result = VerificationResult(
|
|
success=tracks_ok and track_count > 0,
|
|
check_name="pre_tracks_accessible",
|
|
message=f"{track_count} tracks accessible" if tracks_ok else "Cannot read tracks",
|
|
details={"track_count": track_count},
|
|
)
|
|
self.pre_check_results.append(result)
|
|
|
|
if not tracks_ok or track_count == 0:
|
|
self.errors.append(f"Pre-condition failed: No tracks available ({track_count} found)")
|
|
|
|
# Check 4: arrangement_overdub availability (via session capabilities)
|
|
session_result = resp.get("result", {}) if session_ok else {}
|
|
# arrangement_overdub is typically available in Live 12
|
|
overdub_available = session_ok # Simplified check
|
|
|
|
result = VerificationResult(
|
|
success=overdub_available,
|
|
check_name="pre_arrangement_overdub",
|
|
message="Arrangement overdub available" if overdub_available else "Arrangement overdub not confirmed",
|
|
details={},
|
|
)
|
|
self.pre_check_results.append(result)
|
|
|
|
return all(r.success for r in self.pre_check_results)
|
|
|
|
def post_condition_checks(self, expected_clips: int = None,
|
|
expected_duration: float = None) -> bool:
|
|
"""
|
|
Run all post-condition checks after performing arrangement operations.
|
|
|
|
Args:
|
|
expected_clips: Expected number of clips (None = any)
|
|
expected_duration: Expected total duration in beats (None = any)
|
|
|
|
Returns:
|
|
True if all post-conditions met, False otherwise
|
|
"""
|
|
self.post_check_results = []
|
|
|
|
# Check 1: Clips exist
|
|
clips = self.verifier._get_arrangement_clips()
|
|
clips_exist = len(clips) > 0
|
|
|
|
result = VerificationResult(
|
|
success=clips_exist,
|
|
check_name="post_clips_exist",
|
|
message=f"{len(clips)} clips in arrangement" if clips_exist else "No clips found in arrangement",
|
|
details={"clip_count": len(clips), "clips": [c.name for c in clips[:5]]},
|
|
)
|
|
self.post_check_results.append(result)
|
|
|
|
if expected_clips is not None and len(clips) != expected_clips:
|
|
self.errors.append(f"Post-condition failed: Expected {expected_clips} clips, got {len(clips)}")
|
|
|
|
# Check 2: Clip positions are valid (no negative start times)
|
|
invalid_positions = [c for c in clips if c.start_time < 0]
|
|
positions_valid = len(invalid_positions) == 0
|
|
|
|
result = VerificationResult(
|
|
success=positions_valid,
|
|
check_name="post_positions_valid",
|
|
message="All clip positions valid" if positions_valid else f"{len(invalid_positions)} clips with invalid positions",
|
|
details={"invalid_count": len(invalid_positions), "invalid_clips": [c.to_dict() for c in invalid_positions[:3]]},
|
|
)
|
|
self.post_check_results.append(result)
|
|
|
|
if not positions_valid:
|
|
self.errors.append(f"Post-condition failed: {len(invalid_positions)} clips have negative start times")
|
|
|
|
# Check 3: No corruption (overlapping clips on same track - may be valid but flagged)
|
|
# This is informational as overlapping clips can be intentional
|
|
overlaps = []
|
|
clips_by_track: Dict[int, List[ClipInfo]] = {}
|
|
for c in clips:
|
|
clips_by_track.setdefault(c.track_index, []).append(c)
|
|
|
|
for track_idx, track_clips in clips_by_track.items():
|
|
sorted_clips = sorted(track_clips, key=lambda x: x.start_time)
|
|
for i in range(len(sorted_clips) - 1):
|
|
if sorted_clips[i].end_time > sorted_clips[i + 1].start_time:
|
|
overlaps.append({
|
|
"track": track_idx,
|
|
"clip1": sorted_clips[i].name,
|
|
"clip2": sorted_clips[i + 1].name,
|
|
"overlap_beats": sorted_clips[i].end_time - sorted_clips[i + 1].start_time,
|
|
})
|
|
|
|
result = VerificationResult(
|
|
success=True, # Overlaps are not necessarily errors
|
|
check_name="post_no_corruption",
|
|
message=f"{len(overlaps)} overlapping clips detected (informational)" if overlaps else "No clip overlaps detected",
|
|
details={"overlaps": overlaps[:5]},
|
|
)
|
|
self.post_check_results.append(result)
|
|
|
|
# Check 4: Total duration
|
|
if clips:
|
|
total_duration = max(c.end_time for c in clips)
|
|
else:
|
|
total_duration = 0.0
|
|
|
|
duration_ok = expected_duration is None or abs(total_duration - expected_duration) < 1.0
|
|
|
|
result = VerificationResult(
|
|
success=duration_ok,
|
|
check_name="post_duration_check",
|
|
message=f"Total duration: {total_duration:.1f} beats" if duration_ok else f"Duration mismatch: expected ~{expected_duration}, got {total_duration}",
|
|
details={"total_duration_beats": total_duration, "expected": expected_duration},
|
|
)
|
|
self.post_check_results.append(result)
|
|
|
|
if not duration_ok:
|
|
self.errors.append(f"Post-condition failed: Duration {total_duration} != expected {expected_duration}")
|
|
|
|
return all(r.success for r in self.post_check_results)
|
|
|
|
def get_validation_report(self) -> Dict[str, Any]:
|
|
"""Get complete validation report with all checks and errors."""
|
|
return {
|
|
"pre_checks": [r.to_dict() for r in self.pre_check_results],
|
|
"post_checks": [r.to_dict() for r in self.post_check_results],
|
|
"errors": self.errors,
|
|
"all_pre_conditions_met": all(r.success for r in self.pre_check_results),
|
|
"all_post_conditions_met": all(r.success for r in self.post_check_results),
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# TEST SCENARIOS
|
|
# =============================================================================
|
|
|
|
class ArrangementTestScenarios:
|
|
"""
|
|
Collection of test scenarios for Arrangement View.
|
|
|
|
Each scenario includes:
|
|
- Pre-condition checks
|
|
- Test action execution
|
|
- Post-condition verification
|
|
"""
|
|
|
|
def __init__(self, verifier: ArrangementVerifier):
|
|
self.verifier = verifier
|
|
self.validator = ArrangementValidator(verifier)
|
|
|
|
def test_simple_arrangement_recording(self, duration_bars: int = 4) -> TestReport:
|
|
"""
|
|
T023: Test simple arrangement recording.
|
|
|
|
Records from Session to Arrangement for specified bars and verifies:
|
|
- Recording completes successfully
|
|
- Clips appear in Arrangement View
|
|
- Clip positions are correct
|
|
|
|
Args:
|
|
duration_bars: Number of bars to record
|
|
|
|
Returns:
|
|
TestReport with full results
|
|
"""
|
|
started_at = datetime.now().isoformat()
|
|
start_time = time.time()
|
|
|
|
self.verifier.clear_results()
|
|
results = []
|
|
|
|
# Step 1: Pre-conditions
|
|
logger.info(f"[test_simple_arrangement_recording] Checking pre-conditions...")
|
|
if not self.validator.pre_condition_checks():
|
|
for result in self.validator.pre_check_results:
|
|
results.append(result)
|
|
|
|
return TestReport(
|
|
test_name="test_simple_arrangement_recording",
|
|
started_at=started_at,
|
|
completed_at=datetime.now().isoformat(),
|
|
duration_seconds=time.time() - start_time,
|
|
results=results,
|
|
summary={
|
|
"status": "FAILED",
|
|
"reason": "Pre-conditions not met",
|
|
"total_checks": len(results),
|
|
"passed": sum(1 for r in results if r.success),
|
|
"failed": sum(1 for r in results if not r.success),
|
|
},
|
|
)
|
|
|
|
for result in self.validator.pre_check_results:
|
|
results.append(result)
|
|
|
|
# Step 2: Record to arrangement
|
|
logger.info(f"[test_simple_arrangement_recording] Recording {duration_bars} bars...")
|
|
|
|
def record_action():
|
|
# This simulates the MCP command - in real test, this would call the actual MCP tool
|
|
resp = self.verifier._send_command(
|
|
"record_to_arrangement",
|
|
{"duration_bars": duration_bars},
|
|
timeout=60.0
|
|
)
|
|
return resp
|
|
|
|
# Use compare_before_after pattern
|
|
comparison = compare_arrangement_before_after(
|
|
self.verifier,
|
|
record_action,
|
|
expected_changes={"min_new_clips": 1}
|
|
)
|
|
|
|
# Verify clips were created
|
|
success = self.verifier.verify_arrangement_has_content(min_clips=1)
|
|
|
|
# Step 3: Post-conditions
|
|
logger.info(f"[test_simple_arrangement_recording] Checking post-conditions...")
|
|
self.validator.post_condition_checks()
|
|
for result in self.validator.post_check_results:
|
|
results.append(result)
|
|
|
|
completed_at = datetime.now().isoformat()
|
|
duration = time.time() - start_time
|
|
|
|
# Add verifier results
|
|
results.extend(self.verifier._verification_results)
|
|
|
|
summary = {
|
|
"status": "PASSED" if all(r.success for r in results) else "FAILED",
|
|
"total_checks": len(results),
|
|
"passed": sum(1 for r in results if r.success),
|
|
"failed": sum(1 for r in results if not r.success),
|
|
"recording_comparison": comparison,
|
|
}
|
|
|
|
report = TestReport(
|
|
test_name="test_simple_arrangement_recording",
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
duration_seconds=duration,
|
|
results=results,
|
|
summary=summary,
|
|
)
|
|
|
|
logger.info(f"[test_simple_arrangement_recording] Completed: {summary['status']}")
|
|
return report
|
|
|
|
def test_build_arrangement_timeline(self) -> TestReport:
|
|
"""
|
|
T021: Test building arrangement timeline structure.
|
|
|
|
Creates a full arrangement structure (Intro→Build→Drop→Break→Outro)
|
|
and verifies timeline positions.
|
|
|
|
Returns:
|
|
TestReport with full results
|
|
"""
|
|
started_at = datetime.now().isoformat()
|
|
start_time = time.time()
|
|
|
|
self.verifier.clear_results()
|
|
results = []
|
|
|
|
# Pre-conditions
|
|
if not self.validator.pre_condition_checks():
|
|
for result in self.validator.pre_check_results:
|
|
results.append(result)
|
|
return TestReport(
|
|
test_name="test_build_arrangement_timeline",
|
|
started_at=started_at,
|
|
completed_at=datetime.now().isoformat(),
|
|
duration_seconds=time.time() - start_time,
|
|
results=results,
|
|
summary={"status": "FAILED", "reason": "Pre-conditions not met"},
|
|
)
|
|
|
|
for result in self.validator.pre_check_results:
|
|
results.append(result)
|
|
|
|
# Build arrangement
|
|
song_config = {
|
|
"bpm": 95,
|
|
"structure": "intro_build_drop_break_outro",
|
|
"tracks": [
|
|
{
|
|
"name": "Kick",
|
|
"clips": [
|
|
{"name": "Kick Pattern", "start_time": 0, "duration": 64, "notes": []}
|
|
]
|
|
},
|
|
{
|
|
"name": "Snare",
|
|
"clips": [
|
|
{"name": "Snare Pattern", "start_time": 16, "duration": 48, "notes": []}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
def build_action():
|
|
from engines.arrangement_engine import ArrangementBuilder
|
|
builder = ArrangementBuilder()
|
|
arrangement = builder.fill_arrangement_with_song(song_config)
|
|
return arrangement.to_dict()
|
|
|
|
try:
|
|
arrangement_data = build_action()
|
|
|
|
# Verify structure
|
|
expected_positions = [
|
|
{"track_index": 0, "start_time": 0.0, "name": "Kick"},
|
|
{"track_index": 1, "start_time": 64.0, "name": "Snare"}, # Bar 16 * 4 beats
|
|
]
|
|
|
|
success = self.verifier.verify_clip_positions(expected_positions, tolerance_beats=4.0)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Build arrangement failed: {e}")
|
|
results.append(VerificationResult(
|
|
success=False,
|
|
check_name="build_arrangement",
|
|
message=f"Failed to build arrangement: {str(e)}",
|
|
details={"traceback": traceback.format_exc()},
|
|
))
|
|
|
|
# Post-conditions
|
|
self.validator.post_condition_checks()
|
|
for result in self.validator.post_check_results:
|
|
results.append(result)
|
|
|
|
results.extend(self.verifier._verification_results)
|
|
|
|
summary = {
|
|
"status": "PASSED" if all(r.success for r in results) else "FAILED",
|
|
"total_checks": len(results),
|
|
"passed": sum(1 for r in results if r.success),
|
|
"failed": sum(1 for r in results if not r.success),
|
|
}
|
|
|
|
return TestReport(
|
|
test_name="test_build_arrangement_timeline",
|
|
started_at=started_at,
|
|
completed_at=datetime.now().isoformat(),
|
|
duration_seconds=time.time() - start_time,
|
|
results=results,
|
|
summary=summary,
|
|
)
|
|
|
|
def test_section_at_bar(self, section_bar: int = 8, section_name: str = "drop") -> TestReport:
|
|
"""
|
|
Test creating a specific section at a bar position.
|
|
|
|
Creates a section and verifies it's at the correct location.
|
|
|
|
Args:
|
|
section_bar: Bar where section should start
|
|
section_name: Name of the section
|
|
|
|
Returns:
|
|
TestReport with full results
|
|
"""
|
|
started_at = datetime.now().isoformat()
|
|
start_time = time.time()
|
|
|
|
self.verifier.clear_results()
|
|
results = []
|
|
|
|
# Pre-conditions
|
|
if not self.validator.pre_condition_checks():
|
|
for result in self.validator.pre_check_results:
|
|
results.append(result)
|
|
return TestReport(
|
|
test_name="test_section_at_bar",
|
|
started_at=started_at,
|
|
completed_at=datetime.now().isoformat(),
|
|
duration_seconds=time.time() - start_time,
|
|
results=results,
|
|
summary={"status": "FAILED", "reason": "Pre-conditions not met"},
|
|
)
|
|
|
|
for result in self.validator.pre_check_results:
|
|
results.append(result)
|
|
|
|
# Create section
|
|
def create_section():
|
|
from engines.arrangement_engine import ArrangementBuilder
|
|
builder = ArrangementBuilder()
|
|
marker = builder.create_section_marker(section_name, section_bar)
|
|
return marker.to_dict()
|
|
|
|
try:
|
|
marker_data = create_section()
|
|
|
|
# Verify section position
|
|
actual_start = marker_data.get("start_bar")
|
|
actual_end = marker_data.get("end_bar")
|
|
|
|
position_correct = actual_start == section_bar
|
|
duration_positive = actual_end > actual_start
|
|
|
|
results.append(VerificationResult(
|
|
success=position_correct and duration_positive,
|
|
check_name="section_position",
|
|
message=f"Section '{section_name}' at bar {actual_start}, ends at {actual_end}",
|
|
details={
|
|
"expected_bar": section_bar,
|
|
"actual_start": actual_start,
|
|
"actual_end": actual_end,
|
|
"position_correct": position_correct,
|
|
"duration_positive": duration_positive,
|
|
},
|
|
))
|
|
|
|
except Exception as e:
|
|
results.append(VerificationResult(
|
|
success=False,
|
|
check_name="create_section",
|
|
message=f"Failed to create section: {str(e)}",
|
|
details={"traceback": traceback.format_exc()},
|
|
))
|
|
|
|
# Post-conditions
|
|
self.validator.post_condition_checks()
|
|
for result in self.validator.post_check_results:
|
|
results.append(result)
|
|
|
|
summary = {
|
|
"status": "PASSED" if all(r.success for r in results) else "FAILED",
|
|
"total_checks": len(results),
|
|
"passed": sum(1 for r in results if r.success),
|
|
"failed": sum(1 for r in results if not r.success),
|
|
}
|
|
|
|
return TestReport(
|
|
test_name="test_section_at_bar",
|
|
started_at=started_at,
|
|
completed_at=datetime.now().isoformat(),
|
|
duration_seconds=time.time() - start_time,
|
|
results=results,
|
|
summary=summary,
|
|
)
|
|
|
|
def test_without_numpy(self) -> TestReport:
|
|
"""
|
|
Test that all functionality works without numpy dependency.
|
|
|
|
Runs core verification methods using only SQLite and standard library.
|
|
|
|
Returns:
|
|
TestReport with full results
|
|
"""
|
|
started_at = datetime.now().isoformat()
|
|
start_time = time.time()
|
|
|
|
self.verifier.clear_results()
|
|
results = []
|
|
|
|
# Verify no numpy is imported
|
|
import sys
|
|
numpy_loaded = "numpy" in sys.modules
|
|
|
|
results.append(VerificationResult(
|
|
success=not numpy_loaded,
|
|
check_name="no_numpy_dependency",
|
|
message="numpy not loaded" if not numpy_loaded else "numpy is loaded (may cause issues)",
|
|
details={"numpy_in_sys_modules": numpy_loaded},
|
|
))
|
|
|
|
# Run basic verifications that don't need numpy
|
|
try:
|
|
# Test database operations
|
|
db_success = self.verifier.save_results_to_db("test_without_numpy")
|
|
|
|
results.append(VerificationResult(
|
|
success=db_success,
|
|
check_name="sqlite_operations",
|
|
message="SQLite operations successful" if db_success else "SQLite operations failed",
|
|
details={},
|
|
))
|
|
|
|
# Test clip counting
|
|
clips = self.verifier._get_arrangement_clips()
|
|
results.append(VerificationResult(
|
|
success=True, # Even 0 clips is valid
|
|
check_name="clip_counting",
|
|
message=f"Retrieved {len(clips)} clips without numpy",
|
|
details={"clip_count": len(clips)},
|
|
))
|
|
|
|
except Exception as e:
|
|
results.append(VerificationResult(
|
|
success=False,
|
|
check_name="without_numpy_execution",
|
|
message=f"Error running without numpy: {str(e)}",
|
|
details={"traceback": traceback.format_exc()},
|
|
))
|
|
|
|
summary = {
|
|
"status": "PASSED" if all(r.success for r in results) else "FAILED",
|
|
"total_checks": len(results),
|
|
"passed": sum(1 for r in results if r.success),
|
|
"failed": sum(1 for r in results if not r.success),
|
|
}
|
|
|
|
return TestReport(
|
|
test_name="test_without_numpy",
|
|
started_at=started_at,
|
|
completed_at=datetime.now().isoformat(),
|
|
duration_seconds=time.time() - start_time,
|
|
results=results,
|
|
summary=summary,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# MCP INTEGRATION
|
|
# =============================================================================
|
|
|
|
def create_mcp_test_tools() -> List[Dict[str, Any]]:
|
|
"""
|
|
Create test tool definitions for MCP integration.
|
|
|
|
Returns:
|
|
List of tool definitions that can be registered with MCP server
|
|
"""
|
|
return [
|
|
{
|
|
"name": "run_arrangement_test",
|
|
"description": "Run a specific Arrangement View test scenario",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"test_name": {
|
|
"type": "string",
|
|
"enum": ["simple_recording", "build_timeline", "section_at_bar", "without_numpy"],
|
|
"description": "Name of test to run",
|
|
},
|
|
"duration_bars": {
|
|
"type": "number",
|
|
"default": 4,
|
|
"description": "Duration for recording tests",
|
|
},
|
|
"section_bar": {
|
|
"type": "number",
|
|
"default": 8,
|
|
"description": "Bar position for section tests",
|
|
},
|
|
},
|
|
"required": ["test_name"],
|
|
},
|
|
},
|
|
{
|
|
"name": "verify_arrangement_state",
|
|
"description": "Verify current state of Arrangement View",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"expected_clips": {
|
|
"type": "number",
|
|
"description": "Expected number of clips",
|
|
},
|
|
"expected_duration": {
|
|
"type": "number",
|
|
"description": "Expected total duration in beats",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"name": "get_arrangement_report",
|
|
"description": "Get comprehensive arrangement verification report",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {},
|
|
},
|
|
},
|
|
]
|
|
|
|
|
|
def run_mcp_test(test_name: str, **kwargs) -> str:
|
|
"""
|
|
Execute a test via MCP and return JSON result.
|
|
|
|
This function is designed to be called from the MCP server as a tool handler.
|
|
|
|
Args:
|
|
test_name: Name of test to run
|
|
**kwargs: Additional test parameters
|
|
|
|
Returns:
|
|
JSON string with test results
|
|
"""
|
|
verifier = ArrangementVerifier()
|
|
scenarios = ArrangementTestScenarios(verifier)
|
|
|
|
test_map = {
|
|
"simple_recording": lambda: scenarios.test_simple_arrangement_recording(
|
|
duration_bars=kwargs.get("duration_bars", 4)
|
|
),
|
|
"build_timeline": lambda: scenarios.test_build_arrangement_timeline(),
|
|
"section_at_bar": lambda: scenarios.test_section_at_bar(
|
|
section_bar=kwargs.get("section_bar", 8)
|
|
),
|
|
"without_numpy": lambda: scenarios.test_without_numpy(),
|
|
}
|
|
|
|
if test_name not in test_map:
|
|
return json.dumps({
|
|
"status": "error",
|
|
"message": f"Unknown test: {test_name}. Available: {list(test_map.keys())}",
|
|
}, indent=2)
|
|
|
|
try:
|
|
report = test_map[test_name]()
|
|
return report.to_json()
|
|
except Exception as e:
|
|
return json.dumps({
|
|
"status": "error",
|
|
"message": str(e),
|
|
"traceback": traceback.format_exc(),
|
|
}, indent=2)
|
|
|
|
|
|
def generate_test_report_json(verifier: ArrangementVerifier,
|
|
test_name: str = "arrangement_verification") -> str:
|
|
"""
|
|
Generate a comprehensive JSON report for MCP consumption.
|
|
|
|
Args:
|
|
verifier: ArrangementVerifier with results
|
|
test_name: Name of the test run
|
|
|
|
Returns:
|
|
JSON string with complete report
|
|
"""
|
|
report_data = verifier.get_verification_report()
|
|
report_data["test_name"] = test_name
|
|
report_data["generated_at"] = datetime.now().isoformat()
|
|
|
|
return json.dumps(report_data, indent=2)
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN / TEST ENTRY POINT
|
|
# =============================================================================
|
|
|
|
def run_all_tests() -> Dict[str, TestReport]:
|
|
"""
|
|
Run all test scenarios and return reports.
|
|
|
|
Returns:
|
|
Dict mapping test names to TestReport objects
|
|
"""
|
|
verifier = ArrangementVerifier()
|
|
scenarios = ArrangementTestScenarios(verifier)
|
|
|
|
reports = {}
|
|
|
|
logger.info("=" * 70)
|
|
logger.info("RUNNING ALL ARRANGEMENT VIEW TESTS")
|
|
logger.info("=" * 70)
|
|
|
|
# Test 1: Simple recording
|
|
logger.info("\n[1/4] Running test_simple_arrangement_recording...")
|
|
reports["simple_recording"] = scenarios.test_simple_arrangement_recording(duration_bars=4)
|
|
|
|
# Test 2: Build timeline
|
|
logger.info("\n[2/4] Running test_build_arrangement_timeline...")
|
|
reports["build_timeline"] = scenarios.test_build_arrangement_timeline()
|
|
|
|
# Test 3: Section at bar
|
|
logger.info("\n[3/4] Running test_section_at_bar...")
|
|
reports["section_at_bar"] = scenarios.test_section_at_bar(section_bar=8)
|
|
|
|
# Test 4: Without numpy
|
|
logger.info("\n[4/4] Running test_without_numpy...")
|
|
reports["without_numpy"] = scenarios.test_without_numpy()
|
|
|
|
# Summary
|
|
logger.info("\n" + "=" * 70)
|
|
logger.info("TEST SUMMARY")
|
|
logger.info("=" * 70)
|
|
|
|
for name, report in reports.items():
|
|
status = report.summary.get("status", "UNKNOWN")
|
|
passed = report.summary.get("passed", 0)
|
|
total = report.summary.get("total_checks", 0)
|
|
logger.info(f" {name}: {status} ({passed}/{total} checks passed)")
|
|
|
|
return reports
|
|
|
|
|
|
def main():
|
|
"""Main entry point for running tests from command line."""
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s"
|
|
)
|
|
|
|
print("=" * 70)
|
|
print("ARRANGEMENT VIEW VERIFICATION AND TESTING SYSTEM")
|
|
print("=" * 70)
|
|
print()
|
|
|
|
# Run all tests
|
|
reports = run_all_tests()
|
|
|
|
# Save results
|
|
print("\n" + "=" * 70)
|
|
print("SAVING RESULTS")
|
|
print("=" * 70)
|
|
|
|
for name, report in reports.items():
|
|
json_path = Path(f"test_report_{name}.json")
|
|
with open(json_path, "w") as f:
|
|
f.write(report.to_json())
|
|
print(f" Saved: {json_path}")
|
|
|
|
print("\nDone!")
|
|
|
|
return reports
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|