""" CoherenceScorer - Advanced Coherence Calculation Engine Calculates multi-dimensional coherence scores between audio samples using timbre similarity (MFCC), transient compatibility, spectral balance, and energy consistency. Professional-grade tool with 0.90 threshold enforcement. File: AbletonMCP_AI/mcp_server/engines/coherence_scorer.py """ import os import numpy as np from typing import Dict, List, Tuple, Optional from dataclasses import dataclass from pathlib import Path class CoherenceError(Exception): """Raised when coherence score falls below professional threshold.""" def __init__(self, score: float, weak_components: List[str], suggestions: List[str]): self.score = score self.weak_components = weak_components self.suggestions = suggestions super().__init__(self._format_message()) def _format_message(self) -> str: msg = f"\n{'='*60}\n" msg += f"COHERENCE ERROR: Professional threshold not met\n" msg += f"{'='*60}\n" msg += f"Current Score: {self.score:.3f} (MIN_COHERENCE: 0.900)\n" msg += f"Status: {'PASS ✓' if self.score >= 0.90 else 'FAIL ✗'}\n\n" if self.weak_components: msg += f"Weak Components ({len(self.weak_components)}):\n" for comp in self.weak_components: msg += f" • {comp}\n" if self.suggestions: msg += f"\nSuggestions for Improvement:\n" for i, sug in enumerate(self.suggestions, 1): msg += f" {i}. {sug}\n" msg += f"{'='*60}\n" return msg @dataclass class AudioFeatures: """Container for extracted audio features.""" mfccs: np.ndarray # MFCC coefficients (timbre) spectral_centroid: float # Brightness spectral_rolloff: float # Bandwidth spectral_flux: np.ndarray # Spectral change (transients) zero_crossing_rate: float # Noisiness rms_energy: np.ndarray # Loudness envelope attack_time: float # Transient attack sustain_level: float # Sustain level low_energy: float # Low band energy (20-250Hz) mid_energy: float # Mid band energy (250-2000Hz) high_energy: float # High band energy (2000-20000Hz) duration: float # Audio duration in seconds sample_rate: int # Sample rate @dataclass class ScoreBreakdown: """Detailed breakdown of coherence score components.""" overall_score: float timbre_similarity: float # MFCC cosine similarity (40%) transient_compatibility: float # Attack characteristic match (30%) spectral_balance: float # Low/mid/high ratio match (20%) energy_consistency: float # RMS correlation (10%) is_professional: bool weak_components: List[str] suggestions: List[str] def to_dict(self) -> Dict: return { 'overall_score': round(self.overall_score, 4), 'timbre_similarity': round(self.timbre_similarity, 4), 'transient_compatibility': round(self.transient_compatibility, 4), 'spectral_balance': round(self.spectral_balance, 4), 'energy_consistency': round(self.energy_consistency, 4), 'is_professional': self.is_professional, 'weak_components': self.weak_components, 'suggestions': self.suggestions } class CoherenceScorer: """ Professional coherence calculation engine. Calculates multi-dimensional coherence scores between audio samples using real audio feature extraction and weighted component analysis. Weights: - Timbre similarity (MFCC): 40% - Transient compatibility: 30% - Spectral balance: 20% - Energy consistency: 10% Professional threshold: 0.90 (MIN_COHERENCE) """ # Professional threshold - no compromise MIN_COHERENCE = 0.90 # Component weights (must sum to 1.0) WEIGHTS = { 'timbre': 0.40, 'transient': 0.30, 'spectral': 0.20, 'energy': 0.10 } # Thresholds for component quality THRESHOLDS = { 'timbre': 0.75, 'transient': 0.70, 'spectral': 0.65, 'energy': 0.60 } def __init__(self, sample_rate: int = 22050): """ Initialize the CoherenceScorer. Args: sample_rate: Target sample rate for analysis (default 22050) """ self.sample_rate = sample_rate self.last_breakdown: Optional[ScoreBreakdown] = None def _load_audio(self, file_path: str) -> Tuple[np.ndarray, int]: """ Load audio file using librosa. Args: file_path: Path to audio file (.wav, .mp3, etc.) Returns: Tuple of (audio_array, sample_rate) Raises: FileNotFoundError: If file doesn't exist ValueError: If file format unsupported or corrupted """ try: import librosa except ImportError: raise ImportError( "librosa is required for audio analysis. " "Install with: pip install librosa" ) path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"Audio file not found: {file_path}") if not path.suffix.lower() in ['.wav', '.mp3', '.aif', '.aiff', '.flac']: raise ValueError(f"Unsupported audio format: {path.suffix}") try: y, sr = librosa.load(file_path, sr=self.sample_rate, mono=True) if len(y) == 0: raise ValueError(f"Audio file is empty: {file_path}") return y, sr except Exception as e: raise ValueError(f"Failed to load audio file {file_path}: {str(e)}") def _extract_features(self, audio: np.ndarray, sr: int) -> AudioFeatures: """ Extract comprehensive audio features. Args: audio: Audio time series sr: Sample rate Returns: AudioFeatures dataclass with all extracted features """ import librosa # Basic spectral features mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13) spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio, sr=sr)) spectral_rolloff = np.mean(librosa.feature.spectral_rolloff(y=audio, sr=sr)) spectral_flux = librosa.onset.onset_strength(y=audio, sr=sr) zcr = np.mean(librosa.feature.zero_crossing_rate(audio)) rms = librosa.feature.rms(y=audio)[0] # Band energy analysis # Low: 20-250Hz, Mid: 250-2000Hz, High: 2000-20000Hz stft = np.abs(librosa.stft(audio)) freqs = librosa.fft_frequencies(sr=sr) low_mask = (freqs >= 20) & (freqs <= 250) mid_mask = (freqs > 250) & (freqs <= 2000) high_mask = (freqs > 2000) & (freqs <= 20000) low_energy = np.sum(stft[low_mask, :]) / stft.shape[1] mid_energy = np.sum(stft[mid_mask, :]) / stft.shape[1] high_energy = np.sum(stft[high_mask, :]) / stft.shape[1] # Normalize band energies total_energy = low_energy + mid_energy + high_energy if total_energy > 0: low_energy /= total_energy mid_energy /= total_energy high_energy /= total_energy # Transient analysis (attack detection) onset_env = librosa.onset.onset_strength(y=audio, sr=sr) onset_frames = librosa.onset.onset_detect(onset_envelope=onset_env, sr=sr) if len(onset_frames) > 0: # Calculate average attack time from first transient first_onset = onset_frames[0] window_start = max(0, first_onset - 10) window_end = min(len(audio), first_onset + 50) if window_end > window_start: attack_segment = audio[window_start:window_end] # Attack time: time from 10% to 90% of peak peak_idx = np.argmax(np.abs(attack_segment)) peak_val = np.abs(attack_segment[peak_idx]) if peak_val > 0: # Find 10% and 90% points ten_percent = 0.1 * peak_val ninety_percent = 0.9 * peak_val ten_idx = np.where(np.abs(attack_segment[:peak_idx]) >= ten_percent)[0] ninety_idx = np.where(np.abs(attack_segment[:peak_idx]) >= ninety_percent)[0] if len(ten_idx) > 0 and len(ninety_idx) > 0: attack_time = (ninety_idx[0] - ten_idx[0]) / sr * 1000 # ms else: attack_time = 10.0 # Default 10ms else: attack_time = 10.0 # Sustain level: average after attack sustain_start = peak_idx + int(0.01 * sr) # 10ms after peak if sustain_start < len(attack_segment): sustain_level = np.mean(np.abs(attack_segment[sustain_start:])) else: sustain_level = 0.0 else: attack_time = 10.0 sustain_level = np.mean(np.abs(audio)) * 0.5 else: attack_time = 50.0 # Long attack for non-transient sounds sustain_level = np.mean(np.abs(audio)) return AudioFeatures( mfccs=mfccs, spectral_centroid=spectral_centroid, spectral_rolloff=spectral_rolloff, spectral_flux=spectral_flux, zero_crossing_rate=zcr, rms_energy=rms, attack_time=attack_time, sustain_level=float(sustain_level), low_energy=float(low_energy), mid_energy=float(mid_energy), high_energy=float(high_energy), duration=len(audio) / sr, sample_rate=sr ) def _calculate_timbre_similarity(self, feat1: AudioFeatures, feat2: AudioFeatures) -> float: """ Calculate timbre similarity using MFCC cosine similarity. Uses mean MFCC vectors and accounts for temporal evolution. Args: feat1: Features from first sample feat2: Features from second sample Returns: Similarity score 0.0-1.0 """ # Mean MFCC vectors mfcc1_mean = np.mean(feat1.mfccs, axis=1) mfcc2_mean = np.mean(feat2.mfccs, axis=1) # Cosine similarity dot_product = np.dot(mfcc1_mean, mfcc2_mean) norm1 = np.linalg.norm(mfcc1_mean) norm2 = np.linalg.norm(mfcc2_mean) if norm1 == 0 or norm2 == 0: return 0.0 cosine_sim = dot_product / (norm1 * norm2) # Convert from [-1, 1] to [0, 1] similarity = (cosine_sim + 1) / 2 # Also compare spectral centroid (brightness match) centroid_diff = abs(feat1.spectral_centroid - feat2.spectral_centroid) max_centroid = max(feat1.spectral_centroid, feat2.spectral_centroid) if max_centroid > 0: centroid_sim = 1 - (centroid_diff / max_centroid) else: centroid_sim = 1.0 # Weighted combination: 80% MFCC, 20% centroid final_similarity = 0.8 * similarity + 0.2 * centroid_sim return float(np.clip(final_similarity, 0.0, 1.0)) def _calculate_transient_compatibility(self, feat1: AudioFeatures, feat2: AudioFeatures) -> float: """ Calculate transient/attack characteristic compatibility. Compares attack times, sustain levels, and spectral flux patterns. Args: feat1: Features from first sample feat2: Features from second sample Returns: Compatibility score 0.0-1.0 """ # Attack time compatibility attack_diff = abs(feat1.attack_time - feat2.attack_time) max_attack = max(feat1.attack_time, feat2.attack_time, 1.0) attack_compatibility = 1 - (attack_diff / max_attack) # Sustain level compatibility max_sustain = max(feat1.sustain_level, feat2.sustain_level, 0.001) sustain_diff = abs(feat1.sustain_level - feat2.sustain_level) sustain_compatibility = 1 - (sustain_diff / max_sustain) # Spectral flux pattern correlation flux1 = feat1.spectral_flux flux2 = feat2.spectral_flux # Normalize lengths min_len = min(len(flux1), len(flux2)) if min_len > 1: flux1_norm = flux1[:min_len] flux2_norm = flux2[:min_len] # Normalize to unit vectors flux1_norm = flux1_norm / (np.linalg.norm(flux1_norm) + 1e-10) flux2_norm = flux2_norm / (np.linalg.norm(flux2_norm) + 1e-10) flux_corr = np.corrcoef(flux1_norm, flux2_norm)[0, 1] if np.isnan(flux_corr): flux_corr = 0.0 else: flux_corr = 0.5 # Weighted combination # Attack: 40%, Sustain: 30%, Flux correlation: 30% compatibility = ( 0.4 * attack_compatibility + 0.3 * sustain_compatibility + 0.3 * max(0, flux_corr) # Clip negative correlations ) return float(np.clip(compatibility, 0.0, 1.0)) def _calculate_spectral_balance(self, feat1: AudioFeatures, feat2: AudioFeatures) -> float: """ Calculate spectral balance match (low/mid/high ratio comparison). Args: feat1: Features from first sample feat2: Features from second sample Returns: Balance score 0.0-1.0 """ # Energy band ratios bands1 = np.array([feat1.low_energy, feat1.mid_energy, feat1.high_energy]) bands2 = np.array([feat2.low_energy, feat2.mid_energy, feat2.high_energy]) # Cosine similarity of band distributions dot = np.dot(bands1, bands2) norm1 = np.linalg.norm(bands1) norm2 = np.linalg.norm(bands2) if norm1 == 0 or norm2 == 0: return 0.5 balance_sim = dot / (norm1 * norm2) # Also compare rolloff (high-frequency content boundary) rolloff_diff = abs(feat1.spectral_rolloff - feat2.spectral_rolloff) max_rolloff = max(feat1.spectral_rolloff, feat2.spectral_rolloff, 1.0) rolloff_sim = 1 - (rolloff_diff / max_rolloff) # Combined: 70% band balance, 30% rolloff match final_balance = 0.7 * balance_sim + 0.3 * rolloff_sim return float(np.clip(final_balance, 0.0, 1.0)) def _calculate_energy_consistency(self, feat1: AudioFeatures, feat2: AudioFeatures) -> float: """ Calculate energy envelope consistency. Compares RMS energy patterns and overall loudness. Args: feat1: Features from first sample feat2: Features from second sample Returns: Consistency score 0.0-1.0 """ rms1 = feat1.rms_energy rms2 = feat2.rms_energy # Match lengths min_len = min(len(rms1), len(rms2)) if min_len < 2: return 0.5 rms1_norm = rms1[:min_len] rms2_norm = rms2[:min_len] # Normalize max_rms1 = np.max(rms1_norm) + 1e-10 max_rms2 = np.max(rms2_norm) + 1e-10 rms1_norm = rms1_norm / max_rms1 rms2_norm = rms2_norm / max_rms2 # Correlation of energy envelopes corr = np.corrcoef(rms1_norm, rms2_norm)[0, 1] if np.isnan(corr): corr = 0.0 # Mean energy similarity mean1 = np.mean(feat1.rms_energy) mean2 = np.mean(feat2.rms_energy) max_mean = max(mean1, mean2, 0.001) mean_sim = 1 - (abs(mean1 - mean2) / max_mean) # Combined: 60% correlation, 40% mean level consistency = 0.6 * max(0, corr) + 0.4 * mean_sim return float(np.clip(consistency, 0.0, 1.0)) def score_pair(self, sample1_path: str, sample2_path: str, enforce_threshold: bool = True) -> float: """ Calculate coherence score between two samples. Args: sample1_path: Path to first audio file sample2_path: Path to second audio file enforce_threshold: If True, raises CoherenceError if score < 0.90 Returns: Overall coherence score (0.0-1.0) Raises: CoherenceError: If score < MIN_COHERENCE and enforce_threshold=True FileNotFoundError: If audio files not found ValueError: If audio loading fails """ # Load and extract features audio1, sr1 = self._load_audio(sample1_path) audio2, sr2 = self._load_audio(sample2_path) feat1 = self._extract_features(audio1, sr1) feat2 = self._extract_features(audio2, sr2) # Calculate component scores timbre_score = self._calculate_timbre_similarity(feat1, feat2) transient_score = self._calculate_transient_compatibility(feat1, feat2) spectral_score = self._calculate_spectral_balance(feat1, feat2) energy_score = self._calculate_energy_consistency(feat1, feat2) # Calculate weighted overall score overall_score = ( self.WEIGHTS['timbre'] * timbre_score + self.WEIGHTS['transient'] * transient_score + self.WEIGHTS['spectral'] * spectral_score + self.WEIGHTS['energy'] * energy_score ) # Identify weak components weak_components = [] suggestions = [] scores = { 'timbre_similarity': timbre_score, 'transient_compatibility': transient_score, 'spectral_balance': spectral_score, 'energy_consistency': energy_score } for component, score in scores.items(): threshold = self.THRESHOLDS.get(component.replace('_similarity', 'timbre') .replace('_compatibility', 'transient') .replace('_balance', 'spectral') .replace('_consistency', 'energy'), 0.6) if score < threshold: weak_components.append(f"{component}: {score:.3f} (threshold: {threshold:.2f})") # Add specific suggestions if 'timbre' in component: suggestions.append( "Consider samples from the same source/pack for timbral consistency. " "Try layering with a shared reverb bus." ) elif 'transient' in component: suggestions.append( "Adjust transient timing with warp markers or apply transient shaping. " "Samples have different attack characteristics." ) elif 'spectral' in component: suggestions.append( "Use EQ to match frequency profiles. " "Check if samples occupy different frequency ranges." ) elif 'energy' in component: suggestions.append( "Adjust clip gain to match perceived loudness. " "Apply compression for consistent dynamics." ) # Create breakdown self.last_breakdown = ScoreBreakdown( overall_score=overall_score, timbre_similarity=timbre_score, transient_compatibility=transient_score, spectral_balance=spectral_score, energy_consistency=energy_score, is_professional=overall_score >= self.MIN_COHERENCE, weak_components=weak_components, suggestions=list(set(suggestions)) # Remove duplicates ) # Enforce professional threshold if enforce_threshold and overall_score < self.MIN_COHERENCE: raise CoherenceError(overall_score, weak_components, suggestions) return overall_score def score_kit(self, sample_paths: List[str], enforce_threshold: bool = True) -> float: """ Calculate overall kit coherence (average of all pairwise scores). Args: sample_paths: List of audio file paths enforce_threshold: If True, raises CoherenceError if score < 0.90 Returns: Kit coherence score (0.0-1.0) Raises: CoherenceError: If score < MIN_COHERENCE and enforce_threshold=True ValueError: If fewer than 2 samples provided """ if len(sample_paths) < 2: raise ValueError("Need at least 2 samples to calculate kit coherence") # Calculate all pairwise scores scores = [] pair_details = [] for i in range(len(sample_paths)): for j in range(i + 1, len(sample_paths)): try: score = self.score_pair( sample_paths[i], sample_paths[j], enforce_threshold=False # Don't raise until we check all ) scores.append(score) pair_details.append({ 'pair': (Path(sample_paths[i]).name, Path(sample_paths[j]).name), 'score': score }) except Exception as e: print(f"Warning: Could not compare {sample_paths[i]} vs {sample_paths[j]}: {e}") scores.append(0.0) if not scores: raise ValueError("No valid pairwise comparisons could be made") # Average score kit_score = np.mean(scores) # Find worst pairs sorted_pairs = sorted(pair_details, key=lambda x: x['score']) weak_pairs = [p for p in sorted_pairs if p['score'] < 0.75] # Build suggestions suggestions = [] if weak_pairs: worst = weak_pairs[:3] # Top 3 worst suggestions.append( f"{len(weak_pairs)} weak pair(s) detected. " f"Worst: {worst[0]['pair']} = {worst[0]['score']:.3f}" ) suggestions.append( "Consider replacing or processing weak pairs for better cohesion." ) self.last_breakdown = ScoreBreakdown( overall_score=kit_score, timbre_similarity=0.0, # Not meaningful for kit average transient_compatibility=0.0, spectral_balance=0.0, energy_consistency=0.0, is_professional=kit_score >= self.MIN_COHERENCE, weak_components=[f"Weak pair: {p['pair']} ({p['score']:.3f})" for p in weak_pairs[:3]], suggestions=suggestions ) if enforce_threshold and kit_score < self.MIN_COHERENCE: raise CoherenceError(kit_score, self.last_breakdown.weak_components, suggestions) return kit_score def score_section_transition(self, samples_a: List[str], samples_b: List[str], enforce_threshold: bool = True) -> float: """ Calculate coherence of transition between two sections. Compares all samples in section A against all samples in section B to ensure smooth transition. Args: samples_a: List of sample paths in first section samples_b: List of sample paths in second section enforce_threshold: If True, raises CoherenceError if score < 0.90 Returns: Transition coherence score (0.0-1.0) """ if not samples_a or not samples_b: raise ValueError("Both sections must contain at least one sample") # Cross-section comparisons scores = [] for sample_a in samples_a: for sample_b in samples_b: try: score = self.score_pair(sample_a, sample_b, enforce_threshold=False) scores.append(score) except Exception as e: print(f"Warning: Cross-section comparison failed: {e}") if not scores: raise ValueError("No valid cross-section comparisons") transition_score = np.mean(scores) # Analyze worst transitions if scores: min_score = min(scores) weak_count = sum(1 for s in scores if s < 0.75) else: min_score = 0.0 weak_count = 0 suggestions = [] if min_score < 0.70: suggestions.append( f"Poor transition detected (worst pair: {min_score:.3f}). " "Consider using transition FX or crossfade." ) if weak_count > len(scores) * 0.3: suggestions.append( f"{weak_count}/{len(scores)} transitions are weak. " "Sections may be harmonically or sonically incompatible." ) self.last_breakdown = ScoreBreakdown( overall_score=transition_score, timbre_similarity=0.0, transient_compatibility=0.0, spectral_balance=0.0, energy_consistency=0.0, is_professional=transition_score >= self.MIN_COHERENCE, weak_components=[f"Weak transitions: {weak_count}"] if weak_count > 0 else [], suggestions=suggestions if suggestions else ["Transition coherence is acceptable"] ) if enforce_threshold and transition_score < self.MIN_COHERENCE: raise CoherenceError(transition_score, self.last_breakdown.weak_components, suggestions) return transition_score def get_score_breakdown(self) -> Dict: """ Get detailed breakdown of the last coherence calculation. Returns: Dictionary with component scores and analysis """ if self.last_breakdown is None: return { 'error': 'No coherence calculation performed yet. ' 'Call score_pair(), score_kit(), or score_section_transition() first.' } return self.last_breakdown.to_dict() @staticmethod def is_professional_grade(score: float) -> bool: """ Check if a coherence score meets professional standards. Args: score: Coherence score to evaluate Returns: True if score >= MIN_COHERENCE (0.90) """ return score >= CoherenceScorer.MIN_COHERENCE def batch_score(self, sample_paths: List[str], mode: str = 'pairwise') -> Dict: """ Batch coherence analysis for multiple samples. Args: sample_paths: List of sample paths to analyze mode: 'pairwise' for all pairs, 'kit' for overall coherence Returns: Dictionary with scores and analysis """ if mode == 'pairwise': results = { 'mode': 'pairwise', 'pairs': [], 'min_score': 1.0, 'max_score': 0.0, 'avg_score': 0.0 } scores = [] for i in range(len(sample_paths)): for j in range(i + 1, len(sample_paths)): try: score = self.score_pair( sample_paths[i], sample_paths[j], enforce_threshold=False ) scores.append(score) results['pairs'].append({ 'sample_a': Path(sample_paths[i]).name, 'sample_b': Path(sample_paths[j]).name, 'score': round(score, 4), 'professional': score >= self.MIN_COHERENCE }) except Exception as e: results['pairs'].append({ 'sample_a': Path(sample_paths[i]).name, 'sample_b': Path(sample_paths[j]).name, 'error': str(e) }) if scores: results['min_score'] = round(min(scores), 4) results['max_score'] = round(max(scores), 4) results['avg_score'] = round(np.mean(scores), 4) return results elif mode == 'kit': score = self.score_kit(sample_paths, enforce_threshold=False) return { 'mode': 'kit', 'kit_score': round(score, 4), 'professional': score >= self.MIN_COHERENCE, 'sample_count': len(sample_paths), 'breakdown': self.get_score_breakdown() } else: raise ValueError(f"Unknown mode: {mode}. Use 'pairwise' or 'kit'") # Convenience functions for quick access def check_coherence(sample1: str, sample2: str) -> Dict: """ Quick coherence check between two samples. Args: sample1: Path to first audio file sample2: Path to second audio file Returns: Dictionary with score and breakdown """ scorer = CoherenceScorer() try: score = scorer.score_pair(sample1, sample2, enforce_threshold=False) return { 'coherent': score >= CoherenceScorer.MIN_COHERENCE, 'score': round(score, 4), 'details': scorer.get_score_breakdown() } except Exception as e: return { 'coherent': False, 'error': str(e) } def check_kit_coherence(sample_paths: List[str]) -> Dict: """ Quick kit coherence check. Args: sample_paths: List of sample paths Returns: Dictionary with kit score and analysis """ scorer = CoherenceScorer() try: score = scorer.score_kit(sample_paths, enforce_threshold=False) return { 'coherent': score >= CoherenceScorer.MIN_COHERENCE, 'score': round(score, 4), 'details': scorer.get_score_breakdown() } except Exception as e: return { 'coherent': False, 'error': str(e) }