""" Batch Migration Script for Sample Library Scans the libreria/reggaeton/ directory, analyzes all audio files, and stores metadata in SQLite database with progress tracking. Usage: python migrate_library.py # Run migration with defaults python migrate_library.py --force # Force re-analyze all samples python migrate_library.py --dry-run # Scan only, don't save to DB python migrate_library.py --status # Show current DB statistics """ import os import sys import sqlite3 import argparse from pathlib import Path from dataclasses import dataclass, asdict from typing import List, Dict, Optional, Any, Tuple from datetime import datetime # Audio analysis libraries (optional) try: import numpy as np import librosa import librosa.feature LIBROSA_AVAILABLE = True except ImportError: LIBROSA_AVAILABLE = False np = None try: import wave import struct WAVE_AVAILABLE = True except ImportError: WAVE_AVAILABLE = False # Constants DEFAULT_LIBRARY_PATH = Path( r"C:\ProgramData\Ableton\Live 12 Suite\Resources\MIDI Remote Scripts\libreria\reggaeton" ) DEFAULT_DB_PATH = Path( r"C:\ProgramData\Ableton\Live 12 Suite\Resources\MIDI Remote Scripts\AbletonMCP_AI\mcp_server\data\samples.db" ) SUPPORTED_EXTENSIONS = {'.wav', '.aif', '.aiff', '.mp3', '.flac'} # Role mapping for categorization ROLE_MAPPING = { 'kick': 'kick', 'snare': 'snare', 'bass': 'bass', 'fx': 'fx', 'drumloops': 'drum_loop', 'drumloop': 'drum_loop', 'hi-hat': 'hat_closed', 'hihat': 'hat_closed', 'hat': 'hat_closed', 'oneshots': 'oneshot', 'oneshot': 'oneshot', 'perc loop': 'perc_loop', 'perc_loop': 'perc_loop', 'reggaeton 3': 'synth', 'sentimientolatino2025': 'multi', 'sounds presets': 'preset', 'extra': 'extra', 'flp': 'project', } @dataclass class SampleFeatures: """Complete feature set for a sample.""" # File info path: str name: str pack: str role: str # Audio properties duration: float = 0.0 sample_rate: int = 44100 channels: int = 1 # Musical properties bpm: float = 0.0 key: str = "" # Spectral features rms: float = 0.0 spectral_centroid: float = 0.0 spectral_rolloff: float = 0.0 zero_crossing_rate: float = 0.0 # Advanced features mfccs: str = "" # JSON string of list onset_strength: float = 0.0 # Analysis metadata analysis_type: str = "partial" # "full" or "partial" analyzed_at: str = "" file_size: int = 0 file_modified: float = 0.0 def scan_library(library_path: Path) -> List[Path]: """ Scan library directory for all audio files. Args: library_path: Root directory to scan Returns: List of paths to audio files """ samples = [] if not library_path.exists(): print(f"[ERROR] Library path not found: {library_path}") return samples for ext in SUPPORTED_EXTENSIONS: samples.extend(library_path.rglob(f"*{ext}")) samples.extend(library_path.rglob(f"*{ext.upper()}")) # Remove duplicates and sort seen = set() unique_samples = [] for s in samples: resolved = s.resolve() if resolved not in seen: seen.add(resolved) unique_samples.append(s) return sorted(unique_samples) def detect_role(file_path: Path) -> str: """Detect sample role based on folder and filename.""" path_parts = [p.lower() for p in file_path.parts] filename = file_path.name.lower() for part in path_parts: clean_part = part.replace(' ', '_').replace('-', '_').replace('(', '').replace(')', '') if part in ROLE_MAPPING: return ROLE_MAPPING[part] if clean_part in ROLE_MAPPING: return ROLE_MAPPING[clean_part] for key, role in ROLE_MAPPING.items(): if key in part or key in clean_part: return role # Check filename if 'kick' in filename: return 'kick' if 'snare' in filename: return 'snare' if 'clap' in filename: return 'clap' if 'hat' in filename or 'hihat' in filename: return 'hat_closed' if 'bass' in filename: return 'bass' if 'fx' in filename: return 'fx' if 'perc' in filename: return 'perc' return 'unknown' def get_pack_name(file_path: Path, library_path: Path) -> str: """Get the pack/folder name relative to library root.""" try: rel_path = file_path.relative_to(library_path) return rel_path.parts[0] if rel_path.parts else 'root' except ValueError: return file_path.parent.name or 'unknown' def analyze_sample_librosa(sample_path: Path) -> Optional[Dict[str, Any]]: """ Analyze sample using librosa (full analysis). Args: sample_path: Path to audio file Returns: Dictionary with audio features or None on error """ if not LIBROSA_AVAILABLE: return None try: # Load audio y, sr = librosa.load(str(sample_path), sr=None, mono=True) # Duration duration = librosa.get_duration(y=y, sr=sr) # RMS (energy) rms = float(np.mean(librosa.feature.rms(y=y))) rms_db = 20 * np.log10(rms + 1e-10) # Spectral features spectral_centroid = float(np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))) spectral_rolloff = float(np.mean(librosa.feature.spectral_rolloff(y=y, sr=sr))) zcr = float(np.mean(librosa.feature.zero_crossing_rate(y))) # MFCCs mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13) mfccs_mean = [float(np.mean(coef)) for coef in mfccs] # Onset strength onset_env = librosa.onset.onset_strength(y=y, sr=sr) onset_strength = float(np.mean(onset_env)) # BPM detection try: tempo, _ = librosa.beat.beat_track(y=y, sr=sr) bpm = float(tempo) if isinstance(tempo, (int, float, np.number)) else float(tempo[0]) except: bpm = 0.0 # Key detection try: chromagram = librosa.feature.chroma_cqt(y=y, sr=sr) chroma_avg = np.sum(chromagram, axis=1) notes = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] key_index = np.argmax(chroma_avg) key = notes[key_index] # Detect minor minor_third_idx = (key_index + 3) % 12 if chroma_avg[minor_third_idx] > chroma_avg[(key_index + 4) % 12]: key += 'm' except: key = "" # Detect original channels try: y_orig, _ = librosa.load(str(sample_path), sr=None, mono=False) channels = y_orig.shape[0] if len(y_orig.shape) > 1 else 1 except: channels = 1 return { "rms": round(rms_db, 2), "spectral_centroid": round(spectral_centroid, 2), "spectral_rolloff": round(spectral_rolloff, 2), "zero_crossing_rate": round(zcr, 4), "mfccs": mfccs_mean, "onset_strength": round(onset_strength, 4), "duration": round(duration, 3), "sample_rate": sr, "channels": channels, "bpm": round(bpm, 1) if bpm > 0 else 0, "key": key, "analysis_type": "full" } except Exception as e: print(f" [WARN] Librosa analysis failed for {sample_path.name}: {e}") return None def analyze_sample_wave(sample_path: Path) -> Optional[Dict[str, Any]]: """ Analyze sample using wave module (basic info for WAV files). Args: sample_path: Path to audio file Returns: Dictionary with basic audio features or None on error """ if not WAVE_AVAILABLE: return None try: # Only works for WAV files if sample_path.suffix.lower() != '.wav': return None with wave.open(str(sample_path), 'rb') as wav_file: channels = wav_file.getnchannels() sample_rate = wav_file.getframerate() sample_width = wav_file.getsampwidth() n_frames = wav_file.getnframes() duration = n_frames / sample_rate # Try to calculate RMS from samples rms_db = 0.0 try: # Read a portion of the file for RMS calculation frames_to_read = min(n_frames, int(sample_rate * 1)) # Max 1 second raw_data = wav_file.readframes(frames_to_read) if sample_width == 1: fmt = f"{len(raw_data)}B" samples = struct.unpack(fmt, raw_data) samples = [(s - 128) / 128.0 for s in samples] elif sample_width == 2: fmt = f"{len(raw_data) // 2}h" samples = struct.unpack(fmt, raw_data) samples = [s / 32768.0 for s in samples] elif sample_width == 4: fmt = f"{len(raw_data) // 4}i" samples = struct.unpack(fmt, raw_data) samples = [s / 2147483648.0 for s in samples] else: samples = [] if samples: # Calculate RMS if channels > 1: # Interleaved channels - convert to mono mono_samples = [] for i in range(0, len(samples) - channels + 1, channels): mono_samples.append(sum(samples[i:i+channels]) / channels) samples = mono_samples rms = (sum(s**2 for s in samples) / len(samples)) ** 0.5 rms_db = 20 * (rms + 1e-10).bit_length() # Approximate except Exception: pass return { "rms": round(rms_db, 2), "spectral_centroid": 0.0, "spectral_rolloff": 0.0, "zero_crossing_rate": 0.0, "mfccs": [], "onset_strength": 0.0, "duration": round(duration, 3), "sample_rate": sample_rate, "channels": channels, "bpm": 0, "key": "", "analysis_type": "partial" } except Exception as e: return None def create_placeholder_metadata(sample_path: Path) -> Dict[str, Any]: """ Create basic metadata without audio analysis (fallback). Args: sample_path: Path to audio file Returns: Dictionary with file info and placeholder audio features """ # Try wave module first wave_data = analyze_sample_wave(sample_path) if wave_data: return wave_data # Ultimate fallback - just file info stat = sample_path.stat() return { "rms": 0.0, "spectral_centroid": 0.0, "spectral_rolloff": 0.0, "zero_crossing_rate": 0.0, "mfccs": [], "onset_strength": 0.0, "duration": 0.0, "sample_rate": 44100, "channels": 1, "bpm": 0, "key": "", "analysis_type": "partial" } def analyze_sample(sample_path: Path, library_path: Path) -> Optional[SampleFeatures]: """ Analyze a sample and return complete features. Tries librosa first, falls back to wave module, then placeholder. Args: sample_path: Path to audio file library_path: Root library path for pack detection Returns: SampleFeatures object or None on error """ # Get file info stat = sample_path.stat() # Detect role and pack role = detect_role(sample_path) pack = get_pack_name(sample_path, library_path) # Try analysis methods in order of preference audio_features = None if LIBROSA_AVAILABLE: audio_features = analyze_sample_librosa(sample_path) if audio_features is None: audio_features = create_placeholder_metadata(sample_path) if audio_features is None: return None # Build SampleFeatures return SampleFeatures( path=str(sample_path.resolve()), name=sample_path.name, pack=pack, role=role, duration=audio_features.get("duration", 0.0), sample_rate=audio_features.get("sample_rate", 44100), channels=audio_features.get("channels", 1), bpm=audio_features.get("bpm", 0.0), key=audio_features.get("key", ""), rms=audio_features.get("rms", 0.0), spectral_centroid=audio_features.get("spectral_centroid", 0.0), spectral_rolloff=audio_features.get("spectral_rolloff", 0.0), zero_crossing_rate=audio_features.get("zero_crossing_rate", 0.0), mfccs=str(audio_features.get("mfccs", [])), onset_strength=audio_features.get("onset_strength", 0.0), analysis_type=audio_features.get("analysis_type", "partial"), analyzed_at=datetime.now().isoformat(), file_size=stat.st_size, file_modified=stat.st_mtime ) def init_database(db_path: Path) -> sqlite3.Connection: """ Initialize SQLite database with schema. Args: db_path: Path to database file Returns: Database connection """ # Ensure directory exists db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() # Create samples table cursor.execute(""" CREATE TABLE IF NOT EXISTS samples ( id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT UNIQUE NOT NULL, name TEXT NOT NULL, pack TEXT, role TEXT, duration REAL DEFAULT 0.0, sample_rate INTEGER DEFAULT 44100, channels INTEGER DEFAULT 1, bpm REAL DEFAULT 0.0, key TEXT, rms REAL DEFAULT 0.0, spectral_centroid REAL DEFAULT 0.0, spectral_rolloff REAL DEFAULT 0.0, zero_crossing_rate REAL DEFAULT 0.0, mfccs TEXT, onset_strength REAL DEFAULT 0.0, analysis_type TEXT DEFAULT 'partial', analyzed_at TEXT, file_size INTEGER DEFAULT 0, file_modified REAL DEFAULT 0.0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Create indexes cursor.execute("CREATE INDEX IF NOT EXISTS idx_role ON samples(role)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_pack ON samples(pack)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_key ON samples(key)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_bpm ON samples(bpm)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_analysis ON samples(analysis_type)") # Create migration log table cursor.execute(""" CREATE TABLE IF NOT EXISTS migration_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP, total_samples INTEGER DEFAULT 0, analyzed_full INTEGER DEFAULT 0, analyzed_partial INTEGER DEFAULT 0, errors INTEGER DEFAULT 0, duration_seconds REAL DEFAULT 0.0 ) """) conn.commit() return conn def sample_exists(conn: sqlite3.Connection, sample_path: str) -> bool: """Check if a sample already exists in database.""" cursor = conn.cursor() cursor.execute("SELECT 1 FROM samples WHERE path = ?", (sample_path,)) return cursor.fetchone() is not None def save_sample(conn: sqlite3.Connection, features: SampleFeatures) -> bool: """ Save or update sample features in database. Args: conn: Database connection features: SampleFeatures to save Returns: True on success """ cursor = conn.cursor() data = asdict(features) cursor.execute(""" INSERT OR REPLACE INTO samples ( path, name, pack, role, duration, sample_rate, channels, bpm, key, rms, spectral_centroid, spectral_rolloff, zero_crossing_rate, mfccs, onset_strength, analysis_type, analyzed_at, file_size, file_modified ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( data['path'], data['name'], data['pack'], data['role'], data['duration'], data['sample_rate'], data['channels'], data['bpm'], data['key'], data['rms'], data['spectral_centroid'], data['spectral_rolloff'], data['zero_crossing_rate'], data['mfccs'], data['onset_strength'], data['analysis_type'], data['analyzed_at'], data['file_size'], data['file_modified'] )) conn.commit() return True def migrate_library( library_path: Path, db_path: Path, force_reanalyze: bool = False, dry_run: bool = False ) -> Dict[str, Any]: """ Migrate all samples from library to SQLite database. Args: library_path: Path to sample library db_path: Path to SQLite database force_reanalyze: Re-analyze samples even if already in DB dry_run: Scan only, don't save to database Returns: Migration statistics """ start_time = datetime.now() # Scan for samples print(f"[MIGRATE] Scanning library: {library_path}") samples = scan_library(library_path) total = len(samples) if total == 0: print("[MIGRATE] No samples found!") return {"total": 0, "analyzed": 0, "errors": 0, "skipped": 0} print(f"[MIGRATE] Found {total} samples") if dry_run: print("[MIGRATE] Dry run - not saving to database") for i, sample in enumerate(samples, 1): print(f" {i}/{total}: {sample.name}") return {"total": total, "dry_run": True} # Initialize database conn = init_database(db_path) # Start migration log cursor = conn.cursor() cursor.execute("INSERT INTO migration_log (started_at) VALUES (CURRENT_TIMESTAMP)") migration_id = cursor.lastrowid conn.commit() # Process samples analyzed_full = 0 analyzed_partial = 0 errors = 0 skipped = 0 for i, sample_path in enumerate(samples, 1): abs_path = str(sample_path.resolve()) # Check if already analyzed if not force_reanalyze and sample_exists(conn, abs_path): skipped += 1 print(f"\r[MIGRATE] {i}/{total}: {sample_path.name} (skipped - already in DB)", end="") continue print(f"\r[MIGRATE] {i}/{total}: {sample_path.name}", end="") sys.stdout.flush() try: features = analyze_sample(sample_path, library_path) if features: save_sample(conn, features) if features.analysis_type == "full": analyzed_full += 1 else: analyzed_partial += 1 else: errors += 1 print(f"\n [ERROR] Failed to analyze: {sample_path.name}") except Exception as e: errors += 1 print(f"\n [ERROR] Exception analyzing {sample_path.name}: {e}") print() # New line after progress # Update migration log duration = (datetime.now() - start_time).total_seconds() cursor.execute(""" UPDATE migration_log SET completed_at = CURRENT_TIMESTAMP, total_samples = ?, analyzed_full = ?, analyzed_partial = ?, errors = ?, duration_seconds = ? WHERE id = ? """, (total, analyzed_full, analyzed_partial, errors, duration, migration_id)) conn.commit() conn.close() return { "total": total, "analyzed_full": analyzed_full, "analyzed_partial": analyzed_partial, "errors": errors, "skipped": skipped, "duration_seconds": duration, "db_path": str(db_path) } def get_migration_status(db_path: Path) -> Dict[str, Any]: """ Get current database statistics. Args: db_path: Path to SQLite database Returns: Statistics dictionary """ if not db_path.exists(): return {"error": "Database not found", "db_path": str(db_path)} conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() # Total samples cursor.execute("SELECT COUNT(*) FROM samples") total = cursor.fetchone()[0] # By role cursor.execute("SELECT role, COUNT(*) FROM samples GROUP BY role") by_role = {row[0]: row[1] for row in cursor.fetchall()} # By analysis type cursor.execute("SELECT analysis_type, COUNT(*) FROM samples GROUP BY analysis_type") by_analysis = {row[0]: row[1] for row in cursor.fetchall()} # By pack cursor.execute("SELECT pack, COUNT(*) FROM samples GROUP BY pack") by_pack = {row[0]: row[1] for row in cursor.fetchall()} # Averages cursor.execute(""" SELECT AVG(duration), AVG(bpm), AVG(rms), AVG(spectral_centroid) FROM samples """) avg_row = cursor.fetchone() # Last migration cursor.execute(""" SELECT started_at, completed_at, total_samples, errors, duration_seconds FROM migration_log ORDER BY id DESC LIMIT 1 """) last_migration = cursor.fetchone() conn.close() return { "total_samples": total, "by_role": by_role, "by_analysis_type": by_analysis, "by_pack": by_pack, "averages": { "duration": round(avg_row[0], 3) if avg_row[0] else 0, "bpm": round(avg_row[1], 1) if avg_row[1] else 0, "rms": round(avg_row[2], 2) if avg_row[2] else 0, "spectral_centroid": round(avg_row[3], 2) if avg_row[3] else 0, }, "last_migration": { "started": last_migration[0] if last_migration else None, "completed": last_migration[1] if last_migration else None, "total_samples": last_migration[2] if last_migration else 0, "errors": last_migration[3] if last_migration else 0, "duration_seconds": last_migration[4] if last_migration else 0, } if last_migration else None, "db_path": str(db_path), "db_size_mb": round(db_path.stat().st_size / (1024 * 1024), 2) } def print_report(stats: Dict[str, Any]): """Print formatted migration report.""" print("\n" + "=" * 60) print("MIGRATION REPORT") print("=" * 60) if "error" in stats: print(f"Error: {stats['error']}") return print(f"\nTotal samples: {stats['total']}") if stats.get('dry_run'): print("Mode: Dry run (no changes saved)") return print(f"Full analysis: {stats.get('analyzed_full', 0)}") print(f"Partial analysis: {stats.get('analyzed_partial', 0)}") print(f"Skipped (already in DB): {stats.get('skipped', 0)}") print(f"Errors: {stats.get('errors', 0)}") print(f"Duration: {stats.get('duration_seconds', 0):.1f} seconds") print(f"Database: {stats.get('db_path', 'N/A')}") print("\n" + "=" * 60) def print_status(status: Dict[str, Any]): """Print database status report.""" print("\n" + "=" * 60) print("DATABASE STATUS") print("=" * 60) if "error" in status: print(f"Error: {status['error']}") return print(f"\nTotal samples: {status['total_samples']}") print(f"Database size: {status['db_size_mb']} MB") print(f"Database path: {status['db_path']}") print("\nBy Role:") for role, count in sorted(status['by_role'].items()): print(f" {role}: {count}") print("\nBy Analysis Type:") for atype, count in status['by_analysis_type'].items(): print(f" {atype}: {count}") print("\nAverages:") avg = status['averages'] print(f" Duration: {avg['duration']}s") print(f" BPM: {avg['bpm']}") print(f" RMS: {avg['rms']} dB") print(f" Spectral Centroid: {avg['spectral_centroid']} Hz") if status.get('last_migration'): lm = status['last_migration'] print(f"\nLast Migration:") print(f" Started: {lm['started']}") print(f" Completed: {lm['completed']}") print(f" Samples: {lm['total_samples']}") print(f" Errors: {lm['errors']}") print(f" Duration: {lm['duration_seconds']:.1f}s") print("\n" + "=" * 60) def main(): """Command-line interface for migration script.""" parser = argparse.ArgumentParser( description="Migrate sample library to SQLite database", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python migrate_library.py # Run migration python migrate_library.py --force # Force re-analyze all python migrate_library.py --dry-run # Scan only python migrate_library.py --status # Show database stats """ ) parser.add_argument( "--library", type=str, default=str(DEFAULT_LIBRARY_PATH), help=f"Path to sample library (default: {DEFAULT_LIBRARY_PATH})" ) parser.add_argument( "--db", type=str, default=str(DEFAULT_DB_PATH), help=f"Path to SQLite database (default: {DEFAULT_DB_PATH})" ) parser.add_argument( "--force", action="store_true", help="Force re-analysis of all samples" ) parser.add_argument( "--dry-run", action="store_true", help="Scan only, don't save to database" ) parser.add_argument( "--status", action="store_true", help="Show database status and exit" ) parser.add_argument( "--reset", action="store_true", help="Delete database and start fresh" ) args = parser.parse_args() library_path = Path(args.library) db_path = Path(args.db) # Handle reset if args.reset: if db_path.exists(): print(f"[RESET] Deleting database: {db_path}") db_path.unlink() else: print("[RESET] Database does not exist") # Show status if args.status: status = get_migration_status(db_path) print_status(status) return # Run migration print(f"[MIGRATE] Library: {library_path}") print(f"[MIGRATE] Database: {db_path}") print(f"[MIGRATE] Librosa available: {LIBROSA_AVAILABLE}") stats = migrate_library( library_path=library_path, db_path=db_path, force_reanalyze=args.force, dry_run=args.dry_run ) print_report(stats) # Show final status if not args.dry_run: status = get_migration_status(db_path) print_status(status) if __name__ == "__main__": main()