""" RationaleLogger - Tracks all AI decisions for auditability and analysis. This module provides comprehensive logging of all AI-driven decisions in the production pipeline, including sample selection, kit assembly, variations, and mixing choices. All entries are stored in SQLite for queryable analysis. """ import sqlite3 import json import os import uuid from datetime import datetime from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass, asdict from pathlib import Path @dataclass class SampleSelectionRationale: """Rationale for a sample selection decision.""" decision: str reasoning: List[str] rejected: List[Dict[str, str]] confidence: float role: str selected_sample: str similarity_scores: Dict[str, float] def to_dict(self) -> Dict[str, Any]: return asdict(self) @dataclass class KitAssemblyRationale: """Rationale for a drum kit assembly decision.""" kit_samples: Dict[str, str] # role -> sample path coherence_score: float weak_links: List[Dict[str, Any]] reasoning: List[str] def to_dict(self) -> Dict[str, Any]: return asdict(self) @dataclass class SectionVariationRationale: """Rationale for a section variation decision.""" section_name: str base_kit: Dict[str, str] evolved_kit: Dict[str, str] coherence_with_base: float changes: List[str] reasoning: List[str] def to_dict(self) -> Dict[str, Any]: return asdict(self) @dataclass class MixDecisionRationale: """Rationale for a mixing decision.""" track_index: int track_name: str effect: str parameters: Dict[str, Any] reasoning: List[str] before_state: Optional[Dict[str, Any]] after_state: Optional[Dict[str, Any]] def to_dict(self) -> Dict[str, Any]: return asdict(self) class RationaleLogger: """ Logs and queries AI decisions for auditability. Provides a complete audit trail of all AI-driven decisions including: - Sample selection with similarity scores and alternatives - Kit assembly with coherence analysis - Section variations with change tracking - Mix decisions with before/after states All data is stored in SQLite for efficient querying and analysis. """ def __init__(self, db_path: Optional[str] = None): """ Initialize the RationaleLogger. Args: db_path: Path to SQLite database. If None, uses default location. """ if db_path is None: # Store in the same directory as the engine files base_dir = Path(__file__).parent.parent db_path = str(base_dir / "data" / "rationale.db") self.db_path = db_path self._ensure_data_dir() self._init_database() self._current_session_id: Optional[str] = None def _ensure_data_dir(self) -> None: """Create data directory if it doesn't exist.""" data_dir = Path(self.db_path).parent data_dir.mkdir(parents=True, exist_ok=True) def _init_database(self) -> None: """Initialize the SQLite database with required tables.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Create rationale_entries table cursor.execute(""" CREATE TABLE IF NOT EXISTS rationale_entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, session_id TEXT, track_name TEXT, decision_type TEXT, decision_description TEXT, inputs TEXT, outputs TEXT, scores TEXT, rationale TEXT, alternatives_considered TEXT ) """) # Create index for efficient queries cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_session ON rationale_entries(session_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_decision_type ON rationale_entries(decision_type) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_timestamp ON rationale_entries(timestamp) """) # Create stats tracking table cursor.execute(""" CREATE TABLE IF NOT EXISTS decision_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, decision_type TEXT UNIQUE, count INTEGER DEFAULT 0, avg_confidence REAL DEFAULT 0.0, last_updated DATETIME DEFAULT CURRENT_TIMESTAMP ) """) conn.commit() def start_session(self, track_name: Optional[str] = None) -> str: """ Start a new logging session. Args: track_name: Name of the track/project being worked on Returns: The generated session ID """ self._current_session_id = str(uuid.uuid4())[:8] self._current_track_name = track_name or "untitled" return self._current_session_id def get_session_id(self) -> str: """Get current session ID, creating one if needed.""" if self._current_session_id is None: self.start_session() return self._current_session_id def _insert_entry( self, decision_type: str, description: str, inputs: Dict[str, Any], outputs: Dict[str, Any], scores: Dict[str, Any], rationale: Dict[str, Any], alternatives: List[Dict[str, Any]] ) -> int: """Insert a rationale entry into the database.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO rationale_entries ( session_id, track_name, decision_type, decision_description, inputs, outputs, scores, rationale, alternatives_considered ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( self.get_session_id(), getattr(self, '_current_track_name', 'untitled'), decision_type, description, json.dumps(inputs, default=str), json.dumps(outputs, default=str), json.dumps(scores, default=str), json.dumps(rationale, default=str), json.dumps(alternatives, default=str) )) entry_id = cursor.lastrowid # Update stats self._update_stats(conn, cursor, decision_type, rationale.get('confidence', 0.5)) conn.commit() return entry_id def _update_stats( self, conn: sqlite3.Connection, cursor: sqlite3.Cursor, decision_type: str, confidence: float ) -> None: """Update decision statistics.""" cursor.execute(""" INSERT INTO decision_stats (decision_type, count, avg_confidence) VALUES (?, 1, ?) ON CONFLICT(decision_type) DO UPDATE SET count = count + 1, avg_confidence = (avg_confidence * count + ?) / (count + 1), last_updated = CURRENT_TIMESTAMP """, (decision_type, confidence, confidence)) def log_sample_selection( self, role: str, selected_sample: str, alternatives: List[str], similarity_scores: Dict[str, float], rationale: str, reasoning: Optional[List[str]] = None, rejected_details: Optional[List[Dict[str, str]]] = None, confidence: float = 0.0 ) -> int: """ Log a sample selection decision. Args: role: Sample role (kick, snare, hihat, etc.) selected_sample: Path or name of selected sample alternatives: List of alternative samples considered similarity_scores: Dict of similarity metrics rationale: Human-readable explanation reasoning: List of detailed reasoning points rejected_details: List of rejected options with reasons confidence: Confidence score (0.0-1.0) Returns: Entry ID """ inputs = { 'role': role, 'candidates': alternatives + [selected_sample], 'criteria': similarity_scores.get('criteria', 'similarity') } outputs = { 'selected': selected_sample, 'alternatives_count': len(alternatives) } scores = { 'confidence': confidence, 'similarity_to_reference': similarity_scores.get('reference_similarity', 0.0), 'genre_match': similarity_scores.get('genre_match', 0.0), 'energy_match': similarity_scores.get('energy_match', 0.0) } rationale_dict = { 'decision': f"Selected {os.path.basename(selected_sample)} as {role}", 'reasoning': reasoning or [rationale], 'rejected': rejected_details or [], 'confidence': confidence } alternatives_list = [ {'sample': alt, 'reason': 'Lower similarity score'} for alt in alternatives ] if rejected_details: alternatives_list.extend(rejected_details) return self._insert_entry( decision_type='sample_selection', description=f"{role}: {os.path.basename(selected_sample)}", inputs=inputs, outputs=outputs, scores=scores, rationale=rationale_dict, alternatives=alternatives_list ) def log_kit_assembly( self, kit_samples: Dict[str, str], coherence_score: float, weak_links: List[Dict[str, Any]], reasoning: Optional[List[str]] = None ) -> int: """ Log a drum kit assembly decision. Args: kit_samples: Dict mapping roles to sample paths coherence_score: Overall kit coherence (0.0-1.0) weak_links: List of weak coherence points with details reasoning: List of reasoning points Returns: Entry ID """ inputs = { 'available_samples': len(kit_samples), 'target_coherence': 0.8 } outputs = { 'kit_configuration': {role: os.path.basename(path) for role, path in kit_samples.items()}, 'size': len(kit_samples) } scores = { 'coherence': coherence_score, 'weak_link_count': len(weak_links), 'confidence': coherence_score # Use coherence as confidence } rationale_dict = { 'decision': f"Assembled {len(kit_samples)}-piece drum kit", 'reasoning': reasoning or [f"Kit coherence: {coherence_score:.2f}"], 'rejected': weak_links, 'confidence': coherence_score } return self._insert_entry( decision_type='kit_assembly', description=f"Drum kit with {len(kit_samples)} samples", inputs=inputs, outputs=outputs, scores=scores, rationale=rationale_dict, alternatives=weak_links ) def log_section_variation( self, section_name: str, base_kit: Dict[str, str], evolved_kit: Dict[str, str], coherence_with_base: float, changes: Optional[List[str]] = None, reasoning: Optional[List[str]] = None ) -> int: """ Log a section variation decision. Args: section_name: Name of section (verse, chorus, bridge, etc.) base_kit: Original kit configuration evolved_kit: Modified kit configuration coherence_with_base: How well variation matches base changes: List of specific changes made reasoning: List of reasoning points Returns: Entry ID """ # Calculate differences changed_samples = [] for role in set(base_kit.keys()) | set(evolved_kit.keys()): if base_kit.get(role) != evolved_kit.get(role): changed_samples.append(role) inputs = { 'section': section_name, 'base_kit': {k: os.path.basename(v) for k, v in base_kit.items()} } outputs = { 'evolved_kit': {k: os.path.basename(v) for k, v in evolved_kit.items()}, 'changed_roles': changed_samples, 'unchanged_roles': list(set(base_kit.keys()) - set(changed_samples)) } scores = { 'coherence_with_base': coherence_with_base, 'change_ratio': len(changed_samples) / max(len(base_kit), 1), 'confidence': coherence_with_base } rationale_dict = { 'decision': f"Created {section_name} variation from base kit", 'reasoning': reasoning or [f"Coherence with base: {coherence_with_base:.2f}"], 'rejected': [], 'confidence': coherence_with_base } return self._insert_entry( decision_type='variation', description=f"{section_name} kit variation", inputs=inputs, outputs=outputs, scores=scores, rationale=rationale_dict, alternatives=[] ) def log_mix_decision( self, track_index: int, effect: str, parameters: Dict[str, Any], rationale: str, track_name: Optional[str] = None, reasoning: Optional[List[str]] = None, before_state: Optional[Dict[str, Any]] = None, after_state: Optional[Dict[str, Any]] = None, alternatives: Optional[List[Dict[str, Any]]] = None ) -> int: """ Log a mixing decision. Args: track_index: Index of affected track effect: Effect/processor name parameters: Effect parameters applied rationale: Human-readable explanation track_name: Name of track reasoning: List of detailed reasoning points before_state: State before the change after_state: State after the change alternatives: Alternative approaches considered Returns: Entry ID """ inputs = { 'track_index': track_index, 'track_name': track_name or f"Track {track_index}", 'before_state': before_state or {} } outputs = { 'effect': effect, 'parameters': parameters, 'after_state': after_state or {} } scores = { 'impact_score': parameters.get('impact', 0.5), 'confidence': 0.8 # Mix decisions typically have good confidence } rationale_dict = { 'decision': f"Applied {effect} to {track_name or f'track {track_index}'}", 'reasoning': reasoning or [rationale], 'rejected': alternatives or [], 'confidence': 0.8 } return self._insert_entry( decision_type='mix', description=f"{effect} on {track_name or f'track {track_index}'}", inputs=inputs, outputs=outputs, scores=scores, rationale=rationale_dict, alternatives=alternatives or [] ) def get_session_rationale(self, session_id: str) -> List[Dict[str, Any]]: """ Retrieve all decisions for a session. Args: session_id: Session ID to query Returns: List of rationale entries """ with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT * FROM rationale_entries WHERE session_id = ? ORDER BY timestamp """, (session_id,)) rows = cursor.fetchall() return [dict(row) for row in rows] def get_decision_stats(self) -> Dict[str, Any]: """ Get analytics on all decisions. Returns: Dict with statistics including counts, averages, trends """ with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Get per-type stats cursor.execute(""" SELECT decision_type, count, avg_confidence, last_updated FROM decision_stats ORDER BY count DESC """) type_stats = {} for row in cursor.fetchall(): type_stats[row[0]] = { 'count': row[1], 'avg_confidence': row[2], 'last_updated': row[3] } # Get overall stats cursor.execute(""" SELECT COUNT(*) as total_decisions, COUNT(DISTINCT session_id) as total_sessions, AVG( CASE WHEN json_extract(scores, '$.confidence') IS NOT NULL THEN json_extract(scores, '$.confidence') ELSE 0.5 END ) as overall_confidence FROM rationale_entries """) row = cursor.fetchone() overall = { 'total_decisions': row[0] or 0, 'total_sessions': row[1] or 0, 'overall_confidence': row[2] or 0.0 } # Get recent activity (last 24 hours) cursor.execute(""" SELECT COUNT(*) FROM rationale_entries WHERE timestamp > datetime('now', '-1 day') """) recent_count = cursor.fetchone()[0] return { 'by_type': type_stats, 'overall': overall, 'recent_24h': recent_count } def find_similar_decisions( self, decision_type: str, min_confidence: float = 0.7, limit: int = 10 ) -> List[Dict[str, Any]]: """ Find similar past decisions with high confidence. Args: decision_type: Type of decision to query min_confidence: Minimum confidence threshold limit: Maximum results to return Returns: List of similar decisions """ with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT * FROM rationale_entries WHERE decision_type = ? AND json_extract(scores, '$.confidence') >= ? ORDER BY json_extract(scores, '$.confidence') DESC, timestamp DESC LIMIT ? """, (decision_type, min_confidence, limit)) rows = cursor.fetchall() return [dict(row) for row in rows] def get_most_used_samples(self, role: Optional[str] = None, limit: int = 20) -> List[Dict[str, Any]]: """ Track which samples are used most frequently. Args: role: Filter by specific role (optional) limit: Maximum results to return Returns: List of samples with usage counts """ with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() if role: cursor.execute(""" SELECT json_extract(outputs, '$.selected') as sample, json_extract(inputs, '$.role') as sample_role, COUNT(*) as usage_count, AVG(json_extract(scores, '$.confidence')) as avg_confidence FROM rationale_entries WHERE decision_type = 'sample_selection' AND json_extract(inputs, '$.role') = ? GROUP BY json_extract(outputs, '$.selected') ORDER BY usage_count DESC LIMIT ? """, (role, limit)) else: cursor.execute(""" SELECT json_extract(outputs, '$.selected') as sample, json_extract(inputs, '$.role') as sample_role, COUNT(*) as usage_count, AVG(json_extract(scores, '$.confidence')) as avg_confidence FROM rationale_entries WHERE decision_type = 'sample_selection' GROUP BY json_extract(outputs, '$.selected') ORDER BY usage_count DESC LIMIT ? """, (limit,)) results = [] for row in cursor.fetchall(): results.append({ 'sample': row[0], 'role': row[1], 'usage_count': row[2], 'avg_confidence': row[3] }) return results def analyze_coherence_trends(self) -> Dict[str, Any]: """ Analyze coherence trends over time. Returns: Dict with trend analysis """ with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Get coherence scores over time by decision type cursor.execute(""" SELECT decision_type, date(timestamp) as date, AVG(json_extract(scores, '$.coherence')) as avg_coherence, COUNT(*) as count FROM rationale_entries WHERE json_extract(scores, '$.coherence') IS NOT NULL GROUP BY decision_type, date(timestamp) ORDER BY date """) trends = {} for row in cursor.fetchall(): dec_type = row[0] if dec_type not in trends: trends[dec_type] = [] trends[dec_type].append({ 'date': row[1], 'avg_coherence': row[2], 'count': row[3] }) # Calculate overall trend cursor.execute(""" SELECT AVG(json_extract(scores, '$.coherence')) as overall_avg, MIN(json_extract(scores, '$.coherence')) as min_coherence, MAX(json_extract(scores, '$.coherence')) as max_coherence FROM rationale_entries WHERE json_extract(scores, '$.coherence') IS NOT NULL """) row = cursor.fetchone() return { 'trends_by_type': trends, 'overall': { 'average': row[0] or 0.0, 'minimum': row[1] or 0.0, 'maximum': row[2] or 0.0 } } def export_session_report(self, session_id: str, output_path: Optional[str] = None) -> str: """ Export a detailed session report. Args: session_id: Session to export output_path: Output file path (optional) Returns: Path to exported report """ entries = self.get_session_rationale(session_id) if not entries: return "" # Generate report report = { 'session_id': session_id, 'generated_at': datetime.now().isoformat(), 'total_decisions': len(entries), 'decisions': [] } for entry in entries: report['decisions'].append({ 'timestamp': entry['timestamp'], 'type': entry['decision_type'], 'description': entry['decision_description'], 'rationale': json.loads(entry['rationale']), 'scores': json.loads(entry['scores']) }) # Determine output path if output_path is None: base_dir = Path(self.db_path).parent output_path = str(base_dir / f"session_report_{session_id}.json") with open(output_path, 'w') as f: json.dump(report, f, indent=2) return output_path def clear_session(self, session_id: str) -> int: """ Clear all entries for a session. Args: session_id: Session to clear Returns: Number of entries deleted """ with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" DELETE FROM rationale_entries WHERE session_id = ? """, (session_id,)) deleted = cursor.rowcount conn.commit() return deleted def get_decision_by_id(self, entry_id: int) -> Optional[Dict[str, Any]]: """ Retrieve a specific decision by ID. Args: entry_id: Entry ID to retrieve Returns: Decision entry or None """ with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT * FROM rationale_entries WHERE id = ? """, (entry_id,)) row = cursor.fetchone() return dict(row) if row else None # Singleton instance for module-level access _default_logger: Optional[RationaleLogger] = None def get_logger(db_path: Optional[str] = None) -> RationaleLogger: """ Get or create the default RationaleLogger instance. Args: db_path: Path to database (optional) Returns: RationaleLogger instance """ global _default_logger if _default_logger is None: _default_logger = RationaleLogger(db_path) return _default_logger def reset_logger() -> None: """Reset the singleton logger (useful for testing).""" global _default_logger _default_logger = None