feat: drumloop-first generation with forensic analysis
- Add DrumLoopAnalyzer: extracts BPM, transients, key, beat grid from drumloops - Rewrite compose.py: drumloop drives everything (BPM, key, rhythm) - Bass tresillo pattern placed in kick-free zones - Chords change on downbeats matching drumloop key - Melody avoids transients, emphasizes chord tones - Vocal chops between transients, clap on dembow (beats 2, 3.5) - Remove COLOR token (not recognized by REAPER) - 90 tests passing, generates drumloop_song.rpp with 10 tracks, 20 plugins
This commit is contained in:
336
src/composer/drum_analyzer.py
Normal file
336
src/composer/drum_analyzer.py
Normal file
@@ -0,0 +1,336 @@
|
||||
"""DrumLoop-first forensic analyzer for reggaeton production.
|
||||
|
||||
Analyzes a drumloop WAV file and extracts:
|
||||
- BPM and beat grid (quarter, eighth, sixteenth note positions)
|
||||
- Transient positions with classification (kick / snare / hihat / other)
|
||||
- Energy envelope per beat
|
||||
- Musical key (if detectable)
|
||||
|
||||
The analysis result drives all other generation: bass, chords, melody,
|
||||
and vocals are aligned to the drumloop's rhythmic skeleton.
|
||||
|
||||
Usage:
|
||||
from src.composer.drum_analyzer import DrumLoopAnalyzer
|
||||
|
||||
analyzer = DrumLoopAnalyzer("path/to/drumloop.wav")
|
||||
result = analyzer.analyze()
|
||||
print(f"BPM: {result.bpm}, Transients: {len(result.transients)}")
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import dataclasses
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import librosa
|
||||
import numpy as np
|
||||
|
||||
|
||||
NOTE_NAMES = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B"]
|
||||
|
||||
KEY_PROFILES = {
|
||||
name: np.array(profile)
|
||||
for name, profile in {
|
||||
"major": [6.35, 2.23, 3.48, 2.33, 4.38, 4.09, 2.52, 5.19, 2.39, 3.66, 2.29, 2.88],
|
||||
"minor": [6.33, 2.68, 3.52, 5.38, 2.60, 3.53, 2.54, 4.75, 3.98, 2.69, 3.34, 3.17],
|
||||
}.items()
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class Transient:
|
||||
time: float
|
||||
type: str # "kick" | "snare" | "hihat" | "other"
|
||||
energy: float
|
||||
spectral_centroid: float
|
||||
confidence: float = 1.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class BeatGrid:
|
||||
quarter: list[float] = field(default_factory=list)
|
||||
eighth: list[float] = field(default_factory=list)
|
||||
sixteenth: list[float] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DrumLoopAnalysis:
|
||||
file_path: str
|
||||
bpm: float
|
||||
duration: float
|
||||
beats: list[float] = field(default_factory=list)
|
||||
transients: list[Transient] = field(default_factory=list)
|
||||
beat_grid: BeatGrid = field(default_factory=BeatGrid)
|
||||
key: Optional[str] = None
|
||||
key_confidence: float = 0.0
|
||||
energy_profile: list[float] = field(default_factory=list)
|
||||
bar_count: int = 0
|
||||
sample_rate: int = 44100
|
||||
|
||||
def transients_of_type(self, ttype: str) -> list[Transient]:
|
||||
return [t for t in self.transients if t.type == ttype]
|
||||
|
||||
def transient_positions(self, ttype: Optional[str] = None) -> list[float]:
|
||||
ts = self.transients if ttype is None else self.transients_of_type(ttype)
|
||||
return [t.time for t in ts]
|
||||
|
||||
def kick_free_zones(self, margin_beats: float = 0.25) -> list[tuple[float, float]]:
|
||||
kicks = sorted(self.transient_positions("kick"))
|
||||
beat_dur = 60.0 / self.bpm
|
||||
margin_sec = margin_beats * beat_dur
|
||||
zones = []
|
||||
prev = 0.0
|
||||
for k in kicks:
|
||||
start = prev
|
||||
end = k - margin_sec
|
||||
if end > start:
|
||||
zones.append((start, end))
|
||||
prev = k + margin_sec
|
||||
if prev < self.duration:
|
||||
zones.append((prev, self.duration))
|
||||
return zones
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"file_path": self.file_path,
|
||||
"bpm": round(self.bpm, 2),
|
||||
"duration": round(self.duration, 4),
|
||||
"bar_count": self.bar_count,
|
||||
"key": self.key,
|
||||
"key_confidence": round(self.key_confidence, 4),
|
||||
"sample_rate": self.sample_rate,
|
||||
"beat_grid": {
|
||||
"quarter": [round(b, 4) for b in self.beat_grid.quarter],
|
||||
"eighth": [round(b, 4) for b in self.beat_grid.eighth],
|
||||
"sixteenth": [round(b, 4) for b in self.beat_grid.sixteenth],
|
||||
},
|
||||
"transients": [
|
||||
{
|
||||
"time": round(t.time, 4),
|
||||
"beat_pos": round(t.time / (60.0 / self.bpm), 4) if self.bpm > 0 else 0.0,
|
||||
"type": t.type,
|
||||
"energy": round(t.energy, 4),
|
||||
"spectral_centroid": round(t.spectral_centroid, 1),
|
||||
"confidence": round(t.confidence, 4),
|
||||
}
|
||||
for t in self.transients
|
||||
],
|
||||
"energy_profile": [round(e, 4) for e in self.energy_profile],
|
||||
"summary": {
|
||||
"kick_count": len(self.transients_of_type("kick")),
|
||||
"snare_count": len(self.transients_of_type("snare")),
|
||||
"hihat_count": len(self.transients_of_type("hihat")),
|
||||
"other_count": len(self.transients_of_type("other")),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class DrumLoopAnalyzer:
|
||||
def __init__(self, file_path: str | Path, sr: int = 44100):
|
||||
self.file_path = str(file_path)
|
||||
self.sr = sr
|
||||
self._y: Optional[np.ndarray] = None
|
||||
self._sr_actual: int = sr
|
||||
|
||||
def _load(self) -> tuple[np.ndarray, int]:
|
||||
if self._y is None:
|
||||
y, sr = librosa.load(self.file_path, sr=self.sr, mono=True)
|
||||
self._y = y
|
||||
self._sr_actual = sr
|
||||
return self._y, self._sr_actual
|
||||
|
||||
def analyze(self) -> DrumLoopAnalysis:
|
||||
y, sr = self._load()
|
||||
duration = float(len(y) / sr)
|
||||
|
||||
bpm, beat_frames = self._detect_tempo_and_beats(y, sr)
|
||||
beats = librosa.frames_to_time(beat_frames, sr=sr).tolist()
|
||||
|
||||
onset_env = librosa.onset.onset_strength(y=y, sr=sr)
|
||||
onset_frames = librosa.onset.onset_detect(
|
||||
y=y, sr=sr, onset_envelope=onset_env, backtrack=True
|
||||
)
|
||||
onset_times = librosa.frames_to_time(onset_frames, sr=sr)
|
||||
|
||||
transients = self._classify_transients(y, sr, onset_frames, onset_times)
|
||||
|
||||
beat_grid = self._build_beat_grid(beats, bpm, duration)
|
||||
|
||||
key, key_conf = self._detect_key(y, sr)
|
||||
|
||||
energy_profile = self._energy_per_beat(y, sr, beats)
|
||||
|
||||
bar_count = int(len(beats) // 4) if beats else int(duration / (240.0 / bpm))
|
||||
|
||||
return DrumLoopAnalysis(
|
||||
file_path=self.file_path,
|
||||
bpm=bpm,
|
||||
duration=duration,
|
||||
beats=beats,
|
||||
transients=transients,
|
||||
beat_grid=beat_grid,
|
||||
key=key,
|
||||
key_confidence=key_conf,
|
||||
energy_profile=energy_profile,
|
||||
bar_count=bar_count,
|
||||
sample_rate=sr,
|
||||
)
|
||||
|
||||
def _detect_tempo_and_beats(self, y: np.ndarray, sr: int) -> tuple[float, np.ndarray]:
|
||||
onset_env = librosa.onset.onset_strength(y=y, sr=sr)
|
||||
tempo, beat_frames = librosa.beat.beat_track(
|
||||
onset_envelope=onset_env, sr=sr, units="frames"
|
||||
)
|
||||
if isinstance(tempo, np.ndarray):
|
||||
if tempo.ndim == 0:
|
||||
bpm = float(tempo)
|
||||
else:
|
||||
bpm = float(tempo[0])
|
||||
else:
|
||||
bpm = float(tempo)
|
||||
if bpm < 60:
|
||||
bpm *= 2
|
||||
elif bpm > 200:
|
||||
bpm /= 2
|
||||
return bpm, beat_frames
|
||||
|
||||
def _classify_transients(
|
||||
self, y: np.ndarray, sr: int, onset_frames: np.ndarray, onset_times: np.ndarray
|
||||
) -> list[Transient]:
|
||||
if len(onset_frames) == 0:
|
||||
return []
|
||||
|
||||
hop_length = 512
|
||||
S = np.abs(librosa.stft(y, hop_length=hop_length))
|
||||
freqs = librosa.fft_frequencies(sr=sr)
|
||||
|
||||
low_mask = freqs < 200
|
||||
mid_mask = (freqs >= 200) & (freqs < 5000)
|
||||
high_mask = freqs >= 5000
|
||||
|
||||
transients = []
|
||||
for i, frame in enumerate(onset_frames):
|
||||
if frame >= S.shape[1]:
|
||||
continue
|
||||
|
||||
spectrum = S[:, frame]
|
||||
low_e = float(np.sum(spectrum[low_mask] ** 2))
|
||||
mid_e = float(np.sum(spectrum[mid_mask] ** 2))
|
||||
high_e = float(np.sum(spectrum[high_mask] ** 2))
|
||||
total_e = low_e + mid_e + high_e + 1e-10
|
||||
|
||||
centroid = float(librosa.feature.spectral_centroid(
|
||||
S=S[:, max(0, frame - 1):frame + 2], sr=sr, hop_length=hop_length
|
||||
).mean())
|
||||
|
||||
low_ratio = low_e / total_e
|
||||
mid_ratio = mid_e / total_e
|
||||
high_ratio = high_e / total_e
|
||||
|
||||
energy = float(np.sqrt(total_e))
|
||||
|
||||
if low_ratio > 0.55:
|
||||
ttype = "kick"
|
||||
conf = min(1.0, low_ratio / 0.7)
|
||||
elif high_ratio > 0.35:
|
||||
ttype = "hihat"
|
||||
conf = min(1.0, high_ratio / 0.5)
|
||||
elif mid_ratio > 0.40:
|
||||
ttype = "snare"
|
||||
conf = min(1.0, mid_ratio / 0.6)
|
||||
else:
|
||||
if low_ratio > mid_ratio and low_ratio > high_ratio:
|
||||
ttype = "kick"
|
||||
conf = max(low_ratio, 0.3)
|
||||
elif high_ratio > mid_ratio:
|
||||
ttype = "hihat"
|
||||
conf = max(high_ratio, 0.3)
|
||||
else:
|
||||
ttype = "snare"
|
||||
conf = max(mid_ratio, 0.3)
|
||||
|
||||
transients.append(Transient(
|
||||
time=float(onset_times[i]),
|
||||
type=ttype,
|
||||
energy=energy,
|
||||
spectral_centroid=centroid,
|
||||
confidence=conf,
|
||||
))
|
||||
|
||||
return transients
|
||||
|
||||
def _build_beat_grid(
|
||||
self, beats: list[float], bpm: float, duration: float
|
||||
) -> BeatGrid:
|
||||
if not beats or bpm <= 0:
|
||||
return BeatGrid()
|
||||
|
||||
beat_dur = 60.0 / bpm
|
||||
eighth_dur = beat_dur / 2.0
|
||||
sixteenth_dur = beat_dur / 4.0
|
||||
|
||||
start = beats[0]
|
||||
all_quarter = []
|
||||
all_eighth = []
|
||||
all_sixteenth = []
|
||||
|
||||
t = start
|
||||
while t < duration:
|
||||
all_quarter.append(round(t, 4))
|
||||
t += beat_dur
|
||||
|
||||
t = start
|
||||
while t < duration:
|
||||
all_eighth.append(round(t, 4))
|
||||
t += eighth_dur
|
||||
|
||||
t = start
|
||||
while t < duration:
|
||||
all_sixteenth.append(round(t, 4))
|
||||
t += sixteenth_dur
|
||||
|
||||
return BeatGrid(
|
||||
quarter=all_quarter,
|
||||
eighth=all_eighth,
|
||||
sixteenth=all_sixteenth,
|
||||
)
|
||||
|
||||
def _detect_key(self, y: np.ndarray, sr: int) -> tuple[Optional[str], float]:
|
||||
chroma = librosa.feature.chroma_cqt(y=y, sr=sr)
|
||||
chroma_avg = np.mean(chroma, axis=1)
|
||||
|
||||
best_key = None
|
||||
best_corr = -1.0
|
||||
|
||||
for mode_name, profile in KEY_PROFILES.items():
|
||||
for shift in range(12):
|
||||
rotated = np.roll(profile, shift)
|
||||
corr = float(np.corrcoef(chroma_avg, rotated)[0, 1])
|
||||
if corr > best_corr:
|
||||
best_corr = corr
|
||||
best_key = f"{NOTE_NAMES[shift]}{'m' if mode_name == 'minor' else ''}"
|
||||
|
||||
confidence = max(0.0, min(1.0, (best_corr + 1) / 2))
|
||||
return best_key, confidence
|
||||
|
||||
def _energy_per_beat(self, y: np.ndarray, sr: int, beats: list[float]) -> list[float]:
|
||||
if not beats:
|
||||
return []
|
||||
|
||||
hop = 512
|
||||
rms = librosa.feature.rms(y=y, hop_length=hop)[0]
|
||||
rms_times = librosa.frames_to_time(np.arange(len(rms)), sr=sr, hop_length=hop)
|
||||
|
||||
energy = []
|
||||
for i in range(len(beats)):
|
||||
start = beats[i]
|
||||
end = beats[i + 1] if i + 1 < len(beats) else start + (60.0 / (self._sr_actual and 120 or 120))
|
||||
mask = (rms_times >= start) & (rms_times < end)
|
||||
if np.any(mask):
|
||||
energy.append(float(np.mean(rms[mask])))
|
||||
else:
|
||||
energy.append(0.0)
|
||||
|
||||
return energy
|
||||
Reference in New Issue
Block a user