- Fix compose.py to select different samples per section instead of one per role
- Add select_many() to SampleSelector for diverse sample selection
- Migrate 862 samples from scattered dirs to libreria/samples/{role}/
- Rename files with consistent convention: {role}_{key}_{bpm}_{character}_{hash}.wav
- Add migrate_library.py script with dry-run and verification
- Backup original index as sample_index_pre_migration.json
- 72 tests passing
281 lines
8.7 KiB
Python
281 lines
8.7 KiB
Python
#!/usr/bin/env python
|
|
"""Migrate sample library to new organized structure.
|
|
|
|
Copies all 862 samples from scattered subdirectories in `libreria/reggaeton/`
|
|
to flat role-based directories under `libreria/samples/{role}/` with consistent
|
|
naming.
|
|
|
|
Usage:
|
|
python scripts/migrate_library.py [--dry-run] [--verify]
|
|
|
|
CRITICAL RULES:
|
|
- COPY files, do NOT move them (keep originals as backup)
|
|
- Do NOT delete original files
|
|
- Keep the old index as backup before modifying
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import shutil
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Project root
|
|
_ROOT = Path(__file__).parent.parent
|
|
INDEX_PATH = _ROOT / "data" / "sample_index.json"
|
|
BACKUP_PATH = _ROOT / "data" / "sample_index_pre_migration.json"
|
|
SAMPLES_ROOT = _ROOT / "libreria" / "samples"
|
|
LOG_PATH = _ROOT / "scripts" / "migration_log.json"
|
|
|
|
# All known roles
|
|
ROLES = {
|
|
"kick", "snare", "hihat", "perc", "bass", "lead", "keys", "pad",
|
|
"drumloop", "fx", "vocal", "guitar", "brass", "synth", "arp",
|
|
"pluck", "oneshot", "fill",
|
|
}
|
|
|
|
|
|
def log(msg: str) -> None:
|
|
print(msg, flush=True)
|
|
|
|
|
|
def create_directories() -> None:
|
|
"""Create libreria/samples/ with all role subdirectories."""
|
|
SAMPLES_ROOT.mkdir(parents=True, exist_ok=True)
|
|
for role in ROLES:
|
|
(SAMPLES_ROOT / role).mkdir(exist_ok=True)
|
|
log(f"[OK] Created directories under {SAMPLES_ROOT}")
|
|
|
|
|
|
def load_index() -> dict:
|
|
"""Load sample index."""
|
|
with open(INDEX_PATH, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_index(data: dict) -> None:
|
|
"""Write updated index atomically."""
|
|
temp_path = INDEX_PATH.with_suffix(".json.tmp")
|
|
with open(temp_path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
temp_path.replace(INDEX_PATH)
|
|
|
|
|
|
def backup_index() -> None:
|
|
"""Create backup of original index."""
|
|
if not BACKUP_PATH.exists():
|
|
shutil.copy2(INDEX_PATH, BACKUP_PATH)
|
|
log(f"[OK] Backed up index to {BACKUP_PATH}")
|
|
else:
|
|
log(f"[WARN] Backup already exists at {BACKUP_PATH}, skipping")
|
|
|
|
|
|
def migrate_samples(data: dict, dry_run: bool = False) -> tuple[list[dict], list[dict], dict]:
|
|
"""Migrate all samples.
|
|
|
|
Args:
|
|
data: The loaded index data dict (will be modified in-place)
|
|
|
|
Returns:
|
|
(migrated list, error list, role_counts)
|
|
"""
|
|
samples = data["samples"]
|
|
|
|
migrated: list[dict] = []
|
|
errors: list[dict] = []
|
|
|
|
total = len(samples)
|
|
log(f"Starting migration of {total} samples...")
|
|
|
|
for idx, sample in enumerate(samples):
|
|
if idx > 0 and idx % 100 == 0:
|
|
log(f" Progress: {idx}/{total} ({100*idx/total:.1f}%)")
|
|
|
|
role = sample.get("role", "unknown")
|
|
if role not in ROLES:
|
|
errors.append({
|
|
"sample": sample.get("original_name", "unknown"),
|
|
"error": f"Unknown role: {role}",
|
|
})
|
|
continue
|
|
|
|
original_path = Path(sample["original_path"])
|
|
new_name = sample.get("new_name")
|
|
if not new_name:
|
|
errors.append({
|
|
"sample": sample.get("original_name", "unknown"),
|
|
"error": "No new_name in index",
|
|
})
|
|
continue
|
|
|
|
dest_path = SAMPLES_ROOT / role / new_name
|
|
|
|
if dry_run:
|
|
migrated.append({
|
|
"original": str(original_path),
|
|
"destination": str(dest_path),
|
|
"role": role,
|
|
})
|
|
else:
|
|
try:
|
|
if not original_path.exists():
|
|
errors.append({
|
|
"sample": sample.get("original_name", "unknown"),
|
|
"original_path": str(original_path),
|
|
"error": "Source file not found",
|
|
})
|
|
continue
|
|
|
|
# COPY (not move) to preserve originals
|
|
shutil.copy2(original_path, dest_path)
|
|
|
|
# Update index fields IN-PLACE (modifies data dict)
|
|
sample["migrated_from"] = sample["original_path"]
|
|
sample["original_path"] = str(dest_path)
|
|
sample["original_name"] = new_name
|
|
|
|
migrated.append({
|
|
"original": str(original_path),
|
|
"destination": str(dest_path),
|
|
"role": role,
|
|
"new_name": new_name,
|
|
})
|
|
except Exception as e:
|
|
errors.append({
|
|
"sample": sample.get("original_name", "unknown"),
|
|
"original_path": str(original_path),
|
|
"error": str(e),
|
|
})
|
|
|
|
# Compute role counts
|
|
role_counts: dict[str, int] = {}
|
|
for m in migrated:
|
|
role = m.get("role", "unknown")
|
|
role_counts[role] = role_counts.get(role, 0) + 1
|
|
|
|
return migrated, errors, role_counts
|
|
|
|
|
|
def write_log(role_counts: dict[str, int], migrated: list[dict], errors: list[dict]) -> None:
|
|
"""Write migration log."""
|
|
log_data = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"total_samples": len(migrated) + len(errors),
|
|
"migrated_count": len(migrated),
|
|
"error_count": len(errors),
|
|
"role_counts": role_counts,
|
|
"migrated_sample": migrated[:10], # First 10 as sample
|
|
"errors": errors,
|
|
}
|
|
|
|
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(LOG_PATH, "w", encoding="utf-8") as f:
|
|
json.dump(log_data, f, indent=2, ensure_ascii=False)
|
|
log(f"[OK] Migration log written to {LOG_PATH}")
|
|
|
|
|
|
def verify_migration(migrated: list[dict]) -> list[dict]:
|
|
"""Verify all migrated files exist."""
|
|
missing: list[dict] = []
|
|
for m in migrated:
|
|
dest = Path(m["destination"])
|
|
if not dest.exists():
|
|
missing.append(m)
|
|
return missing
|
|
|
|
|
|
def run_migration(dry_run: bool = False, verify: bool = True) -> int:
|
|
"""Execute full migration.
|
|
|
|
Returns:
|
|
0 on success, 1 on errors
|
|
"""
|
|
log("=" * 60)
|
|
log("SAMPLE LIBRARY MIGRATION")
|
|
log("=" * 60)
|
|
log(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}")
|
|
log(f"Index: {INDEX_PATH}")
|
|
log(f"Backup: {BACKUP_PATH}")
|
|
log(f"Target: {SAMPLES_ROOT}")
|
|
log("")
|
|
|
|
# Phase 1: Setup
|
|
log("[PHASE 1] Creating directories...")
|
|
create_directories()
|
|
backup_index()
|
|
|
|
# Load index once
|
|
data = load_index()
|
|
|
|
# Phase 2: Migrate
|
|
log("")
|
|
log("[PHASE 2] Migrating samples...")
|
|
migrated, errors, role_counts = migrate_samples(data, dry_run=dry_run)
|
|
|
|
log("")
|
|
log(f" Migrated: {len(migrated)} samples")
|
|
if errors:
|
|
log(f" Errors: {len(errors)} samples")
|
|
for e in errors[:5]:
|
|
log(f" - {e.get('sample', 'unknown')}: {e.get('error', 'unknown error')}")
|
|
|
|
if dry_run:
|
|
log("")
|
|
log("[DRY RUN] No files were copied. Showing first 5 destinations:")
|
|
for m in migrated[:5]:
|
|
log(f" {m['original']} -> {m['destination']}")
|
|
return 0 if not errors else 1
|
|
|
|
# Phase 3: Write updated index (data already modified in-place by migrate_samples)
|
|
log("")
|
|
log("[PHASE 3] Writing updated index...")
|
|
save_index(data)
|
|
|
|
# Phase 4: Write log and verify
|
|
log("")
|
|
log("[PHASE 4] Writing migration log...")
|
|
write_log(role_counts, migrated, errors)
|
|
|
|
if verify:
|
|
log("")
|
|
log("[PHASE 5] Verifying migration...")
|
|
missing = verify_migration(migrated)
|
|
if missing:
|
|
log(f"[ERROR] {len(missing)} migrated files are missing!")
|
|
for m in missing[:5]:
|
|
log(f" - {m['destination']}")
|
|
return 1
|
|
else:
|
|
log(f"[OK] All {len(migrated)} migrated files verified")
|
|
|
|
log("")
|
|
log("=" * 60)
|
|
log("MIGRATION COMPLETE")
|
|
log(f"Migrated: {len(migrated)} samples")
|
|
log(f"Errors: {len(errors)} samples")
|
|
log("=" * 60)
|
|
|
|
# Print role breakdown
|
|
log("")
|
|
log("Sample count per role:")
|
|
for role, count in sorted(role_counts.items()):
|
|
log(f" {role}: {count}")
|
|
|
|
return 0 if not errors else 1
|
|
|
|
|
|
def main() -> int:
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="Migrate sample library to new structure")
|
|
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without copying")
|
|
parser.add_argument("--verify", action="store_true", default=True, help="Verify migrated files exist")
|
|
parser.add_argument("--no-verify", dest="verify", action="store_false", help="Skip verification")
|
|
args = parser.parse_args()
|
|
|
|
return run_migration(dry_run=args.dry_run, verify=args.verify)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main()) |