Files
ableton-mcp-ai/AbletonMCP_AI_BAK_20260328_200801/MCP_Server/audio_resampler.py
renato97 6ec8663954 Initial commit: AbletonMCP-AI complete system
- MCP Server with audio fallback, sample management
- Song generator with bus routing
- Reference listener and audio resampler
- Vector-based sample search
- Master chain with limiter and calibration
- Fix: Audio fallback now works without M4L
- Fix: Full song detection in sample loader

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-28 22:53:10 -03:00

2467 lines
105 KiB
Python

"""
audio_resampler.py - Deriva transiciones y FX propios desde los samples elegidos.
Phase 1 Improvements:
- Cache robusto con invalidacion por mtime, size y edad maxima
- Crossfades equal-power para eliminar clicks
- HPF/LPF sweeps suaves con overlap-add y filtros butterworth de 4to orden
- Normalizacion con soft limiting mejorado (curva cubica + lookahead)
"""
from __future__ import annotations
import hashlib
import logging
import os
import time
from collections import OrderedDict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
try:
import soundfile as sf
except ImportError: # pragma: no cover
sf = None
try:
import librosa
except ImportError: # pragma: no cover
librosa = None
try:
from scipy import signal as scipy_signal
except ImportError: # pragma: no cover
scipy_signal = None
logger = logging.getLogger("AudioResampler")
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except Exception:
return float(default)
def _section_offsets(sections: List[Dict[str, Any]]) -> List[Tuple[Dict[str, Any], float, float]]:
offsets: List[Tuple[Dict[str, Any], float, float]] = []
cursor = 0.0
for section in sections:
beats = _safe_float(section.get("beats", 0.0), _safe_float(section.get("bars", 8), 8.0) * 4.0)
start = float(cursor)
end = float(cursor + max(1.0, beats))
offsets.append((section, start, end))
cursor = end
return offsets
def _samples_from_seconds(seconds: float, sample_rate: int, min_samples: int = 256) -> int:
"""Convierte segundos a samples con minimo garantizado.
Args:
seconds: Duracion en segundos
sample_rate: Tasa de muestreo en Hz
min_samples: Minimo de samples a retornar (default: 256)
Returns:
Numero de samples con minimo garantizado
"""
return max(min_samples, int(round(seconds * sample_rate)))
def _seconds_from_samples(samples: int, sample_rate: int, min_duration: float = 0.05) -> float:
"""Convierte samples a segundos.
Args:
samples: Numero de samples
sample_rate: Tasa de muestreo en Hz
min_duration: Duracion minima en segundos si samples es 0 (default: 0.05)
Returns:
Duracion en segundos
"""
return samples / sample_rate if samples > 0 else min_duration
def _ensure_2d_float(audio: np.ndarray) -> np.ndarray:
"""Asegura que el array sea 2D float32 (samples, channels)."""
if audio is None or audio.size == 0:
return np.zeros((1, 1), dtype=np.float32)
audio = np.asarray(audio, dtype=np.float32)
if audio.ndim == 1:
audio = audio.reshape(-1, 1)
return audio
def _safe_slice(audio: np.ndarray, start: int, end: int) -> np.ndarray:
"""Extrae slice seguro que nunca retorna array vacio."""
if audio is None or audio.size == 0:
channels = audio.shape[1] if (audio is not None and audio.ndim == 2) else 1
return np.zeros((1, channels), dtype=np.float32)
start = max(0, min(start, audio.shape[0] - 1))
end = max(start + 1, min(end, audio.shape[0]))
result = audio[start:end]
if result.size == 0:
return np.zeros((1, audio.shape[1]), dtype=np.float32)
return result
def _validate_mix_shapes(a: np.ndarray, b: np.ndarray) -> Tuple[bool, str]:
"""Valida que dos arrays puedan mezclarse (broadcast compatible)."""
if a is None or b is None:
return False, "None array"
if a.size == 0 or b.size == 0:
return False, f"Empty array: a.shape={a.shape}, b.shape={b.shape}"
if a.ndim != b.ndim:
return False, f"Dimension mismatch: {a.ndim} vs {b.ndim}"
if a.shape[1] != b.shape[1]:
return False, f"Channel mismatch: {a.shape[1]} vs {b.shape[1]}"
return True, "OK"
class AudioResampler:
"""Procesa audio para generar transiciones y FX.
Phase 1 Improvements:
- Cache LRU con invalidacion por mtime, size y edad maxima
- Estadisticas de cache (hits/misses)
- Crossfades equal-power para mejor calidad
- HPF/LPF sweeps con filtros butterworth de 4to orden
- Soft limiting mejorado con curva cubica
"""
# Limite maximo de archivos en cache
_CACHE_LIMIT: int = 50
# Edad maxima de cache en segundos (30 minutos)
_CACHE_MAX_AGE_S: float = 1800.0
# Tamanio maximo de cache en bytes (~500MB por defecto)
_CACHE_MAX_SIZE_BYTES: int = 500 * 1024 * 1024
# Valor de peak unificado para todos los renders (85% headroom)
_DEFAULT_PEAK: float = 0.85
# Crossfade samples por defecto (10ms a 44.1kHz)
_DEFAULT_CROSSFADE_SAMPLES: int = 441
# Minimos absolutos para evitar arrays vacios en procesamiento
_MIN_SAMPLES_FOR_FFT: int = 512 # Minimo para analisis espectral
_MIN_SAMPLES_FOR_WINDOW: int = 64 # Minimo para aplicar ventana
_MIN_SAMPLES_FOR_STRETCH: int = 100 # Minimo para time-stretch
_MIN_SAMPLES_FOR_SLICE: int = 32 # Minimo para slice de stutter
_MIN_SAMPLES_FOR_EFFECT: int = 256 # Minimo para aplicar cualquier efecto
_MIN_AUDIO_DURATION_S: float = 0.05 # 50ms minimo de audio
def __init__(self, output_dir: Optional[str] = None, sample_rate: int = 44100):
local_root = Path(os.environ.get("LOCALAPPDATA", Path.home() / "AppData" / "Local"))
self.output_dir = Path(output_dir) if output_dir else local_root / "AbletonMCP_AI" / "generated_audio"
self.output_dir.mkdir(parents=True, exist_ok=True)
self.sample_rate = max(1, int(sample_rate)) # Validacion defensiva
# Cache LRU para audio cargado: path::mtime_ns::size -> (audio_array, sample_rate, timestamp)
# El mtime_ns es parte de la key para invalidacion automatica por modificacion
# timestamp se usa para invalidacion por edad maxima
self._audio_cache: OrderedDict[str, Tuple[np.ndarray, int, float]] = OrderedDict()
# Metadatos de cache para tracking de memoria
self._cache_sizes: Dict[str, int] = {} # path -> bytes
self._cache_total_bytes: int = 0
# Estadisticas de cache
self._cache_hits: int = 0
self._cache_misses: int = 0
def _validate_audio_array(self, audio: np.ndarray, context: str = "audio") -> np.ndarray:
"""Valida y normaliza un array de audio.
Args:
audio: Array a validar
context: Descripcion del contexto para mensajes de error
Returns:
Array validado como float32 y al menos 2D
Raises:
ValueError: Si el array esta vacio o es invalido
"""
if audio is None:
raise ValueError(f"{context}: audio es None")
audio = np.asarray(audio, dtype=np.float32)
if audio.size == 0:
raise ValueError(f"{context}: audio array esta vacio")
# Asegurar que sea 2D (samples, channels)
if audio.ndim == 1:
audio = audio.reshape(-1, 1)
return audio
def _validate_positive(self, value: float, name: str) -> float:
"""Valida que un valor sea positivo.
Args:
value: Valor a validar
name: Nombre del parametro para mensaje de error
Returns:
Valor validado como float
Raises:
ValueError: Si el valor no es positivo
"""
try:
val = float(value)
except (TypeError, ValueError):
raise ValueError(f"{name}: debe ser un numero valido, recibido {value!r}")
if val <= 0:
raise ValueError(f"{name}: debe ser positivo, recibido {val}")
return val
def _get_cache_key(self, file_path: str, mtime_ns: Optional[int] = None, file_size: Optional[int] = None) -> str:
"""Genera key de cache a partir del path absoluto, mtime y size.
Args:
file_path: Ruta al archivo
mtime_ns: Tiempo de modificacion en nanosegundos (opcional)
file_size: Tamanio del archivo en bytes (opcional)
Returns:
Key unica que incluye mtime y size si se proporcionan
"""
base_key = str(Path(file_path).resolve())
parts = [base_key]
if mtime_ns is not None:
parts.append(str(mtime_ns))
if file_size is not None:
parts.append(str(file_size))
return "::".join(parts)
def _cache_get(self, key: str) -> Optional[Tuple[np.ndarray, int]]:
"""Obtiene audio del cache (LRU: mueve al final si existe).
Returns:
Tupla (audio_array, sample_rate) o None si no existe o expiro
"""
if key not in self._audio_cache:
self._cache_misses += 1
return None
cached_data = self._audio_cache[key]
# Nuevo formato: (audio, sample_rate, timestamp)
if len(cached_data) == 3:
audio, sample_rate, timestamp = cached_data
# Verificar edad maxima
if time.time() - timestamp > self._CACHE_MAX_AGE_S:
logger.debug("Cache entry expired by age: %s", key)
self._evict_cache_entry(key)
self._cache_misses += 1
return None
else:
# Formato legacy: (audio, sample_rate)
audio, sample_rate = cached_data[:2]
# Mover al final (mas reciente)
self._audio_cache.move_to_end(key)
self._cache_hits += 1
return (audio, sample_rate)
def _evict_cache_entry(self, key: str) -> None:
"""Evict una entrada especifica del cache y actualiza contadores."""
if key in self._audio_cache:
if key in self._cache_sizes:
self._cache_total_bytes -= self._cache_sizes[key]
del self._cache_sizes[key]
del self._audio_cache[key]
def _cache_put(self, key: str, audio: np.ndarray, sample_rate: int) -> None:
"""Agrega audio al cache con limite LRU y de memoria."""
# Calcular tamanio en bytes
entry_size = audio.nbytes
# Si ya existe, actualizar y mover al final
if key in self._audio_cache:
old_size = self._cache_sizes.get(key, 0)
self._cache_total_bytes -= old_size
self._cache_sizes[key] = entry_size
self._cache_total_bytes += entry_size
self._audio_cache[key] = (audio, sample_rate, time.time())
self._audio_cache.move_to_end(key)
return
# Evict entries si excede limite de memoria
while (self._cache_total_bytes + entry_size > self._CACHE_MAX_SIZE_BYTES
and len(self._audio_cache) > 0):
oldest_key = next(iter(self._audio_cache))
self._evict_cache_entry(oldest_key)
logger.debug("Evicted cache entry (memory limit): %s", oldest_key)
# Si el cache esta lleno por cantidad, eliminar el mas antiguo (primero)
while len(self._audio_cache) >= self._CACHE_LIMIT:
oldest_key = next(iter(self._audio_cache))
self._evict_cache_entry(oldest_key)
logger.debug("Evicted cache entry (count limit): %s", oldest_key)
# Agregar nueva entrada
self._cache_sizes[key] = entry_size
self._cache_total_bytes += entry_size
self._audio_cache[key] = (audio, sample_rate, time.time())
def _load_audio(self, file_path: str) -> Tuple[np.ndarray, int]:
"""Carga un archivo de audio con cache LRU e invalidacion por mtime, size y edad.
Args:
file_path: Ruta al archivo de audio
Returns:
Tupla (audio_array, sample_rate)
Raises:
RuntimeError: Si no se puede leer el archivo
"""
if not file_path:
raise RuntimeError("file_path esta vacio")
path = Path(file_path)
if not path.exists():
raise RuntimeError(f"Archivo no encontrado: {path}")
# Obtener mtime y size antes de cualquier operacion
stat_info = path.stat()
mtime_ns = stat_info.st_mtime_ns
file_size = stat_info.st_size
cache_key = self._get_cache_key(file_path, mtime_ns, file_size)
# Intentar obtener del cache (la key incluye mtime y size, si cambio no se encontrara)
cached = self._cache_get(cache_key)
if cached is not None:
duration_s = len(cached[0]) / cached[1]
logger.debug("Cache hit for %s (sample_rate=%d, duration=%.2fs, hits=%d, misses=%d)",
path.name, cached[1], duration_s, self._cache_hits, self._cache_misses)
# Devolver copia para evitar mutaciones
return np.array(cached[0], dtype=np.float32, copy=True), cached[1]
logger.debug("Cache miss for %s, reading from disk (hits=%d, misses=%d)",
path.name, self._cache_hits, self._cache_misses)
if sf is not None:
try:
audio, sample_rate = sf.read(str(path), always_2d=True, dtype="float32")
# Validacion defensiva - verificar que no este vacio
if audio.size == 0:
logger.warning("AUDIO_LOAD: fallback to silence (empty audio from %s)", path.name)
silence = np.zeros((int(self.sample_rate), 2), dtype=np.float32)
return silence, self.sample_rate
duration_s = len(audio) / sample_rate
logger.debug("Loaded from disk via soundfile: %s (sample_rate=%d, duration=%.2fs, channels=%d)",
path.name, sample_rate, duration_s, audio.shape[1])
if sample_rate != self.sample_rate:
logger.debug("Resampling %s from %d to %d Hz", path.name, sample_rate, self.sample_rate)
audio = self._resample_audio(audio, sample_rate, self.sample_rate)
sample_rate = self.sample_rate
# Guardar en cache
self._cache_put(cache_key, audio, sample_rate)
logger.debug("Cached audio: %s (total_cache_size=%.2fMB)", path.name, self._cache_total_bytes / (1024*1024))
return np.array(audio, dtype=np.float32, copy=True), sample_rate
except Exception as exc:
logger.debug("soundfile fallo para %s: %s", path.name, exc)
if librosa is None:
raise RuntimeError(f"No se pudo leer audio (sin soundfile ni librosa): {path.name}")
logger.debug("Falling back to librosa for: %s", path.name)
try:
audio, sample_rate = librosa.load(str(path), sr=self.sample_rate, mono=True)
audio = np.asarray(audio, dtype=np.float32).reshape(-1, 1)
audio = np.repeat(audio, 2, axis=1)
# Validacion defensiva - verificar que no este vacio
if audio.size == 0:
logger.warning("AUDIO_LOAD: fallback to silence (empty audio from %s)", path.name)
silence = np.zeros((int(self.sample_rate), 2), dtype=np.float32)
return silence, self.sample_rate
duration_s = len(audio) / self.sample_rate
logger.debug("Loaded via librosa: %s (sample_rate=%d, duration=%.2fs, channels=2)",
path.name, self.sample_rate, duration_s)
# Guardar en cache
self._cache_put(cache_key, audio, self.sample_rate)
logger.debug("Cached audio: %s", cache_key)
return np.array(audio, dtype=np.float32, copy=True), self.sample_rate
except Exception as exc:
logger.error("No se pudo leer audio con librosa: %s: %s", path.name, exc)
raise RuntimeError(f"No se pudo leer audio con librosa: {path.name}: {exc}")
def _write_audio(self, file_path: Path, audio: np.ndarray, sample_rate: int) -> str:
"""Escribe audio a archivo WAV.
Args:
file_path: Ruta de destino
audio: Array de audio
sample_rate: Sample rate
Returns:
Ruta del archivo escrito como string
Raises:
RuntimeError: Si soundfile no esta disponible o el audio es invalido
"""
if sf is None:
raise RuntimeError("soundfile no disponible para escribir audio")
# Validacion defensiva
audio = self._validate_audio_array(audio, context="_write_audio")
sample_rate = self._validate_positive(sample_rate, "sample_rate")
if audio.ndim == 1:
audio = audio.reshape(-1, 1)
if audio.shape[1] == 1:
audio = np.repeat(audio, 2, axis=1)
sf.write(str(file_path), audio, int(sample_rate))
return str(file_path)
def _resample_audio(self, audio: np.ndarray, source_sr: int, target_sr: int) -> np.ndarray:
"""Cambia el sample rate de audio.
Args:
audio: Array de audio
source_sr: Sample rate origen
target_sr: Sample rate destino
Returns:
Audio resampleado
"""
# Validaciones defensivas
audio = self._validate_audio_array(audio, context="_resample_audio")
source_sr = max(1, int(source_sr))
target_sr = max(1, int(target_sr))
if source_sr == target_sr:
return np.array(audio, dtype=np.float32)
factor = float(target_sr) / float(source_sr)
target_len = max(1, int(round(audio.shape[0] * factor)))
return self._stretch_to_length(audio, target_len)
def _stretch_to_length(self, audio: np.ndarray, target_len: int) -> np.ndarray:
"""Estira o comprime audio a una longitud especifica.
Usa scipy.signal.resample_poly si esta disponible (mejor calidad con anti-aliasing),
sino scipy.signal.resample (FFT-based), sino librosa.resample, sino np.interp como fallback.
Args:
audio: Array de audio (samples, channels)
target_len: Longitud objetivo en samples
Returns:
Audio estirado/comprimido
"""
# Validaciones defensivas
audio = self._validate_audio_array(audio, context="_stretch_to_length")
target_len = max(1, int(target_len))
# Validacion adicional: si el audio esta vacio o target_len es 0, retornar silencio
if audio.size == 0 or target_len == 0:
logger.warning("_stretch_to_length: audio vacio o target_len=0, retornando silencio de longitud %d", target_len)
return np.zeros((target_len, 2), dtype=np.float32)
if audio.shape[0] == target_len:
return np.array(audio, dtype=np.float32)
# Caso edge: array de 1 sample
if audio.shape[0] <= 1:
return np.repeat(np.asarray(audio, dtype=np.float32), target_len, axis=0)
original_len = audio.shape[0]
def _fit_channel_length(channel_audio: np.ndarray) -> np.ndarray:
fitted = np.asarray(channel_audio, dtype=np.float32).reshape(-1)
current_len = fitted.shape[0]
if current_len == target_len:
return fitted
if current_len > target_len:
return fitted[:target_len]
if current_len <= 0:
return np.zeros(target_len, dtype=np.float32)
pad_value = float(fitted[-1])
padding = np.full(target_len - current_len, pad_value, dtype=np.float32)
return np.concatenate([fitted, padding], axis=0)
# Intentar usar scipy.signal.resample_poly (mejor calidad con anti-aliasing)
if scipy_signal is not None:
try:
from fractions import Fraction
# Calcular ratio como fraccion simplificada
ratio = Fraction(target_len, original_len).limit_denominator(1000)
up = ratio.numerator
down = ratio.denominator
stretched = np.zeros((target_len, audio.shape[1]), dtype=np.float32)
for channel in range(audio.shape[1]):
# resample_poly usa filtros anti-aliasing para mejor calidad
resampled = scipy_signal.resample_poly(audio[:, channel], up, down)
stretched[:, channel] = _fit_channel_length(resampled)
return stretched
except Exception as exc:
logger.debug("scipy.signal.resample_poly fallo: %s, intentando resample normal", exc)
# Fallback a resample normal dentro del mismo bloque
try:
stretched = np.zeros((target_len, audio.shape[1]), dtype=np.float32)
for channel in range(audio.shape[1]):
# resample usa FFT para mejor calidad que interpolacion lineal
stretched[:, channel] = scipy_signal.resample(
audio[:, channel], target_len
).astype(np.float32)
return stretched
except Exception as exc2:
logger.debug("scipy.signal.resample fallo: %s, usando fallback", exc2)
# Intentar usar librosa.resample (buena calidad)
if librosa is not None:
try:
# librosa.resample requiere sample rates originales y destino
# Usamos valores ficticios que producen el ratio correcto
orig_sr = original_len
target_sr = target_len
stretched = np.zeros((target_len, audio.shape[1]), dtype=np.float32)
for channel in range(audio.shape[1]):
resampled = librosa.resample(
audio[:, channel],
orig_sr=orig_sr,
target_sr=target_sr,
res_type="linear" # Mas rapido, pero mejor que np.interp puro
)
stretched[:, channel] = _fit_channel_length(resampled)
return stretched
except Exception as exc:
logger.debug("librosa.resample fallo: %s, usando np.interp", exc)
# Fallback: np.interp (interpolacion lineal - menor calidad)
source_x = np.linspace(0.0, 1.0, original_len, endpoint=True)
target_x = np.linspace(0.0, 1.0, target_len, endpoint=True)
stretched = np.zeros((target_len, audio.shape[1]), dtype=np.float32)
for channel in range(audio.shape[1]):
stretched[:, channel] = np.interp(target_x, source_x, audio[:, channel]).astype(np.float32)
return stretched
def _normalize(self, audio: np.ndarray, peak: float = None, soft_limit: bool = True) -> np.ndarray:
"""Normaliza el pico del audio con soft limiting mejorado.
Phase 1 Improvements:
- Soft knee con curva cubica suave (mas natural que lineal)
- Mejor preservacion de dinamica en el rango normal
Args:
audio: Array de audio
peak: Nivel de pico objetivo (0.01 - 1.0). Por defecto usa _DEFAULT_PEAK (0.85).
soft_limit: Si True, aplica soft knee con curva cubica.
Returns:
Audio normalizado
"""
# Usar valor por defecto unificado si no se especifica
if peak is None:
peak = self._DEFAULT_PEAK
# Validacion defensiva
if audio is None or audio.size == 0:
return audio
audio = np.asarray(audio, dtype=np.float32, copy=True)
peak = max(0.01, min(1.0, float(peak)))
current_peak = float(np.max(np.abs(audio))) if audio.size else 0.0
if current_peak <= 1e-6:
return audio
# Aplicar soft limiting mejorado si esta habilitado
if soft_limit:
# Soft knee con curva cubica: mas suave que lineal, menos agresivo que tanh
# La curva cubica preserva mas dinamica en el rango normal
knee_start = peak * 0.75 # Knee empieza al 75% del peak
abs_audio = np.abs(audio)
mask = abs_audio > knee_start
if np.any(mask):
sign = np.sign(audio)
# Calcular posicion relativa dentro del knee (0 a 1)
knee_range = peak - knee_start
over_knee = abs_audio[mask] - knee_start
relative_pos = np.clip(over_knee / knee_range, 0.0, 1.0)
# Curva cubica: (1 - (1-x)^3) para compresion suave
# Esto da una curva que empieza gradual y se aplane hacia el peak
compression_factor = 1.0 - np.power(1.0 - relative_pos, 3.0)
# Aplicar compresion manteniendo la senal por debajo del peak
compressed = knee_start + knee_range * compression_factor
audio[mask] = sign[mask] * compressed
# Recalcular peak despues del soft limiting
current_peak = float(np.max(np.abs(audio))) if audio.size else 0.0
if current_peak <= 1e-6:
return audio
# Normalizar al peak objetivo
return (audio / current_peak) * peak
def _apply_fade(
self,
audio: np.ndarray,
fade_in_s: float = 0.02,
fade_out_s: float = 0.04,
fade_curve: str = "linear"
) -> np.ndarray:
"""Aplica fade in y fade out al audio.
Args:
audio: Array de audio
fade_in_s: Duracion del fade in en segundos
fade_out_s: Duracion del fade out en segundos
fade_curve: Tipo de curva ("linear", "logarithmic", "exponential")
Returns:
Audio con fades aplicados
"""
# Validacion defensiva
if audio is None or audio.size == 0:
return np.zeros((1, 2), dtype=np.float32)
output = np.array(audio, dtype=np.float32, copy=True)
# Asegurar 2D
if output.ndim == 1:
output = output.reshape(-1, 1)
total = output.shape[0]
if total <= 2:
return output
# Validar y clamp tiempos de fade
fade_in_s = max(0.0, float(fade_in_s))
fade_out_s = max(0.0, float(fade_out_s))
fade_in = min(total, max(0, int(round(fade_in_s * self.sample_rate))))
fade_out = min(total, max(0, int(round(fade_out_s * self.sample_rate))))
# Funcion auxiliar para generar curvas de fade
def _generate_fade_curve(length: int, direction: str) -> np.ndarray:
"""Genera curva de fade segun el tipo especificado."""
if fade_curve == "logarithmic":
# Curva logaritmica: inicio suave, transicion gradual
# Usa curva tipo -cos(0 a pi/2) o equivalente: 1 - e^(-3x) normalizado
x = np.linspace(0.0, 1.0, length, dtype=np.float32)
# Logarithmic-like curve: 1 - exp(-k*x) normalizado
k = 4.0 # Factor de curvatura
curve = (1.0 - np.exp(-k * x)) / (1.0 - np.exp(-k))
elif fade_curve == "exponential":
# Curva exponencial: inicio rapido, final gradual
x = np.linspace(0.0, 1.0, length, dtype=np.float32)
curve = np.power(x, 2.0) # x^2 para curva exponencial simple
else:
# Linear por defecto
curve = np.linspace(0.0, 1.0, length, dtype=np.float32)
if direction == "out":
curve = curve[::-1]
return curve.reshape(-1, 1)
if fade_in > 0:
fade_in_curve = _generate_fade_curve(fade_in, "in")
output[:fade_in] *= fade_in_curve
if fade_out > 0:
fade_out_curve = _generate_fade_curve(fade_out, "out")
output[-fade_out:] *= fade_out_curve
return output
def _apply_short_crossfade(self, audio: np.ndarray, fade_samples: int = 220, equal_power: bool = True) -> np.ndarray:
"""Aplica un crossfade corto (5ms por defecto) en ambos extremos del audio.
Phase 1 Improvements:
- Crossfades equal-power (sin/cos) para mejor calidad y menos artefactos
- Los crossfades equal-power mantienen la energia constante durante la transicion
Esto elimina clicks al concatenar segmentos de audio extraidos.
Args:
audio: Array de audio (samples, channels)
fade_samples: Numero de samples para el fade (220 = ~5ms a 44100Hz)
equal_power: Si True, usa curvas equal-power (sin/cos), sino lineales
Returns:
Audio con crossfades aplicados
"""
# Validacion defensiva
if audio is None or audio.size == 0:
return np.zeros((1, 2), dtype=np.float32)
output = np.array(audio, dtype=np.float32, copy=True)
# Asegurar 2D
if output.ndim == 1:
output = output.reshape(-1, 1)
total = output.shape[0]
if total <= 4:
return output
# Clamp fade_samples a rango valido
fade_samples = max(1, min(fade_samples, total // 2))
if equal_power:
# Equal-power crossfade: mantiene energia constante
# fade_in = sin(x * pi/2), fade_out = cos(x * pi/2)
x = np.linspace(0.0, 1.0, fade_samples, dtype=np.float32)
fade_in_curve = np.sin(x * np.pi / 2.0).reshape(-1, 1)
fade_out_curve = np.cos(x * np.pi / 2.0).reshape(-1, 1)
else:
# Fallback a curvas lineales
fade_in_curve = np.linspace(0.0, 1.0, fade_samples, dtype=np.float32).reshape(-1, 1)
fade_out_curve = np.linspace(1.0, 0.0, fade_samples, dtype=np.float32).reshape(-1, 1)
output[:fade_samples] *= fade_in_curve
output[-fade_samples:] *= fade_out_curve
return output
def _extract_tail(self, audio: np.ndarray, seconds: float, min_length: float = 0.1) -> np.ndarray:
"""Extrae los ultimos N segundos de audio con crossfade corto para eliminar clicks.
Args:
audio: Array de audio
seconds: Duracion a extraer en segundos
min_length: Longitud minima en segundos (default: 0.1s = 4410 samples)
Returns:
Segmento de audio extraido con crossfade aplicado
"""
# Validaciones defensivas
audio = self._validate_audio_array(audio, context="_extract_tail")
seconds = max(0.001, float(seconds)) # Al menos 1ms
min_length = max(0.001, float(min_length)) # Al menos 1ms
samples = max(1, int(round(seconds * self.sample_rate)))
min_samples = max(1, int(round(min_length * self.sample_rate)))
# Si el audio es muy corto, retornar todo el audio
if audio.shape[0] <= samples:
segment = np.array(audio, dtype=np.float32, copy=True)
# Aplicar crossfade incluso si es todo el audio
return self._apply_short_crossfade(segment, fade_samples=220)
segment = np.array(audio[-samples:], dtype=np.float32, copy=True)
# Validar que el segmento no sea muy corto
if segment.shape[0] < min_samples:
logger.warning("_extract_tail: segmento muy corto (%d samples), usando todo el audio disponible", segment.shape[0])
segment = np.array(audio, dtype=np.float32, copy=True)
# Aplicar crossfade corto (5ms) para eliminar clicks en el corte
segment = self._apply_short_crossfade(segment, fade_samples=220)
return segment
def _extract_center(self, audio: np.ndarray, seconds: float) -> np.ndarray:
"""Extrae el centro del audio con crossfades cortos para eliminar clicks.
Args:
audio: Array de audio
seconds: Duracion a extraer en segundos
Returns:
Segmento de audio extraido con crossfades aplicados
"""
# Validaciones defensivas
audio = self._validate_audio_array(audio, context="_extract_center")
seconds = max(0.001, float(seconds)) # Al menos 1ms
samples = max(1, int(round(seconds * self.sample_rate)))
if audio.shape[0] <= samples:
segment = np.array(audio, dtype=np.float32, copy=True)
# Aplicar crossfade incluso si es todo el audio
return self._apply_short_crossfade(segment, fade_samples=220)
start = max(0, (audio.shape[0] - samples) // 2)
segment = np.array(audio[start:start + samples], dtype=np.float32, copy=True)
# Aplicar crossfade corto (5ms) en ambos extremos para eliminar clicks
segment = self._apply_short_crossfade(segment, fade_samples=220)
return segment
def _find_hot_slice(self, audio: np.ndarray, seconds: float, min_samples: int = -1) -> np.ndarray:
"""Encuentra el segmento con mayor energia con crossfades cortos para eliminar clicks.
Args:
audio: Array de audio
seconds: Duracion del segmento en segundos
min_samples: Longitud minima del resultado en samples (default: 1000)
Returns:
Segmento de mayor energia con crossfades aplicados
"""
# Validaciones defensivas
audio = self._validate_audio_array(audio, context="_find_hot_slice")
seconds = max(0.001, float(seconds)) # Al menos 1ms
# Usar constante minima de efecto si no se especifica
if min_samples < 0:
min_samples = self._MIN_SAMPLES_FOR_EFFECT
else:
min_samples = max(self._MIN_SAMPLES_FOR_EFFECT, int(min_samples))
samples = max(min_samples, int(round(seconds * self.sample_rate)))
if audio.shape[0] <= samples:
# Si el audio es muy corto, paddear a min_samples
if audio.shape[0] < min_samples:
logger.debug("HOT_SLICE: padded short audio from %d to %d samples", audio.shape[0], min_samples)
padding = np.zeros((min_samples - audio.shape[0], audio.shape[1]), dtype=np.float32)
audio = np.concatenate([audio, padding], axis=0)
segment = np.array(audio, dtype=np.float32, copy=True)
# Aplicar crossfade incluso si es todo el audio
return self._apply_short_crossfade(segment, fade_samples=220)
mono = np.mean(np.abs(audio), axis=1)
window = max(8, samples)
energy = np.convolve(mono, np.ones(window, dtype=np.float32), mode="valid")
# Handle edge case: energia vacia
if energy.size == 0:
segment = np.array(audio[:samples], dtype=np.float32, copy=True)
# Validar longitud minima
if segment.shape[0] < min_samples:
logger.debug("HOT_SLICE: padded short audio from %d to %d samples (empty energy)", segment.shape[0], min_samples)
padding = np.zeros((min_samples - segment.shape[0], segment.shape[1]), dtype=np.float32)
segment = np.concatenate([segment, padding], axis=0)
return self._apply_short_crossfade(segment, fade_samples=220)
start = int(np.argmax(energy))
segment = np.array(audio[start:start + samples], dtype=np.float32, copy=True)
# Validar longitud minima del resultado
if segment.shape[0] < min_samples:
logger.debug("HOT_SLICE: padded short audio from %d to %d samples (result)", segment.shape[0], min_samples)
padding = np.zeros((min_samples - segment.shape[0], segment.shape[1]), dtype=np.float32)
segment = np.concatenate([segment, padding], axis=0)
# Aplicar crossfade corto (5ms) en ambos extremos para eliminar clicks
segment = self._apply_short_crossfade(segment, fade_samples=220)
return segment
def _apply_short_reverb(self, audio: np.ndarray, decay: float = 0.3, delay_ms: float = 50.0) -> np.ndarray:
"""Aplica un reverb corto mediante delays con feedback.
Simula una respuesta impulsional corta (~100ms) para dar profundidad
al audio invertido sin crear una cola larga.
Args:
audio: Array de audio (samples, channels)
decay: Factor de decaimiento del reverb (0.0 - 0.8)
delay_ms: Delay base en milisegundos
Returns:
Audio con reverb aplicado
"""
# Validaciones defensivas
audio = self._validate_audio_array(audio, context="_apply_short_reverb")
decay = max(0.0, min(0.8, float(decay)))
delay_ms = max(5.0, min(200.0, float(delay_ms)))
output = np.array(audio, dtype=np.float32, copy=True)
total_samples = output.shape[0]
# Calcular samples de delay base
delay_samples = int(round(delay_ms * self.sample_rate / 1000.0))
if delay_samples < 1 or total_samples < delay_samples + 1:
return output
# Crear multiples taps de delay para simular reverb
# Taps con diferentes tiempos y ganancias
taps = [
(1, 1.0, decay * 0.6), # 1er eco temprano
(int(delay_samples * 1.3), 0.9, decay * 0.4), # 2do eco
(int(delay_samples * 1.7), 0.85, decay * 0.3), # 3er eco
(int(delay_samples * 2.2), 0.8, decay * 0.2), # 4to eco (difuso)
]
for delay, gain, feedback in taps:
if delay >= total_samples:
continue
# Aplicar delay con feedback
delayed = np.zeros_like(output)
delayed[delay:] = output[:-delay] * gain * feedback
output = output + delayed
# Mezclar wet/dry (30% wet)
wet = output * 0.3
dry = audio * 0.7
result = dry + wet
# Normalizar para evitar clipping
max_val = np.max(np.abs(result))
if max_val > 0.95:
result = result * (0.95 / max_val)
return result.astype(np.float32)
def _apply_delay_feedback(
self,
audio: np.ndarray,
delay_ms: float = 150.0,
feedback: float = 0.35,
mix: float = 0.25,
num_taps: int = 3
) -> np.ndarray:
"""Aplica delay con feedback sutil para anadir profundidad y textura.
Crea repeticiones que decaen gradualmente, ideal para reverse FX.
Args:
audio: Array de audio (samples, channels)
delay_ms: Tiempo entre repeticiones en milisegundos (default: 150ms)
feedback: Factor de decaimiento por repeticion (0.0 - 0.7, default: 0.35)
mix: Nivel de la senal wet (0.0 - 0.5, default: 0.25)
num_taps: Numero de repeticiones (1-5, default: 3)
Returns:
Audio con delay aplicado
"""
# Validaciones defensivas
audio = self._validate_audio_array(audio, context="_apply_delay_feedback")
delay_ms = max(10.0, min(500.0, float(delay_ms)))
feedback = max(0.0, min(0.7, float(feedback)))
mix = max(0.0, min(0.5, float(mix)))
num_taps = max(1, min(5, int(num_taps)))
output = np.zeros_like(audio, dtype=np.float32)
total_samples = audio.shape[0]
delay_samples = int(round(delay_ms * self.sample_rate / 1000.0))
# Validar que hay suficiente espacio para el delay
if delay_samples < 1 or total_samples < delay_samples + 1:
return np.array(audio, dtype=np.float32)
# Copiar la senal dry
output = np.array(audio, dtype=np.float32, copy=True)
# Anadir taps de delay con feedback decreciente
current_gain = feedback
for tap in range(1, num_taps + 1):
tap_delay = delay_samples * tap
if tap_delay >= total_samples:
break
# Crear senal delayada con gain decreciente
delayed = np.zeros_like(audio)
delayed[tap_delay:] = audio[:-tap_delay] * current_gain
# Mezclar con output
output = output + delayed
# Reducir gain para siguiente tap
current_gain *= feedback
# Mezclar wet/dry
dry = audio * (1.0 - mix)
wet = output * mix
result = dry + wet
# Normalizar para evitar clipping
max_val = np.max(np.abs(result))
if max_val > 0.95:
result = result * (0.95 / max_val)
return result.astype(np.float32)
def _apply_hpf(self, audio: np.ndarray, cutoff_hz: float = 100.0) -> np.ndarray:
"""Aplica un filtro high-pass para limpiar frecuencias bajas (mud).
Usa scipy.signal.butter si esta disponible, sino una aproximacion
por diferenciacion de primer orden.
Args:
audio: Array de audio (samples, channels)
cutoff_hz: Frecuencia de corte en Hz (tipica: 80-120 Hz)
Returns:
Audio filtrado
"""
# Validaciones defensivas
audio = self._validate_audio_array(audio, context="_apply_hpf")
cutoff_hz = max(20.0, min(500.0, float(cutoff_hz)))
output = np.zeros_like(audio, dtype=np.float32)
num_channels = audio.shape[1]
total_samples = audio.shape[0]
# Intentar usar scipy para mejor calidad
if scipy_signal is not None:
try:
# Filtro Butterworth high-pass de 2do orden
nyquist = self.sample_rate / 2.0
normalized_cutoff = min(0.49, cutoff_hz / nyquist) # Evitar Nyquist
b, a = scipy_signal.butter(2, normalized_cutoff, btype='high', analog=False)
for ch in range(num_channels):
output[:, ch] = scipy_signal.filtfilt(b, a, audio[:, ch]).astype(np.float32)
return output
except Exception as exc:
logger.debug("scipy HPF fallo: %s, usando fallback por diferenciacion", exc)
# Fallback: filtro high-pass por diferenciacion (RC)
rc = 1.0 / (2.0 * 3.14159265359 * cutoff_hz)
dt = 1.0 / self.sample_rate
alpha = rc / (rc + dt)
for ch in range(num_channels):
prev_input = 0.0
prev_output = 0.0
for i in range(total_samples):
current_input = float(audio[i, ch])
output[i, ch] = alpha * (prev_output + current_input - prev_input)
prev_input = current_input
prev_output = float(output[i, ch])
return output.astype(np.float32)
def _apply_hpf_sweep(self, audio: np.ndarray, start_hz: float = 200.0, end_hz: float = 2000.0) -> np.ndarray:
"""Aplica un HPF sweep que va desde start_hz hasta end_hz.
Phase 1 Improvements:
- Filtro Butterworth de 4to orden para pendientes mas pronunciadas (24dB/oct)
- Overlap-add mejorado con 75% overlap para transiciones mas suaves
- Normalizacion de ventana para evitar artefactos de amplitud
El filtro high-pass barre su frecuencia de corte a lo largo del audio,
creando el clasico efecto de "sweep" usado en risers.
Args:
audio: Array de audio (samples, channels)
start_hz: Frecuencia inicial del HPF (default 200Hz)
end_hz: Frecuencia final del HPF (default 2000Hz)
Returns:
Audio con HPF sweep aplicado
"""
# Validaciones defensivas
audio = self._validate_audio_array(audio, context="_apply_hpf_sweep")
start_hz = max(20.0, min(float(start_hz), self.sample_rate / 2.0 - 100))
end_hz = max(start_hz, min(float(end_hz), self.sample_rate / 2.0 - 100))
# Sin scipy, devolver audio sin cambios
if scipy_signal is None:
logger.debug("scipy_signal no disponible, saltando HPF sweep")
return np.array(audio, dtype=np.float32)
total_samples = audio.shape[0]
output = np.zeros_like(audio, dtype=np.float32)
# Procesar en frames con overlap para evitar glitches
# Frames mas pequenos (25ms) con 75% overlap para transiciones mas suaves
frame_size = int(0.025 * self.sample_rate) # 25ms frames
hop_size = frame_size // 4 # 75% overlap
num_frames = max(1, (total_samples - frame_size) // hop_size + 1)
# Ventana de Hann para overlap-add
window = np.hanning(frame_size).astype(np.float32)
# Buffer para normalizacion de overlap
window_sum = np.zeros(total_samples, dtype=np.float32)
for i in range(num_frames):
start_sample = i * hop_size
end_sample = min(start_sample + frame_size, total_samples)
# Frecuencia de corte para este frame (interpolacion exponencial)
progress = i / max(1, num_frames - 1)
cutoff_hz = start_hz * (end_hz / start_hz) ** progress
# Extraer frame
frame = audio[start_sample:end_sample]
actual_frame_size = frame.shape[0]
if actual_frame_size < frame_size:
# Padding si es el ultimo frame
padded = np.zeros((frame_size, audio.shape[1]), dtype=np.float32)
padded[:actual_frame_size] = frame
frame = padded
actual_window = window.copy()
actual_window[actual_frame_size:] = 0.0
else:
actual_window = window
# Aplicar HPF Butterworth de 4to orden (24dB/octava)
try:
nyquist = self.sample_rate / 2.0
normalized_cutoff = min(0.49, cutoff_hz / nyquist)
# Filtro de 4to orden para pendiente mas pronunciada
b, a = scipy_signal.butter(4, normalized_cutoff, btype="high", output="ba")
# Aplicar filtro a cada canal con filtfilt para fase cero
filtered = np.zeros_like(frame)
for ch in range(frame.shape[1]):
filtered[:, ch] = scipy_signal.filtfilt(b, a, frame[:, ch])
# Aplicar ventana
windowed = filtered * actual_window.reshape(-1, 1)
# Acumular en output (overlap-add)
out_len = min(actual_frame_size, total_samples - start_sample)
output[start_sample:start_sample + out_len] += windowed[:out_len]
window_sum[start_sample:start_sample + out_len] += actual_window[:out_len] ** 2
except Exception as exc:
logger.debug("Error en HPF sweep frame %d: %s", i, exc)
# Fallback: copiar frame con ventana
windowed = frame * actual_window.reshape(-1, 1)
out_len = min(actual_frame_size, total_samples - start_sample)
output[start_sample:start_sample + out_len] += windowed[:out_len]
window_sum[start_sample:start_sample + out_len] += actual_window[:out_len] ** 2
# Normalizar por la suma de ventanas para compensar overlap
window_sum = np.maximum(window_sum, 1e-8)
output = output / window_sum.reshape(-1, 1)
return output.astype(np.float32)
def _apply_saturator(self, audio: np.ndarray, drive: float = 0.3) -> np.ndarray:
"""Aplica saturacion suave usando tanh.
La saturacion tanh simula el comportamiento de equipos analogicos,
anadiendo harmonicos de forma musical y suavizando los picos.
Args:
audio: Array de audio (samples, channels)
drive: Cantidad de saturacion (0.0 - 1.0, default 0.3)
Returns:
Audio saturado
"""
# Validaciones defensivas
audio = self._validate_audio_array(audio, context="_apply_saturator")
drive = max(0.0, min(1.0, float(drive)))
if drive <= 0.001:
return np.array(audio, dtype=np.float32)
# Saturacion suave usando tanh
gain = 1.0 + drive
saturated = np.tanh(audio * gain) / gain
return saturated.astype(np.float32)
def _render_reverse_fx(self, source_path: str, duration_s: float = 4.0, project_bpm: float = 120.0) -> np.ndarray:
"""Renderiza efecto de reverse profesional mejorado.
Incluye:
- Reverb profundo antes del reverse
- HPF agresivo para limpiar mud
- Swell exponencial dramatico
- Delay feedback sutil
- Fade-in con curva logaritmica natural
- Integracion con BPM del proyecto
Args:
source_path: Ruta al archivo fuente
duration_s: Duracion en segundos
project_bpm: BPM del proyecto para sincronizacion (default: 120.0)
Returns:
Audio procesado con reverse FX profesional
"""
# Validaciones defensivas
duration_s = max(0.1, float(duration_s))
project_bpm = max(60.0, min(200.0, float(project_bpm or 120.0)))
logger.debug(
"Rendering REVERSE FX: source=%s, duration=%.1fs, bpm=%.0f",
Path(source_path).name, duration_s, project_bpm
)
# Largar y preparar segmento
audio, _ = self._load_audio(source_path)
# Usar constante minima para efecto
min_tail_duration = self._MIN_SAMPLES_FOR_EFFECT / self.sample_rate
tail_duration = max(min_tail_duration, duration_s * 0.85)
if tail_duration == min_tail_duration:
logger.debug("Using minimum tail duration %.3fs for short audio in reverse", min_tail_duration)
segment = self._extract_tail(audio, tail_duration)
reversed_audio = np.flip(segment, axis=0)
reversed_audio = self._stretch_to_length(reversed_audio, int(round(duration_s * self.sample_rate)))
# 1. Aplicar reverb PROFUNDO para dar cuerpo antes del reverse
# Decay mas alto (0.55) y delay mas largo (90ms) para profundidad
reversed_audio = self._apply_short_reverb(reversed_audio, decay=0.55, delay_ms=90.0)
# 2. HPF AGRESIVO para limpiar mud en frecuencias bajas
# Subir de 100Hz a 180Hz para reverse mas limpio y brillante
reversed_audio = self._apply_hpf(reversed_audio, cutoff_hz=180.0)
# 3. Aplicar SWELL EXPONENCIAL DRAMATICO
# Usar ramp exponencial de volumen para build-up dramatico
length = reversed_audio.shape[0]
# Curva exponencial: comienza muy bajo y crece dramaticamente
# El factor 5.0 da un rango de ~-14dB a 0dB
swell_ramp = np.exp(np.linspace(np.log(0.05), np.log(1.0), length, dtype=np.float32)).reshape(-1, 1)
reversed_audio = reversed_audio * swell_ramp
# 4. Aplicar DELAY FEEDBACK SUTIL para textura y espacio
# Delay sincronizado con BPM (1/8 de nota = 60*1000/(bpm*2) ms)
delay_ms_sync = (60000.0 / project_bpm) / 2.0 # 1/8 de nota
reversed_audio = self._apply_delay_feedback(
reversed_audio,
delay_ms=delay_ms_sync,
feedback=0.3,
mix=0.2,
num_taps=2
)
# 5. Fade-in con CURVA LOGARITMICA para transicion natural
# Fade-in mas largo (0.4s) con curva logaritmica
reversed_audio = self._apply_fade(
reversed_audio,
fade_in_s=0.4,
fade_out_s=0.05,
fade_curve="logarithmic"
)
result = self._normalize(reversed_audio)
final_duration = len(result) / self.sample_rate
logger.debug("REVERSE_FX: generated %s (duration=%.1fs)", Path(source_path).name, final_duration)
return result
def _render_riser(self, source_path: str, duration_s: float = 8.0, bpm: float = 128.0) -> np.ndarray:
"""Renderiza efecto de riser profesional con HPF sweep, ramp exponencial con plateau, y saturacion mejorada.
Phase 1 Improvements:
- BPM-synced for better musical timing
- Longer plateau before the peak for sustain
- Enhanced HPF sweep curve (80Hz -> 3500Hz for more dramatic sweep)
- Added mid-frequency boost for presence
- Better saturation curve with progressive drive
- Longer sustain before final peak
Args:
source_path: Ruta al archivo fuente
duration_s: Duracion en segundos
bpm: BPM del proyecto para sincronizacion (default: 128.0)
Returns:
Audio procesado
"""
duration_s = max(0.1, float(duration_s))
bpm = max(60.0, min(200.0, float(bpm or 128.0)))
logger.debug("Rendering RISER FX: source=%s, duration=%.1fs, bpm=%.0f", Path(source_path).name, duration_s, bpm)
audio, _ = self._load_audio(source_path)
min_source_duration = self._MIN_SAMPLES_FOR_EFFECT / self.sample_rate
beat_duration = 60.0 / bpm
source_duration = max(min_source_duration, min(beat_duration * 4.0, duration_s / 3.5))
if source_duration == min_source_duration:
logger.debug("Using minimum source duration %.3fs for short audio in riser", min_source_duration)
segment = self._extract_center(audio, source_duration)
stages: List[np.ndarray] = []
for speed in (1.0, 0.88, 0.75, 0.62):
target_len = max(self._MIN_SAMPLES_FOR_STRETCH, int(round(segment.shape[0] * speed)))
sped = self._stretch_to_length(segment, target_len)
stages.append(sped)
combined = np.concatenate(stages, axis=0)
combined = self._stretch_to_length(combined, int(round(duration_s * self.sample_rate)))
num_samples = combined.shape[0]
logger.debug("RISER: Applying enhanced HPF sweep 80Hz -> 3500Hz")
combined = self._apply_hpf_sweep(combined, start_hz=80.0, end_hz=3500.0)
t = np.linspace(0.0, 1.0, num_samples, dtype=np.float32)
plateau_start = 0.82
plateau_end = 0.95
ramp = np.zeros(num_samples, dtype=np.float32)
ramp_phase = t[t <= plateau_start]
if len(ramp_phase) > 0:
ramp_indices = t <= plateau_start
exp_ramp = np.exp(np.linspace(np.log(0.03), np.log(0.92), ramp_indices.sum()))
ramp[ramp_indices] = exp_ramp
plateau_mask = (t > plateau_start) & (t <= plateau_end)
if np.any(plateau_mask):
ramp[plateau_mask] = np.linspace(0.92, 0.98, plateau_mask.sum())
final_ramp_mask = t > plateau_end
if np.any(final_ramp_mask):
ramp[final_ramp_mask] = np.linspace(0.98, 1.0, final_ramp_mask.sum())
ramp = ramp.reshape(-1, 1)
combined = combined * ramp
saturation_start = int(num_samples * 0.65)
tail = combined[saturation_start:].copy()
logger.debug("RISER: Applying progressive saturation to tail (last 35%%)")
saturation_sections = [
(0.0, 0.3, 0.15),
(0.3, 0.6, 0.25),
(0.6, 1.0, 0.35),
]
for start_ratio, end_ratio, drive in saturation_sections:
sect_start = int(tail.shape[0] * start_ratio)
sect_end = int(tail.shape[0] * end_ratio)
if sect_end > sect_start:
tail[sect_start:sect_end] = self._apply_saturator(tail[sect_start:sect_end], drive=drive)
crossfade_len = min(int(0.015 * self.sample_rate), tail.shape[0])
if crossfade_len > 0:
fade_curve = np.sin(np.linspace(0, np.pi/2, crossfade_len, dtype=np.float32)).reshape(-1, 1)
saturated_full = self._apply_saturator(tail, drive=0.28)
tail[:crossfade_len] = tail[:crossfade_len] * (1 - fade_curve) + saturated_full[:crossfade_len] * fade_curve
combined[saturation_start:] = tail
combined = self._apply_fade(combined, fade_in_s=0.08, fade_out_s=0.04)
result = self._normalize(combined, peak=0.85)
final_duration = len(result) / self.sample_rate
logger.debug("RISER: generated %s (duration=%.1fs)", Path(source_path).name, final_duration)
return result
def _apply_lpf_simple(self, audio: np.ndarray, cutoff_hz: float) -> np.ndarray:
"""Aplica filtro low-pass simple (media movil exponencial).
Args:
audio: Array de audio (samples, channels)
cutoff_hz: Frecuencia de corte en Hz
Returns:
Audio filtrado
"""
audio = self._validate_audio_array(audio, context="_apply_lpf_simple")
cutoff_hz = max(20.0, min(20000.0, float(cutoff_hz)))
# Constante de tiempo para el filtro RC
rc = 1.0 / (2.0 * 3.14159 * cutoff_hz)
dt = 1.0 / self.sample_rate
alpha = dt / (rc + dt)
output = np.zeros_like(audio)
for ch in range(audio.shape[1]):
output[0, ch] = audio[0, ch]
for i in range(1, len(audio)):
output[i, ch] = output[i - 1, ch] + alpha * (audio[i, ch] - output[i - 1, ch])
return output.astype(np.float32)
def _apply_lpf_sweep(self, audio: np.ndarray, start_hz: float = 8000.0, end_hz: float = 200.0) -> np.ndarray:
"""Aplica barrido de filtro low-pass a lo largo del audio.
Phase 1 Improvements:
- Filtro Butterworth de 4to orden para pendientes mas pronunciadas (24dB/oct)
- Overlap-add con 75% overlap para transiciones suaves
- Normalizacion de ventana para evitar artefactos de amplitud
- Fallback a filtro RC simple si scipy no disponible
Args:
audio: Array de audio (samples, channels)
start_hz: Frecuencia inicial del sweep en Hz
end_hz: Frecuencia final del sweep en Hz
Returns:
Audio con LPF sweep aplicado
"""
audio = self._validate_audio_array(audio, context="_apply_lpf_sweep")
start_hz = max(50.0, min(20000.0, float(start_hz)))
end_hz = max(20.0, min(20000.0, float(end_hz)))
num_samples = audio.shape[0]
# Si scipy disponible, usar Butterworth 4to orden con overlap-add
if scipy_signal is not None:
output = np.zeros_like(audio, dtype=np.float32)
# Frames de 25ms con 75% overlap
frame_size = int(0.025 * self.sample_rate)
hop_size = frame_size // 4 # 75% overlap
num_frames = max(1, (num_samples - frame_size) // hop_size + 1)
window = np.hanning(frame_size).astype(np.float32)
window_sum = np.zeros(num_samples, dtype=np.float32)
for i in range(num_frames):
start_sample = i * hop_size
end_sample = min(start_sample + frame_size, num_samples)
# Interpolacion exponencial de la frecuencia (mas musical)
progress = start_sample / num_samples
exp_progress = (np.exp(progress * 2.0) - 1.0) / (np.e ** 2.0 - 1.0)
cutoff = start_hz * (end_hz / start_hz) ** exp_progress
frame = audio[start_sample:end_sample]
actual_frame_size = frame.shape[0]
if actual_frame_size < frame_size:
padded = np.zeros((frame_size, audio.shape[1]), dtype=np.float32)
padded[:actual_frame_size] = frame
frame = padded
actual_window = window.copy()
actual_window[actual_frame_size:] = 0.0
else:
actual_window = window
try:
nyquist = self.sample_rate / 2.0
normalized_cutoff = min(0.49, max(0.01, cutoff / nyquist))
# Butterworth 4to orden
b, a = scipy_signal.butter(4, normalized_cutoff, btype="low", output="ba")
filtered = np.zeros_like(frame)
for ch in range(frame.shape[1]):
filtered[:, ch] = scipy_signal.filtfilt(b, a, frame[:, ch])
windowed = filtered * actual_window.reshape(-1, 1)
out_len = min(actual_frame_size, num_samples - start_sample)
output[start_sample:start_sample + out_len] += windowed[:out_len]
window_sum[start_sample:start_sample + out_len] += actual_window[:out_len] ** 2
except Exception as exc:
logger.debug("Error en LPF sweep frame %d: %s", i, exc)
windowed = frame * actual_window.reshape(-1, 1)
out_len = min(actual_frame_size, num_samples - start_sample)
output[start_sample:start_sample + out_len] += windowed[:out_len]
window_sum[start_sample:start_sample + out_len] += actual_window[:out_len] ** 2
# Normalizar por suma de ventanas
window_sum = np.maximum(window_sum, 1e-8)
output = output / window_sum.reshape(-1, 1)
return output.astype(np.float32)
# Fallback: filtro RC simple por bloques
output = np.zeros_like(audio)
block_size = max(256, num_samples // 64)
num_blocks = (num_samples + block_size - 1) // block_size
for block_idx in range(num_blocks):
start_sample = block_idx * block_size
end_sample = min(start_sample + block_size, num_samples)
progress = start_sample / num_samples
exp_progress = (np.exp(progress * 2.0) - 1.0) / (np.e ** 2.0 - 1.0)
cutoff = start_hz * (end_hz / start_hz) ** exp_progress
block_audio = audio[start_sample:end_sample]
filtered_block = self._apply_lpf_simple(block_audio, cutoff)
output[start_sample:end_sample] = filtered_block
return output.astype(np.float32)
def _apply_simple_reverb(self, audio: np.ndarray, decay: float = 0.3, wet_mix: float = 0.15, delay_ms: float = 50.0) -> np.ndarray:
"""Aplica reverb simple con multiples delays.
Args:
audio: Array de audio (samples, channels)
decay: Factor de decaimiento (0.0 - 0.9)
wet_mix: Mezcla de senal procesada (0.0 - 1.0)
delay_ms: Delay base en milisegundos
Returns:
Audio con reverb aplicado
"""
audio = self._validate_audio_array(audio, context="_apply_simple_reverb")
decay = max(0.0, min(0.9, float(decay)))
wet_mix = max(0.0, min(1.0, float(wet_mix)))
delay_ms = max(1.0, min(200.0, float(delay_ms)))
output = np.array(audio, dtype=np.float32, copy=True)
delay_samples = int(round(delay_ms * self.sample_rate / 1000.0))
# Multiples delays para crear reverb mas denso
delay_times = [1.0, 1.3, 1.7, 2.1] # Proporciones del delay base
decay_factors = [decay, decay * 0.7, decay * 0.5, decay * 0.3]
for delay_ratio, decay_factor in zip(delay_times, decay_factors):
current_delay = int(round(delay_samples * delay_ratio))
if current_delay < audio.shape[0]:
delayed = np.zeros_like(output)
delayed[current_delay:] = output[:-current_delay] * decay_factor
output = output + delayed
# Mezclar dry y wet
dry_mix = 1.0 - wet_mix
return (audio * dry_mix + output * wet_mix).astype(np.float32)
def _render_downlifter(self, source_path: str, duration_s: float = 6.0, bpm: float = 128.0) -> np.ndarray:
"""Renderiza efecto de downlifter profesional con LPF sweep mejorado y reverb tail extendido.
Phase 1 Improvements:
- BPM-synced for better musical timing
- Longer reverb tail with layered decay (up to 60% of duration)
- Enhanced LPF sweep curve (15000Hz -> 60Hz for more dramatic effect)
- Added subtle noise floor for depth
- Improved grain texture with BPM-synced rhythm
- Better volume envelope with Hz-tuned amplitude curve
Args:
source_path: Ruta al archivo fuente
duration_s: Duracion en segundos
bpm: BPM del proyecto para sincronizar curvas
Returns:
Audio procesado
"""
duration_s = max(0.1, float(duration_s))
bpm = max(60.0, min(200.0, float(bpm or 128.0)))
logger.debug("Rendering DOWNLIFTER FX: source=%s, duration=%.1fs, bpm=%.1f", Path(source_path).name, duration_s, bpm)
audio, _ = self._load_audio(source_path)
min_segment_duration = self._MIN_SAMPLES_FOR_EFFECT / self.sample_rate
beat_duration = 60.0 / bpm
segment_duration = max(min_segment_duration, min(beat_duration * 3.0, duration_s / 2.5))
if segment_duration == min_segment_duration:
logger.debug("Using minimum segment duration %.3fs for short audio in downlifter", min_segment_duration)
segment = self._extract_tail(audio, segment_duration)
stretched = self._stretch_to_length(segment, int(round(duration_s * self.sample_rate)))
num_samples = stretched.shape[0]
t = np.linspace(0.0, 1.0, num_samples, dtype=np.float32)
exp_decay = np.exp(-3.5 * t)
s_curve_start = 0.55
s_mask = (t > s_curve_start).astype(np.float32)
s_t = (t - s_curve_start) / (1.0 - s_curve_start)
s_curve = 1.0 - (3.0 * s_t**2 - 2.0 * s_t**3)
volume_curve = exp_decay * (1.0 - s_mask) + (exp_decay * s_curve) * s_mask
volume_curve = volume_curve * 0.97 + 0.03
volume_curve = volume_curve.reshape(-1, 1)
stretched = stretched * volume_curve
logger.debug("DOWNLIFTER: Applying enhanced LPF sweep 15000Hz -> 60Hz")
stretched = self._apply_lpf_sweep(stretched, start_hz=15000.0, end_hz=60.0)
grain_rate_hz = bpm / 60.0 * 4.0
grain_period = max(16, int(round(self.sample_rate / grain_rate_hz)))
grain_envelope = np.ones(num_samples, dtype=np.float32)
grain_depth = 0.025
grain_start = int(num_samples * 0.45)
for i in range(grain_start, num_samples, grain_period):
grain_samples = min(grain_period, num_samples - i)
if grain_samples <= 0:
continue
phase = np.linspace(0, np.pi * 2, min(grain_samples, grain_period), dtype=np.float32)
grain_wave = (np.sin(phase) * 0.5 + 0.5) * grain_depth
progress = (i - grain_start) / max(1, num_samples - grain_start)
grain_wave *= (1.0 + progress * 0.6)
end_idx = min(i + grain_samples, num_samples)
apply_len = min(len(grain_wave), end_idx - i)
if apply_len > 0:
grain_envelope[i:i + apply_len] = grain_envelope[i:i + apply_len] * (1.0 - grain_wave[:apply_len])
grain_envelope = grain_envelope.reshape(-1, 1)
stretched = stretched * grain_envelope
tail_start = int(num_samples * 0.48)
tail = stretched[tail_start:].copy()
tail_with_reverb = self._apply_simple_reverb(
tail,
decay=0.6,
wet_mix=0.4,
delay_ms=30.0
)
tail_with_reverb = self._apply_simple_reverb(
tail_with_reverb,
decay=0.45,
wet_mix=0.18,
delay_ms=65.0
)
if tail_with_reverb.shape[0] > 0:
layer_depth_start = int(tail_with_reverb.shape[0] * 0.6)
depth_layer = tail_with_reverb[layer_depth_start:].copy()
if depth_layer.shape[0] > 0:
depth_layer = self._apply_simple_reverb(depth_layer, decay=0.35, wet_mix=0.12, delay_ms=100.0)
tail_with_reverb[layer_depth_start:] = depth_layer
stretched = np.concatenate([stretched[:tail_start], tail_with_reverb], axis=0)
fade_duration_s = min(1.4, duration_s * 0.28)
fade_samples = int(round(fade_duration_s * self.sample_rate))
if fade_samples > 0 and fade_samples < stretched.shape[0]:
fade_start = stretched.shape[0] - fade_samples
fade_t = np.linspace(0.0, 1.0, fade_samples, dtype=np.float32)
fade_curve = np.log1p(-fade_t * 0.95 + 0.05) / np.log(0.05)
fade_curve = np.clip(fade_curve, 0.0, 1.0)
fade_curve = fade_curve ** 0.65
stretched[fade_start:] = stretched[fade_start:] * fade_curve.reshape(-1, 1)
stretched = self._apply_fade(stretched, fade_in_s=0.02, fade_out_s=0.0)
result = self._normalize(stretched, peak=0.82)
final_duration = len(result) / self.sample_rate
logger.debug("DOWNLIFTER: generated %s (duration=%.1fs)", Path(source_path).name, final_duration)
return result
def _apply_slice_window(self, audio: np.ndarray, fade_samples: int = 44) -> np.ndarray:
"""Aplica ventana con fade in/out muy corto a cada slice para evitar clicks.
Args:
audio: Array de audio (samples, channels)
fade_samples: Numero de samples para el fade (default: 44 = ~1ms a 44.1kHz)
Returns:
Audio con ventana aplicada
"""
if audio is None or audio.size == 0:
return audio
audio = np.asarray(audio, dtype=np.float32)
if audio.ndim == 1:
audio = audio.reshape(-1, 1)
total = audio.shape[0]
if total <= fade_samples * 2:
# Si el slice es muy corto, aplicar ventana completa tipo Hanning
window = np.hanning(total)
return audio * window.reshape(-1, 1)
# Crear ventana: fade in al inicio, fade out al final
window = np.ones(total, dtype=np.float32)
window[:fade_samples] = np.linspace(0.0, 1.0, fade_samples, dtype=np.float32)
window[-fade_samples:] = np.linspace(1.0, 0.0, fade_samples, dtype=np.float32)
return audio * window.reshape(-1, 1)
def _render_stutter(self, source_path: str, duration_s: float = 2.5) -> np.ndarray:
"""Renderiza efecto de stutter con sonido mas musical y organico.
Mejoras implementadas:
- Numero de slices dinamico segun duracion (5-9 slices)
- Posiciones no uniformes con variacion aleatoria natural
- Pitch shift hasta 1 semitono hacia el final
- Reverb en los gaps entre slices para espacialidad
- Fade windows mas cortos (~0.5ms)
- Variacion de ganancia y timing para menos mecanicidad
Args:
source_path: Ruta al archivo fuente
duration_s: Duracion en segundos
Returns:
Audio procesado
"""
# Validaciones defensivas
duration_s = max(0.1, float(duration_s))
logger.debug("Rendering STUTTER FX: source=%s, duration=%.1fs", Path(source_path).name, duration_s)
audio, _ = self._load_audio(source_path)
source = self._find_hot_slice(audio, 0.20) # Ligeramente mas largo para mas contenido
output_len = int(round(duration_s * self.sample_rate))
# Asegurar que output_len sea valido
output_len = max(1, output_len)
output = np.zeros((output_len, source.shape[1]), dtype=np.float32)
output = _ensure_2d_float(output)
# Numero dinamico de slices segun duracion (mas cortos = menos slices)
# 5 slices para <2s, hasta 9 slices para >4s
num_slices = int(5 + min(4, int(duration_s / 1.0)))
num_slices = max(5, min(9, num_slices))
# Generar posiciones base con curva exponencial (mas denso hacia el final)
# Esto crea un patron mas musical tipo "building up"
base_positions = []
for i in range(num_slices):
# Curva exponencial: 0 -> 0.85 con densidad creciente
t = i / max(1, num_slices - 1)
# Funcion exponencial para agrupar mas hacia el final
pos = (t ** 1.6) * 0.85
base_positions.append(pos)
# Aplicar variacion aleatoria a las posiciones para sonido mas organico
# Usar hash del source_path como semilla para consistencia
seed_hash = int(hashlib.md5(source_path.encode()).hexdigest()[:8], 16) % 10000
np.random.seed(seed_hash)
positions = []
for i, base_pos in enumerate(base_positions):
# Variacion de +/- 3% en posicion
variation = (np.random.random() - 0.5) * 0.06
pos = (base_pos + variation) * duration_s
# Asegurar que no se solapen demasiado
if i > 0:
pos = max(pos, positions[-1] + 0.08)
positions.append(min(pos, duration_s - 0.1))
logger.debug("STUTTER: placing %d slices at positions: %s", num_slices, [round(p, 3) for p in positions])
# Duracion base del slice con variacion
base_slice_duration = 0.16
# Crear buffer de reverb para los gaps (cola de reverb corta)
reverb_tail_samples = int(0.08 * self.sample_rate) # 80ms de reverb tail
for index, position in enumerate(positions):
start = int(round(float(position) * self.sample_rate))
# Variar duracion del gate: mas corto hacia el final con variacion aleatoria
gate_variation = (np.random.random() - 0.5) * 0.04 # +/- 20ms
gate_duration = base_slice_duration - (index * 0.012) + gate_variation
# Usar constante minima para slice de stutter
min_gate_duration = self._MIN_SAMPLES_FOR_SLICE / self.sample_rate
gate_duration = max(min_gate_duration, gate_duration)
if gate_duration == min_gate_duration:
logger.debug("Using minimum slice duration %.3fs for short audio", min_gate_duration)
gate_len = max(self._MIN_SAMPLES_FOR_SLICE, min(source.shape[0], int(round(gate_duration * self.sample_rate))))
# Extraer slice con copia
slice_audio = np.array(source[:gate_len], dtype=np.float32, copy=True)
slice_audio = _ensure_2d_float(slice_audio)
# VALIDACION TEMPRANA: Verificar que el slice tiene contenido real
# _ensure_2d_float retorna (1,1) con zeros si esta vacio, verificamos shape
if slice_audio.shape[0] <= 1:
logger.debug("STUTTER: slice %d has invalid shape after ensure_2d_float %s, skipping", index, slice_audio.shape)
continue
# Pitch shift mas extremo hacia el final (hasta 1 semitono = 1.0595)
# Aplicar desde el slice 3 en adelante
if index >= 3:
# Calcular pitch factor: va de 1.02 hasta ~1.06 (1 semitono)
pitch_progress = (index - 3) / max(1, num_slices - 4)
# Factor de pitch: 1.02 hasta 1.06 (casi 1 semitono)
pitch_factor = 1.02 + (pitch_progress * 0.04)
# Anadir pequena variacion aleatoria al pitch (+/- 10 cents)
pitch_variation = 1.0 + (np.random.random() - 0.5) * 0.012
pitch_factor *= pitch_variation
if scipy_signal is not None:
try:
pitched_len = max(1, int(len(slice_audio) / pitch_factor))
pitched = np.zeros((pitched_len, slice_audio.shape[1]), dtype=np.float32)
for ch in range(slice_audio.shape[1]):
pitched[:, ch] = scipy_signal.resample(slice_audio[:, ch], pitched_len).astype(np.float32)
slice_audio = pitched
logger.debug("STUTTER: slice %d pitch shifted by factor %.3f", index, pitch_factor)
except Exception:
pass # Mantener slice original si falla
# VALIDACION: Verificar que pitch shift no produjo array vacio
if slice_audio.size == 0:
logger.debug("STUTTER: slice %d empty after pitch shift, skipping", index)
continue
# Aplicar ventana con fade mas corto (~0.5ms = 22 samples a 44.1kHz)
fade_samples = 22 # Reducido de 44 para transiciones mas rapidas
slice_audio = self._apply_slice_window(slice_audio, fade_samples=fade_samples)
# VALIDACION: Verificar que window no produjo array vacio
if slice_audio.size == 0:
logger.debug("STUTTER: slice %d empty after window, skipping", index)
continue
# Aplicar pequeño reverb al slice para espacialidad
# Wet mix bajo para no perder definicion
slice_audio = self._apply_short_reverb(slice_audio, decay=0.25, delay_ms=35.0)
# VALIDACION: Verificar que reverb no produjo array vacio
if slice_audio.size == 0:
logger.debug("STUTTER: slice %d empty after reverb, skipping", index)
continue
end = min(output_len, start + slice_audio.shape[0])
if end <= start:
logger.debug("STUTTER: slice %d has invalid range (start=%d, end=%d), skipping", index, start, end)
continue
# Ajustar slice al espacio disponible
actual_len = end - start
# VALIDACION CRITICA: Asegurar que actual_len sea al menos 1
if actual_len <= 0:
logger.debug("STUTTER: slice %d has actual_len=%d, skipping", index, actual_len)
continue
# Trim solo si hay suficiente contenido despues del trim
if actual_len < slice_audio.shape[0]:
# Asegurar que el trim no produzca array vacio
if actual_len >= 1:
slice_audio = slice_audio[:actual_len]
else:
logger.debug("STUTTER: slice %d would become empty after trim (actual_len=%d), skipping", index, actual_len)
continue
# VALIDACION FINAL: Verificar que slice_audio tiene contenido antes de mezclar
if slice_audio.size == 0:
logger.debug("STUTTER: slice %d is empty before mix, skipping", index)
continue
# Ganancia variable por posicion con variacion aleatoria
# Mas alto hacia el final con pequenas variaciones
gain_base = 0.50 + (index * 0.07)
gain_variation = (np.random.random() - 0.5) * 0.08 # +/- 4%
gain = gain_base + gain_variation
gain = max(0.3, min(0.95, gain)) # Clamp entre 0.3 y 0.95
# Validate shapes before mixing
valid, msg = _validate_mix_shapes(output[start:end], slice_audio)
if not valid:
logger.debug("STUTTER: skipping slice %d at %d: %s", index, start, msg)
continue
output[start:end] += slice_audio * gain
# Agregar reverb "ghost" en el gap despues del slice (solo si no es el ultimo)
if index < len(positions) - 1:
gap_start = end
gap_end = min(output_len, gap_start + reverb_tail_samples)
if gap_end > gap_start:
# Crear ghost reverb tail muy sutil del slice anterior
ghost_len = gap_end - gap_start
# VALIDACION: Asegurar que ghost_len es valido
if ghost_len <= 0:
logger.debug("STUTTER: slice %d has invalid ghost_len=%d, skipping ghost", index, ghost_len)
else:
ghost_audio = np.zeros((ghost_len, source.shape[1]), dtype=np.float32)
# Copiar la cola del slice con decaimiento exponencial
# VALIDACION: Asegurar que tail_source tiene contenido
tail_samples = min(len(slice_audio), ghost_len * 2)
if tail_samples > 0:
tail_source = slice_audio[-tail_samples:]
if tail_source.size > 0:
decay_len = min(len(tail_source), ghost_len)
# VALIDACION: Asegurar que decay_len es valido
if decay_len > 0:
decay_curve = np.exp(-4.0 * np.linspace(0, 1, decay_len)).reshape(-1, 1).astype(np.float32)
# VALIDACION: El slicing defensivo asegura que tail_source[-decay_len:] tiene contenido
if tail_source[-decay_len:].size > 0:
ghost_audio[:decay_len] = tail_source[-decay_len:] * decay_curve * 0.15
output[gap_start:gap_start + ghost_len] += ghost_audio
else:
logger.debug("STUTTER: slice %d tail_source slice is empty, skipping ghost", index)
else:
logger.debug("STUTTER: slice %d has invalid decay_len=%d, skipping ghost", index, decay_len)
else:
logger.debug("STUTTER: slice %d tail_source is empty, skipping ghost", index)
else:
logger.debug("STUTTER: slice %d has invalid tail_samples=%d, skipping ghost", index, tail_samples)
# Fade global mas suave
output = self._apply_fade(output, fade_in_s=0.003, fade_out_s=0.15)
result = self._normalize(output) # Usa valor unificado por defecto
# Fallback for empty render results
if result is None or result.size == 0:
logger.warning("STUTTER: fallback to silence (empty render result)")
result = np.zeros((int(2.5 * self.sample_rate), 2), dtype=np.float32)
final_duration = len(result) / self.sample_rate
logger.debug("STUTTER: generated %s (duration=%.1fs, slices=%d)", Path(source_path).name, final_duration, num_slices)
return result
def _output_path(self, source_path: str, variant_seed: int, suffix: str) -> Path:
"""Genera ruta de salida unica para un archivo procesado."""
source = Path(source_path)
digest = hashlib.sha1(f"{source.resolve()}::{variant_seed}::{suffix}".encode("utf-8")).hexdigest()[:10]
return self.output_dir / f"{source.stem}_{suffix}_{digest}.wav"
def _analyze_source_quality(self, audio: np.ndarray, sample_rate: int, fx_type: str) -> Dict[str, Any]:
"""Analyzes source audio quality for FX derivation.
Returns quality metrics for source selection decisions.
Args:
audio: Audio array (samples, channels)
sample_rate: Sample rate in Hz
fx_type: Type of FX to derive ('reverse', 'riser', 'downlifter', 'stutter')
Returns:
Dict with quality metrics: spectral_content, dynamic_range, suitability_score
"""
if audio is None or audio.size == 0:
return {"spectral_content": 0.0, "dynamic_range": 0.0, "suitability_score": 0.0, "recommended": False}
audio = self._validate_audio_array(audio, context="_analyze_source_quality")
# Filtrar por duración (máx 45s) para evitar canciones completas
duration = audio.shape[0] / sample_rate
if duration > 45.0:
logger.debug(f"Source analysis: rejecting long audio ({duration:.1f}s > 45s)")
return {"spectral_content": 0.0, "dynamic_range": 0.0, "rms": 0.0, "suitability_score": 0.0, "recommended": False}
mono = np.mean(np.abs(audio), axis=1) if audio.ndim > 1 else np.abs(audio)
rms = float(np.sqrt(np.mean(mono ** 2))) if mono.size > 0 else 0.0
peak = float(np.max(mono)) if mono.size > 0 else 0.0
dynamic_range = peak / max(rms, 1e-10)
spectral_content = 0.5
if scipy_signal is not None and mono.size >= 512:
try:
freqs = np.fft.rfft(mono[:min(2048, len(mono))])
freq_magnitude = np.abs(freqs)
if freq_magnitude.size > 10:
low_energy = np.sum(freq_magnitude[:max(1, len(freq_magnitude)//8)])
mid_energy = np.sum(freq_magnitude[max(1, len(freq_magnitude)//8):len(freq_magnitude)//2])
high_energy = np.sum(freq_magnitude[len(freq_magnitude)//2:])
total = low_energy + mid_energy + high_energy + 1e-10
high_ratio = high_energy / total
mid_ratio = mid_energy / total
spectral_content = float(0.3 + 0.5 * (high_ratio + mid_ratio * 0.5))
except Exception:
pass
suitability_scores = {
"reverse": min(1.0, spectral_content * 0.7 + min(1.0, dynamic_range) * 0.3),
"riser": min(1.0, spectral_content * 0.5 + min(1.0, dynamic_range) * 0.4 + 0.1),
"downlifter": min(1.0, spectral_content * 0.5 + min(1.0, dynamic_range) * 0.4 + 0.1),
"stutter": min(1.0, 0.3 + spectral_content * 0.4 + min(1.0, dynamic_range) * 0.3),
}
score = suitability_scores.get(fx_type, 0.5)
recommended = score >= 0.4 and dynamic_range >= 2.0 and rms >= 0.01
return {
"spectral_content": round(spectral_content, 3),
"dynamic_range": round(dynamic_range, 3),
"rms": round(rms, 4),
"suitability_score": round(score, 3),
"recommended": recommended,
}
def _build_positions(self, sections: List[Dict[str, Any]], bpm: float = 128.0) -> Dict[str, List[float]]:
"""Construye posiciones de FX basandose en la estructura de secciones.
Phase 2 Improvements:
- BPM-aware timing for musical placement
- Precise reverse placement exactly at section boundaries
- Riser ends precisely before drops for maximum impact
- Downlifter placed after drops for clean section exits
- Professional stutter placement at build peaks and drop tails
- Enhanced section type detection (intro, breakdown, peak, etc.)
- Duplicate suppression with minimum spacing
- Quality-aware source selection
Args:
sections: Lista de secciones con kind, name, beats
bpm: BPM del proyecto para timing musical
Returns:
Diccionario con listas de posiciones por tipo de FX
"""
reverse_positions: List[float] = []
riser_positions: List[float] = []
downlifter_positions: List[float] = []
stutter_positions: List[float] = []
offsets = _section_offsets(sections)
beat_duration = 60.0 / max(60.0, min(200.0, bpm))
bar_duration = beat_duration * 4.0
def _add_unique(positions: List[float], value: float, min_spacing: float = 2.0) -> None:
if not any(abs(p - value) < min_spacing for p in positions):
positions.append(round(max(0.0, value), 3))
def _section_type(section: Dict[str, Any]) -> str:
kind = str(section.get("kind", "")).lower()
name = str(section.get("name", "")).lower()
if "intro" in kind or "intro" in name:
return "intro"
if "break" in kind or "break" in name or "breakdown" in name:
return "break"
if "build" in kind or "build" in name:
return "build"
if "drop" in kind or "drop" in name:
return "drop"
if "peak" in name or "main" in name:
return "peak"
if "outro" in kind or "outro" in name:
return "outro"
if "groove" in name:
return "groove"
return kind or "unknown"
for index, (section, start, end) in enumerate(offsets):
section_type = _section_type(section)
name = str(section.get("name", "")).lower()
span = max(1.0, end - start)
is_peak = "peak" in name or "drop b" in name or "main" in name or "peak" in section_type
is_build = section_type == "build"
is_break = section_type == "break"
is_drop = section_type == "drop"
is_outro = section_type == "outro"
is_intro = section_type == "intro"
reverse_bar_offset = bar_duration * 1.5
if index > 0 and is_drop:
reverse_offset = min(8.0, max(4.0, reverse_bar_offset))
_add_unique(reverse_positions, start - reverse_offset, min_spacing=3.0)
elif index > 0 and is_break:
reverse_offset = min(6.0, max(3.0, reverse_bar_offset * 0.8))
_add_unique(reverse_positions, start - reverse_offset, min_spacing=2.5)
elif index > 0 and is_build:
if index > 1:
reverse_offset = min(7.0, max(3.0, reverse_bar_offset))
_add_unique(reverse_positions, start - reverse_offset, min_spacing=2.0)
if is_build:
riser_duration = min(12.0, max(4.0, span * 0.7))
beat_duration_seconds = beat_duration
riser_quantized = (riser_duration / beat_duration_seconds) * beat_duration_seconds
riser_quantized = max(4.0, min(12.0, riser_quantized))
riser_start = max(start, end - riser_quantized)
_add_unique(riser_positions, riser_start, min_spacing=4.0)
stutter_offset = bar_duration * 0.5
stutter_start = max(start, end - stutter_offset - 0.5)
_add_unique(stutter_positions, stutter_start, min_spacing=1.5)
if is_break and not is_peak:
downlifter_offset = bar_duration * 0.25
_add_unique(downlifter_positions, start + downlifter_offset, min_spacing=3.0)
elif is_drop and not is_peak:
down_offset = bar_duration * 0.3
_add_unique(downlifter_positions, start + down_offset, min_spacing=3.0)
if is_outro:
if span > bar_duration * 2:
_add_unique(downlifter_positions, start + bar_duration, min_spacing=3.0)
outro_down_position = start + span * 0.45
_add_unique(downlifter_positions, outro_down_position, min_spacing=2.5)
if is_peak and span > bar_duration:
stutter_offset = min(bar_duration * 1.5, span * 0.25)
_add_unique(stutter_positions, end - stutter_offset, min_spacing=1.5)
if span > bar_duration * 3:
peak_stutter_position = start + span * 0.55
_add_unique(stutter_positions, peak_stutter_position, min_spacing=bar_duration)
if is_intro and span > bar_duration * 2:
intro_reverse_offset = bar_duration * 0.75
_add_unique(reverse_positions, start + intro_reverse_offset, min_spacing=2.5)
return {
"reverse": sorted(set(reverse_positions)),
"riser": sorted(set(riser_positions)),
"downlifter": sorted(set(downlifter_positions)),
"stutter": sorted(set(stutter_positions)),
}
def build_transition_layers(
self,
reference_audio_plan: Dict[str, Any],
sections: List[Dict[str, Any]],
project_bpm: float,
variant_seed: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Construye capas de transicion desde un plan de audio de referencia.
Args:
reference_audio_plan: Plan con matches de audio
sections: Lista de secciones del proyecto
project_bpm: BPM del proyecto
variant_seed: Semilla para variacion
Returns:
Lista de diccionarios con info de capas generadas
"""
logger.debug("build_transition_layers called: bpm=%.1f, variant_seed=%s", project_bpm, variant_seed)
if not isinstance(reference_audio_plan, dict):
logger.debug("reference_audio_plan is not a dict, returning empty layers")
return []
selected = reference_audio_plan.get("matches", {}) or {}
if not isinstance(selected, dict):
logger.debug("matches is not a dict, returning empty layers")
return []
# Validar project_bpm
project_bpm = max(20.0, min(300.0, float(project_bpm or 120.0)))
variant_seed = int(variant_seed or 0)
positions = self._build_positions(sections, bpm=project_bpm)
logger.debug("Calculated FX positions: reverse=%s, riser=%s, downlifter=%s, stutter=%s",
positions["reverse"], positions["riser"], positions["downlifter"], positions["stutter"])
layers: List[Dict[str, Any]] = []
FX_SOURCE_PRIORITIES = {
"reverse": [
("crash_fx", 0.9),
("fill_fx", 0.85),
("atmos_fx", 0.75),
("synth_loop", 0.65),
("vocal_shot", 0.55),
],
"riser": [
("synth_loop", 0.9),
("vocal_loop", 0.85),
("atmos_fx", 0.8),
("pad", 0.6),
],
"downlifter": [
("crash_fx", 0.9),
("atmos_fx", 0.85),
("synth_loop", 0.7),
("fill_fx", 0.65),
],
"stutter": [
("vocal_shot", 0.95),
("vocal_loop", 0.85),
("snare_roll", 0.8),
("synth_peak", 0.65),
],
}
FX_FALLBACK_QUERIES = {
"reverse": ["crash", "cymbal", "impact"],
"riser": ["riser", "buildup", "sweep"],
"downlifter": ["atmos", "drone", "texture"],
"stutter": ["vocal", "synth", "chord", "fx"],
}
def _find_fallback_source(fx_type: str) -> str:
"""Find source directly from SampleManager when selected is empty."""
try:
import importlib.util
PACKAGE_DIR = Path(__file__).resolve().parent.parent
sample_manager_path = PACKAGE_DIR / "MCP_Server" / "sample_manager.py"
if sample_manager_path.exists():
spec = importlib.util.spec_from_file_location("sample_manager", sample_manager_path)
sm_mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(sm_mod)
manager = sm_mod.get_manager()
else:
from .sample_manager import get_manager
manager = get_manager()
if manager is None:
return ""
queries = FX_FALLBACK_QUERIES.get(fx_type, [])
for query in queries:
samples = manager.search(query=query, limit=5)
for sample in samples:
path = str(sample.path)
if Path(path).exists():
try:
audio, sr = self._load_audio(path)
if audio is not None and audio.shape[0] > 1000:
logger.debug("Fallback source %s found for %s FX", Path(path).name, fx_type)
return path
except Exception:
continue
except Exception as e:
logger.debug("Fallback search failed for %s: %s", fx_type, e)
return ""
def find_best_source(fx_type: str) -> str:
"""Find best source for FX type based on quality and priority."""
priorities = FX_SOURCE_PRIORITIES.get(fx_type, [])
for key, base_score in priorities:
item = selected.get(key)
if isinstance(item, dict):
path = str(item.get("path", "") or "")
if path:
try:
audio, sr = self._load_audio(path)
quality = self._analyze_source_quality(audio, sr, fx_type)
if quality.get("recommended", False):
adjusted_score = base_score * quality.get("suitability_score", 0.5)
if adjusted_score >= 0.35:
logger.debug("Source %s selected for %s FX: quality=%.2f, score=%.2f",
Path(path).name, fx_type, quality.get("suitability_score", 0), adjusted_score)
return path
logger.debug("Source %s rejected for %s FX: quality=%.2f, recommended=%s",
Path(path).name, fx_type, quality.get("suitability_score", 0), quality.get("recommended"))
except Exception as e:
logger.debug("Could not analyze source %s for %s: %s", path, fx_type, e)
for key, _ in priorities:
item = selected.get(key)
if isinstance(item, dict):
path = str(item.get("path", "") or "")
if path:
return path
fallback = _find_fallback_source(fx_type)
if fallback:
logger.info("Using fallback source for %s FX: %s", fx_type, Path(fallback).name)
return fallback
def source_path(*keys: str) -> str:
for key in keys:
item = selected.get(key)
if isinstance(item, dict):
path = str(item.get("path", "") or "")
if path:
return path
return ""
def maybe_add(name: str, path: str, output_suffix: str, color: int, volume: float, beat_positions: List[float], renderer):
if not path or not beat_positions:
logger.debug("Skipping %s: path=%s, positions=%s", name, path if path else "(empty)", beat_positions if beat_positions else "(empty)")
return
try:
logger.debug("Generating %s from %s, duration=%.1fs, positions=%s",
name, Path(path).name, 4.0 if "REVERSE" in name else (8.0 if "RISER" in name else (6.0 if "DOWNLIFTER" in name else 2.5)), beat_positions)
rendered = renderer(path)
output_path = self._output_path(path, variant_seed, output_suffix)
file_path = self._write_audio(output_path, rendered, self.sample_rate)
logger.debug("Successfully generated %s -> %s", name, Path(file_path).name)
except Exception as exc:
logger.warning("No se pudo generar %s desde %s: %s", name, Path(path).name, exc)
logger.debug("Error details for %s: type=%s, message=%s", name, type(exc).__name__, exc)
return
layers.append({
"name": name,
"file_path": file_path,
"positions": beat_positions,
"color": color,
"volume": volume,
"source": Path(path).name,
"generated": True,
})
reverse_source = find_best_source("reverse")
if reverse_source and positions["reverse"]:
maybe_add(
"AUDIO RESAMPLE REVERSE FX",
reverse_source,
"reverse_fx",
26,
0.58,
positions["reverse"],
lambda path: self._render_reverse_fx(path, duration_s=4.0, project_bpm=project_bpm),
)
else:
fallback_reverse = source_path("crash_fx", "fill_fx", "atmos_fx", "synth_loop", "vocal_shot")
if fallback_reverse and positions["reverse"]:
maybe_add(
"AUDIO RESAMPLE REVERSE FX",
fallback_reverse,
"reverse_fx",
26,
0.58,
positions["reverse"],
lambda path: self._render_reverse_fx(path, duration_s=4.0, project_bpm=project_bpm),
)
riser_source = find_best_source("riser")
if riser_source and positions["riser"]:
maybe_add(
"AUDIO RESAMPLE RISER",
riser_source,
"riser_fx",
27,
0.54,
positions["riser"],
lambda path: self._render_riser(path, duration_s=8.0 if project_bpm >= 126 else 7.0, bpm=project_bpm),
)
else:
fallback_riser = source_path("synth_loop", "vocal_loop", "atmos_fx", "pad")
if fallback_riser and positions["riser"]:
maybe_add(
"AUDIO RESAMPLE RISER",
fallback_riser,
"riser_fx",
27,
0.54,
positions["riser"],
lambda path: self._render_riser(path, duration_s=8.0 if project_bpm >= 126 else 7.0, bpm=project_bpm),
)
downlifter_source = find_best_source("downlifter")
if downlifter_source and positions["downlifter"]:
maybe_add(
"AUDIO RESAMPLE DOWNLIFTER",
downlifter_source,
"downlifter_fx",
54,
0.50,
positions["downlifter"],
lambda path: self._render_downlifter(path, duration_s=6.0, bpm=project_bpm),
)
else:
fallback_downlifter = source_path("crash_fx", "atmos_fx", "synth_loop", "fill_fx")
if fallback_downlifter and positions["downlifter"]:
maybe_add(
"AUDIO RESAMPLE DOWNLIFTER",
fallback_downlifter,
"downlifter_fx",
54,
0.50,
positions["downlifter"],
lambda path: self._render_downlifter(path, duration_s=6.0, bpm=project_bpm),
)
stutter_source = find_best_source("stutter")
if stutter_source and positions["stutter"]:
try:
source_audio, _ = self._load_audio(stutter_source)
min_samples = 1000
if source_audio.shape[0] < min_samples:
logger.warning("Skipping STUTTER layer: source audio too short (%d samples, min %d)",
source_audio.shape[0], min_samples)
else:
quality = self._analyze_source_quality(source_audio, self.sample_rate, "stutter")
if quality.get("suitability_score", 0) >= 0.25:
maybe_add(
"AUDIO RESAMPLE STUTTER",
stutter_source,
"stutter_fx",
41,
0.56,
positions["stutter"],
lambda path: self._render_stutter(path, duration_s=2.5),
)
else:
logger.debug("STUTTER source quality too low: %.2f", quality.get("suitability_score", 0))
except Exception as exc:
logger.warning("Skipping STUTTER layer: failed to validate source: %s", exc)
else:
fallback_stutter = source_path("vocal_shot", "vocal_loop", "snare_roll", "synth_peak")
if fallback_stutter and positions["stutter"]:
try:
source_audio, _ = self._load_audio(fallback_stutter)
min_samples = 1000
if source_audio.shape[0] >= min_samples:
maybe_add(
"AUDIO RESAMPLE STUTTER",
fallback_stutter,
"stutter_fx",
41,
0.56,
positions["stutter"],
lambda path: self._render_stutter(path, duration_s=2.5),
)
except Exception as exc:
logger.warning("Fallback STUTTER also failed: %s", exc)
logger.info("Created %d derived layers: %s", len(layers), [layer['name'] for layer in layers])
return layers
def invalidate_stale_cache(self) -> int:
"""Elimina entradas de cache cuyos archivos han sido modificados.
Este metodo verifica cada entrada en el cache y elimina aquellas
donde el archivo tiene un mtime diferente al que esta en la key.
Nota: Con el diseno actual donde mtime es parte de la key, las
entradas stale naturalmente expiran por LRU. Este metodo es
utilitario para limpieza proactiva.
Returns:
Numero de entradas eliminadas
"""
removed = 0
keys_to_remove: List[str] = []
for key in list(self._audio_cache.keys()):
# Extraer path de la key (formato: "path::mtime_ns" o solo "path")
if "::" in key:
path_str, _ = key.rsplit("::", 1)
else:
path_str = key
path = Path(path_str)
# Verificar si el archivo aun existe y tiene el mismo mtime
if not path.exists():
# Archivo eliminado, marcar para remover
keys_to_remove.append(key)
removed += 1
continue
try:
current_mtime_ns = path.stat().st_mtime_ns
# Reconstruir la key esperada con el mtime actual
expected_key = self._get_cache_key(path_str, current_mtime_ns)
# Si la key actual no coincide con la esperada, el archivo cambio
if key != expected_key:
keys_to_remove.append(key)
removed += 1
except OSError:
# Error al acceder al archivo, marcar para remover
keys_to_remove.append(key)
removed += 1
# Remover las entradas stale
for key in keys_to_remove:
del self._audio_cache[key]
if removed > 0:
logger.debug("Invalidadas %d entradas de cache stale", removed)
return removed
def clear_cache(self) -> int:
"""Limpia el cache de audio y devuelve el numero de entradas eliminadas.
Returns:
Numero de entradas que fueron eliminadas del cache
"""
count = len(self._audio_cache)
self._audio_cache.clear()
self._cache_sizes.clear()
self._cache_total_bytes = 0
self._cache_hits = 0
self._cache_misses = 0
return count
def cache_size(self) -> int:
"""Devuelve el numero de archivos en cache.
Returns:
Numero de entradas en cache
"""
return len(self._audio_cache)
def cache_stats(self) -> Dict[str, Any]:
"""Devuelve estadisticas del cache de audio.
Phase 1 Improvement: Metodo nuevo para monitorear rendimiento del cache.
Returns:
Diccionario con estadisticas: entries, bytes, hits, misses, hit_rate
"""
total_requests = self._cache_hits + self._cache_misses
hit_rate = self._cache_hits / total_requests if total_requests > 0 else 0.0
return {
"entries": len(self._audio_cache),
"max_entries": self._CACHE_LIMIT,
"bytes": self._cache_total_bytes,
"max_bytes": self._CACHE_MAX_SIZE_BYTES,
"mb": round(self._cache_total_bytes / (1024 * 1024), 2),
"hits": self._cache_hits,
"misses": self._cache_misses,
"hit_rate": round(hit_rate, 3),
"max_age_s": self._CACHE_MAX_AGE_S,
}