Files
ableton-mcp-ai/AbletonMCP_AI/AbletonMCP_AI/MCP_Server/pack_brain.py

723 lines
26 KiB
Python

"""
pack_brain.py - Palette/pack selection focused on coherent reggaeton production.
Builds candidate palettes from the local library by scoring folder-level coherence
across drums, bass, music, vocal and FX material. The goal is to stop selecting
good isolated samples that do not belong to the same sonic universe.
"""
from __future__ import annotations
import itertools
import logging
import re
from collections import Counter, defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
logger = logging.getLogger("PackBrain")
IGNORED_SEGMENTS = {
"(extra)",
".sample_cache",
".segment_rag",
"__pycache__",
"documentation",
"installer",
"flp",
}
GENERIC_FOLDER_HINTS = {
"kick",
"snare",
"drumloops",
"drumloop",
"oneshots",
"one shots",
"fx",
"bass",
"perc loop",
"perc",
"sounds presets",
"sample pack",
"drum loops",
"instrumental loops",
"vocal phrases",
"music loops",
"one shots",
"hi hat",
"hi-hat",
}
BUS_ROLE_KEYWORDS = {
"drums": {
"kick", "snare", "clap", "hat", "hihat", "drum", "dembow", "perc",
"percussion", "shaker", "loop", "drumloop", "toploop", "ride",
},
"bass": {"bass", "sub", "808", "reese"},
"music": {
"music", "instrumental", "synth", "lead", "pluck", "arp", "pad",
"melody", "melodic", "keys", "piano", "guitar", "loop", "hook",
},
"vocal": {"vocal", "vox", "phrase", "double", "harmony", "libs", "choir"},
"fx": {"fx", "impact", "riser", "fill", "sweep", "transition", "reverse", "atmos"},
}
ROLE_TO_BUS = {
"kick": "drums",
"snare": "drums",
"clap": "drums",
"hat": "drums",
"perc": "drums",
"top_loop": "drums",
"perc_loop": "drums",
"bass": "bass",
"sub": "bass",
"bass_loop": "bass",
"synth": "music",
"synth_loop": "music",
"synth_peak": "music",
"instrumental": "music",
"vocal": "vocal",
"vocal_loop": "vocal",
"vocal_peak": "vocal",
"vocal_build": "vocal",
"vocal_shot": "vocal",
"fx": "fx",
"fill_fx": "fx",
"crash_fx": "fx",
"atmos_fx": "fx",
"snare_roll": "fx",
}
STOP_TOKENS = {
"wav", "mp3", "flac", "aiff", "aif", "loop", "loops", "shot", "shots", "one",
"audio", "pack", "sample", "samples", "prod", "the", "and", "with", "para",
"todos", "usan", "este", "type", "main", "latin", "latinos",
}
NOTE_TO_SEMITONE = {
"c": 0,
"c#": 1,
"db": 1,
"d": 2,
"d#": 3,
"eb": 3,
"e": 4,
"f": 5,
"f#": 6,
"gb": 6,
"g": 7,
"g#": 8,
"ab": 8,
"a": 9,
"a#": 10,
"bb": 10,
"b": 11,
}
ENHARMONIC_EQUIV = {
"db": "c#",
"eb": "d#",
"gb": "f#",
"ab": "g#",
"bb": "a#",
}
def _tokenize(text: str) -> List[str]:
cleaned = re.sub(r"[^a-z0-9#]+", " ", str(text or "").lower())
return [token for token in cleaned.split() if len(token) > 1 and token not in STOP_TOKENS]
def _extract_bpm(text: str) -> Optional[float]:
match = re.search(r"(?<!\d)(\d{2,3})(?:\s?bpm|\s?bpms)?(?!\d)", str(text or "").lower())
if not match:
return None
value = float(match.group(1))
if 60.0 <= value <= 180.0:
return value
return None
def _normalize_key(value: Any) -> str:
text = str(value or "").strip().lower()
if not text:
return ""
text = text.replace("minor", "m").replace("major", "")
text = text.replace(" min", "m").replace(" maj", "")
text = text.replace("_", "").replace("-", "").replace(" ", "")
text = text.replace("", "b").replace("", "#")
mode = "m" if text.endswith("m") else ""
note = text[:-1] if mode else text
note = ENHARMONIC_EQUIV.get(note, note)
return f"{note}{mode}"
def _split_key(value: Any) -> Tuple[str, str]:
normalized = _normalize_key(value)
if not normalized:
return "", ""
if normalized.endswith("m"):
return normalized[:-1], "minor"
return normalized, "major"
def _extract_key(text: str) -> str:
lowered = str(text or "").lower()
patterns = [
r"([a-g])([#b]?)[ _-]?(?:min|minor|m)(?:\b|_)",
r"([a-g])([#b]?)[ _-]?(?:maj|major)(?:\b|_)",
r"\b([a-g])([#b]?m)(?:\b|_)",
r"\b([a-g])([#b]?)\b",
]
for pattern in patterns:
match = re.search(pattern, lowered)
if not match:
continue
if len(match.groups()) == 2:
return _normalize_key("".join(match.groups()))
return _normalize_key("".join(match.groups()))
return ""
def _key_score(target_key: str, candidate_key: str) -> float:
target = _normalize_key(target_key)
candidate = _normalize_key(candidate_key)
if not target or not candidate:
return 0.55
if target == candidate:
return 1.0
target_note, target_mode = _split_key(target)
candidate_note, candidate_mode = _split_key(candidate)
target_pc = NOTE_TO_SEMITONE.get(target_note)
candidate_pc = NOTE_TO_SEMITONE.get(candidate_note)
if target_pc is None or candidate_pc is None:
return 0.55
if target_note == candidate_note and target_mode != candidate_mode:
return 0.78
if target_mode != candidate_mode:
if target_mode == "major" and ((target_pc + 9) % 12) == candidate_pc:
return 0.9
if target_mode == "minor" and ((target_pc + 3) % 12) == candidate_pc:
return 0.9
distance = min((target_pc - candidate_pc) % 12, (candidate_pc - target_pc) % 12)
if distance in {5, 7} and target_mode == candidate_mode:
return 0.72
if distance in {2, 10} and target_mode == candidate_mode:
return 0.54
if distance in {3, 4}:
return 0.38
return 0.24
def _shared_token_bonus(groups: Sequence[Sequence[str]]) -> Tuple[float, List[str]]:
counters = [Counter(tokens) for tokens in groups if tokens]
if not counters:
return 0.0, []
intersection = set(counters[0].keys())
for counter in counters[1:]:
intersection &= set(counter.keys())
shared = sorted(token for token in intersection if token not in STOP_TOKENS)
bonus = min(2.4, 0.35 * len(shared))
return bonus, shared[:8]
@dataclass
class FolderStats:
path: str
bus: str
sample_count: int = 0
loop_count: int = 0
one_shot_count: int = 0
bpm_values: List[float] = field(default_factory=list)
keys: Counter = field(default_factory=Counter)
tokens: Counter = field(default_factory=Counter)
source_roots: Counter = field(default_factory=Counter)
def to_summary(self) -> Dict[str, Any]:
dominant_key = self.keys.most_common(1)[0][0] if self.keys else ""
avg_bpm = round(sum(self.bpm_values) / len(self.bpm_values), 2) if self.bpm_values else None
return {
"path": self.path,
"bus": self.bus,
"sample_count": self.sample_count,
"loop_count": self.loop_count,
"one_shot_count": self.one_shot_count,
"avg_bpm": avg_bpm,
"dominant_key": dominant_key,
"top_tokens": [token for token, _ in self.tokens.most_common(8)],
"source_root": self.source_roots.most_common(1)[0][0] if self.source_roots else "",
}
class PackBrain:
"""Derive coherent palettes from the user's library."""
def __init__(self, manager: Any):
self.manager = manager
self.base_dir = Path(getattr(manager, "base_dir", "."))
self._folder_stats: Dict[Tuple[str, str], FolderStats] = {}
self._prepared = False
def _should_ignore(self, sample_path: Path) -> bool:
return any(part.strip().lower() in IGNORED_SEGMENTS for part in sample_path.parts)
def _detect_bus(self, sample: Any, sample_path: Path) -> str:
haystack = " ".join(
[
sample_path.as_posix().lower(),
str(getattr(sample, "category", "")).lower(),
str(getattr(sample, "subcategory", "")).lower(),
str(getattr(sample, "sample_type", "")).lower(),
]
)
bus_scores = {}
for bus, keywords in BUS_ROLE_KEYWORDS.items():
bus_scores[bus] = sum(1 for keyword in keywords if keyword in haystack)
if "vocal" in haystack or "vox" in haystack:
bus_scores["vocal"] += 2
if "fx" in haystack or "impact" in haystack or "transition" in haystack:
bus_scores["fx"] += 2
best_bus, best_score = max(bus_scores.items(), key=lambda item: item[1])
return best_bus if best_score > 0 else "music"
def _source_root(self, relative_parts: Sequence[str]) -> str:
for part in relative_parts:
lowered = part.strip().lower()
if lowered not in GENERIC_FOLDER_HINTS and lowered not in STOP_TOKENS:
return part
return relative_parts[0] if relative_parts else "library"
def _build_stats(self) -> None:
if self._prepared:
return
for sample in getattr(self.manager, "samples", {}).values():
sample_path = Path(str(getattr(sample, "path", "") or ""))
if not sample_path.is_file() or self._should_ignore(sample_path):
continue
try:
rel = sample_path.relative_to(self.base_dir)
rel_parts = rel.parts[:-1]
except ValueError:
rel_parts = sample_path.parts[:-1]
bus = self._detect_bus(sample, sample_path)
folder_key = (bus, str(sample_path.parent))
stats = self._folder_stats.setdefault(folder_key, FolderStats(path=str(sample_path.parent), bus=bus))
stats.sample_count += 1
sample_name = str(getattr(sample, "name", sample_path.stem))
duration = float(getattr(sample, "duration", 0.0) or 0.0)
bpm = getattr(sample, "bpm", None) or _extract_bpm(sample_name) or _extract_bpm(sample_path.as_posix())
key = getattr(sample, "key", None) or _extract_key(sample_name) or _extract_key(sample_path.as_posix())
if bpm:
stats.bpm_values.append(float(bpm))
if key:
stats.keys[_normalize_key(key)] += 1
looks_like_loop = duration >= 1.25 or "loop" in sample_name.lower() or "loop" in sample_path.as_posix().lower()
if looks_like_loop:
stats.loop_count += 1
else:
stats.one_shot_count += 1
token_source = " ".join(list(rel_parts) + [sample_name])
stats.tokens.update(_tokenize(token_source))
stats.source_roots[self._source_root(rel_parts)] += 1
self._prepared = True
def _folder_request_score(self, stats: FolderStats, genre: str, style: str, bpm: float, key: str) -> Tuple[float, List[str]]:
score = 0.0
reasons: List[str] = []
tokens = {token for token, _ in stats.tokens.most_common(20)}
request_tokens = set(_tokenize(f"{genre} {style}"))
folder_text = Path(stats.path).as_posix().lower()
if stats.sample_count:
density_bonus = min(2.2, 0.2 * stats.sample_count)
score += density_bonus
reasons.append(f"{stats.sample_count} samples")
if stats.loop_count and stats.bus in {"drums", "music", "vocal"}:
loop_bonus = min(1.6, 0.25 * stats.loop_count)
score += loop_bonus
if stats.one_shot_count and stats.bus in {"drums", "bass"}:
one_shot_bonus = min(1.2, 0.2 * stats.one_shot_count)
score += one_shot_bonus
if request_tokens:
overlap = request_tokens & tokens
if overlap:
score += 0.6 * len(overlap)
reasons.append(f"keywords {sorted(overlap)}")
if "reggaeton" in " ".join(tokens) or "dembow" in " ".join(tokens):
score += 1.1
if stats.bus == "drums":
if any(term in folder_text for term in ["/drum", "/kick", "/snare", "/oneshot", "drum loops", "drumloops"]):
score += 1.4
if "/fx/" in folder_text or "fill" in folder_text:
score -= 0.9
elif stats.bus == "bass":
if "/bass/" in folder_text or " sub" in folder_text or "/sub" in folder_text:
score += 1.6
if "/fx/" in folder_text or "fill" in folder_text or "impact" in folder_text:
score -= 1.8
elif stats.bus == "music":
if "instrumental loops" in folder_text or "music loops" in folder_text or "sample pack" in folder_text:
score += 1.6
if "/fx/" in folder_text or "fill" in folder_text or "drum loop" in folder_text:
score -= 1.4
elif stats.bus == "vocal":
if "vocal" in folder_text or "vox" in folder_text or "phrases" in folder_text:
score += 1.4
elif stats.bus == "fx":
if "/fx/" in folder_text or "fill" in folder_text or "impact" in folder_text or "transition" in folder_text:
score += 1.4
if bpm > 0 and stats.bpm_values:
avg_bpm = sum(stats.bpm_values) / len(stats.bpm_values)
diff = abs(avg_bpm - bpm)
if diff <= 1.5:
score += 2.4
reasons.append(f"BPM {avg_bpm:.1f}")
elif diff <= 4:
score += 1.8
elif diff <= 8:
score += 1.0
elif abs(avg_bpm - (bpm * 2.0)) <= 4 or abs(avg_bpm - (bpm / 2.0)) <= 3:
score += 0.75
if key and stats.keys:
dominant_key = stats.keys.most_common(1)[0][0]
compatibility = _key_score(key, dominant_key)
score += compatibility * 2.2
if compatibility >= 0.8:
reasons.append(f"key {dominant_key}")
source_root = stats.source_roots.most_common(1)[0][0] if stats.source_roots else ""
if source_root and source_root.lower() not in GENERIC_FOLDER_HINTS:
score += 0.5
return score, reasons
def _support_folder_score(
self,
stats: FolderStats,
requested_bus: str,
palette_tokens: Sequence[Sequence[str]],
genre: str,
style: str,
bpm: float,
key: str,
) -> float:
base_score, _ = self._folder_request_score(stats, genre, style, bpm, key)
bus_bonus = 1.2 if stats.bus == requested_bus else 0.0
shared_bonus, _ = _shared_token_bonus(list(palette_tokens) + [[token for token, _ in stats.tokens.most_common(10)]])
return base_score + bus_bonus + shared_bonus
def rank_palettes(
self,
genre: str,
style: str = "",
bpm: float = 0.0,
key: str = "",
max_candidates: int = 5,
) -> Dict[str, Any]:
self._build_stats()
bus_rankings: Dict[str, List[Tuple[float, FolderStats, List[str]]]] = defaultdict(list)
for (_, _), stats in self._folder_stats.items():
if stats.bus not in {"drums", "bass", "music", "vocal", "fx"}:
continue
folder_score, reasons = self._folder_request_score(stats, genre, style, bpm, key)
if folder_score <= 0:
continue
bus_rankings[stats.bus].append((folder_score, stats, reasons))
for bus in bus_rankings:
bus_rankings[bus].sort(key=lambda item: item[0], reverse=True)
drums = bus_rankings.get("drums", [])[:4]
bass = bus_rankings.get("bass", [])[:4]
music = bus_rankings.get("music", [])[:4]
vocals = bus_rankings.get("vocal", [])[:4]
fxs = bus_rankings.get("fx", [])[:4]
palette_candidates: List[Dict[str, Any]] = []
candidate_index = 0
for drums_item, bass_item, music_item in itertools.product(drums or [None], bass or [None], music or [None]):
if not drums_item or not bass_item or not music_item:
continue
selected = [drums_item[1], bass_item[1], music_item[1]]
token_groups = [[token for token, _ in stats.tokens.most_common(10)] for stats in selected]
shared_bonus, shared_tokens = _shared_token_bonus(token_groups)
source_roots = [
stats.source_roots.most_common(1)[0][0]
for stats in selected
if stats.source_roots
]
source_counter = Counter(source_roots)
source_bonus = 0.0
if source_counter:
most_common_source, source_hits = source_counter.most_common(1)[0]
if source_hits >= 3:
source_bonus += 2.2
elif source_hits == 2:
source_bonus += 1.4
if most_common_source.lower() in {"reggaeton 3", "sentimientolatino2025"}:
source_bonus += 0.4
if Path(bass_item[1].path).parent == Path(music_item[1].path).parent:
source_bonus += 1.6
harmony_notes: List[str] = []
bass_key = bass_item[1].keys.most_common(1)[0][0] if bass_item[1].keys else ""
music_key = music_item[1].keys.most_common(1)[0][0] if music_item[1].keys else ""
harmony_score = _key_score(bass_key, music_key) if bass_key and music_key else 0.55
if bass_key and music_key:
if harmony_score >= 0.9:
source_bonus += 1.8
harmony_notes.append(f"harmonic lock {bass_key}/{music_key}")
elif harmony_score >= 0.72:
source_bonus += 0.9
harmony_notes.append(f"harmonic fit {bass_key}/{music_key}")
elif harmony_score >= 0.54:
source_bonus += 0.2
harmony_notes.append(f"harmonic risk {bass_key}/{music_key}")
else:
source_bonus -= 3.5
harmony_notes.append(f"harmonic clash {bass_key}/{music_key}")
palette_score = drums_item[0] + bass_item[0] + music_item[0] + shared_bonus + source_bonus
reason_bits = list(dict.fromkeys(harmony_notes + drums_item[2] + bass_item[2] + music_item[2]))
palette = {
"drums": drums_item[1].path,
"bass": bass_item[1].path,
"music": music_item[1].path,
}
support_folders: Dict[str, str] = {}
for bus_name, support_rankings in (("vocal", vocals), ("fx", fxs)):
if not support_rankings:
continue
best_support = max(
support_rankings,
key=lambda item: self._support_folder_score(
item[1], bus_name, token_groups, genre, style, bpm, key
),
)
support_folders[bus_name] = best_support[1].path
if support_folders:
palette_score += 0.35 * len(support_folders)
candidate_index += 1
palette_candidates.append(
{
"id": f"palette-{candidate_index}",
"score": round(palette_score, 3),
"harmony_score": round(harmony_score, 3),
"harmony_verdict": (
"compatible" if harmony_score >= 0.72
else "risky" if harmony_score >= 0.54
else "clash"
),
"palette": palette,
"support_folders": support_folders,
"shared_tokens": shared_tokens,
"reasons": reason_bits[:10],
"folders": {
"drums": drums_item[1].to_summary(),
"bass": bass_item[1].to_summary(),
"music": music_item[1].to_summary(),
"vocal": next((item[1].to_summary() for item in vocals if item[1].path == support_folders.get("vocal")), None),
"fx": next((item[1].to_summary() for item in fxs if item[1].path == support_folders.get("fx")), None),
},
}
)
palette_candidates.sort(key=lambda item: item["score"], reverse=True)
selected = palette_candidates[0] if palette_candidates else {}
return {
"genre": genre,
"style": style,
"bpm": bpm,
"key": key,
"selected_palette": selected,
"candidates": palette_candidates[:max_candidates],
"folder_rankings": {
bus: [
{
"score": round(score, 3),
"summary": stats.to_summary(),
"reasons": reasons[:6],
}
for score, stats, reasons in rankings[:max_candidates]
]
for bus, rankings in bus_rankings.items()
},
}
def get_folder_compatibility_score(self, folder1: str, folder2: str) -> Tuple[float, str]:
"""
Calculate compatibility score between two folders.
Returns:
Tuple of (score, relationship_type)
- score: 0.0 to 1.0 compatibility score
- relationship_type: 'exact', 'sibling', 'cousin', 'unrelated'
"""
import os
f1 = folder1.replace(os.sep, '/')
f2 = folder2.replace(os.sep, '/')
# Exact same folder
if f1 == f2:
return 1.0, 'exact'
p1 = str(Path(f1).parent).replace(os.sep, '/')
p2 = str(Path(f2).parent).replace(os.sep, '/')
# Sibling folders (same parent)
if p1 == p2:
return 0.85, 'sibling'
gp1 = str(Path(p1).parent).replace(os.sep, '/') if p1 else ''
gp2 = str(Path(p2).parent).replace(os.sep, '/') if p2 else ''
# Cousin folders (same grandparent)
if gp1 == gp2 and gp1:
return 0.70, 'cousin'
# Check if folders share tokens
tokens1 = set(_tokenize(f1))
tokens2 = set(_tokenize(f2))
shared = tokens1 & tokens2
if shared:
# Shared tokens indicate some relationship
return 0.55, 'related'
return 0.30, 'unrelated'
def evaluate_folder_combination(self, folders: Dict[str, str]) -> Dict[str, Any]:
"""
Evaluate a combination of folders for different buses/roles.
Args:
folders: Dict mapping bus/role to folder path
Returns:
Dict with compatibility analysis
"""
if not folders or len(folders) < 2:
return {
'overall_score': 0.0,
'pair_scores': {},
'recommendation': 'Need at least 2 folders to evaluate'
}
pair_scores = {}
total_score = 0.0
pair_count = 0
items = list(folders.items())
for i, (role1, folder1) in enumerate(items):
for role2, folder2 in items[i+1:]:
score, relationship = self.get_folder_compatibility_score(folder1, folder2)
pair_key = f"{role1}-{role2}"
pair_scores[pair_key] = {
'score': round(score, 3),
'relationship': relationship,
'folder1': Path(folder1).name,
'folder2': Path(folder2).name,
}
total_score += score
pair_count += 1
overall_score = total_score / pair_count if pair_count > 0 else 0.0
# Generate recommendation
if overall_score >= 0.8:
recommendation = "Excellent folder combination - highly coherent"
elif overall_score >= 0.6:
recommendation = "Good folder combination - reasonably coherent"
elif overall_score >= 0.4:
recommendation = "Moderate coherence - some folders are unrelated"
else:
recommendation = "Poor coherence - folders are from different packs/sources"
return {
'overall_score': round(overall_score, 3),
'pair_scores': pair_scores,
'folder_count': len(folders),
'recommendation': recommendation,
}
def find_compatible_folder_for_role(self,
target_role: str,
reference_folders: List[str],
genre: str = "",
bpm: float = 0,
key: str = "") -> Optional[str]:
"""
Find a folder for a role that is compatible with reference folders.
Args:
target_role: Role to find folder for (e.g., 'fill_fx', 'snare_roll')
reference_folders: List of reference folder paths to match against
genre: Genre for filtering
bpm: BPM for filtering
key: Key for filtering
Returns:
Best matching folder path or None
"""
self._build_stats()
# Determine bus for role
target_bus = ROLE_TO_BUS.get(target_role, 'fx')
# Get candidate folders for this bus
candidates = []
for (bus, path), stats in self._folder_stats.items():
if bus == target_bus:
score, _ = self._folder_request_score(stats, genre, "", bpm, key)
if score > 0:
candidates.append((score, path, stats))
if not candidates:
return None
# Score candidates by compatibility with reference folders
scored_candidates = []
for base_score, path, stats in candidates:
compatibility_bonus = 0.0
for ref_folder in reference_folders:
compat_score, _ = self.get_folder_compatibility_score(path, ref_folder)
compatibility_bonus += compat_score * 0.5
final_score = base_score + compatibility_bonus
scored_candidates.append((final_score, path))
scored_candidates.sort(reverse=True)
if scored_candidates:
best_folder = scored_candidates[0][1]
logger.debug("COMPAT_FOLDER [%s]: Selected '%s' with score %.2f (matched against %d refs)",
target_role, Path(best_folder).name, scored_candidates[0][0],
len(reference_folders))
return best_folder
return None