- Implementa ProcessingMonitor singleton para procesamiento secuencial de archivos - Agrega AI summary service con soporte para MiniMax API - Agrega PDF generator para resúmenes - Agrega watchers para monitoreo de carpeta remota - Mejora sistema de notificaciones Telegram - Implementa gestión de VRAM para GPU - Configuración mediante variables de entorno (sin hardcoded secrets) - .env y transcriptions/ agregados a .gitignore Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
468 lines
17 KiB
Python
468 lines
17 KiB
Python
"""
|
|
Procesador de audio para transcripción con Whisper.
|
|
|
|
OPTIMIZACIONES DE MEMORIA PARA GPUs DE 8GB:
|
|
- Cache global singleton para evitar carga múltiple del modelo
|
|
- Configuración PYTORCH_ALLOC_CONF para reducir fragmentación
|
|
- Verificación de VRAM antes de cargar
|
|
- Fallback automático a CPU si GPU OOM
|
|
- Limpieza agresiva de cache CUDA
|
|
"""
|
|
import gc
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
|
|
from pathlib import Path
|
|
from typing import Dict, Literal, Optional, Tuple
|
|
|
|
import whisper
|
|
|
|
from config import settings
|
|
from services.vram_manager import vram_manager
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============ CONFIGURACIÓN DE OPTIMIZACIONES ============
|
|
|
|
# CRÍTICO: Permite segmentos expandibles para reducir fragmentación
|
|
os.environ.setdefault("PYTORCH_ALLOC_CONF", "expandable_segments:True")
|
|
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True")
|
|
|
|
# Tamaños de modelos en GB (incluyendo overhead)
|
|
MODEL_MEMORY_REQUIREMENTS = {
|
|
"tiny": 0.5, "base": 0.8, "small": 1.5,
|
|
"medium": 2.5, "large": 4.5,
|
|
}
|
|
|
|
# Cache global singleton - CLAVE para evitar OOM
|
|
_model_cache: Dict[str, Tuple[whisper.Whisper, str, float]] = {}
|
|
|
|
TRANSCRIPTION_TIMEOUT_SECONDS = 600
|
|
MAX_RETRY_ATTEMPTS = 2
|
|
RETRY_DELAY_SECONDS = 5
|
|
|
|
|
|
# ============ FUNCIONES DE GESTIÓN DE MEMORIA ============
|
|
|
|
def get_gpu_memory_info() -> Dict[str, float]:
|
|
"""Obtiene información de memoria GPU en GB."""
|
|
try:
|
|
import torch
|
|
if torch.cuda.is_available():
|
|
props = torch.cuda.get_device_properties(0)
|
|
total = props.total_memory / (1024 ** 3)
|
|
reserved = torch.cuda.memory_reserved(0) / (1024 ** 3)
|
|
allocated = torch.cuda.memory_allocated(0) / (1024 ** 3)
|
|
return {"total": total, "free": total - reserved, "used": allocated, "reserved": reserved}
|
|
except Exception:
|
|
pass
|
|
return {"total": 0, "free": 0, "used": 0, "reserved": 0}
|
|
|
|
|
|
def clear_cuda_cache(aggressive: bool = False) -> None:
|
|
"""Limpia el cache de CUDA."""
|
|
try:
|
|
import torch
|
|
if torch.cuda.is_available():
|
|
torch.cuda.empty_cache()
|
|
if aggressive:
|
|
for _ in range(3):
|
|
gc.collect()
|
|
torch.cuda.empty_cache()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def check_memory_for_model(model_name: str) -> Tuple[bool, str]:
|
|
"""Verifica si hay memoria suficiente para el modelo."""
|
|
required = MODEL_MEMORY_REQUIREMENTS.get(model_name, 2.0)
|
|
gpu_info = get_gpu_memory_info()
|
|
|
|
if gpu_info["total"] == 0:
|
|
return False, "cpu"
|
|
|
|
needed = required * 1.5
|
|
if gpu_info["free"] >= needed:
|
|
return True, "cuda"
|
|
elif gpu_info["free"] >= required:
|
|
return True, "cuda"
|
|
else:
|
|
logger.warning(f"Memoria GPU insuficiente para '{model_name}': {gpu_info['free']:.2f}GB libre, {required:.2f}GB necesario")
|
|
return False, "cpu"
|
|
|
|
|
|
def get_cached_model(model_name: str, device: str) -> Optional[whisper.Whisper]:
|
|
"""Obtiene modelo desde cache global."""
|
|
cache_key = f"{model_name}_{device}"
|
|
if cache_key in _model_cache:
|
|
model, cached_device, _ = _model_cache[cache_key]
|
|
if cached_device == device:
|
|
logger.info(f"Modelo '{model_name}' desde cache global")
|
|
_model_cache[cache_key] = (model, cached_device, time.time())
|
|
return model
|
|
del _model_cache[cache_key]
|
|
return None
|
|
|
|
|
|
def cache_model(model_name: str, model: whisper.Whisper, device: str) -> None:
|
|
"""Almacena modelo en cache global."""
|
|
cache_key = f"{model_name}_{device}"
|
|
_model_cache[cache_key] = (model, device, time.time())
|
|
logger.info(f"Modelo '{model_name}' cacheado en {device}")
|
|
|
|
|
|
def clear_model_cache() -> None:
|
|
"""Limpia todo el cache de modelos."""
|
|
global _model_cache
|
|
for cache_key, (model, _, _) in list(_model_cache.items()):
|
|
try:
|
|
del model
|
|
except Exception:
|
|
pass
|
|
_model_cache.clear()
|
|
clear_cuda_cache(aggressive=True)
|
|
|
|
|
|
# ============ EXCEPCIONES ============
|
|
|
|
class AudioProcessingError(Exception):
|
|
"""Error específico para fallos en el procesamiento de audio."""
|
|
pass
|
|
|
|
|
|
class TranscriptionTimeoutError(AudioProcessingError):
|
|
"""Error cuando la transcripción excede el tiempo máximo."""
|
|
pass
|
|
|
|
|
|
class GPUOutOfMemoryError(AudioProcessingError):
|
|
"""Error específico para CUDA OOM."""
|
|
pass
|
|
|
|
|
|
class AudioValidationError(AudioProcessingError):
|
|
"""Error cuando el archivo de audio no pasa las validaciones."""
|
|
pass
|
|
|
|
|
|
# ============ PROCESADOR DE AUDIO ============
|
|
|
|
class AudioProcessor:
|
|
"""Procesador de audio con cache global y fallback automático."""
|
|
|
|
SUPPORTED_MODELS = ("tiny", "base", "small", "medium", "large")
|
|
DEFAULT_MODEL = settings.WHISPER_MODEL
|
|
DEFAULT_LANGUAGE = "es"
|
|
|
|
def __init__(
|
|
self,
|
|
model_name: Optional[str] = None,
|
|
language: Optional[str] = None,
|
|
device: Optional[Literal["cuda", "rocm", "cpu", "auto"]] = None,
|
|
) -> None:
|
|
self._model_name = model_name or settings.WHISPER_MODEL
|
|
self._language = language or self.DEFAULT_LANGUAGE
|
|
self._device = device or "auto"
|
|
self._model: Optional[whisper.Whisper] = None
|
|
self._using_cpu_fallback = False
|
|
self._model_id = f"whisper_{self._model_name}"
|
|
|
|
if self._model_name not in self.SUPPORTED_MODELS:
|
|
raise ValueError(
|
|
f"Modelo '{self._model_name}' no soportado. "
|
|
f"Disponibles: {', '.join(self.SUPPORTED_MODELS)}"
|
|
)
|
|
|
|
logger.info(
|
|
"AudioProcessor inicializado",
|
|
extra={"model": self._model_name, "device": self._device},
|
|
)
|
|
|
|
@property
|
|
def model_name(self) -> str:
|
|
return self._model_name
|
|
|
|
@property
|
|
def language(self) -> str:
|
|
return self._language
|
|
|
|
@property
|
|
def device(self) -> str:
|
|
return getattr(self, "_resolved_device", self._device)
|
|
|
|
@property
|
|
def is_loaded(self) -> bool:
|
|
return self._model is not None
|
|
|
|
def _validate_audio_file(self, audio_path: Path) -> dict:
|
|
"""Valida el archivo de audio."""
|
|
logger.info(f"Validando: {audio_path.name}")
|
|
|
|
file_size = audio_path.stat().st_size
|
|
if file_size < 1024:
|
|
raise AudioValidationError("Archivo demasiado pequeño")
|
|
if file_size > 500 * 1024 * 1024:
|
|
logger.warning(f"Archivo grande: {file_size / 1024 / 1024:.1f}MB")
|
|
|
|
try:
|
|
cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration",
|
|
"-show_entries", "stream=channels,sample_rate,codec_name",
|
|
"-of", "json", str(audio_path)]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
if result.returncode == 0:
|
|
import json
|
|
info = json.loads(result.stdout)
|
|
duration = float(info.get("format", {}).get("duration", 0))
|
|
|
|
for stream in info.get("streams", []):
|
|
if stream.get("codec_type") == "audio":
|
|
return {
|
|
"duration": duration,
|
|
"sample_rate": int(stream.get("sample_rate", 16000)),
|
|
"channels": int(stream.get("channels", 1)),
|
|
"codec": stream.get("codec_name", "unknown"),
|
|
"size_bytes": file_size,
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
return {"duration": 0, "sample_rate": 16000, "channels": 1,
|
|
"codec": "unknown", "size_bytes": file_size}
|
|
|
|
def _convert_audio_with_ffmpeg(self, input_path: Path, output_format: str = "wav") -> Path:
|
|
"""Convierte audio usando ffmpeg."""
|
|
suffix = f".{output_format}"
|
|
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
|
|
output_path = Path(tmp.name)
|
|
|
|
cmd = ["ffmpeg", "-i", str(input_path),
|
|
"-acodec", "pcm_s16le" if output_format == "wav" else "libmp3lame",
|
|
"-ar", "16000", "-ac", "1", "-y", str(output_path)]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
|
|
|
if result.returncode != 0 or not output_path.exists():
|
|
raise AudioProcessingError(f"ffmpeg falló: {result.stderr[-500:] if result.stderr else 'Unknown'}")
|
|
|
|
return output_path
|
|
|
|
def _get_device_with_memory_check(self) -> str:
|
|
"""Detecta dispositivo verificando memoria disponible."""
|
|
if self._device == "cpu":
|
|
return "cpu"
|
|
|
|
if self._device == "auto":
|
|
has_memory, recommended = check_memory_for_model(self._model_name)
|
|
|
|
if has_memory and recommended == "cuda":
|
|
try:
|
|
import torch
|
|
if torch.cuda.is_available():
|
|
logger.info(f"GPU detectada: {torch.cuda.get_device_name(0)}")
|
|
return "cuda"
|
|
except ImportError:
|
|
pass
|
|
|
|
if not has_memory:
|
|
logger.warning("Usando CPU por falta de memoria GPU")
|
|
self._using_cpu_fallback = True
|
|
return "cpu"
|
|
|
|
return self._device
|
|
|
|
def _load_model(self, force_reload: bool = False) -> None:
|
|
"""Carga modelo usando cache global con optimizaciones de memoria."""
|
|
if self._model is not None and not force_reload:
|
|
return
|
|
|
|
# Configurar PyTorch para mejor gestión de memoria
|
|
import os
|
|
os.environ['PYTORCH_ALLOC_CONF'] = 'expandable_segments:True'
|
|
|
|
clear_cuda_cache(aggressive=True)
|
|
self._resolved_device = self._get_device_with_memory_check()
|
|
|
|
# Verificar cache global
|
|
if not force_reload:
|
|
cached = get_cached_model(self._model_name, self._resolved_device)
|
|
if cached is not None:
|
|
self._model = cached
|
|
return
|
|
|
|
try:
|
|
# Cargar modelo con menos memoria inicial
|
|
# Primero cargar en RAM, luego mover a GPU
|
|
import torch
|
|
with torch.cuda.device(self._resolved_device):
|
|
self._model = whisper.load_model(
|
|
self._model_name,
|
|
device=self._resolved_device,
|
|
download_root=None,
|
|
in_memory=True # Reducir uso de disco
|
|
)
|
|
|
|
# Limpiar cache después de cargar
|
|
torch.cuda.empty_cache()
|
|
|
|
cache_model(self._model_name, self._model, self._resolved_device)
|
|
|
|
gpu_info = get_gpu_memory_info()
|
|
logger.info(
|
|
f"Modelo cargado en {self._resolved_device}",
|
|
extra={"gpu_used_gb": round(gpu_info.get("used", 0), 2),
|
|
"gpu_free_gb": round(gpu_info.get("free", 0), 2)},
|
|
)
|
|
vram_manager.update_usage(self._model_id)
|
|
|
|
except RuntimeError as e:
|
|
error_str = str(e)
|
|
if "out of memory" in error_str.lower():
|
|
# NUNCA usar CPU - limpiar GPU y reintentar
|
|
logger.error(f"OOM en GPU - limpiando memoria para reintentar...")
|
|
clear_cuda_cache(aggressive=True)
|
|
raise AudioProcessingError(f"CUDA OOM - limpie la GPU y reintente. {error_str}") from e
|
|
else:
|
|
raise AudioProcessingError(f"Error cargando modelo: {e}") from e
|
|
except Exception as e:
|
|
raise AudioProcessingError(f"Error cargando modelo: {e}") from e
|
|
|
|
def _transcribe_internal(self, audio_path: Path, audio_properties: dict) -> str:
|
|
"""Ejecuta la transcripción real."""
|
|
result = self._model.transcribe(
|
|
str(audio_path),
|
|
language=self._language,
|
|
fp16=self._resolved_device in ("cuda", "rocm"),
|
|
verbose=False,
|
|
)
|
|
return result.get("text", "").strip()
|
|
|
|
def transcribe(self, audio_path: str) -> str:
|
|
"""Transcribe un archivo de audio."""
|
|
audio_file = Path(audio_path)
|
|
if not audio_file.exists():
|
|
raise FileNotFoundError(f"Archivo no encontrado: {audio_path}")
|
|
|
|
vram_manager.update_usage(self._model_id)
|
|
|
|
try:
|
|
audio_properties = self._validate_audio_file(audio_file)
|
|
except AudioValidationError as e:
|
|
logger.error(f"Validación falló: {e}")
|
|
raise
|
|
|
|
converted_file: Optional[Path] = None
|
|
last_error: Optional[Exception] = None
|
|
|
|
for attempt in range(MAX_RETRY_ATTEMPTS):
|
|
try:
|
|
force_reload = attempt > 0
|
|
if self._model is None or force_reload:
|
|
self._load_model(force_reload=force_reload)
|
|
|
|
audio_to_transcribe = audio_file
|
|
cleanup_converted = False
|
|
|
|
needs_conversion = (
|
|
audio_file.suffix.lower() not in {".wav", ".mp3"} or
|
|
audio_properties.get("codec") in ("aac", "opus", "vorbis") or
|
|
audio_properties.get("channels", 1) > 1
|
|
)
|
|
|
|
if needs_conversion:
|
|
try:
|
|
converted_file = self._convert_audio_with_ffmpeg(audio_file, "wav")
|
|
audio_to_transcribe = converted_file
|
|
cleanup_converted = True
|
|
except AudioProcessingError as e:
|
|
logger.warning(f"Conversión falló: {e}")
|
|
|
|
logger.info(
|
|
f"Transcribiendo: {audio_file.name}",
|
|
extra={"device": self._resolved_device, "cpu_fallback": self._using_cpu_fallback},
|
|
)
|
|
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
future = executor.submit(self._transcribe_internal, audio_to_transcribe, audio_properties)
|
|
try:
|
|
text = future.result(timeout=TRANSCRIPTION_TIMEOUT_SECONDS)
|
|
except FutureTimeoutError:
|
|
self.unload()
|
|
raise TranscriptionTimeoutError(f"Timeout después de {TRANSCRIPTION_TIMEOUT_SECONDS}s")
|
|
|
|
logger.info(f"Transcripción completada: {len(text)} caracteres")
|
|
return text
|
|
|
|
except RuntimeError as e:
|
|
error_str = str(e)
|
|
last_error = e
|
|
|
|
if "out of memory" in error_str.lower():
|
|
logger.warning("OOM durante transcripción...")
|
|
clear_cuda_cache(aggressive=True)
|
|
|
|
if not self._using_cpu_fallback and self._resolved_device in ("cuda", "rocm"):
|
|
self.unload()
|
|
self._resolved_device = "cpu"
|
|
self._using_cpu_fallback = True
|
|
self._load_model()
|
|
continue
|
|
|
|
if attempt >= MAX_RETRY_ATTEMPTS - 1:
|
|
raise GPUOutOfMemoryError("Memoria GPU insuficiente") from e
|
|
time.sleep(RETRY_DELAY_SECONDS)
|
|
continue
|
|
|
|
if "Key and Value must have the same sequence length" in error_str:
|
|
if not converted_file:
|
|
converted_file = self._convert_audio_with_ffmpeg(audio_file, "wav")
|
|
text = self._model.transcribe(
|
|
str(converted_file), language=self._language,
|
|
fp16=self._resolved_device in ("cuda", "rocm"), verbose=False
|
|
).get("text", "").strip()
|
|
converted_file.unlink()
|
|
return text
|
|
|
|
raise AudioProcessingError(f"Error de transcripción: {e}") from e
|
|
|
|
except (TranscriptionTimeoutError, GPUOutOfMemoryError):
|
|
raise
|
|
except Exception as e:
|
|
last_error = e
|
|
self.unload()
|
|
|
|
if attempt >= MAX_RETRY_ATTEMPTS - 1:
|
|
raise AudioProcessingError(f"Error después de {MAX_RETRY_ATTEMPTS} intentos: {e}") from e
|
|
|
|
time.sleep(RETRY_DELAY_SECONDS)
|
|
|
|
finally:
|
|
if converted_file and converted_file.exists():
|
|
try:
|
|
converted_file.unlink()
|
|
except Exception:
|
|
pass
|
|
|
|
raise AudioProcessingError(f"Error al transcribir: {last_error}") from last_error
|
|
|
|
def unload(self) -> None:
|
|
"""Descarga la referencia local del modelo."""
|
|
if self._model is not None:
|
|
self._model = None
|
|
clear_cuda_cache(aggressive=False)
|
|
vram_manager.unregister_model(self._model_id)
|
|
|
|
def __repr__(self) -> str:
|
|
return f"AudioProcessor(model='{self._model_name}', device='{self.device}', loaded={self.is_loaded})"
|
|
|
|
def __del__(self) -> None:
|
|
try:
|
|
self.unload()
|
|
except Exception:
|
|
pass
|