import argparse import json import socket from datetime import datetime from typing import Any, Dict, List, Tuple try: from song_generator import SongGenerator except ImportError: SongGenerator = None STRUCTURE_SCENE_COUNTS = { "minimal": 4, "standard": 6, "extended": 7, } # Expected buses for Phase 7 validation EXPECTED_BUSES = ["drums", "bass", "music", "vocal", "fx"] EXPECTED_CRITICAL_ROLES = {"kick", "bass", "clap", "hat"} EXPECTED_AUDIO_FX_LAYERS = ["AUDIO ATMOS", "AUDIO CRASH FX", "AUDIO TRANSITION FILL"] EXPECTED_BUS_NAMES = ["DRUMS", "BASS", "MUSIC"] MIN_TRACKS_FOR_EXPORT = 6 MIN_BUSES_FOR_EXPORT = 3 MIN_RETURNS_FOR_EXPORT = 2 MASTER_VOLUME_RANGE = (0.75, 0.95) # Expected AUDIO RESAMPLE track names AUDIO_RESAMPLE_TRACKS = [ "AUDIO RESAMPLE REVERSE FX", "AUDIO RESAMPLE RISER", "AUDIO RESAMPLE DOWNLIFTER", "AUDIO RESAMPLE STUTTER", ] # Bus routing map: track role -> expected bus output BUS_ROUTING_MAP = { "kick": {"drums"}, "snare": {"drums"}, "clap": {"drums"}, "hat": {"drums"}, "perc": {"drums"}, "sub_bass": {"bass"}, "bass": {"bass"}, "chords": {"music"}, "pad": {"music"}, "pluck": {"music"}, "lead": {"music"}, "vocal": {"vocal"}, "vocal_chop": {"vocal"}, "reverse_fx": {"fx"}, "riser": {"fx"}, "impact": {"fx"}, "atmos": {"fx"}, "crash": {"drums", "fx"}, } def _extract_bus_payload(payload: Any) -> List[Dict[str, Any]]: if isinstance(payload, list): return [item for item in payload if isinstance(item, dict)] if isinstance(payload, dict): buses = payload.get("buses", []) if isinstance(buses, list): return [item for item in buses if isinstance(item, dict)] return [] def _normalize_bus_key(name: str) -> str: normalized = "".join(ch for ch in (name or "").lower() if ch.isalnum()) if not normalized: return "" if "drum" in normalized or "groove" in normalized: return "drums" if "bass" in normalized or "tube" in normalized or "subdeep" in normalized: return "bass" if "music" in normalized or "wide" in normalized: return "music" if "vocal" in normalized or "vox" in normalized or "tail" in normalized: return "vocal" if "fx" in normalized or "wash" in normalized: return "fx" return "" def _canonical_track_name(name: str) -> str: text = (name or "").strip().lower() if not text: return "" if " (" in text: text = text.split(" (", 1)[0].strip() return text class AbletonSocketClient: def __init__(self, host: str = "127.0.0.1", port: int = 9877, timeout: float = 15.0): self.host = host self.port = port self.timeout = timeout def send(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]: payload = json.dumps({ "type": command_type, "params": params or {}, }).encode("utf-8") + b"\n" with socket.create_connection((self.host, self.port), timeout=self.timeout) as sock: sock.sendall(payload) reader = sock.makefile("r", encoding="utf-8") try: line = reader.readline() finally: reader.close() try: sock.shutdown(socket.SHUT_RDWR) except OSError: pass if not line: raise RuntimeError(f"No response for command: {command_type}") return json.loads(line) def expect_success(name: str, response: Dict[str, Any]) -> Dict[str, Any]: if response.get("status") != "success": raise RuntimeError(f"{name} failed: {response}") return response.get("result", {}) class TestResult: """Tracks test results for reporting.""" def __init__(self): self.passed: List[Tuple[str, str]] = [] self.failed: List[Tuple[str, str]] = [] self.skipped: List[Tuple[str, str]] = [] self.warnings: List[Tuple[str, str]] = [] def add_pass(self, name: str, details: str = ""): self.passed.append((name, details)) def add_fail(self, name: str, error: str): self.failed.append((name, error)) def add_skip(self, name: str, reason: str): self.skipped.append((name, reason)) def add_warning(self, name: str, message: str): self.warnings.append((name, message)) def to_dict(self) -> Dict[str, Any]: return { "summary": { "total": len(self.passed) + len(self.failed) + len(self.skipped) + len(self.warnings), "passed": len(self.passed), "failed": len(self.failed), "skipped": len(self.skipped), "warnings": len(self.warnings), "status": "PASS" if len(self.failed) == 0 else "FAIL", }, "passed_tests": [{"name": n, "details": d} for n, d in self.passed], "failed_tests": [{"name": n, "error": d} for n, d in self.failed], "skipped_tests": [{"name": n, "reason": d} for n, d in self.skipped], "warnings": [{"name": n, "message": d} for n, d in self.warnings], } def print_report(self): print("\n" + "=" * 60) print("PHASE 7 SMOKE TEST REPORT") print("=" * 60) print(f"Timestamp: {datetime.now().isoformat()}") print(f"Total: {len(self.passed) + len(self.failed) + len(self.skipped) + len(self.warnings)}") print(f"Passed: {len(self.passed)}") print(f"Failed: {len(self.failed)}") print(f"Skipped: {len(self.skipped)}") print(f"Warnings: {len(self.warnings)}") print("-" * 60) if self.passed: print("\n[PASSED]") for name, details in self.passed: print(f" [OK] {name}: {details}") if self.failed: print("\n[FAILED]") for name, error in self.failed: print(f" [FAIL] {name}: {error}") if self.warnings: print("\n[WARNINGS]") for name, message in self.warnings: print(f" [WARN] {name}: {message}") if self.skipped: print("\n[SKIPPED]") for name, reason in self.skipped: print(f" [SKIP] {name}: {reason}") print("\n" + "=" * 60) status = "PASS" if len(self.failed) == 0 else "FAIL" print(f"FINAL STATUS: {status}") print("=" * 60 + "\n") def run_readonly_checks(client: AbletonSocketClient) -> List[Tuple[str, str]]: checks = [] expect_success("get_session_info", client.send("get_session_info")) checks.append(( "get_session_info", # f"tempo={session.get('tempo')} tracks={session.get('num_tracks')} scenes={session.get('num_scenes')}", )) tracks = expect_success("get_tracks", client.send("get_tracks")) checks.append(("get_tracks", f"tracks={len(tracks)}")) return checks def run_generation_check( client: AbletonSocketClient, genre: str, style: str, bpm: float, key: str, structure: str, use_blueprint: bool = False, ) -> List[Tuple[str, str]]: checks = [] params = { "genre": genre, "style": style, "bpm": bpm, "key": key, "structure": structure, } if use_blueprint and SongGenerator is not None: params = SongGenerator().generate_config(genre, style, bpm, key, structure) result = expect_success( "generate_complete_song", client.send("generate_complete_song", params), ) checks.append(( "generate_complete_song", f"tracks={result.get('tracks')} scenes={result.get('scenes')} structure={result.get('structure')}", )) session = expect_success("post_generate_session_info", client.send("get_session_info")) actual_scenes = session.get("num_scenes") expected_scenes = len(params.get("sections", [])) if use_blueprint and isinstance(params, dict) and params.get("sections") else STRUCTURE_SCENE_COUNTS.get(structure.lower()) if expected_scenes is not None and actual_scenes != expected_scenes: raise RuntimeError( f"scene count mismatch after generate_complete_song: expected {expected_scenes}, got {actual_scenes}" ) checks.append(( "post_generate_session_info", f"tracks={session.get('num_tracks')} scenes={actual_scenes}", )) return checks def run_bus_checks(client: AbletonSocketClient, results: TestResult) -> None: """Verify buses are created correctly.""" try: buses_payload = expect_success("list_buses", client.send("list_buses")) buses = _extract_bus_payload(buses_payload) bus_keys = {_normalize_bus_key(bus.get("name", "")) for bus in buses} bus_keys.discard("") found_buses = [] missing_buses = [] for expected in EXPECTED_BUSES: if expected in bus_keys: found_buses.append(expected) else: missing_buses.append(expected) if found_buses: results.add_pass("buses_found", f"found={found_buses}") if missing_buses: # Not a failure if buses don't exist yet - they may be created during generation results.add_skip("buses_missing", f"not_found={missing_buses} (may be created during generation)") else: results.add_pass("buses_complete", "all expected buses present") except Exception as e: results.add_fail("buses_check", str(e)) def run_routing_checks(client: AbletonSocketClient, results: TestResult) -> None: """Verify track routing is configured correctly.""" try: tracks = expect_success("get_tracks", client.send("get_tracks")) if not tracks: results.add_skip("routing_check", "no tracks to verify routing") return correct_routing = 0 incorrect_routing = [] no_routing = 0 for track in tracks: original_track_name = track.get("name", "") track_name = _canonical_track_name(original_track_name) output_routing = track.get("current_output_routing", "") output_bus_key = _normalize_bus_key(output_routing) track_bus_key = _normalize_bus_key(track_name) if output_routing and output_routing.lower() != "master": correct_routing += 1 elif not output_routing: no_routing += 1 if track_bus_key: continue for role, expected_bus in BUS_ROUTING_MAP.items(): if role in track_name: if output_bus_key in expected_bus: correct_routing += 1 elif output_routing.lower() != "master": expected_label = "/".join(sorted(expected_bus)) incorrect_routing.append(f"{original_track_name.lower()} -> {output_routing} (expected {expected_label})") results.add_pass("routing_summary", f"correct={correct_routing} no_routing={no_routing}") if incorrect_routing: results.add_fail("routing_mismatches", ", ".join(incorrect_routing[:5])) elif correct_routing > 0: results.add_pass("routing_correct", f"{correct_routing} tracks with non-master routing") except Exception as e: results.add_fail("routing_check", str(e)) def run_audio_resample_checks(client: AbletonSocketClient, results: TestResult) -> None: """Verify AUDIO RESAMPLE tracks exist.""" try: tracks = expect_success("get_tracks", client.send("get_tracks")) track_names = [t.get("name", "") for t in tracks] found_layers = [] missing_layers = [] for expected in AUDIO_RESAMPLE_TRACKS: if any(expected.upper() in name.upper() for name in track_names): found_layers.append(expected) else: missing_layers.append(expected) if found_layers: results.add_pass("audio_resample_found", f"layers={found_layers}") if missing_layers: results.add_skip("audio_resample_missing", f"not_found={missing_layers} (may require reference audio)") else: results.add_pass("audio_resample_complete", "all 4 resample layers present") # Verify they are audio tracks for track in tracks: name = track.get("name", "").upper() if "AUDIO RESAMPLE" in name: if track.get("has_audio_input"): results.add_pass(f"audio_track_type_{name[:20]}", "correct audio track type") else: results.add_fail(f"audio_track_type_{name[:20]}", "expected audio track") except Exception as e: results.add_fail("audio_resample_check", str(e)) def run_automation_snapshot_checks(client: AbletonSocketClient, results: TestResult) -> None: """Verify automation and device parameter snapshots.""" try: tracks = expect_success("get_tracks", client.send("get_tracks")) total_devices = 0 tracks_with_devices = 0 tracks_with_automation = 0 for track in tracks: num_devices = track.get("num_devices", 0) if num_devices > 0: total_devices += num_devices tracks_with_devices += 1 # Check for arrangement clips (may contain automation) arrangement_clips = track.get("arrangement_clip_count", 0) if arrangement_clips > 0: tracks_with_automation += 1 if tracks_with_devices > 0: results.add_pass("automation_devices", f"tracks_with_devices={tracks_with_devices} total_devices={total_devices}") else: results.add_skip("automation_devices", "no devices found") if tracks_with_automation > 0: results.add_pass("automation_clips", f"tracks_with_arrangement_clips={tracks_with_automation}") else: results.add_skip("automation_clips", "no arrangement clips (may need to commit to arrangement)") # Try to get device parameters for first track with devices for i, track in enumerate(tracks): if track.get("num_devices", 0) > 0: try: devices = expect_success("get_devices", client.send("get_devices", {"track_index": i})) if devices: params_sample = [] for dev in devices[:3]: params = dev.get("parameters", []) if params: params_sample.append(f"{dev.get('name', '?')}:{len(params)}params") if params_sample: results.add_pass("automation_params_snapshot", ", ".join(params_sample[:3])) break except Exception: pass break except Exception as e: results.add_fail("automation_snapshot_check", str(e)) def run_loudness_checks(client: AbletonSocketClient, results: TestResult) -> None: """Verify basic loudness levels using output meters.""" try: tracks = expect_success("get_tracks", client.send("get_tracks")) tracks_with_signal = 0 max_level = 0.0 level_samples = [] for track in tracks: output_level = track.get("output_meter_level", 0.0) left = track.get("output_meter_left", 0.0) right = track.get("output_meter_right", 0.0) if output_level and output_level > 0: tracks_with_signal += 1 max_level = max(max_level, output_level) level_samples.append(f"{track.get('name', '?')[:15]}:{output_level:.2f}") # Check for stereo balance if left and right and left > 0 and right > 0: balance = abs(left - right) if balance < 0.1: pass # Balanced stereo if tracks_with_signal > 0: results.add_pass("loudness_signal_detected", f"tracks_with_signal={tracks_with_signal} max_level={max_level:.3f}") else: results.add_skip("loudness_signal", "no signal detected (playback may be stopped)") # Check for clipping (levels > 1.0) if max_level > 1.0: results.add_fail("loudness_clipping", f"max_level={max_level:.3f} indicates potential clipping") else: results.add_pass("loudness_no_clipping", f"max_level={max_level:.3f}") # Sample levels for verification if level_samples: results.add_pass("loudness_levels", ", ".join(level_samples[:5])) except Exception as e: results.add_fail("loudness_check", str(e)) def run_critical_layer_checks(client: AbletonSocketClient, results: TestResult) -> None: """Verify critical layers (kick, bass, clap, hat) exist and have content.""" try: tracks = expect_success("get_tracks", client.send("get_tracks")) track_names = [str(t.get("name", "")).upper() for t in tracks if isinstance(t, dict)] found_layers = {role: False for role in EXPECTED_CRITICAL_ROLES} for track_name in track_names: for role in EXPECTED_CRITICAL_ROLES: if role.upper() in track_name or f"AUDIO {role.upper()}" in track_name: found_layers[role] = True break for role, found in found_layers.items(): if found: results.add_pass(f"critical_layer_{role}", "found in tracks") else: results.add_fail(f"critical_layer_{role}", "missing - set may sound incomplete") except Exception as e: results.add_fail("critical_layer_check", str(e)) def run_derived_fx_checks(client: AbletonSocketClient, results: TestResult) -> None: """Verify derived FX tracks (AUDIO RESAMPLE) are present.""" try: tracks = expect_success("get_tracks", client.send("get_tracks")) track_names = [str(t.get("name", "")).upper() for t in tracks if isinstance(t, dict)] found_derived = [] missing_derived = [] for expected in AUDIO_RESAMPLE_TRACKS: if any(expected.upper() in name for name in track_names): found_derived.append(expected) else: missing_derived.append(expected) if found_derived: results.add_pass("derived_fx_found", f"layers={found_derived}") if missing_derived: results.add_skip("derived_fx_missing", f"not_found={missing_derived} (may require reference audio)") else: results.add_pass("derived_fx_complete", "all 4 resample layers present") except Exception as e: results.add_fail("derived_fx_check", str(e)) def run_export_readiness_checks(client: AbletonSocketClient, results: TestResult) -> None: """Verify set is ready for export.""" try: expect_success("get_session_info", client.send("get_session_info")) tracks = expect_success("get_tracks", client.send("get_tracks")) issues = [] track_count = len(tracks) if isinstance(tracks, list) else 0 if track_count < MIN_TRACKS_FOR_EXPORT: issues.append(f"insufficient_tracks: {track_count} (need {MIN_TRACKS_FOR_EXPORT}+)") master_response = client.send("get_track_info", {"track_type": "master", "track_index": 0}) if master_response.get("status") == "success": master_volume = float(master_response.get("result", {}).get("volume", 0.85)) if master_volume < MASTER_VOLUME_RANGE[0]: issues.append(f"master_volume_low: {master_volume:.2f}") elif master_volume > MASTER_VOLUME_RANGE[1]: issues.append(f"master_volume_high: {master_volume:.2f}") muted_count = sum(1 for t in tracks if isinstance(t, dict) and t.get("mute", False)) if muted_count > track_count * 0.5: issues.append(f"too_many_muted: {muted_count}/{track_count}") if issues: results.add_pass("export_readiness_issues", f"issues={len(issues)}") for issue in issues: results.add_fail(f"export_ready_{issue.split(':')[0]}", issue) else: results.add_pass("export_ready", "set appears ready for export") except Exception as e: results.add_fail("export_readiness_check", str(e)) def run_midi_clip_content_checks(client: AbletonSocketClient, results: TestResult) -> None: """Verify MIDI tracks have clips with notes.""" try: tracks = expect_success("get_tracks", client.send("get_tracks")) midi_tracks_empty = [] midi_tracks_with_notes = 0 for track in tracks: if not isinstance(track, dict): continue track_type = str(track.get("type", "")).lower() if track_type != "midi": continue track_name = track.get("name", "?") clips = track.get("clips", []) if not isinstance(clips, list): clips = [] has_notes = False empty_clips = [] for clip in clips: if not isinstance(clip, dict): continue notes_count = clip.get("notes_count", 0) has_notes_flag = clip.get("has_notes", None) if has_notes_flag is True or notes_count > 0: has_notes = True elif has_notes_flag is False or (has_notes_flag is None and notes_count == 0): empty_clips.append(clip.get("name", "?")) if has_notes: midi_tracks_with_notes += 1 elif empty_clips: midi_tracks_empty.append({ "track_name": track_name, "empty_clips_count": len(empty_clips), }) if midi_tracks_with_notes > 0: results.add_pass("midi_tracks_with_notes", f"count={midi_tracks_with_notes}") if midi_tracks_empty: for track_info in midi_tracks_empty[:3]: results.add_fail( f"midi_track_empty_{track_info['track_name'][:20]}", f"Track has {track_info['empty_clips_count']} empty MIDI clips - may need notes" ) except Exception as e: results.add_fail("midi_clip_content_check", str(e)) def run_bus_signal_checks(client: AbletonSocketClient, results: TestResult) -> None: """Verify buses receive signal from tracks.""" try: buses_payload = expect_success("list_buses", client.send("list_buses")) buses = _extract_bus_payload(buses_payload) tracks = expect_success("get_tracks", client.send("get_tracks")) bus_signal_map = {} for bus in buses: if not isinstance(bus, dict): continue bus_name = bus.get("name", "").upper() bus_signal_map[bus_name] = {"senders": [], "has_signal": False} for track in tracks: if not isinstance(track, dict): continue track_name = str(track.get("name", "")).upper() output_routing = str(track.get("current_output_routing", "")).upper() for bus_name in bus_signal_map: if bus_name in output_routing: bus_signal_map[bus_name]["senders"].append(track_name) sends = track.get("sends", []) if isinstance(sends, list): for send_level in sends: try: if float(send_level) > 0.01: pass except (TypeError, ValueError): pass buses_without_senders = [] buses_with_senders = [] for bus_name, info in bus_signal_map.items(): if info["senders"]: buses_with_senders.append(bus_name) else: buses_without_senders.append(bus_name) if buses_with_senders: results.add_pass("buses_with_signal", f"buses={buses_with_senders}") if buses_without_senders: for bus_name in buses_without_senders[:3]: results.add_fail(f"bus_no_signal_{bus_name[:15]}", f"Bus '{bus_name}' has no routed tracks - will not produce output") except Exception as e: results.add_fail("bus_signal_check", str(e)) def run_clipping_detection(client: AbletonSocketClient, results: TestResult) -> None: """Detect tracks with dangerously high volume (clipping risk).""" try: tracks = expect_success("get_tracks", client.send("get_tracks")) clipping_tracks = [] high_volume_tracks = [] for track in tracks: if not isinstance(track, dict): continue track_name = track.get("name", "?") volume = float(track.get("volume", 0.85)) if volume > 0.95: clipping_tracks.append({"name": track_name, "volume": volume}) elif volume > 0.90: high_volume_tracks.append({"name": track_name, "volume": volume}) if clipping_tracks: for track_info in clipping_tracks[:3]: results.add_fail(f"clipping_track_{track_info['name'][:15]}",f"Volume {track_info['volume']:.2f} > 0.95 - CLIPPING RISK") if high_volume_tracks: for track_info in high_volume_tracks[:3]: results.add_warning(f"high_volume_{track_info['name'][:15]}", f"Volume {track_info['volume']:.2f} - consider reducing") if not clipping_tracks and not high_volume_tracks: results.add_pass("no_clipping_tracks", "All track volumes in safe range") except Exception as e: results.add_fail("clipping_detection", str(e)) def run_all_phase7_tests(client: AbletonSocketClient, results: TestResult) -> None: """Run all Phase 7 smoke tests.""" print("\n[Phase 7] Running bus verification...") run_bus_checks(client, results) print("[Phase 7] Running routing verification...") run_routing_checks(client, results) print("[Phase 7] Running AUDIO RESAMPLE track verification...") run_audio_resample_checks(client, results) print("[Phase 7] Running automation snapshot verification...") run_automation_snapshot_checks(client, results) print("[Phase 7] Running loudness verification...") run_loudness_checks(client, results) print("[Phase 7] Running critical layer verification...") run_critical_layer_checks(client, results) print("[Phase 7] Running derived FX verification...") run_derived_fx_checks(client, results) print("[Phase 7] Running export readiness verification...") run_export_readiness_checks(client, results) print("[Phase 7] Running MIDI clip content verification...") run_midi_clip_content_checks(client, results) print("[Phase 7] Running bus signal verification...") run_bus_signal_checks(client, results) print("[Phase 7] Running clipping detection...") run_clipping_detection(client, results) def main() -> int: parser = argparse.ArgumentParser(description="Smoke test for AbletonMCP_AI socket runtime") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=9877) parser.add_argument("--timeout", type=float, default=15.0) parser.add_argument("--generate-demo", action="store_true") parser.add_argument("--genre", default="techno") parser.add_argument("--style", default="industrial") parser.add_argument("--bpm", type=float, default=128.0) parser.add_argument("--key", default="Am") parser.add_argument("--structure", default="standard") parser.add_argument("--use-blueprint", action="store_true") parser.add_argument("--phase7", action="store_true", help="Run Phase 7 extended tests (buses, routing, audio resample, automation, loudness)") parser.add_argument("--json-report", action="store_true", help="Output report as JSON") args = parser.parse_args() client = AbletonSocketClient(host=args.host, port=args.port, timeout=args.timeout) # Run basic checks print("[Basic] Running readonly checks...") checks = run_readonly_checks(client) for name, details in checks: print(f"[ok] {name}: {details}") # Run generation check if requested if args.generate_demo: print("\n[Generation] Running generation check...") checks.extend( run_generation_check( client, genre=args.genre, style=args.style, bpm=args.bpm, key=args.key, structure=args.structure, use_blueprint=args.use_blueprint, ) ) for name, details in checks[-2:]: print(f"[ok] {name}: {details}") # Run Phase 7 tests if requested results = TestResult() if args.phase7: run_all_phase7_tests(client, results) if args.json_report: print(json.dumps(results.to_dict(), indent=2)) else: results.print_report() return 0 if len(results.failed) == 0 else 1 return 0 if __name__ == "__main__": raise SystemExit(main())