Files
cbc2027/processors/audio_processor.py
renato97 ee8fc183be feat: Sistema CBCFacil completo con cola secuencial
- Implementa ProcessingMonitor singleton para procesamiento secuencial de archivos
- Agrega AI summary service con soporte para MiniMax API
- Agrega PDF generator para resúmenes
- Agrega watchers para monitoreo de carpeta remota
- Mejora sistema de notificaciones Telegram
- Implementa gestión de VRAM para GPU
- Configuración mediante variables de entorno (sin hardcoded secrets)
- .env y transcriptions/ agregados a .gitignore

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 15:35:39 +00:00

468 lines
17 KiB
Python

"""
Procesador de audio para transcripción con Whisper.
OPTIMIZACIONES DE MEMORIA PARA GPUs DE 8GB:
- Cache global singleton para evitar carga múltiple del modelo
- Configuración PYTORCH_ALLOC_CONF para reducir fragmentación
- Verificación de VRAM antes de cargar
- Fallback automático a CPU si GPU OOM
- Limpieza agresiva de cache CUDA
"""
import gc
import logging
import os
import subprocess
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
from pathlib import Path
from typing import Dict, Literal, Optional, Tuple
import whisper
from config import settings
from services.vram_manager import vram_manager
logger = logging.getLogger(__name__)
# ============ CONFIGURACIÓN DE OPTIMIZACIONES ============
# CRÍTICO: Permite segmentos expandibles para reducir fragmentación
os.environ.setdefault("PYTORCH_ALLOC_CONF", "expandable_segments:True")
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True")
# Tamaños de modelos en GB (incluyendo overhead)
MODEL_MEMORY_REQUIREMENTS = {
"tiny": 0.5, "base": 0.8, "small": 1.5,
"medium": 2.5, "large": 4.5,
}
# Cache global singleton - CLAVE para evitar OOM
_model_cache: Dict[str, Tuple[whisper.Whisper, str, float]] = {}
TRANSCRIPTION_TIMEOUT_SECONDS = 600
MAX_RETRY_ATTEMPTS = 2
RETRY_DELAY_SECONDS = 5
# ============ FUNCIONES DE GESTIÓN DE MEMORIA ============
def get_gpu_memory_info() -> Dict[str, float]:
"""Obtiene información de memoria GPU en GB."""
try:
import torch
if torch.cuda.is_available():
props = torch.cuda.get_device_properties(0)
total = props.total_memory / (1024 ** 3)
reserved = torch.cuda.memory_reserved(0) / (1024 ** 3)
allocated = torch.cuda.memory_allocated(0) / (1024 ** 3)
return {"total": total, "free": total - reserved, "used": allocated, "reserved": reserved}
except Exception:
pass
return {"total": 0, "free": 0, "used": 0, "reserved": 0}
def clear_cuda_cache(aggressive: bool = False) -> None:
"""Limpia el cache de CUDA."""
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
if aggressive:
for _ in range(3):
gc.collect()
torch.cuda.empty_cache()
except Exception:
pass
def check_memory_for_model(model_name: str) -> Tuple[bool, str]:
"""Verifica si hay memoria suficiente para el modelo."""
required = MODEL_MEMORY_REQUIREMENTS.get(model_name, 2.0)
gpu_info = get_gpu_memory_info()
if gpu_info["total"] == 0:
return False, "cpu"
needed = required * 1.5
if gpu_info["free"] >= needed:
return True, "cuda"
elif gpu_info["free"] >= required:
return True, "cuda"
else:
logger.warning(f"Memoria GPU insuficiente para '{model_name}': {gpu_info['free']:.2f}GB libre, {required:.2f}GB necesario")
return False, "cpu"
def get_cached_model(model_name: str, device: str) -> Optional[whisper.Whisper]:
"""Obtiene modelo desde cache global."""
cache_key = f"{model_name}_{device}"
if cache_key in _model_cache:
model, cached_device, _ = _model_cache[cache_key]
if cached_device == device:
logger.info(f"Modelo '{model_name}' desde cache global")
_model_cache[cache_key] = (model, cached_device, time.time())
return model
del _model_cache[cache_key]
return None
def cache_model(model_name: str, model: whisper.Whisper, device: str) -> None:
"""Almacena modelo en cache global."""
cache_key = f"{model_name}_{device}"
_model_cache[cache_key] = (model, device, time.time())
logger.info(f"Modelo '{model_name}' cacheado en {device}")
def clear_model_cache() -> None:
"""Limpia todo el cache de modelos."""
global _model_cache
for cache_key, (model, _, _) in list(_model_cache.items()):
try:
del model
except Exception:
pass
_model_cache.clear()
clear_cuda_cache(aggressive=True)
# ============ EXCEPCIONES ============
class AudioProcessingError(Exception):
"""Error específico para fallos en el procesamiento de audio."""
pass
class TranscriptionTimeoutError(AudioProcessingError):
"""Error cuando la transcripción excede el tiempo máximo."""
pass
class GPUOutOfMemoryError(AudioProcessingError):
"""Error específico para CUDA OOM."""
pass
class AudioValidationError(AudioProcessingError):
"""Error cuando el archivo de audio no pasa las validaciones."""
pass
# ============ PROCESADOR DE AUDIO ============
class AudioProcessor:
"""Procesador de audio con cache global y fallback automático."""
SUPPORTED_MODELS = ("tiny", "base", "small", "medium", "large")
DEFAULT_MODEL = settings.WHISPER_MODEL
DEFAULT_LANGUAGE = "es"
def __init__(
self,
model_name: Optional[str] = None,
language: Optional[str] = None,
device: Optional[Literal["cuda", "rocm", "cpu", "auto"]] = None,
) -> None:
self._model_name = model_name or settings.WHISPER_MODEL
self._language = language or self.DEFAULT_LANGUAGE
self._device = device or "auto"
self._model: Optional[whisper.Whisper] = None
self._using_cpu_fallback = False
self._model_id = f"whisper_{self._model_name}"
if self._model_name not in self.SUPPORTED_MODELS:
raise ValueError(
f"Modelo '{self._model_name}' no soportado. "
f"Disponibles: {', '.join(self.SUPPORTED_MODELS)}"
)
logger.info(
"AudioProcessor inicializado",
extra={"model": self._model_name, "device": self._device},
)
@property
def model_name(self) -> str:
return self._model_name
@property
def language(self) -> str:
return self._language
@property
def device(self) -> str:
return getattr(self, "_resolved_device", self._device)
@property
def is_loaded(self) -> bool:
return self._model is not None
def _validate_audio_file(self, audio_path: Path) -> dict:
"""Valida el archivo de audio."""
logger.info(f"Validando: {audio_path.name}")
file_size = audio_path.stat().st_size
if file_size < 1024:
raise AudioValidationError("Archivo demasiado pequeño")
if file_size > 500 * 1024 * 1024:
logger.warning(f"Archivo grande: {file_size / 1024 / 1024:.1f}MB")
try:
cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-show_entries", "stream=channels,sample_rate,codec_name",
"-of", "json", str(audio_path)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
import json
info = json.loads(result.stdout)
duration = float(info.get("format", {}).get("duration", 0))
for stream in info.get("streams", []):
if stream.get("codec_type") == "audio":
return {
"duration": duration,
"sample_rate": int(stream.get("sample_rate", 16000)),
"channels": int(stream.get("channels", 1)),
"codec": stream.get("codec_name", "unknown"),
"size_bytes": file_size,
}
except Exception:
pass
return {"duration": 0, "sample_rate": 16000, "channels": 1,
"codec": "unknown", "size_bytes": file_size}
def _convert_audio_with_ffmpeg(self, input_path: Path, output_format: str = "wav") -> Path:
"""Convierte audio usando ffmpeg."""
suffix = f".{output_format}"
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
output_path = Path(tmp.name)
cmd = ["ffmpeg", "-i", str(input_path),
"-acodec", "pcm_s16le" if output_format == "wav" else "libmp3lame",
"-ar", "16000", "-ac", "1", "-y", str(output_path)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0 or not output_path.exists():
raise AudioProcessingError(f"ffmpeg falló: {result.stderr[-500:] if result.stderr else 'Unknown'}")
return output_path
def _get_device_with_memory_check(self) -> str:
"""Detecta dispositivo verificando memoria disponible."""
if self._device == "cpu":
return "cpu"
if self._device == "auto":
has_memory, recommended = check_memory_for_model(self._model_name)
if has_memory and recommended == "cuda":
try:
import torch
if torch.cuda.is_available():
logger.info(f"GPU detectada: {torch.cuda.get_device_name(0)}")
return "cuda"
except ImportError:
pass
if not has_memory:
logger.warning("Usando CPU por falta de memoria GPU")
self._using_cpu_fallback = True
return "cpu"
return self._device
def _load_model(self, force_reload: bool = False) -> None:
"""Carga modelo usando cache global con optimizaciones de memoria."""
if self._model is not None and not force_reload:
return
# Configurar PyTorch para mejor gestión de memoria
import os
os.environ['PYTORCH_ALLOC_CONF'] = 'expandable_segments:True'
clear_cuda_cache(aggressive=True)
self._resolved_device = self._get_device_with_memory_check()
# Verificar cache global
if not force_reload:
cached = get_cached_model(self._model_name, self._resolved_device)
if cached is not None:
self._model = cached
return
try:
# Cargar modelo con menos memoria inicial
# Primero cargar en RAM, luego mover a GPU
import torch
with torch.cuda.device(self._resolved_device):
self._model = whisper.load_model(
self._model_name,
device=self._resolved_device,
download_root=None,
in_memory=True # Reducir uso de disco
)
# Limpiar cache después de cargar
torch.cuda.empty_cache()
cache_model(self._model_name, self._model, self._resolved_device)
gpu_info = get_gpu_memory_info()
logger.info(
f"Modelo cargado en {self._resolved_device}",
extra={"gpu_used_gb": round(gpu_info.get("used", 0), 2),
"gpu_free_gb": round(gpu_info.get("free", 0), 2)},
)
vram_manager.update_usage(self._model_id)
except RuntimeError as e:
error_str = str(e)
if "out of memory" in error_str.lower():
# NUNCA usar CPU - limpiar GPU y reintentar
logger.error(f"OOM en GPU - limpiando memoria para reintentar...")
clear_cuda_cache(aggressive=True)
raise AudioProcessingError(f"CUDA OOM - limpie la GPU y reintente. {error_str}") from e
else:
raise AudioProcessingError(f"Error cargando modelo: {e}") from e
except Exception as e:
raise AudioProcessingError(f"Error cargando modelo: {e}") from e
def _transcribe_internal(self, audio_path: Path, audio_properties: dict) -> str:
"""Ejecuta la transcripción real."""
result = self._model.transcribe(
str(audio_path),
language=self._language,
fp16=self._resolved_device in ("cuda", "rocm"),
verbose=False,
)
return result.get("text", "").strip()
def transcribe(self, audio_path: str) -> str:
"""Transcribe un archivo de audio."""
audio_file = Path(audio_path)
if not audio_file.exists():
raise FileNotFoundError(f"Archivo no encontrado: {audio_path}")
vram_manager.update_usage(self._model_id)
try:
audio_properties = self._validate_audio_file(audio_file)
except AudioValidationError as e:
logger.error(f"Validación falló: {e}")
raise
converted_file: Optional[Path] = None
last_error: Optional[Exception] = None
for attempt in range(MAX_RETRY_ATTEMPTS):
try:
force_reload = attempt > 0
if self._model is None or force_reload:
self._load_model(force_reload=force_reload)
audio_to_transcribe = audio_file
cleanup_converted = False
needs_conversion = (
audio_file.suffix.lower() not in {".wav", ".mp3"} or
audio_properties.get("codec") in ("aac", "opus", "vorbis") or
audio_properties.get("channels", 1) > 1
)
if needs_conversion:
try:
converted_file = self._convert_audio_with_ffmpeg(audio_file, "wav")
audio_to_transcribe = converted_file
cleanup_converted = True
except AudioProcessingError as e:
logger.warning(f"Conversión falló: {e}")
logger.info(
f"Transcribiendo: {audio_file.name}",
extra={"device": self._resolved_device, "cpu_fallback": self._using_cpu_fallback},
)
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(self._transcribe_internal, audio_to_transcribe, audio_properties)
try:
text = future.result(timeout=TRANSCRIPTION_TIMEOUT_SECONDS)
except FutureTimeoutError:
self.unload()
raise TranscriptionTimeoutError(f"Timeout después de {TRANSCRIPTION_TIMEOUT_SECONDS}s")
logger.info(f"Transcripción completada: {len(text)} caracteres")
return text
except RuntimeError as e:
error_str = str(e)
last_error = e
if "out of memory" in error_str.lower():
logger.warning("OOM durante transcripción...")
clear_cuda_cache(aggressive=True)
if not self._using_cpu_fallback and self._resolved_device in ("cuda", "rocm"):
self.unload()
self._resolved_device = "cpu"
self._using_cpu_fallback = True
self._load_model()
continue
if attempt >= MAX_RETRY_ATTEMPTS - 1:
raise GPUOutOfMemoryError("Memoria GPU insuficiente") from e
time.sleep(RETRY_DELAY_SECONDS)
continue
if "Key and Value must have the same sequence length" in error_str:
if not converted_file:
converted_file = self._convert_audio_with_ffmpeg(audio_file, "wav")
text = self._model.transcribe(
str(converted_file), language=self._language,
fp16=self._resolved_device in ("cuda", "rocm"), verbose=False
).get("text", "").strip()
converted_file.unlink()
return text
raise AudioProcessingError(f"Error de transcripción: {e}") from e
except (TranscriptionTimeoutError, GPUOutOfMemoryError):
raise
except Exception as e:
last_error = e
self.unload()
if attempt >= MAX_RETRY_ATTEMPTS - 1:
raise AudioProcessingError(f"Error después de {MAX_RETRY_ATTEMPTS} intentos: {e}") from e
time.sleep(RETRY_DELAY_SECONDS)
finally:
if converted_file and converted_file.exists():
try:
converted_file.unlink()
except Exception:
pass
raise AudioProcessingError(f"Error al transcribir: {last_error}") from last_error
def unload(self) -> None:
"""Descarga la referencia local del modelo."""
if self._model is not None:
self._model = None
clear_cuda_cache(aggressive=False)
vram_manager.unregister_model(self._model_id)
def __repr__(self) -> str:
return f"AudioProcessor(model='{self._model_name}', device='{self.device}', loaded={self.is_loaded})"
def __del__(self) -> None:
try:
self.unload()
except Exception:
pass