""" coherence_system.py - Advanced Coherence Scoring System Implements sophisticated sample coherence tracking and scoring for the AbletonMCP_AI music production engine. Provides cross-generation memory, fatigue tracking, section-aware selection, and palette locking. Author: AbletonMCP_AI Date: 2026-04-11 Version: 1.0.0 """ from typing import Dict, List, Tuple, Optional, Any, Set from dataclasses import dataclass, field from pathlib import Path import json import time # ============================================================================ # CROSS-GENERATION MEMORY # ============================================================================ # Global storage for tracking sample usage across song generations _cross_generation_family_memory: Dict[str, Dict[str, Any]] = {} _cross_generation_path_memory: Dict[str, Dict[str, Any]] = {} # Fatigue tracking: path -> usage count _fatigue_memory: Dict[str, int] = {} # Palette lock state: role -> locked folder _palette_locks: Dict[str, str] = {} # ============================================================================ # SECTION-AWARE CONFIGURATION # ============================================================================ ROLE_ACTIVITY: Dict[str, Dict[str, int]] = { 'kick': {'intro': 2, 'build': 3, 'drop': 4, 'break': 1, 'outro': 2}, 'clap': {'intro': 0, 'build': 2, 'drop': 4, 'break': 1, 'outro': 1}, 'snare': {'intro': 1, 'build': 2, 'drop': 3, 'break': 0, 'outro': 1}, 'hat': {'intro': 1, 'build': 3, 'drop': 4, 'break': 2, 'outro': 1}, 'bass': {'intro': 0, 'build': 2, 'drop': 4, 'break': 1, 'outro': 1}, 'lead': {'intro': 0, 'build': 1, 'drop': 4, 'break': 0, 'outro': 0}, 'pad': {'intro': 3, 'build': 2, 'drop': 1, 'break': 3, 'outro': 2}, 'fx': {'intro': 1, 'build': 4, 'drop': 2, 'break': 2, 'outro': 1}, 'perc': {'intro': 1, 'build': 2, 'drop': 4, 'break': 1, 'outro': 2}, } SECTION_DENSITY_PROFILES: Dict[str, Dict[str, Any]] = { 'intro': {'density': 0.3, 'complexity': 'low', 'energy_target': 0.25}, 'build': {'density': 0.7, 'complexity': 'high', 'energy_target': 0.72}, 'drop': {'density': 1.0, 'complexity': 'high', 'energy_target': 1.0}, 'break': {'density': 0.4, 'complexity': 'low', 'energy_target': 0.38}, 'outro': {'density': 0.35, 'complexity': 'low', 'energy_target': 0.32}, 'verse': {'density': 0.5, 'complexity': 'medium', 'energy_target': 0.5}, 'chorus': {'density': 0.9, 'complexity': 'high', 'energy_target': 0.85}, 'bridge': {'density': 0.6, 'complexity': 'medium', 'energy_target': 0.65}, } # Family compatibility matrix (0.0 - 1.0) FAMILY_COMPATIBILITY: Dict[str, Dict[str, float]] = { 'kick': {'kick': 1.0, 'snare': 0.95, 'clap': 0.9, 'perc': 0.85, 'hat': 0.7, 'bass': 0.8, 'lead': 0.4, 'pad': 0.3, 'fx': 0.5}, 'snare': {'kick': 0.95, 'snare': 1.0, 'clap': 0.98, 'perc': 0.9, 'hat': 0.85, 'bass': 0.75, 'lead': 0.4, 'pad': 0.3, 'fx': 0.5}, 'clap': {'kick': 0.9, 'snare': 0.98, 'clap': 1.0, 'perc': 0.85, 'hat': 0.8, 'bass': 0.75, 'lead': 0.4, 'pad': 0.3, 'fx': 0.55}, 'hat': {'kick': 0.7, 'snare': 0.85, 'clap': 0.8, 'perc': 0.8, 'hat': 1.0, 'bass': 0.65, 'lead': 0.45, 'pad': 0.4, 'fx': 0.5}, 'perc': {'kick': 0.85, 'snare': 0.9, 'clap': 0.85, 'perc': 1.0, 'hat': 0.8, 'bass': 0.7, 'lead': 0.4, 'pad': 0.35, 'fx': 0.6}, 'bass': {'kick': 0.8, 'snare': 0.75, 'clap': 0.75, 'perc': 0.7, 'hat': 0.65, 'bass': 1.0, 'lead': 0.85, 'pad': 0.9, 'fx': 0.6}, 'lead': {'kick': 0.4, 'snare': 0.4, 'clap': 0.4, 'perc': 0.4, 'hat': 0.45, 'bass': 0.85, 'lead': 1.0, 'pad': 0.95, 'fx': 0.7}, 'pad': {'kick': 0.3, 'snare': 0.3, 'clap': 0.3, 'perc': 0.35, 'hat': 0.4, 'bass': 0.9, 'lead': 0.95, 'pad': 1.0, 'fx': 0.6}, 'fx': {'kick': 0.5, 'snare': 0.5, 'clap': 0.55, 'perc': 0.6, 'hat': 0.5, 'bass': 0.6, 'lead': 0.7, 'pad': 0.6, 'fx': 1.0}, } # ============================================================================ # JOINT SCORING SYSTEM # ============================================================================ def calculate_joint_score( candidate_sample: Dict[str, Any], role: str, current_selections: Dict[str, Dict[str, Any]] ) -> float: """ Calculates coherence between candidate and already-selected samples. Returns a score in the range 1.0-1.3+ based on: - Same folder/pack bonus (1.2x-1.4x) - Family compatibility (1.1x-1.3x) - Duration matching Args: candidate_sample: Dict with sample metadata including 'path', 'folder', 'pack', 'family', 'duration', etc. role: The role this sample would fill (kick, snare, bass, etc.) current_selections: Dict of already-selected samples by role Returns: Float score where: - 1.0 = neutral (no coherence bonus) - 1.2-1.4x = folder/pack matching - 1.1-1.3x = family compatibility - Combined score can exceed 1.3 for highly coherent selections Example: >>> candidate = {'path': '/kick/808.wav', 'folder': 'kick', 'pack': 'trap_kit', ... 'family': 'drums', 'duration': 0.5} >>> current = {'snare': {'folder': 'kick', 'pack': 'trap_kit', 'family': 'drums', ... 'duration': 0.5}} >>> calculate_joint_score(candidate, 'kick', current) 1.35 # High coherence from folder, pack, and family match """ if not current_selections: return 1.0 candidate_path = str(candidate_sample.get('path', '')) candidate_folder = candidate_sample.get('folder', '') candidate_pack = candidate_sample.get('pack', '') candidate_family = candidate_sample.get('family', 'unknown') candidate_duration = candidate_sample.get('duration', 1.0) scores = [] compatibilities = [] for selected_role, selected_sample in current_selections.items(): selected_path = str(selected_sample.get('path', '')) selected_folder = selected_sample.get('folder', '') selected_pack = selected_sample.get('pack', '') selected_family = selected_sample.get('family', 'unknown') selected_duration = selected_sample.get('duration', 1.0) # Same folder bonus (1.2x-1.4x) if candidate_folder and candidate_folder == selected_folder: scores.append(1.3) # Same pack bonus (1.2x-1.4x) - slightly higher than folder if candidate_pack and candidate_pack == selected_pack: scores.append(1.35) # Family compatibility (1.1x-1.3x based on matrix) family_score = _get_family_compatibility(candidate_family, selected_family) if family_score > 0.8: compatibilities.append(family_score) # Duration matching (0.95x-1.15x) duration_score = _calculate_duration_match(candidate_duration, selected_duration) if duration_score > 1.0: scores.append(duration_score) # Combine scores multiplicatively for high coherence base_score = 1.0 if scores: # Use the top 2 scores to calculate bonus top_scores = sorted(scores, reverse=True)[:2] for s in top_scores: base_score *= min(s, 1.15) # Cap individual multipliers at 1.15x if compatibilities: avg_compat = sum(compatibilities) / len(compatibilities) base_score *= (0.9 + (avg_compat * 0.4)) # Scale 1.0-1.3x range # Cap at reasonable maximum return min(round(base_score, 3), 1.5) def _get_family_compatibility(family1: str, family2: str) -> float: """ Get compatibility score between two families from the compatibility matrix. Args: family1: First family name family2: Second family name Returns: Compatibility score 0.0-1.0 """ if family1 in FAMILY_COMPATIBILITY: return FAMILY_COMPATIBILITY[family1].get(family2, 0.5) if family2 in FAMILY_COMPATIBILITY: return FAMILY_COMPATIBILITY[family2].get(family1, 0.5) return 0.5 def _calculate_duration_match(duration1: float, duration2: float) -> float: """ Calculate duration matching score between two samples. Args: duration1: First sample duration in seconds duration2: Second sample duration in seconds Returns: Match score 0.95x-1.15x """ if duration1 <= 0 or duration2 <= 0: return 1.0 ratio = min(duration1, duration2) / max(duration1, duration2) # Scale ratio to 0.95-1.15 range if ratio > 0.9: return 1.15 elif ratio > 0.7: return 1.05 elif ratio > 0.5: return 1.0 else: return 0.95 # ============================================================================ # CROSS-GENERATION MEMORY # ============================================================================ def update_cross_generation_memory( selections: Dict[str, Dict[str, Any]], sample_paths: List[str] ) -> None: """ Tracks sample usage across song generations. Updates both family memory and path memory with timestamp and usage count information. Args: selections: Dict of selected samples by role sample_paths: List of all sample paths used in generation Example: >>> selections = {'kick': {'family': 'drums', 'path': '/kick.wav'}} >>> update_cross_generation_memory(selections, ['/kick.wav', '/snare.wav']) """ timestamp = time.time() # Update family memory for role, sample in selections.items(): family = sample.get('family', 'unknown') path = str(sample.get('path', '')) if family not in _cross_generation_family_memory: _cross_generation_family_memory[family] = { 'count': 0, 'last_used': 0, 'roles': set(), 'paths': set() } memory = _cross_generation_family_memory[family] memory['count'] += 1 memory['last_used'] = timestamp memory['roles'].add(role) if path: memory['paths'].add(path) # Update path memory for path in sample_paths: path_str = str(path) if path_str not in _cross_generation_path_memory: _cross_generation_path_memory[path_str] = { 'count': 0, 'last_used': 0, 'generations': [] } path_memory = _cross_generation_path_memory[path_str] path_memory['count'] += 1 path_memory['last_used'] = timestamp path_memory['generations'].append(timestamp) # Also update fatigue memory for path in sample_paths: path_str = str(path) _fatigue_memory[path_str] = _fatigue_memory.get(path_str, 0) + 1 def get_cross_generation_penalty(sample_path: str, role: str) -> float: """ Returns penalty factor 0.5-1.0 based on usage history. Samples used in recent generations receive higher penalties. Args: sample_path: Path to the sample file role: The role being filled Returns: Penalty factor where: - 1.0 = no penalty (never used) - 0.5 = maximum penalty (very recently used) Example: >>> get_cross_generation_penalty('/kick.wav', 'kick') 0.75 # Moderate penalty """ path_str = str(sample_path) if path_str not in _cross_generation_path_memory: return 1.0 memory = _cross_generation_path_memory[path_str] count = memory.get('count', 0) last_used = memory.get('last_used', 0) # Calculate recency factor (decays over time) time_since_use = time.time() - last_used hours_since_use = time_since_use / 3600 # Recency decay: 1.0 at 0 hours, 0.5 at 24+ hours recency_factor = max(0.5, 1.0 - (hours_since_use / 48)) # Count factor: more uses = more penalty # 1 use = 0.95, 5 uses = 0.65, 10+ uses = 0.5 if count == 1: count_factor = 0.95 elif count <= 5: count_factor = 0.95 - ((count - 1) * 0.075) else: count_factor = 0.5 # Combine factors penalty = (recency_factor * 0.4) + (count_factor * 0.6) return round(max(0.5, min(1.0, penalty)), 3) def get_cross_generation_memory_stats() -> Dict[str, Any]: """ Get statistics about cross-generation memory. Returns: Dict with family memory and path memory statistics """ return { 'family_memory_count': len(_cross_generation_family_memory), 'path_memory_count': len(_cross_generation_path_memory), 'fatigue_memory_count': len(_fatigue_memory), 'top_used_families': sorted( _cross_generation_family_memory.items(), key=lambda x: x[1]['count'], reverse=True )[:5], 'top_used_paths': sorted( _cross_generation_path_memory.items(), key=lambda x: x[1]['count'], reverse=True )[:5] } # ============================================================================ # FATIGUE TRACKING # ============================================================================ def get_persistent_fatigue(sample_path: str, role: str) -> float: """ Returns fatigue factor 0.5-1.0 based on usage count. Fatigue represents how "worn out" a sample is from overuse: - 5 uses = 50% fatigue (0.5 factor) - 0 uses = 100% fresh (1.0 factor) Args: sample_path: Path to the sample file role: The role being filled (for role-specific fatigue tracking) Returns: Fatigue factor 0.5-1.0 where higher is better (less fatigued) Example: >>> get_persistent_fatigue('/kick.wav', 'kick') 0.6 # 40% fatigued from previous uses """ path_str = str(sample_path) # Get usage count usage_count = _fatigue_memory.get(path_str, 0) # Calculate fatigue factor if usage_count == 0: return 1.0 elif usage_count == 1: return 0.9 elif usage_count == 2: return 0.8 elif usage_count == 3: return 0.7 elif usage_count == 4: return 0.6 else: # 5+ uses return 0.5 def reset_fatigue_for_path(sample_path: str) -> None: """ Reset fatigue for a specific sample path. Args: sample_path: Path to reset fatigue for """ path_str = str(sample_path) if path_str in _fatigue_memory: del _fatigue_memory[path_str] def reset_all_fatigue() -> None: """Reset all fatigue tracking memory.""" global _fatigue_memory _fatigue_memory = {} def get_fatigue_report() -> Dict[str, Any]: """ Get a report of current fatigue levels. Returns: Dict with fatigue statistics by usage level """ fatigue_levels = { 'fresh': [], # 0 uses, 1.0 'slight': [], # 1 use, 0.9 'moderate': [], # 2 uses, 0.8 'significant': [], # 3 uses, 0.7 'high': [], # 4 uses, 0.6 'exhausted': [] # 5+ uses, 0.5 } for path, count in _fatigue_memory.items(): if count == 0: fatigue_levels['fresh'].append(path) elif count == 1: fatigue_levels['slight'].append(path) elif count == 2: fatigue_levels['moderate'].append(path) elif count == 3: fatigue_levels['significant'].append(path) elif count == 4: fatigue_levels['high'].append(path) else: fatigue_levels['exhausted'].append(path) return { 'total_tracked': len(_fatigue_memory), 'fresh_count': len(fatigue_levels['fresh']), 'slight_count': len(fatigue_levels['slight']), 'moderate_count': len(fatigue_levels['moderate']), 'significant_count': len(fatigue_levels['significant']), 'high_count': len(fatigue_levels['high']), 'exhausted_count': len(fatigue_levels['exhausted']), 'by_level': fatigue_levels } # ============================================================================ # SECTION-AWARE SELECTION # ============================================================================ def get_section_role_bonus(role: str, section_type: str) -> float: """ Returns bonus/penalty based on role appropriateness for section. Uses ROLE_ACTIVITY table to determine how suitable a role is for a given section type. Args: role: The sample role (kick, snare, bass, lead, etc.) section_type: The section type (intro, build, drop, break, outro, verse, chorus, bridge) Returns: Bonus factor 0.5-1.5 where: - 1.5 = highly appropriate (strong bonus) - 1.0 = neutral - 0.5 = inappropriate (penalty) Example: >>> get_section_role_bonus('kick', 'drop') 1.4 # Kick highly appropriate in drop >>> get_section_role_bonus('lead', 'intro') 0.5 # Lead not appropriate in intro """ # Normalize inputs role = role.lower() section_type = section_type.lower() # Check if role exists in activity table if role not in ROLE_ACTIVITY: return 1.0 # Check if section exists for this role if section_type not in ROLE_ACTIVITY[role]: return 1.0 # Get activity level (0-4 scale) activity_level = ROLE_ACTIVITY[role][section_type] # Convert to bonus factor # 0 = 0.5 (penalty), 1 = 0.75, 2 = 1.0, 3 = 1.25, 4 = 1.5 bonus_map = {0: 0.5, 1: 0.75, 2: 1.0, 3: 1.25, 4: 1.5} return bonus_map.get(activity_level, 1.0) def get_section_density_profile(section_type: str) -> Dict[str, Any]: """ Get the density profile for a section type. Args: section_type: The section type (intro, build, drop, etc.) Returns: Dict with density, complexity, and energy_target Example: >>> get_section_density_profile('drop') {'density': 1.0, 'complexity': 'high', 'energy_target': 1.0} """ section_type = section_type.lower() if section_type not in SECTION_DENSITY_PROFILES: return {'density': 0.5, 'complexity': 'medium', 'energy_target': 0.5} return SECTION_DENSITY_PROFILES[section_type].copy() def calculate_section_appropriateness( sample_features: Dict[str, Any], role: str, section_type: str ) -> float: """ Calculate how appropriate a sample is for a specific section. Considers role activity, energy characteristics, and density. Args: sample_features: Dict with sample characteristics (energy, density, etc.) role: The sample role section_type: The target section type Returns: Appropriateness score 0.0-1.5 """ # Get base role bonus role_bonus = get_section_role_bonus(role, section_type) # Get section profile section_profile = get_section_density_profile(section_type) # Compare sample features to section needs sample_energy = sample_features.get('energy', 0.5) section_energy_target = section_profile['energy_target'] # Energy matching (closer = better) energy_diff = abs(sample_energy - section_energy_target) energy_match = max(0.5, 1.0 - (energy_diff * 2)) # Combine scores final_score = role_bonus * energy_match return round(min(final_score, 1.5), 3) def get_section_role_recommendations(section_type: str) -> List[Tuple[str, float]]: """ Get a ranked list of recommended roles for a section. Args: section_type: The section type Returns: List of (role, bonus) tuples sorted by bonus descending """ section_type = section_type.lower() recommendations = [] for role, sections in ROLE_ACTIVITY.items(): if section_type in sections: bonus = get_section_role_bonus(role, section_type) recommendations.append((role, bonus)) return sorted(recommendations, key=lambda x: x[1], reverse=True) # ============================================================================ # PALETTE LOCK SYSTEM # ============================================================================ def set_palette_lock(folders_by_role: Dict[str, str]) -> None: """ Locks selection to specific folders for coherence. Once locked, sample selection will be biased towards samples from the locked folder for each role. Args: folders_by_role: Dict mapping role -> folder path to lock to Example: >>> set_palette_lock({ ... 'kick': 'reggaeton/kick', ... 'snare': 'reggaeton/snare', ... 'bass': 'reggaeton/bass' ... }) """ global _palette_locks _palette_locks.update(folders_by_role) def clear_palette_lock(role: Optional[str] = None) -> None: """ Clear palette lock for a specific role or all roles. Args: role: Role to clear lock for, or None to clear all """ global _palette_locks if role is None: _palette_locks = {} elif role in _palette_locks: del _palette_locks[role] def get_palette_locks() -> Dict[str, str]: """ Get currently active palette locks. Returns: Dict of role -> locked folder """ return _palette_locks.copy() def calculate_palette_bonus(sample_path: str, locked_folder: str) -> float: """ Returns bonus based on palette lock matching. Bonus structure: - Exact folder match: 1.4x - Sibling folder (same parent): 1.2x - Different: 0.9x (penalty) Args: sample_path: Path to the candidate sample locked_folder: The locked folder path to compare against Returns: Bonus factor 0.9-1.4 Example: >>> calculate_palette_bonus('/kick/808.wav', 'kick') 1.4 # Exact match >>> calculate_palette_bonus('/snare/clap.wav', 'drums') 1.2 # Sibling (both in drums) """ if not sample_path or not locked_folder: return 1.0 path_str = str(sample_path).lower() folder_str = str(locked_folder).lower() # Normalize paths path_parts = path_str.replace('\\', '/').split('/') folder_parts = folder_str.replace('\\', '/').split('/') # Check for exact match if folder_str in path_str: return 1.4 # Check for sibling (same parent) if len(path_parts) >= 2 and len(folder_parts) >= 1: sample_parent = path_parts[-2] if len(path_parts) > 1 else '' locked_parent = folder_parts[-2] if len(folder_parts) > 1 else folder_parts[0] if sample_parent and sample_parent == locked_parent: return 1.2 # No match - apply slight penalty return 0.9 def is_sample_in_palette(sample_path: str, role: str) -> bool: """ Check if a sample matches the palette lock for a role. Args: sample_path: Path to the sample role: The role to check palette lock for Returns: True if sample matches palette (or no lock exists) """ if role not in _palette_locks: return True locked_folder = _palette_locks[role] bonus = calculate_palette_bonus(sample_path, locked_folder) # Consider it "in palette" if bonus >= 1.2 (exact or sibling match) return bonus >= 1.2 def get_palette_coherence_score( selections: Dict[str, Dict[str, Any]] ) -> float: """ Calculate overall coherence score for a set of selections based on palette locks. Args: selections: Dict of selected samples by role Returns: Average coherence score across all selections """ if not selections or not _palette_locks: return 1.0 scores = [] for role, sample in selections.items(): if role in _palette_locks: path = str(sample.get('path', '')) locked_folder = _palette_locks[role] bonus = calculate_palette_bonus(path, locked_folder) scores.append(bonus) if not scores: return 1.0 return round(sum(scores) / len(scores), 3) # ============================================================================ # COMPREHENSIVE COHERENCE CALCULATION # ============================================================================ def calculate_comprehensive_coherence( candidate_sample: Dict[str, Any], role: str, current_selections: Dict[str, Dict[str, Any]], section_type: Optional[str] = None ) -> Dict[str, Any]: """ Calculate comprehensive coherence score with all factors. Combines joint scoring, section awareness, palette locking, fatigue, and cross-generation penalties. Args: candidate_sample: Sample to evaluate role: Role for this sample current_selections: Already-selected samples section_type: Optional section type for section-aware scoring Returns: Dict with individual scores and final composite Example: >>> result = calculate_comprehensive_coherence( ... candidate, 'kick', current, 'drop' ... ) >>> result['final_score'] 1.25 """ sample_path = str(candidate_sample.get('path', '')) # Calculate individual scores joint_score = calculate_joint_score(candidate_sample, role, current_selections) section_score = 1.0 if section_type: section_score = get_section_role_bonus(role, section_type) palette_score = 1.0 if role in _palette_locks: palette_score = calculate_palette_bonus(sample_path, _palette_locks[role]) fatigue_factor = get_persistent_fatigue(sample_path, role) generation_penalty = get_cross_generation_penalty(sample_path, role) # Calculate composite score # Joint and section are multiplicative bonuses # Fatigue and generation are penalties applied at the end base_score = joint_score * section_score * palette_score # Apply penalties final_score = base_score * fatigue_factor * generation_penalty # Normalize to 0-1.5 range final_score = min(1.5, max(0.0, final_score)) return { 'joint_score': joint_score, 'section_score': section_score, 'palette_score': palette_score, 'fatigue_factor': fatigue_factor, 'generation_penalty': generation_penalty, 'base_score': round(base_score, 3), 'final_score': round(final_score, 3), 'role': role, 'section_type': section_type, 'sample_path': sample_path } def reset_all_memory() -> None: """Reset all coherence system memory (for testing).""" global _cross_generation_family_memory, _cross_generation_path_memory global _fatigue_memory, _palette_locks _cross_generation_family_memory = {} _cross_generation_path_memory = {} _fatigue_memory = {} _palette_locks = {} # Export all public functions __all__ = [ 'calculate_joint_score', 'update_cross_generation_memory', 'get_cross_generation_penalty', 'get_cross_generation_memory_stats', 'get_persistent_fatigue', 'reset_fatigue_for_path', 'reset_all_fatigue', 'get_fatigue_report', 'get_section_role_bonus', 'get_section_density_profile', 'calculate_section_appropriateness', 'get_section_role_recommendations', 'set_palette_lock', 'clear_palette_lock', 'get_palette_locks', 'calculate_palette_bonus', 'is_sample_in_palette', 'get_palette_coherence_score', 'calculate_comprehensive_coherence', 'reset_all_memory', 'ROLE_ACTIVITY', 'SECTION_DENSITY_PROFILES', 'FAMILY_COMPATIBILITY', ]