- Add _cmd_create_arrangement_audio_pattern with 5-method fallback chain - Method 1: track.insert_arrangement_clip() [Live 12+] - Method 2: track.create_audio_clip() [Live 11+] - Method 3: arrangement_clips.add_new_clip() [Live 12+] - Method 4: Session->duplicate_clip_to_arrangement [Legacy] - Method 5: Session->Recording [Universal] - Add _cmd_duplicate_clip_to_arrangement for session-to-arrangement workflow - Update skills documentation - Verified: 3 clips created at positions [0, 4, 8] in Arrangement View Closes: Audio injection in Arrangement View
620 lines
20 KiB
Python
620 lines
20 KiB
Python
"""
|
|
SampleMetadataStore - SQLite database for audio sample metadata.
|
|
|
|
Stores analyzed audio features for the sample library to enable
|
|
fast similarity search and intelligent sample selection.
|
|
"""
|
|
|
|
import sqlite3
|
|
import logging
|
|
import json
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any, Tuple
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class SampleFeatures:
|
|
"""Dataclass containing all audio features for a sample."""
|
|
path: str
|
|
bpm: Optional[float] = None
|
|
key: Optional[str] = None
|
|
duration: Optional[float] = None
|
|
rms: Optional[float] = None
|
|
spectral_centroid: Optional[float] = None
|
|
spectral_rolloff: Optional[float] = None
|
|
zero_crossing_rate: Optional[float] = None
|
|
# MFCC coefficients 1-13
|
|
mfcc_1: Optional[float] = None
|
|
mfcc_2: Optional[float] = None
|
|
mfcc_3: Optional[float] = None
|
|
mfcc_4: Optional[float] = None
|
|
mfcc_5: Optional[float] = None
|
|
mfcc_6: Optional[float] = None
|
|
mfcc_7: Optional[float] = None
|
|
mfcc_8: Optional[float] = None
|
|
mfcc_9: Optional[float] = None
|
|
mfcc_10: Optional[float] = None
|
|
mfcc_11: Optional[float] = None
|
|
mfcc_12: Optional[float] = None
|
|
mfcc_13: Optional[float] = None
|
|
analyzed_at: Optional[str] = None
|
|
categories: Optional[List[str]] = None
|
|
|
|
def to_db_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary suitable for database insertion."""
|
|
data = asdict(self)
|
|
# Remove categories from samples table data (stored separately)
|
|
data.pop('categories', None)
|
|
# Handle None values for database
|
|
for key, value in data.items():
|
|
if value is None and key != 'path':
|
|
data[key] = None
|
|
return data
|
|
|
|
@classmethod
|
|
def from_db_row(cls, row: sqlite3.Row, categories: Optional[List[str]] = None) -> 'SampleFeatures':
|
|
"""Create SampleFeatures from a database row."""
|
|
features = cls(
|
|
path=row['path'],
|
|
bpm=row['bpm'],
|
|
key=row['key'],
|
|
duration=row['duration'],
|
|
rms=row['rms'],
|
|
spectral_centroid=row['spectral_centroid'],
|
|
spectral_rolloff=row['spectral_rolloff'],
|
|
zero_crossing_rate=row['zero_crossing_rate'],
|
|
mfcc_1=row['mfcc_1'],
|
|
mfcc_2=row['mfcc_2'],
|
|
mfcc_3=row['mfcc_3'],
|
|
mfcc_4=row['mfcc_4'],
|
|
mfcc_5=row['mfcc_5'],
|
|
mfcc_6=row['mfcc_6'],
|
|
mfcc_7=row['mfcc_7'],
|
|
mfcc_8=row['mfcc_8'],
|
|
mfcc_9=row['mfcc_9'],
|
|
mfcc_10=row['mfcc_10'],
|
|
mfcc_11=row['mfcc_11'],
|
|
mfcc_12=row['mfcc_12'],
|
|
mfcc_13=row['mfcc_13'],
|
|
analyzed_at=row['analyzed_at'],
|
|
categories=categories or []
|
|
)
|
|
return features
|
|
|
|
|
|
class SampleMetadataStore:
|
|
"""
|
|
SQLite-based store for sample metadata and audio features.
|
|
|
|
Manages three tables:
|
|
- samples: Core audio features for each sample
|
|
- sample_categories: Many-to-many relationship for categories
|
|
- analysis_metadata: Store-wide statistics and versioning
|
|
"""
|
|
|
|
def __init__(self, db_path: str = "sample_metadata.db"):
|
|
"""
|
|
Initialize the metadata store.
|
|
|
|
Args:
|
|
db_path: Path to SQLite database file
|
|
"""
|
|
self.db_path = Path(db_path)
|
|
self._connection: Optional[sqlite3.Connection] = None
|
|
|
|
def _get_connection(self) -> sqlite3.Connection:
|
|
"""Get or create database connection."""
|
|
if self._connection is None:
|
|
self._connection = sqlite3.connect(str(self.db_path))
|
|
self._connection.row_factory = sqlite3.Row
|
|
self._connection.execute("PRAGMA foreign_keys = ON")
|
|
return self._connection
|
|
|
|
def close(self):
|
|
"""Close database connection."""
|
|
if self._connection:
|
|
self._connection.close()
|
|
self._connection = None
|
|
|
|
def init_database(self) -> bool:
|
|
"""
|
|
Initialize database schema. Creates tables if they don't exist.
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
conn = self._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Main samples table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS samples (
|
|
path TEXT PRIMARY KEY,
|
|
bpm REAL,
|
|
key TEXT,
|
|
duration REAL,
|
|
rms REAL,
|
|
spectral_centroid REAL,
|
|
spectral_rolloff REAL,
|
|
zero_crossing_rate REAL,
|
|
mfcc_1 REAL,
|
|
mfcc_2 REAL,
|
|
mfcc_3 REAL,
|
|
mfcc_4 REAL,
|
|
mfcc_5 REAL,
|
|
mfcc_6 REAL,
|
|
mfcc_7 REAL,
|
|
mfcc_8 REAL,
|
|
mfcc_9 REAL,
|
|
mfcc_10 REAL,
|
|
mfcc_11 REAL,
|
|
mfcc_12 REAL,
|
|
mfcc_13 REAL,
|
|
analyzed_at TEXT
|
|
)
|
|
""")
|
|
|
|
# Index on key for fast key-based queries
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_samples_key ON samples(key)
|
|
""")
|
|
|
|
# Index on bpm for fast BPM-based queries
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_samples_bpm ON samples(bpm)
|
|
""")
|
|
|
|
# Sample categories table (many-to-many)
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS sample_categories (
|
|
path TEXT NOT NULL,
|
|
category TEXT NOT NULL,
|
|
PRIMARY KEY (path, category),
|
|
FOREIGN KEY (path) REFERENCES samples(path) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
# Index on category for fast category-based queries
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_categories_category ON sample_categories(category)
|
|
""")
|
|
|
|
# Analysis metadata table
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS analysis_metadata (
|
|
id INTEGER PRIMARY KEY CHECK (id = 1),
|
|
version INTEGER DEFAULT 1,
|
|
total_samples INTEGER DEFAULT 0,
|
|
last_updated TEXT
|
|
)
|
|
""")
|
|
|
|
# Initialize metadata row if not exists
|
|
cursor.execute("""
|
|
INSERT OR IGNORE INTO analysis_metadata (id, version, total_samples, last_updated)
|
|
VALUES (1, 1, 0, ?)
|
|
""", (datetime.now().isoformat(),))
|
|
|
|
conn.commit()
|
|
logger.info(f"Database initialized at {self.db_path}")
|
|
return True
|
|
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Failed to initialize database: {e}")
|
|
return False
|
|
|
|
def get_sample_features(self, sample_path: str) -> Optional[SampleFeatures]:
|
|
"""
|
|
Get features for a specific sample.
|
|
|
|
Args:
|
|
sample_path: Path to the sample file
|
|
|
|
Returns:
|
|
SampleFeatures object or None if not found
|
|
"""
|
|
try:
|
|
conn = self._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Get sample features
|
|
cursor.execute(
|
|
"SELECT * FROM samples WHERE path = ?",
|
|
(sample_path,)
|
|
)
|
|
row = cursor.fetchone()
|
|
|
|
if row is None:
|
|
return None
|
|
|
|
# Get categories
|
|
cursor.execute(
|
|
"SELECT category FROM sample_categories WHERE path = ?",
|
|
(sample_path,)
|
|
)
|
|
categories = [r['category'] for r in cursor.fetchall()]
|
|
|
|
return SampleFeatures.from_db_row(row, categories)
|
|
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Error retrieving features for {sample_path}: {e}")
|
|
return None
|
|
|
|
def save_sample_features(self, sample_path: str, features: SampleFeatures) -> bool:
|
|
"""
|
|
Save or update features for a sample.
|
|
|
|
Args:
|
|
sample_path: Path to the sample file
|
|
features: SampleFeatures object with all audio features
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
conn = self._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Prepare data for samples table
|
|
data = features.to_db_dict()
|
|
data['path'] = sample_path
|
|
data['analyzed_at'] = datetime.now().isoformat()
|
|
|
|
# Insert or update sample
|
|
cursor.execute("""
|
|
INSERT INTO samples VALUES (
|
|
:path, :bpm, :key, :duration, :rms, :spectral_centroid,
|
|
:spectral_rolloff, :zero_crossing_rate,
|
|
:mfcc_1, :mfcc_2, :mfcc_3, :mfcc_4, :mfcc_5, :mfcc_6,
|
|
:mfcc_7, :mfcc_8, :mfcc_9, :mfcc_10, :mfcc_11, :mfcc_12, :mfcc_13,
|
|
:analyzed_at
|
|
)
|
|
ON CONFLICT(path) DO UPDATE SET
|
|
bpm = excluded.bpm,
|
|
key = excluded.key,
|
|
duration = excluded.duration,
|
|
rms = excluded.rms,
|
|
spectral_centroid = excluded.spectral_centroid,
|
|
spectral_rolloff = excluded.spectral_rolloff,
|
|
zero_crossing_rate = excluded.zero_crossing_rate,
|
|
mfcc_1 = excluded.mfcc_1,
|
|
mfcc_2 = excluded.mfcc_2,
|
|
mfcc_3 = excluded.mfcc_3,
|
|
mfcc_4 = excluded.mfcc_4,
|
|
mfcc_5 = excluded.mfcc_5,
|
|
mfcc_6 = excluded.mfcc_6,
|
|
mfcc_7 = excluded.mfcc_7,
|
|
mfcc_8 = excluded.mfcc_8,
|
|
mfcc_9 = excluded.mfcc_9,
|
|
mfcc_10 = excluded.mfcc_10,
|
|
mfcc_11 = excluded.mfcc_11,
|
|
mfcc_12 = excluded.mfcc_12,
|
|
mfcc_13 = excluded.mfcc_13,
|
|
analyzed_at = excluded.analyzed_at
|
|
""", data)
|
|
|
|
# Handle categories if present
|
|
if features.categories:
|
|
# Remove existing categories
|
|
cursor.execute(
|
|
"DELETE FROM sample_categories WHERE path = ?",
|
|
(sample_path,)
|
|
)
|
|
# Insert new categories
|
|
for category in features.categories:
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO sample_categories (path, category) VALUES (?, ?)",
|
|
(sample_path, category)
|
|
)
|
|
|
|
# Update metadata stats
|
|
cursor.execute(
|
|
"UPDATE analysis_metadata SET total_samples = (SELECT COUNT(*) FROM samples), last_updated = ? WHERE id = 1",
|
|
(datetime.now().isoformat(),)
|
|
)
|
|
|
|
conn.commit()
|
|
logger.debug(f"Saved features for {sample_path}")
|
|
return True
|
|
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Error saving features for {sample_path}: {e}")
|
|
return False
|
|
|
|
def get_samples_by_category(self, category: str) -> List[str]:
|
|
"""
|
|
Get all sample paths for a specific category.
|
|
|
|
Args:
|
|
category: Category name (e.g., 'kick', 'snare', 'bass')
|
|
|
|
Returns:
|
|
List of sample paths
|
|
"""
|
|
try:
|
|
conn = self._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(
|
|
"SELECT path FROM sample_categories WHERE category = ?",
|
|
(category,)
|
|
)
|
|
|
|
return [row['path'] for row in cursor.fetchall()]
|
|
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Error retrieving samples for category {category}: {e}")
|
|
return []
|
|
|
|
def get_all_samples(self, limit: Optional[int] = None) -> List[SampleFeatures]:
|
|
"""
|
|
Get all samples with their features.
|
|
|
|
Args:
|
|
limit: Optional limit on number of results
|
|
|
|
Returns:
|
|
List of SampleFeatures objects
|
|
"""
|
|
try:
|
|
conn = self._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
query = "SELECT * FROM samples"
|
|
if limit:
|
|
query += f" LIMIT {limit}"
|
|
|
|
cursor.execute(query)
|
|
rows = cursor.fetchall()
|
|
|
|
# Get categories for all samples
|
|
result = []
|
|
for row in rows:
|
|
path = row['path']
|
|
cursor.execute(
|
|
"SELECT category FROM sample_categories WHERE path = ?",
|
|
(path,)
|
|
)
|
|
categories = [r['category'] for r in cursor.fetchall()]
|
|
result.append(SampleFeatures.from_db_row(row, categories))
|
|
|
|
return result
|
|
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Error retrieving all samples: {e}")
|
|
return []
|
|
|
|
def sample_exists(self, sample_path: str) -> bool:
|
|
"""
|
|
Check if a sample has been analyzed and exists in database.
|
|
|
|
Args:
|
|
sample_path: Path to the sample file
|
|
|
|
Returns:
|
|
True if sample exists in database
|
|
"""
|
|
try:
|
|
conn = self._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute(
|
|
"SELECT 1 FROM samples WHERE path = ?",
|
|
(sample_path,)
|
|
)
|
|
return cursor.fetchone() is not None
|
|
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Error checking existence of {sample_path}: {e}")
|
|
return False
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Get database statistics including count by category.
|
|
|
|
Returns:
|
|
Dictionary with stats: total_samples, version, last_updated, categories
|
|
"""
|
|
try:
|
|
conn = self._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Get metadata
|
|
cursor.execute("SELECT * FROM analysis_metadata WHERE id = 1")
|
|
metadata_row = cursor.fetchone()
|
|
|
|
# Get count by category
|
|
cursor.execute("""
|
|
SELECT category, COUNT(*) as count
|
|
FROM sample_categories
|
|
GROUP BY category
|
|
""")
|
|
categories = {row['category']: row['count'] for row in cursor.fetchall()}
|
|
|
|
# Get total (more accurate than metadata)
|
|
cursor.execute("SELECT COUNT(*) as total FROM samples")
|
|
total = cursor.fetchone()['total']
|
|
|
|
if metadata_row:
|
|
return {
|
|
'total_samples': total,
|
|
'version': metadata_row['version'],
|
|
'last_updated': metadata_row['last_updated'],
|
|
'categories': categories
|
|
}
|
|
else:
|
|
return {
|
|
'total_samples': total,
|
|
'version': 1,
|
|
'last_updated': None,
|
|
'categories': categories
|
|
}
|
|
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Error retrieving stats: {e}")
|
|
return {
|
|
'total_samples': 0,
|
|
'version': 1,
|
|
'last_updated': None,
|
|
'categories': {}
|
|
}
|
|
|
|
def delete_sample(self, sample_path: str) -> bool:
|
|
"""
|
|
Delete a sample and its categories from the database.
|
|
|
|
Args:
|
|
sample_path: Path to the sample file
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
conn = self._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DELETE FROM samples WHERE path = ?", (sample_path,))
|
|
|
|
# Update metadata stats
|
|
cursor.execute(
|
|
"UPDATE analysis_metadata SET total_samples = (SELECT COUNT(*) FROM samples), last_updated = ? WHERE id = 1",
|
|
(datetime.now().isoformat(),)
|
|
)
|
|
|
|
conn.commit()
|
|
logger.debug(f"Deleted sample {sample_path}")
|
|
return True
|
|
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Error deleting sample {sample_path}: {e}")
|
|
return False
|
|
|
|
def search_samples(
|
|
self,
|
|
category: Optional[str] = None,
|
|
key: Optional[str] = None,
|
|
bpm_min: Optional[float] = None,
|
|
bpm_max: Optional[float] = None,
|
|
limit: int = 50
|
|
) -> List[SampleFeatures]:
|
|
"""
|
|
Search samples with optional filters.
|
|
|
|
Args:
|
|
category: Filter by category
|
|
key: Filter by musical key
|
|
bpm_min: Minimum BPM
|
|
bpm_max: Maximum BPM
|
|
limit: Maximum results to return
|
|
|
|
Returns:
|
|
List of matching SampleFeatures
|
|
"""
|
|
try:
|
|
conn = self._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
conditions = []
|
|
params = []
|
|
|
|
if category:
|
|
# Join with categories table
|
|
base_query = """
|
|
SELECT s.* FROM samples s
|
|
INNER JOIN sample_categories sc ON s.path = sc.path
|
|
WHERE sc.category = ?
|
|
"""
|
|
params.append(category)
|
|
else:
|
|
base_query = "SELECT * FROM samples WHERE 1=1"
|
|
|
|
if key:
|
|
conditions.append("key = ?")
|
|
params.append(key)
|
|
|
|
if bpm_min is not None:
|
|
conditions.append("bpm >= ?")
|
|
params.append(bpm_min)
|
|
|
|
if bpm_max is not None:
|
|
conditions.append("bpm <= ?")
|
|
params.append(bpm_max)
|
|
|
|
if conditions:
|
|
base_query += " AND " + " AND ".join(conditions)
|
|
|
|
base_query += f" LIMIT {limit}"
|
|
|
|
cursor.execute(base_query, params)
|
|
rows = cursor.fetchall()
|
|
|
|
result = []
|
|
for row in rows:
|
|
path = row['path']
|
|
cursor.execute(
|
|
"SELECT category FROM sample_categories WHERE path = ?",
|
|
(path,)
|
|
)
|
|
categories = [r['category'] for r in cursor.fetchall()]
|
|
result.append(SampleFeatures.from_db_row(row, categories))
|
|
|
|
return result
|
|
|
|
except sqlite3.Error as e:
|
|
logger.error(f"Error searching samples: {e}")
|
|
return []
|
|
|
|
|
|
# Convenience function for quick initialization
|
|
def create_metadata_store(db_path: str = "sample_metadata.db") -> SampleMetadataStore:
|
|
"""
|
|
Create and initialize a metadata store.
|
|
|
|
Args:
|
|
db_path: Path to the database file
|
|
|
|
Returns:
|
|
Initialized SampleMetadataStore instance
|
|
"""
|
|
store = SampleMetadataStore(db_path)
|
|
store.init_database()
|
|
return store
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Simple test
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
# Create test store
|
|
store = create_metadata_store("test_metadata.db")
|
|
|
|
# Test saving
|
|
features = SampleFeatures(
|
|
path="/test/kick.wav",
|
|
bpm=95.0,
|
|
key="Am",
|
|
duration=2.5,
|
|
rms=-12.0,
|
|
spectral_centroid=2500.0,
|
|
categories=["kick", "drums"]
|
|
)
|
|
|
|
store.save_sample_features("/test/kick.wav", features)
|
|
|
|
# Test retrieving
|
|
retrieved = store.get_sample_features("/test/kick.wav")
|
|
print(f"Retrieved: {retrieved}")
|
|
|
|
# Test stats
|
|
stats = store.get_stats()
|
|
print(f"Stats: {stats}")
|
|
|
|
store.close()
|
|
print("Tests completed successfully")
|