- Add _cmd_create_arrangement_audio_pattern with 5-method fallback chain - Method 1: track.insert_arrangement_clip() [Live 12+] - Method 2: track.create_audio_clip() [Live 11+] - Method 3: arrangement_clips.add_new_clip() [Live 12+] - Method 4: Session->duplicate_clip_to_arrangement [Legacy] - Method 5: Session->Recording [Universal] - Add _cmd_duplicate_clip_to_arrangement for session-to-arrangement workflow - Update skills documentation - Verified: 3 clips created at positions [0, 4, 8] in Arrangement View Closes: Audio injection in Arrangement View
1431 lines
48 KiB
Python
1431 lines
48 KiB
Python
#!/usr/bin/env python3
|
|
"""CLI tool to migrate AbletonMCP_AI to Senior Architecture.
|
|
|
|
This script:
|
|
1. Creates SQLite metadata database
|
|
2. Analyzes all 511 samples (with or without numpy)
|
|
3. Backs up existing configuration
|
|
4. Updates all necessary files
|
|
5. Runs verification tests
|
|
6. Generates migration report
|
|
|
|
Usage:
|
|
python migrate_to_senior.py # Full migration with defaults
|
|
python migrate_to_senior.py --backup --verify # Backup then verify
|
|
python migrate_to_senior.py --analyze=skip # Skip sample analysis
|
|
python migrate_to_senior.py --dry-run # Preview changes
|
|
python migrate_to_senior.py --interactive # Interactive mode
|
|
|
|
Author: AbletonMCP_AI
|
|
Version: 1.0.0
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
import os
|
|
import json
|
|
import shutil
|
|
import sqlite3
|
|
import subprocess
|
|
import traceback
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional, Callable
|
|
from dataclasses import dataclass, field, asdict
|
|
|
|
# =============================================================================
|
|
# CONSTANTS AND CONFIGURATION
|
|
# =============================================================================
|
|
|
|
VERSION = "1.0.0"
|
|
MIGRATION_NAME = "Senior Architecture Migration"
|
|
|
|
# Paths
|
|
BASE_DIR = Path(r"C:\ProgramData\Ableton\Live 12 Suite\Resources\MIDI Remote Scripts")
|
|
PROJECT_DIR = BASE_DIR / "AbletonMCP_AI"
|
|
MCP_SERVER_DIR = PROJECT_DIR / "mcp_server"
|
|
ENGINE_DIR = MCP_SERVER_DIR / "engines"
|
|
LIBRARY_PATH = BASE_DIR / "libreria" / "reggaeton"
|
|
DB_PATH = MCP_SERVER_DIR / "data" / "samples.db"
|
|
MIGRATE_LIBRARY_SCRIPT = MCP_SERVER_DIR / "migrate_library.py"
|
|
TEST_ARRANGEMENT_SCRIPT = MCP_SERVER_DIR / "test_arrangement.py"
|
|
|
|
# Files to backup
|
|
FILES_TO_BACKUP = [
|
|
PROJECT_DIR / "__init__.py",
|
|
MCP_SERVER_DIR / "server.py",
|
|
ENGINE_DIR / "__init__.py",
|
|
]
|
|
|
|
# Required Python version
|
|
REQUIRED_PYTHON = (3, 8)
|
|
|
|
# =============================================================================
|
|
# DATA CLASSES
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class MigrationStep:
|
|
"""Result of a single migration step."""
|
|
name: str
|
|
status: str # "success", "failed", "skipped", "warning"
|
|
message: str
|
|
details: Dict[str, Any] = field(default_factory=dict)
|
|
duration_seconds: float = 0.0
|
|
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return asdict(self)
|
|
|
|
|
|
@dataclass
|
|
class MigrationReport:
|
|
"""Complete migration report."""
|
|
migration_name: str
|
|
version: str
|
|
started_at: str
|
|
completed_at: Optional[str] = None
|
|
steps: List[MigrationStep] = field(default_factory=list)
|
|
summary: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"migration_name": self.migration_name,
|
|
"version": self.version,
|
|
"started_at": self.started_at,
|
|
"completed_at": self.completed_at,
|
|
"steps": [s.to_dict() for s in self.steps],
|
|
"summary": self.summary,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# UTILITY FUNCTIONS
|
|
# =============================================================================
|
|
|
|
def print_header(text: str, width: int = 70):
|
|
"""Print a formatted header."""
|
|
print("\n" + "=" * width)
|
|
print(f" {text}")
|
|
print("=" * width)
|
|
|
|
|
|
def print_step(step_num: int, total: int, text: str):
|
|
"""Print step progress."""
|
|
print(f"\n[Step {step_num}/{total}] {text}")
|
|
print("-" * 70)
|
|
|
|
|
|
def print_success(message: str):
|
|
"""Print success message."""
|
|
print(f" [OK] {message}")
|
|
|
|
|
|
def print_warning(message: str):
|
|
"""Print warning message."""
|
|
print(f" [WARN] {message}")
|
|
|
|
|
|
def print_error(message: str):
|
|
"""Print error message."""
|
|
print(f" [ERROR] {message}")
|
|
|
|
|
|
def print_info(message: str):
|
|
"""Print info message."""
|
|
print(f" [INFO] {message}")
|
|
|
|
|
|
def spinner(duration: float = 0.5):
|
|
"""Simple spinner for visual feedback."""
|
|
import time
|
|
time.sleep(duration)
|
|
|
|
|
|
# =============================================================================
|
|
# PREREQUISITE CHECKS
|
|
# =============================================================================
|
|
|
|
def check_prerequisites() -> MigrationStep:
|
|
"""Check all prerequisites for migration.
|
|
|
|
Checks:
|
|
- Python version
|
|
- Ableton installation path exists
|
|
- File permissions
|
|
- Disk space
|
|
- Required directories exist
|
|
|
|
Returns:
|
|
MigrationStep with results
|
|
"""
|
|
start_time = datetime.now()
|
|
errors = []
|
|
warnings = []
|
|
details = {}
|
|
|
|
# Check Python version
|
|
py_version = sys.version_info
|
|
python_ok = py_version >= REQUIRED_PYTHON
|
|
if not python_ok:
|
|
errors.append(f"Python {REQUIRED_PYTHON[0]}.{REQUIRED_PYTHON[1]}+ required, found {py_version.major}.{py_version.minor}")
|
|
details["python_version"] = f"{py_version.major}.{py_version.minor}.{py_version.micro}"
|
|
details["python_ok"] = python_ok
|
|
|
|
# Check Ableton installation
|
|
ableton_exists = BASE_DIR.exists()
|
|
if not ableton_exists:
|
|
errors.append(f"Ableton installation not found at {BASE_DIR}")
|
|
details["ableton_path"] = str(BASE_DIR)
|
|
details["ableton_exists"] = ableton_exists
|
|
|
|
# Check project directory
|
|
project_exists = PROJECT_DIR.exists()
|
|
if not project_exists:
|
|
errors.append(f"Project directory not found: {PROJECT_DIR}")
|
|
details["project_exists"] = project_exists
|
|
|
|
# Check file permissions (try to write to project dir)
|
|
try:
|
|
test_file = PROJECT_DIR / ".migration_write_test"
|
|
test_file.write_text("test")
|
|
test_file.unlink()
|
|
write_ok = True
|
|
except Exception as e:
|
|
write_ok = False
|
|
errors.append(f"Cannot write to project directory: {e}")
|
|
details["write_permissions"] = write_ok
|
|
|
|
# Check disk space (rough estimate - need at least 100MB free)
|
|
try:
|
|
import shutil as _shutil
|
|
total, used, free = _shutil.disk_usage(PROJECT_DIR)
|
|
free_mb = free / (1024 * 1024)
|
|
disk_ok = free_mb >= 100
|
|
if not disk_ok:
|
|
errors.append(f"Insufficient disk space: {free_mb:.1f}MB free, need 100MB+")
|
|
details["disk_free_mb"] = round(free_mb, 2)
|
|
details["disk_ok"] = disk_ok
|
|
except Exception as e:
|
|
warnings.append(f"Could not check disk space: {e}")
|
|
details["disk_check_error"] = str(e)
|
|
|
|
# Check for required scripts
|
|
migrate_lib_exists = MIGRATE_LIBRARY_SCRIPT.exists()
|
|
test_arr_exists = TEST_ARRANGEMENT_SCRIPT.exists()
|
|
details["migrate_library_script_exists"] = migrate_lib_exists
|
|
details["test_arrangement_script_exists"] = test_arr_exists
|
|
|
|
if not migrate_lib_exists:
|
|
warnings.append("migrate_library.py not found - sample analysis will be limited")
|
|
if not test_arr_exists:
|
|
warnings.append("test_arrangement.py not found - verification will be limited")
|
|
|
|
# Determine status
|
|
if errors:
|
|
status = "failed"
|
|
message = f"Prerequisites check failed: {len(errors)} error(s)"
|
|
elif warnings:
|
|
status = "warning"
|
|
message = f"Prerequisites met with {len(warnings)} warning(s)"
|
|
else:
|
|
status = "success"
|
|
message = "All prerequisites met"
|
|
|
|
details["errors"] = errors
|
|
details["warnings"] = warnings
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
return MigrationStep(
|
|
name="check_prerequisites",
|
|
status=status,
|
|
message=message,
|
|
details=details,
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# BACKUP FUNCTIONS
|
|
# =============================================================================
|
|
|
|
def create_backup() -> MigrationStep:
|
|
"""Backup existing configuration.
|
|
|
|
Creates a timestamped backup directory containing:
|
|
- __init__.py
|
|
- server.py
|
|
- engines/__init__.py
|
|
- Any other critical files
|
|
|
|
Returns:
|
|
MigrationStep with backup results
|
|
"""
|
|
start_time = datetime.now()
|
|
backup_dir_name = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
backup_dir = PROJECT_DIR / "backups" / backup_dir_name
|
|
|
|
details = {
|
|
"backup_dir": str(backup_dir),
|
|
"files_backed_up": [],
|
|
"files_failed": [],
|
|
}
|
|
|
|
try:
|
|
# Create backup directory
|
|
backup_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Backup each file
|
|
for file_path in FILES_TO_BACKUP:
|
|
if file_path.exists():
|
|
try:
|
|
dest = backup_dir / file_path.relative_to(PROJECT_DIR)
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(file_path, dest)
|
|
details["files_backed_up"].append(str(file_path.relative_to(PROJECT_DIR)))
|
|
except Exception as e:
|
|
details["files_failed"].append({
|
|
"file": str(file_path),
|
|
"error": str(e),
|
|
})
|
|
else:
|
|
details["files_failed"].append({
|
|
"file": str(file_path),
|
|
"error": "File does not exist",
|
|
})
|
|
|
|
# Also backup engines directory
|
|
engines_backup_dir = backup_dir / "engines"
|
|
if ENGINE_DIR.exists():
|
|
for engine_file in ENGINE_DIR.glob("*.py"):
|
|
try:
|
|
dest = engines_backup_dir / engine_file.name
|
|
engines_backup_dir.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(engine_file, dest)
|
|
details["files_backed_up"].append(f"engines/{engine_file.name}")
|
|
except Exception as e:
|
|
details["files_failed"].append({
|
|
"file": str(engine_file),
|
|
"error": str(e),
|
|
})
|
|
|
|
# Create backup manifest
|
|
manifest = {
|
|
"backup_name": backup_dir_name,
|
|
"created_at": datetime.now().isoformat(),
|
|
"files_backed_up": details["files_backed_up"],
|
|
"files_failed": details["files_failed"],
|
|
"source_version": VERSION,
|
|
}
|
|
manifest_path = backup_dir / "manifest.json"
|
|
manifest_path.write_text(json.dumps(manifest, indent=2))
|
|
|
|
success = len(details["files_failed"]) == 0
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
if success:
|
|
return MigrationStep(
|
|
name="create_backup",
|
|
status="success",
|
|
message=f"Backup created: {backup_dir_name} ({len(details['files_backed_up'])} files)",
|
|
details=details,
|
|
duration_seconds=duration,
|
|
)
|
|
else:
|
|
return MigrationStep(
|
|
name="create_backup",
|
|
status="warning",
|
|
message=f"Backup created with {len(details['files_failed'])} failures",
|
|
details=details,
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
return MigrationStep(
|
|
name="create_backup",
|
|
status="failed",
|
|
message=f"Backup failed: {str(e)}",
|
|
details={"error": str(e), "traceback": traceback.format_exc()},
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
|
|
def rollback_if_needed(backup_dir: str) -> MigrationStep:
|
|
"""Rollback to previous state if migration fails.
|
|
|
|
Args:
|
|
backup_dir: Path to backup directory to restore from
|
|
|
|
Returns:
|
|
MigrationStep with rollback results
|
|
"""
|
|
start_time = datetime.now()
|
|
backup_path = Path(backup_dir)
|
|
|
|
if not backup_path.exists():
|
|
return MigrationStep(
|
|
name="rollback",
|
|
status="failed",
|
|
message=f"Backup directory not found: {backup_dir}",
|
|
details={},
|
|
)
|
|
|
|
details = {
|
|
"backup_dir": backup_dir,
|
|
"files_restored": [],
|
|
"files_failed": [],
|
|
}
|
|
|
|
try:
|
|
# Read manifest
|
|
manifest_path = backup_path / "manifest.json"
|
|
if manifest_path.exists():
|
|
manifest = json.loads(manifest_path.read_text())
|
|
details["manifest"] = manifest
|
|
|
|
# Restore files
|
|
for backed_up_file in details.get("manifest", {}).get("files_backed_up", []):
|
|
src = backup_path / backed_up_file
|
|
dest = PROJECT_DIR / backed_up_file
|
|
|
|
if src.exists():
|
|
try:
|
|
shutil.copy2(src, dest)
|
|
details["files_restored"].append(backed_up_file)
|
|
except Exception as e:
|
|
details["files_failed"].append({
|
|
"file": backed_up_file,
|
|
"error": str(e),
|
|
})
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
if len(details["files_failed"]) == 0:
|
|
return MigrationStep(
|
|
name="rollback",
|
|
status="success",
|
|
message=f"Rollback completed: {len(details['files_restored'])} files restored",
|
|
details=details,
|
|
duration_seconds=duration,
|
|
)
|
|
else:
|
|
return MigrationStep(
|
|
name="rollback",
|
|
status="warning",
|
|
message=f"Rollback completed with {len(details['files_failed'])} failures",
|
|
details=details,
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
return MigrationStep(
|
|
name="rollback",
|
|
status="failed",
|
|
message=f"Rollback failed: {str(e)}",
|
|
details={"error": str(e), "traceback": traceback.format_exc()},
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# SAMPLE ANALYSIS
|
|
# =============================================================================
|
|
|
|
def run_analysis(mode: str = "full") -> MigrationStep:
|
|
"""Run sample analysis.
|
|
|
|
Imports and runs migrate_library.py logic.
|
|
Handles with or without numpy.
|
|
|
|
Args:
|
|
mode: Analysis mode - "full" (requires numpy), "placeholder" (basic), or "skip"
|
|
|
|
Returns:
|
|
MigrationStep with analysis results
|
|
"""
|
|
start_time = datetime.now()
|
|
|
|
if mode == "skip":
|
|
return MigrationStep(
|
|
name="run_analysis",
|
|
status="skipped",
|
|
message="Sample analysis skipped as requested",
|
|
details={"mode": mode},
|
|
)
|
|
|
|
try:
|
|
# Import the migration module
|
|
sys.path.insert(0, str(MCP_SERVER_DIR))
|
|
from migrate_library import migrate_library, get_migration_status, LIBROSA_AVAILABLE
|
|
|
|
details = {
|
|
"mode": mode,
|
|
"librosa_available": LIBROSA_AVAILABLE,
|
|
"library_path": str(LIBRARY_PATH),
|
|
"db_path": str(DB_PATH),
|
|
}
|
|
|
|
print_info(f"Library path: {LIBRARY_PATH}")
|
|
print_info(f"Database path: {DB_PATH}")
|
|
print_info(f"Librosa available: {LIBROSA_AVAILABLE}")
|
|
|
|
# Run migration (analysis)
|
|
print_info("Starting sample analysis...")
|
|
|
|
force_reanalyze = mode == "full"
|
|
|
|
stats = migrate_library(
|
|
library_path=LIBRARY_PATH,
|
|
db_path=DB_PATH,
|
|
force_reanalyze=force_reanalyze,
|
|
dry_run=False,
|
|
)
|
|
|
|
details["analysis_stats"] = stats
|
|
|
|
# Get current status
|
|
status_info = get_migration_status(DB_PATH)
|
|
details["migration_status"] = status_info
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
total_samples = stats.get("total", 0)
|
|
analyzed_full = stats.get("analyzed_full", 0)
|
|
analyzed_partial = stats.get("analyzed_partial", 0)
|
|
errors = stats.get("errors", 0)
|
|
|
|
if errors == 0:
|
|
return MigrationStep(
|
|
name="run_analysis",
|
|
status="success",
|
|
message=f"Analyzed {total_samples} samples ({analyzed_full} full, {analyzed_partial} partial)",
|
|
details=details,
|
|
duration_seconds=duration,
|
|
)
|
|
else:
|
|
return MigrationStep(
|
|
name="run_analysis",
|
|
status="warning",
|
|
message=f"Analysis completed with {errors} errors ({analyzed_full} full, {analyzed_partial} partial)",
|
|
details=details,
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
except ImportError as e:
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
return MigrationStep(
|
|
name="run_analysis",
|
|
status="failed",
|
|
message=f"Could not import migration module: {str(e)}",
|
|
details={"error": str(e), "mode": mode},
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
return MigrationStep(
|
|
name="run_analysis",
|
|
status="failed",
|
|
message=f"Analysis failed: {str(e)}",
|
|
details={"error": str(e), "traceback": traceback.format_exc(), "mode": mode},
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# CONFIGURATION UPDATE
|
|
# =============================================================================
|
|
|
|
def update_configuration() -> MigrationStep:
|
|
"""Update all configuration files.
|
|
|
|
- Update __init__.py with new imports (if needed)
|
|
- Update server.py with new tools (if needed)
|
|
- Update engines/__init__.py with new exports
|
|
- Update any other necessary files
|
|
|
|
Returns:
|
|
MigrationStep with update results
|
|
"""
|
|
start_time = datetime.now()
|
|
|
|
details = {
|
|
"files_updated": [],
|
|
"files_unchanged": [],
|
|
"files_failed": [],
|
|
}
|
|
|
|
try:
|
|
# Check engines/__init__.py exports
|
|
engines_init = ENGINE_DIR / "__init__.py"
|
|
if engines_init.exists():
|
|
content = engines_init.read_text()
|
|
|
|
# Check if all engines are properly exported
|
|
expected_exports = [
|
|
"MetadataStore",
|
|
"EmbeddingEngine",
|
|
"ReferenceMatcher",
|
|
"SampleSelector",
|
|
"ArrangementBuilder",
|
|
"ArrangementRecorder",
|
|
"ProductionWorkflow",
|
|
"WorkflowEngine",
|
|
"MusicalIntelligenceEngine",
|
|
"HarmonyEngine",
|
|
"MixingEngine",
|
|
"PresetSystem",
|
|
"SongGenerator",
|
|
"PatternLibrary",
|
|
]
|
|
|
|
missing_exports = []
|
|
for export in expected_exports:
|
|
if f"{export}" not in content:
|
|
missing_exports.append(export)
|
|
|
|
details["engines_init_exports_checked"] = expected_exports
|
|
details["engines_init_missing_exports"] = missing_exports
|
|
|
|
if missing_exports:
|
|
print_warning(f"Missing exports in engines/__init__.py: {missing_exports}")
|
|
else:
|
|
print_success("All engine exports verified")
|
|
|
|
details["files_unchanged"].append("engines/__init__.py")
|
|
|
|
# Verify critical engine files exist
|
|
critical_engines = [
|
|
"metadata_store.py",
|
|
"embedding_engine.py",
|
|
"sample_selector.py",
|
|
"arrangement_recorder.py",
|
|
"production_workflow.py",
|
|
"workflow_engine.py",
|
|
"musical_intelligence.py",
|
|
]
|
|
|
|
missing_engines = []
|
|
for engine_file in critical_engines:
|
|
engine_path = ENGINE_DIR / engine_file
|
|
if not engine_path.exists():
|
|
missing_engines.append(engine_file)
|
|
|
|
details["critical_engines_checked"] = critical_engines
|
|
details["missing_engines"] = missing_engines
|
|
|
|
if missing_engines:
|
|
print_warning(f"Missing engine files: {missing_engines}")
|
|
else:
|
|
print_success("All critical engine files present")
|
|
|
|
# Verify data directory exists
|
|
data_dir = MCP_SERVER_DIR / "data"
|
|
if not data_dir.exists():
|
|
data_dir.mkdir(parents=True, exist_ok=True)
|
|
print_success("Created data directory")
|
|
details["files_updated"].append("mcp_server/data/")
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
if missing_engines or missing_exports:
|
|
return MigrationStep(
|
|
name="update_configuration",
|
|
status="warning",
|
|
message="Configuration updated with warnings",
|
|
details=details,
|
|
duration_seconds=duration,
|
|
)
|
|
else:
|
|
return MigrationStep(
|
|
name="update_configuration",
|
|
status="success",
|
|
message="Configuration verified and updated",
|
|
details=details,
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
return MigrationStep(
|
|
name="update_configuration",
|
|
status="failed",
|
|
message=f"Configuration update failed: {str(e)}",
|
|
details={"error": str(e), "traceback": traceback.format_exc()},
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# VERIFICATION TESTS
|
|
# =============================================================================
|
|
|
|
def run_verification() -> MigrationStep:
|
|
"""Run verification tests.
|
|
|
|
Imports test_arrangement.py and runs ArrangementVerifier checks.
|
|
|
|
Returns:
|
|
MigrationStep with verification results
|
|
"""
|
|
start_time = datetime.now()
|
|
|
|
details = {
|
|
"tests_run": [],
|
|
"tests_passed": 0,
|
|
"tests_failed": 0,
|
|
"test_results": [],
|
|
}
|
|
|
|
try:
|
|
# Import the test module
|
|
sys.path.insert(0, str(MCP_SERVER_DIR))
|
|
from test_arrangement import ArrangementVerifier, ArrangementValidator, ArrangementTestScenarios
|
|
|
|
print_info("Initializing ArrangementVerifier...")
|
|
|
|
# Create verifier instance
|
|
verifier = ArrangementVerifier()
|
|
|
|
# Run basic connectivity check
|
|
print_info("Running connectivity check...")
|
|
resp = verifier._send_command("health_check", timeout=10.0)
|
|
health_ok = resp.get("status") == "success"
|
|
|
|
details["health_check"] = {
|
|
"success": health_ok,
|
|
"response": resp.get("result") if health_ok else resp.get("message"),
|
|
}
|
|
|
|
if health_ok:
|
|
print_success("Ableton connection verified")
|
|
else:
|
|
print_warning(f"Ableton not reachable: {resp.get('message')}")
|
|
|
|
# Run validator pre-conditions
|
|
print_info("Running pre-condition checks...")
|
|
validator = ArrangementValidator(verifier)
|
|
pre_ok = validator.pre_condition_checks()
|
|
|
|
details["pre_condition_checks"] = {
|
|
"success": pre_ok,
|
|
"checks": [r.to_dict() for r in validator.pre_check_results],
|
|
}
|
|
|
|
# Run test scenarios
|
|
print_info("Running test scenarios...")
|
|
scenarios = ArrangementTestScenarios(verifier)
|
|
|
|
# Test 1: Without numpy
|
|
print_info("Testing without numpy dependency...")
|
|
try:
|
|
report = scenarios.test_without_numpy()
|
|
details["tests_run"].append("test_without_numpy")
|
|
details["test_results"].append(report.to_dict())
|
|
|
|
if report.summary.get("status") == "PASSED":
|
|
details["tests_passed"] += 1
|
|
print_success("test_without_numpy passed")
|
|
else:
|
|
details["tests_failed"] += 1
|
|
print_warning("test_without_numpy had failures")
|
|
except Exception as e:
|
|
details["tests_failed"] += 1
|
|
print_error(f"test_without_numpy error: {e}")
|
|
|
|
# Get final verification report
|
|
verification_report = verifier.get_verification_report()
|
|
details["verification_report"] = verification_report
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
if details["tests_failed"] == 0:
|
|
return MigrationStep(
|
|
name="run_verification",
|
|
status="success",
|
|
message=f"All {details['tests_passed']} verification tests passed",
|
|
details=details,
|
|
duration_seconds=duration,
|
|
)
|
|
else:
|
|
return MigrationStep(
|
|
name="run_verification",
|
|
status="warning",
|
|
message=f"Verification completed: {details['tests_passed']} passed, {details['tests_failed']} failed",
|
|
details=details,
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
except ImportError as e:
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
return MigrationStep(
|
|
name="run_verification",
|
|
status="failed",
|
|
message=f"Could not import test module: {str(e)}",
|
|
details={"error": str(e)},
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
return MigrationStep(
|
|
name="run_verification",
|
|
status="failed",
|
|
message=f"Verification failed: {str(e)}",
|
|
details={"error": str(e), "traceback": traceback.format_exc()},
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# REPORT GENERATION
|
|
# =============================================================================
|
|
|
|
def generate_report(results: Dict[str, Any], output_dir: Path = None) -> MigrationStep:
|
|
"""Generate migration report.
|
|
|
|
Args:
|
|
results: Complete migration results dictionary
|
|
output_dir: Directory to save report (default: PROJECT_DIR)
|
|
|
|
Returns:
|
|
MigrationStep with report generation results
|
|
"""
|
|
start_time = datetime.now()
|
|
|
|
if output_dir is None:
|
|
output_dir = PROJECT_DIR / "docs"
|
|
|
|
try:
|
|
# Ensure output directory exists
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate console report
|
|
print_header("MIGRATION REPORT")
|
|
|
|
steps = results.get("steps", [])
|
|
|
|
print(f"\nMigration: {results.get('migration_name', 'Unknown')}")
|
|
print(f"Version: {results.get('version', 'Unknown')}")
|
|
print(f"Started: {results.get('started_at', 'Unknown')}")
|
|
print(f"Completed: {results.get('completed_at', 'Unknown')}")
|
|
|
|
print("\n" + "-" * 70)
|
|
print("STEP RESULTS:")
|
|
print("-" * 70)
|
|
|
|
for step in steps:
|
|
status_icon = {
|
|
"success": "OK",
|
|
"failed": "FAIL",
|
|
"skipped": "SKIP",
|
|
"warning": "WARN",
|
|
}.get(step.get("status", "unknown"), "?")
|
|
|
|
print(f" [{status_icon}] {step.get('name', 'Unknown')}: {step.get('message', '')}")
|
|
if step.get("duration_seconds"):
|
|
print(f" Duration: {step.get('duration_seconds', 0):.2f}s")
|
|
|
|
# Calculate summary
|
|
success_count = sum(1 for s in steps if s.get("status") == "success")
|
|
failed_count = sum(1 for s in steps if s.get("status") == "failed")
|
|
warning_count = sum(1 for s in steps if s.get("status") == "warning")
|
|
skipped_count = sum(1 for s in steps if s.get("status") == "skipped")
|
|
|
|
print("\n" + "-" * 70)
|
|
print("SUMMARY:")
|
|
print("-" * 70)
|
|
print(f" Total steps: {len(steps)}")
|
|
print(f" Success: {success_count}")
|
|
print(f" Failed: {failed_count}")
|
|
print(f" Warnings: {warning_count}")
|
|
print(f" Skipped: {skipped_count}")
|
|
|
|
# Determine overall status
|
|
if failed_count > 0:
|
|
overall_status = "FAILED"
|
|
elif warning_count > 0:
|
|
overall_status = "COMPLETED_WITH_WARNINGS"
|
|
else:
|
|
overall_status = "SUCCESS"
|
|
|
|
print(f"\n Overall Status: {overall_status}")
|
|
|
|
# Print next steps
|
|
print("\n" + "-" * 70)
|
|
print("NEXT STEPS:")
|
|
print("-" * 70)
|
|
|
|
if overall_status == "SUCCESS":
|
|
print(" 1. Restart Ableton Live to load the updated Remote Script")
|
|
print(" 2. Run 'health_check' to verify the installation")
|
|
print(" 3. Try 'build_song' to test the new arrangement features")
|
|
print(" 4. Check the documentation in docs/ for new features")
|
|
elif overall_status == "COMPLETED_WITH_WARNINGS":
|
|
print(" 1. Review the warnings above")
|
|
print(" 2. Fix any missing dependencies if needed")
|
|
print(" 3. Restart Ableton Live")
|
|
print(" 4. Run verification tests manually if desired")
|
|
else:
|
|
print(" 1. Review the failed steps above")
|
|
print(" 2. Fix the issues and re-run the migration")
|
|
print(" 3. Use --backup to create a backup before retrying")
|
|
print(" 4. Contact support if issues persist")
|
|
|
|
print("\n" + "=" * 70)
|
|
|
|
# Save JSON report
|
|
report_path = output_dir / f"migration_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
report_path.write_text(json.dumps(results, indent=2))
|
|
|
|
# Also save markdown report
|
|
md_path = output_dir / f"migration_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
|
|
md_content = generate_markdown_report(results, overall_status)
|
|
md_path.write_text(md_content)
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
|
return MigrationStep(
|
|
name="generate_report",
|
|
status="success",
|
|
message=f"Reports saved to {output_dir}",
|
|
details={
|
|
"json_report": str(report_path),
|
|
"markdown_report": str(md_path),
|
|
"overall_status": overall_status,
|
|
},
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
return MigrationStep(
|
|
name="generate_report",
|
|
status="failed",
|
|
message=f"Report generation failed: {str(e)}",
|
|
details={"error": str(e), "traceback": traceback.format_exc()},
|
|
duration_seconds=duration,
|
|
)
|
|
|
|
|
|
def generate_markdown_report(results: Dict[str, Any], overall_status: str) -> str:
|
|
"""Generate a markdown formatted migration report.
|
|
|
|
Args:
|
|
results: Migration results dictionary
|
|
overall_status: Overall migration status string
|
|
|
|
Returns:
|
|
Markdown formatted report string
|
|
"""
|
|
lines = [
|
|
"# AbletonMCP_AI Senior Architecture Migration Report",
|
|
"",
|
|
f"**Migration:** {results.get('migration_name', 'Unknown')}",
|
|
f"**Version:** {results.get('version', 'Unknown')}",
|
|
f"**Started:** {results.get('started_at', 'Unknown')}",
|
|
f"**Completed:** {results.get('completed_at', 'Unknown')}",
|
|
f"**Overall Status:** {overall_status}",
|
|
"",
|
|
"---",
|
|
"",
|
|
"## Step Results",
|
|
"",
|
|
"| Step | Status | Message | Duration |",
|
|
"|------|--------|---------|----------|",
|
|
]
|
|
|
|
for step in results.get("steps", []):
|
|
status_badge = {
|
|
"success": "[OK] Success",
|
|
"failed": "[FAIL] Failed",
|
|
"skipped": "[SKIP] Skipped",
|
|
"warning": "[WARN] Warning",
|
|
}.get(step.get("status", "unknown"), step.get("status", "Unknown"))
|
|
|
|
lines.append(
|
|
f"| {step.get('name', 'Unknown')} | {status_badge} | "
|
|
f"{step.get('message', '')} | {step.get('duration_seconds', 0):.2f}s |"
|
|
)
|
|
|
|
lines.extend([
|
|
"",
|
|
"---",
|
|
"",
|
|
"## Summary",
|
|
"",
|
|
])
|
|
|
|
steps = results.get("steps", [])
|
|
success_count = sum(1 for s in steps if s.get("status") == "success")
|
|
failed_count = sum(1 for s in steps if s.get("status") == "failed")
|
|
warning_count = sum(1 for s in steps if s.get("status") == "warning")
|
|
skipped_count = sum(1 for s in steps if s.get("status") == "skipped")
|
|
|
|
lines.extend([
|
|
f"- **Total steps:** {len(steps)}",
|
|
f"- **Success:** {success_count}",
|
|
f"- **Failed:** {failed_count}",
|
|
f"- **Warnings:** {warning_count}",
|
|
f"- **Skipped:** {skipped_count}",
|
|
"",
|
|
"---",
|
|
"",
|
|
"## Next Steps",
|
|
"",
|
|
])
|
|
|
|
if overall_status == "SUCCESS":
|
|
lines.extend([
|
|
"1. [OK] Restart Ableton Live to load the updated Remote Script",
|
|
"2. [OK] Run 'health_check' to verify the installation",
|
|
"3. [OK] Try 'build_song' to test the new arrangement features",
|
|
"4. [OK] Check the documentation in docs/ for new features",
|
|
])
|
|
elif overall_status == "COMPLETED_WITH_WARNINGS":
|
|
lines.extend([
|
|
"1. [WARN] Review the warnings above",
|
|
"2. [WARN] Fix any missing dependencies if needed",
|
|
"3. [OK] Restart Ableton Live",
|
|
"4. [OK] Run verification tests manually if desired",
|
|
])
|
|
else:
|
|
lines.extend([
|
|
"1. [FAIL] Review the failed steps above",
|
|
"2. [FAIL] Fix the issues and re-run the migration",
|
|
"3. [SAVE] Use --backup to create a backup before retrying",
|
|
"4. [HELP] Contact support if issues persist",
|
|
])
|
|
|
|
lines.extend([
|
|
"",
|
|
"---",
|
|
"",
|
|
"## Detailed Information",
|
|
"",
|
|
"### Full Results JSON",
|
|
"",
|
|
"```json",
|
|
json.dumps(results, indent=2),
|
|
"```",
|
|
"",
|
|
])
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# =============================================================================
|
|
# INTERACTIVE MODE
|
|
# =============================================================================
|
|
|
|
def run_interactive() -> Dict[str, Any]:
|
|
"""Run migration in interactive mode.
|
|
|
|
Guides the user through the migration process with prompts.
|
|
|
|
Returns:
|
|
Migration results dictionary
|
|
"""
|
|
print_header("INTERACTIVE MIGRATION MODE")
|
|
print("\nWelcome to the AbletonMCP_AI Senior Architecture Migration!")
|
|
print("This tool will guide you through the migration process.\n")
|
|
|
|
# Ask for confirmation
|
|
print("This migration will:")
|
|
print(" 1. Create a backup of your current configuration")
|
|
print(" 2. Analyze all 511 samples in your library")
|
|
print(" 3. Update configuration files")
|
|
print(" 4. Run verification tests")
|
|
print(" 5. Generate a detailed report")
|
|
print("")
|
|
|
|
response = input("Do you want to continue? (yes/no): ").strip().lower()
|
|
if response not in ("yes", "y"):
|
|
print("Migration cancelled by user.")
|
|
return {
|
|
"migration_name": MIGRATION_NAME,
|
|
"version": VERSION,
|
|
"started_at": datetime.now().isoformat(),
|
|
"completed_at": datetime.now().isoformat(),
|
|
"steps": [],
|
|
"summary": {"status": "CANCELLED", "reason": "User cancelled"},
|
|
}
|
|
|
|
# Ask for analysis mode
|
|
print("\nSelect analysis mode:")
|
|
print(" 1. full - Full spectral analysis (requires numpy/librosa)")
|
|
print(" 2. placeholder - Basic metadata only (works without numpy)")
|
|
print(" 3. skip - Skip sample analysis")
|
|
|
|
mode_choice = input("Enter choice (1/2/3) [default: 1]: ").strip() or "1"
|
|
|
|
analysis_mode = {
|
|
"1": "full",
|
|
"2": "placeholder",
|
|
"3": "skip",
|
|
}.get(mode_choice, "full")
|
|
|
|
# Ask for backup
|
|
backup_choice = input("\nCreate backup before migration? (yes/no) [default: yes]: ").strip().lower() or "yes"
|
|
do_backup = backup_choice in ("yes", "y")
|
|
|
|
# Ask for verification
|
|
verify_choice = input("\nRun verification tests after migration? (yes/no) [default: yes]: ").strip().lower() or "yes"
|
|
do_verify = verify_choice in ("yes", "y")
|
|
|
|
# Show summary and confirm
|
|
print("\n" + "=" * 70)
|
|
print("MIGRATION PLAN:")
|
|
print("=" * 70)
|
|
print(f" Backup: {'Yes' if do_backup else 'No'}")
|
|
print(f" Analysis mode: {analysis_mode}")
|
|
print(f" Verification: {'Yes' if do_verify else 'No'}")
|
|
print("=" * 70)
|
|
|
|
final_confirm = input("\nProceed with migration? (yes/no): ").strip().lower()
|
|
if final_confirm not in ("yes", "y"):
|
|
print("Migration cancelled by user.")
|
|
return {
|
|
"migration_name": MIGRATION_NAME,
|
|
"version": VERSION,
|
|
"started_at": datetime.now().isoformat(),
|
|
"completed_at": datetime.now().isoformat(),
|
|
"steps": [],
|
|
"summary": {"status": "CANCELLED", "reason": "User cancelled"},
|
|
}
|
|
|
|
# Execute migration
|
|
return execute_migration(
|
|
backup=do_backup,
|
|
analyze=analysis_mode,
|
|
verify=do_verify,
|
|
dry_run=False,
|
|
force=False,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN MIGRATION EXECUTION
|
|
# =============================================================================
|
|
|
|
def execute_migration(
|
|
backup: bool = True,
|
|
analyze: str = "full",
|
|
verify: bool = True,
|
|
dry_run: bool = False,
|
|
force: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""Execute the full migration.
|
|
|
|
Args:
|
|
backup: Whether to create backup
|
|
analyze: Analysis mode ("full", "placeholder", "skip")
|
|
verify: Whether to run verification tests
|
|
dry_run: Whether to preview without making changes
|
|
force: Whether to force migration even if errors occur
|
|
|
|
Returns:
|
|
Complete migration results dictionary
|
|
"""
|
|
started_at = datetime.now().isoformat()
|
|
steps: List[MigrationStep] = []
|
|
|
|
# Track backup dir for potential rollback
|
|
backup_dir: Optional[str] = None
|
|
|
|
# Step 1: Check prerequisites
|
|
print_step(1, 5, "Checking prerequisites")
|
|
step = check_prerequisites()
|
|
steps.append(step)
|
|
|
|
if step.status == "failed" and not force:
|
|
print_error("Prerequisites check failed. Use --force to proceed anyway.")
|
|
return {
|
|
"migration_name": MIGRATION_NAME,
|
|
"version": VERSION,
|
|
"started_at": started_at,
|
|
"completed_at": datetime.now().isoformat(),
|
|
"steps": [s.to_dict() for s in steps],
|
|
"summary": {"status": "FAILED", "reason": "Prerequisites check failed"},
|
|
}
|
|
elif step.status == "warning":
|
|
print_warning("Prerequisites met with warnings. Proceeding...")
|
|
else:
|
|
print_success("Prerequisites check passed")
|
|
|
|
if dry_run:
|
|
print_header("DRY RUN MODE - No changes will be made")
|
|
|
|
# Step 2: Create backup
|
|
if backup:
|
|
print_step(2, 5, "Creating backup")
|
|
if dry_run:
|
|
print_info("Would create backup of existing configuration")
|
|
else:
|
|
step = create_backup()
|
|
steps.append(step)
|
|
|
|
if step.status == "success":
|
|
backup_dir = step.details.get("backup_dir")
|
|
print_success(f"Backup created: {backup_dir}")
|
|
elif step.status == "warning":
|
|
print_warning(f"Backup created with warnings: {step.message}")
|
|
else:
|
|
print_error(f"Backup failed: {step.message}")
|
|
if not force:
|
|
return {
|
|
"migration_name": MIGRATION_NAME,
|
|
"version": VERSION,
|
|
"started_at": started_at,
|
|
"completed_at": datetime.now().isoformat(),
|
|
"steps": [s.to_dict() for s in steps],
|
|
"summary": {"status": "FAILED", "reason": "Backup creation failed"},
|
|
}
|
|
else:
|
|
print_step(2, 5, "Skipping backup (not requested)")
|
|
steps.append(MigrationStep(
|
|
name="create_backup",
|
|
status="skipped",
|
|
message="Backup skipped as requested",
|
|
))
|
|
|
|
# Step 3: Run analysis
|
|
if analyze != "skip":
|
|
print_step(3, 5, f"Running sample analysis ({analyze} mode)")
|
|
if dry_run:
|
|
print_info(f"Would run sample analysis in {analyze} mode")
|
|
else:
|
|
step = run_analysis(mode=analyze)
|
|
steps.append(step)
|
|
|
|
if step.status == "success":
|
|
stats = step.details.get("analysis_stats", {})
|
|
print_success(f"Analysis complete: {stats.get('total', 0)} samples analyzed")
|
|
elif step.status == "warning":
|
|
print_warning(f"Analysis completed with warnings: {step.message}")
|
|
else:
|
|
print_error(f"Analysis failed: {step.message}")
|
|
if not force:
|
|
# Attempt rollback if backup exists
|
|
if backup_dir:
|
|
print_info("Attempting rollback...")
|
|
rollback_step = rollback_if_needed(backup_dir)
|
|
steps.append(rollback_step)
|
|
|
|
return {
|
|
"migration_name": MIGRATION_NAME,
|
|
"version": VERSION,
|
|
"started_at": started_at,
|
|
"completed_at": datetime.now().isoformat(),
|
|
"steps": [s.to_dict() for s in steps],
|
|
"summary": {"status": "FAILED", "reason": "Sample analysis failed"},
|
|
}
|
|
else:
|
|
print_step(3, 5, "Skipping sample analysis")
|
|
steps.append(MigrationStep(
|
|
name="run_analysis",
|
|
status="skipped",
|
|
message="Analysis skipped as requested",
|
|
))
|
|
|
|
# Step 4: Update configuration
|
|
print_step(4, 5, "Updating configuration")
|
|
if dry_run:
|
|
print_info("Would update configuration files")
|
|
else:
|
|
step = update_configuration()
|
|
steps.append(step)
|
|
|
|
if step.status == "success":
|
|
print_success("Configuration updated successfully")
|
|
elif step.status == "warning":
|
|
print_warning(f"Configuration updated with warnings: {step.message}")
|
|
else:
|
|
print_error(f"Configuration update failed: {step.message}")
|
|
if not force:
|
|
return {
|
|
"migration_name": MIGRATION_NAME,
|
|
"version": VERSION,
|
|
"started_at": started_at,
|
|
"completed_at": datetime.now().isoformat(),
|
|
"steps": [s.to_dict() for s in steps],
|
|
"summary": {"status": "FAILED", "reason": "Configuration update failed"},
|
|
}
|
|
|
|
# Step 5: Run verification
|
|
if verify:
|
|
print_step(5, 5, "Running verification tests")
|
|
if dry_run:
|
|
print_info("Would run verification tests")
|
|
else:
|
|
step = run_verification()
|
|
steps.append(step)
|
|
|
|
if step.status == "success":
|
|
details = step.details
|
|
print_success(f"Verification passed: {details.get('tests_passed', 0)} tests")
|
|
elif step.status == "warning":
|
|
print_warning(f"Verification completed with warnings: {step.message}")
|
|
else:
|
|
print_error(f"Verification failed: {step.message}")
|
|
if not force:
|
|
return {
|
|
"migration_name": MIGRATION_NAME,
|
|
"version": VERSION,
|
|
"started_at": started_at,
|
|
"completed_at": datetime.now().isoformat(),
|
|
"steps": [s.to_dict() for s in steps],
|
|
"summary": {"status": "FAILED", "reason": "Verification failed"},
|
|
}
|
|
else:
|
|
print_step(5, 5, "Skipping verification tests")
|
|
steps.append(MigrationStep(
|
|
name="run_verification",
|
|
status="skipped",
|
|
message="Verification skipped as requested",
|
|
))
|
|
|
|
# Generate report
|
|
completed_at = datetime.now().isoformat()
|
|
|
|
results = {
|
|
"migration_name": MIGRATION_NAME,
|
|
"version": VERSION,
|
|
"started_at": started_at,
|
|
"completed_at": completed_at,
|
|
"steps": [s.to_dict() for s in steps],
|
|
}
|
|
|
|
# Generate final report
|
|
report_step = generate_report(results)
|
|
steps.append(report_step)
|
|
|
|
# Update with final steps list
|
|
results["steps"] = [s.to_dict() for s in steps]
|
|
|
|
# Determine overall status
|
|
failed_count = sum(1 for s in steps if s.status == "failed")
|
|
warning_count = sum(1 for s in steps if s.status == "warning")
|
|
|
|
if failed_count > 0:
|
|
overall_status = "FAILED"
|
|
elif warning_count > 0:
|
|
overall_status = "COMPLETED_WITH_WARNINGS"
|
|
else:
|
|
overall_status = "SUCCESS"
|
|
|
|
results["summary"] = {
|
|
"status": overall_status,
|
|
"total_steps": len(steps),
|
|
"success": sum(1 for s in steps if s.status == "success"),
|
|
"failed": failed_count,
|
|
"warnings": warning_count,
|
|
"skipped": sum(1 for s in steps if s.status == "skipped"),
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
"""Main entry point for the CLI."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Migrate AbletonMCP_AI to Senior Architecture",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
python migrate_to_senior.py # Full migration with defaults
|
|
python migrate_to_senior.py --backup --verify # Backup then verify
|
|
python migrate_to_senior.py --analyze=skip # Skip sample analysis
|
|
python migrate_to_senior.py --dry-run # Preview changes
|
|
python migrate_to_senior.py --interactive # Interactive mode
|
|
python migrate_to_senior.py --force # Force even if errors
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--backup",
|
|
action="store_true",
|
|
help="Create backup of existing configuration"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--analyze",
|
|
choices=["full", "placeholder", "skip"],
|
|
default="full",
|
|
help="Analysis mode: full (requires numpy), placeholder (basic), skip (default: full)"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--verify",
|
|
action="store_true",
|
|
help="Run verification tests after migration"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Show what would be done without making changes"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--force",
|
|
action="store_true",
|
|
help="Force migration even if errors occur"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--interactive",
|
|
action="store_true",
|
|
help="Run in interactive mode with user prompts"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--version",
|
|
action="version",
|
|
version=f"%(prog)s {VERSION}"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
print_header(f"AbletonMCP_AI Migration Tool v{VERSION}")
|
|
|
|
# Run interactive mode if requested
|
|
if args.interactive:
|
|
results = run_interactive()
|
|
else:
|
|
# Default to backup and verify unless explicitly disabled
|
|
do_backup = args.backup if args.backup else True # Default to True
|
|
do_verify = args.verify if args.verify else True # Default to True
|
|
|
|
results = execute_migration(
|
|
backup=do_backup,
|
|
analyze=args.analyze,
|
|
verify=do_verify,
|
|
dry_run=args.dry_run,
|
|
force=args.force,
|
|
)
|
|
|
|
# Exit with appropriate code
|
|
status = results.get("summary", {}).get("status", "UNKNOWN")
|
|
|
|
if status == "SUCCESS":
|
|
print("\n*** Migration completed successfully! ***")
|
|
sys.exit(0)
|
|
elif status == "COMPLETED_WITH_WARNINGS":
|
|
print("\n[!] Migration completed with warnings. Please review the report.")
|
|
sys.exit(0)
|
|
elif status == "CANCELLED":
|
|
print("\n[X] Migration cancelled by user.")
|
|
sys.exit(0)
|
|
else:
|
|
print("\n[ERROR] Migration failed. Please review the errors above.")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|