Files
cbc2027/core/process_manager.py
renato97 1f6bfa771b fix: Mejoras en generación de PDFs y resúmenes
- Corrige PDFGenerator para pasar contenido (no ruta)
- Agrega prompt siguiendo código.md (español, estructura académica)
- Limpia thinking tokens de respuesta AI
- Agrega skip de archivos ya procesados en watcher
- Implementa tablas LaTeX en PDFs (reportlab Table)
- Agrega load_dotenv() en main.py
- Actualiza .env con MiniMax config
- Agrega transcriptions/ a .gitignore

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 17:12:00 +00:00

625 lines
22 KiB
Python

"""
Process Manager - Coordina el flujo watcher -> descarga -> transcripción.
Maneja el estado de cada archivo a través de una state machine simple:
pending -> downloading -> transcribing -> completed -> error
"""
import logging
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Callable, Optional
from processors.audio_processor import AudioProcessor, AudioProcessingError
from processors.audio_processor import GPUOutOfMemoryError, TranscriptionTimeoutError
from services.webdav_service import WebDAVService
from services.ai_summary_service import AISummaryService
from services.telegram_service import telegram_service
from config import settings
logger = logging.getLogger(__name__)
class ProcessState(str, Enum):
"""Estados del proceso de transcripción."""
PENDING = "pending"
DOWNLOADING = "downloading"
TRANSCRIBING = "transcribing"
COMPLETED = "completed"
ERROR = "error"
CLEANING = "cleaning" # Estado intermedio para limpieza de GPU
@dataclass
class ProcessInfo:
"""Información del proceso de un archivo."""
file_path: Path
state: ProcessState = ProcessState.PENDING
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
transcript: Optional[str] = None
error: Optional[str] = None
file_size: Optional[int] = None
# Callback para notificaciones
NotificationCallback = Callable[[ProcessInfo], None]
class ProcessManagerError(Exception):
"""Error específico del ProcessManager."""
pass
class ProcessManager:
"""
Coordina el flujo: watcher -> descarga -> transcripción.
Maneja el estado de archivos de audio a través de una máquina de estados
simple y notifica sobre cambios mediante callbacks.
Attributes:
audio_processor: Instancia de AudioProcessor para transcripciones.
webdav_service: Instancia opcional de WebDAVService para descargas remotas.
on_state_change: Callback llamado cuando cambia el estado de un proceso.
on_complete: Callback llamado cuando un proceso se completa exitosamente.
on_error: Callback llamado cuando ocurre un error en un proceso.
"""
def __init__(
self,
audio_processor: Optional[AudioProcessor] = None,
webdav_service: Optional[WebDAVService] = None,
ai_summary_service: Optional[AISummaryService] = None,
on_state_change: Optional[NotificationCallback] = None,
on_complete: Optional[NotificationCallback] = None,
on_error: Optional[NotificationCallback] = None,
) -> None:
"""
Inicializa el ProcessManager.
Args:
audio_processor: Procesador de audio. Se crea uno nuevo si no se provee.
webdav_service: Servicio WebDAV para descargas remotas (opcional).
ai_summary_service: Servicio de resumen con IA (opcional).
on_state_change: Callback para cambios de estado.
on_complete: Callback para procesos completados.
on_error: Callback para errores.
"""
self._audio_processor = audio_processor or AudioProcessor()
self._webdav_service = webdav_service
self._ai_summary_service = ai_summary_service or AISummaryService()
self._on_state_change = on_state_change
self._on_complete = on_complete
self._on_error = on_error
# Estado de procesos: file_key -> ProcessInfo
self._processes: dict[str, ProcessInfo] = {}
logger.info(
"ProcessManager inicializado",
extra={
"has_audio_processor": audio_processor is not None,
"has_webdav": webdav_service is not None,
},
)
@property
def audio_processor(self) -> AudioProcessor:
"""Procesador de audio configurado."""
return self._audio_processor
@property
def webdav_service(self) -> Optional[WebDAVService]:
"""Servicio WebDAV configurado."""
return self._webdav_service
@property
def ai_summary_service(self) -> AISummaryService:
"""Servicio de resumen con IA configurado."""
return self._ai_summary_service
def process_file(self, filepath: Path) -> ProcessInfo:
"""
Procesa un archivo de audio: download + transcripción.
El método garantiza que el modelo de audio se descargará en todos
los casos (éxito, error, timeout, etc.) mediante bloques try/finally.
Args:
filepath: Ruta al archivo de audio.
Returns:
ProcessInfo con el estado final del proceso.
Raises:
ProcessManagerError: Si el archivo no es válido o no se puede procesar.
"""
file_key = str(filepath)
logger.info(
"Iniciando procesamiento de archivo",
extra={"file_path": str(filepath)},
)
# Crear o recuperar proceso
if file_key in self._processes:
process = self._processes[file_key]
# Reiniciar si ya estaba en estado terminal
if process.state in (ProcessState.COMPLETED, ProcessState.ERROR):
process = ProcessInfo(file_path=filepath)
self._processes[file_key] = process
else:
process = ProcessInfo(file_path=filepath)
self._processes[file_key] = process
# Variable para rastrear si debemos limpiar GPU
should_cleanup_gpu = False
try:
# Validar archivo
if not filepath.exists():
process.state = ProcessState.ERROR
process.error = f"Archivo no encontrado: {filepath}"
process.updated_at = datetime.now()
self._notify_error(process)
logger.error(
"Archivo no encontrado",
extra={"file_path": str(filepath)},
)
raise ProcessManagerError(process.error)
# Obtener tamaño
try:
process.file_size = filepath.stat().st_size
except OSError:
pass
# Estado: downloading (asumimos que ya está disponible localmente)
self._update_state(process, ProcessState.DOWNLOADING)
# Si hay WebDAV y el archivo es remoto, descargar
if self._webdav_service and self._is_remote_path(filepath):
try:
self._download_from_remote(process)
telegram_service.send_download_complete(filepath.name)
except Exception as e:
process.state = ProcessState.ERROR
process.error = f"Descarga fallida: {e}"
process.updated_at = datetime.now()
self._notify_error(process)
logger.error(
"Descarga fallida",
extra={"file_path": str(filepath), "error": str(e)},
)
raise ProcessManagerError(process.error) from e
else:
# Archivo local, notificar descarga completa
telegram_service.send_download_complete(filepath.name)
# Estado: transcribing
self._update_state(process, ProcessState.TRANSCRIBING)
# Notificar inicio de transcripción
telegram_service.send_transcription_start(filepath.name)
# Marcar que necesitamos limpieza de GPU después de cargar el modelo
should_cleanup_gpu = True
# Transcribir con manejo robusto de errores
try:
process.transcript = self._audio_processor.transcribe(str(filepath))
# Notificar transcripción completada
transcript_length = len(process.transcript) if process.transcript else 0
telegram_service.send_transcription_complete(filepath.name, transcript_length)
# Guardar transcripción en archivo .txt
txt_path = self._save_transcription(filepath, process.transcript)
# Mover archivo de audio a transcriptions/
self._move_audio_to_transcriptions(filepath)
# Generar resumen con IA y PDF
md_path, pdf_path = self.generate_summary(filepath)
# Notificación final con todos los archivos
telegram_service.send_all_complete(
filename=filepath.name,
txt_path=str(txt_path) if txt_path else None,
md_path=str(md_path) if md_path else None,
pdf_path=str(pdf_path) if pdf_path else None,
)
process.state = ProcessState.COMPLETED
process.updated_at = datetime.now()
self._notify_complete(process)
logger.info(
"Transcripción completada",
extra={
"file_path": str(filepath),
"transcript_length": len(process.transcript or ""),
},
)
except (GPUOutOfMemoryError, TranscriptionTimeoutError) as e:
# Estos errores ya limpian la GPU internamente, no necesitamos limpiar de nuevo
should_cleanup_gpu = False
process.state = ProcessState.ERROR
error_type = "GPU OOM" if isinstance(e, GPUOutOfMemoryError) else "Timeout"
process.error = f"Transcripción fallida ({error_type}): {e}"
process.updated_at = datetime.now()
self._notify_error(process)
logger.error(
f"Transcripción fallida ({error_type})",
extra={"file_path": str(filepath), "error": str(e)},
)
raise ProcessManagerError(process.error) from e
except AudioProcessingError as e:
process.state = ProcessState.ERROR
process.error = f"Transcripción fallida: {e}"
process.updated_at = datetime.now()
self._notify_error(process)
logger.error(
"Transcripción fallida",
extra={"file_path": str(filepath), "error": str(e)},
)
raise ProcessManagerError(process.error) from e
return process
finally:
# LIMPIEZA GUARANTIZADA: Siempre ejecutado, pase lo que pase
if should_cleanup_gpu:
self._ensure_gpu_cleanup(filepath)
def _ensure_gpu_cleanup(self, filepath: Path) -> None:
"""
Asegura que el modelo de audio se descargue de la GPU.
Este método se llama en el bloque finally para garantizar que
la memoria GPU se libere sin importar cómo terminó el proceso.
Args:
filepath: Ruta del archivo procesado (para logs).
"""
try:
if self._audio_processor and self._audio_processor.is_loaded:
logger.info(
"Limpiando GPU después de procesamiento",
extra={"file_path": str(filepath)},
)
self._audio_processor.unload()
logger.info(
"GPU liberada correctamente",
extra={"file_path": str(filepath)},
)
except Exception as e:
logger.warning(
"Error durante limpieza de GPU (no crítico)",
extra={"file_path": str(filepath), "error": str(e)},
)
def get_status(self) -> dict:
"""
Obtiene el estado actual del ProcessManager.
Returns:
Diccionario con estadísticas de procesos.
"""
states_count = {state.value: 0 for state in ProcessState}
for process in self._processes.values():
states_count[process.state.value] += 1
return {
"total_processes": len(self._processes),
"by_state": states_count,
"pending": states_count[ProcessState.PENDING.value],
"processing": states_count[ProcessState.DOWNLOADING.value]
+ states_count[ProcessState.TRANSCRIBING.value],
"completed": states_count[ProcessState.COMPLETED.value],
"errors": states_count[ProcessState.ERROR.value],
}
def get_process(self, filepath: Path) -> Optional[ProcessInfo]:
"""
Obtiene la información de un proceso específico.
Args:
filepath: Ruta al archivo.
Returns:
ProcessInfo si existe, None si no.
"""
return self._processes.get(str(filepath))
def get_all_processes(self) -> list[ProcessInfo]:
"""
Obtiene todos los procesos.
Returns:
Lista de ProcessInfo.
"""
return list(self._processes.values())
def clear_completed(self) -> int:
"""
Limpia procesos completados exitosamente.
Returns:
Número de procesos eliminados.
"""
keys_to_remove = [
k for k, p in self._processes.items()
if p.state == ProcessState.COMPLETED
]
for key in keys_to_remove:
del self._processes[key]
logger.info(
"Procesos completados limpiados",
extra={"count": len(keys_to_remove)},
)
return len(keys_to_remove)
def set_callbacks(
self,
on_state_change: Optional[NotificationCallback] = None,
on_complete: Optional[NotificationCallback] = None,
on_error: Optional[NotificationCallback] = None,
) -> None:
"""
Actualiza los callbacks de notificación.
Args:
on_state_change: Callback para cambios de estado.
on_complete: Callback para procesos completados.
on_error: Callback para errores.
"""
if on_state_change is not None:
self._on_state_change = on_state_change
if on_complete is not None:
self._on_complete = on_complete
if on_error is not None:
self._on_error = on_error
def _update_state(self, process: ProcessInfo, new_state: ProcessState) -> None:
"""
Actualiza el estado de un proceso.
Args:
process: Proceso a actualizar.
new_state: Nuevo estado.
"""
old_state = process.state
process.state = new_state
process.updated_at = datetime.now()
logger.info(
f"Cambio de estado: {old_state.value} -> {new_state.value}",
extra={
"file_path": str(process.file_path),
"old_state": old_state.value,
"new_state": new_state.value,
},
)
if self._on_state_change:
try:
self._on_state_change(process)
except Exception as e:
logger.error(
"Error en callback on_state_change",
extra={"error": str(e)},
)
def _notify_complete(self, process: ProcessInfo) -> None:
"""Notifica completado."""
if self._on_complete:
try:
self._on_complete(process)
except Exception as e:
logger.error(
"Error en callback on_complete",
extra={"error": str(e)},
)
def _notify_error(self, process: ProcessInfo) -> None:
"""Notifica error."""
if self._on_error:
try:
self._on_error(process)
except Exception as e:
logger.error(
"Error en callback on_error",
extra={"error": str(e)},
)
def _save_transcription(self, filepath: Path, transcript: str) -> Path:
"""
Guarda la transcripción en un archivo de texto.
Args:
filepath: Ruta original del archivo de audio.
transcript: Texto de la transcripción.
Returns:
Path del archivo guardado.
"""
transcriptions_dir = settings.TRANSCRIPTIONS_DIR
transcriptions_dir.mkdir(parents=True, exist_ok=True)
output_path = transcriptions_dir / f"{filepath.stem}.txt"
output_path.write_text(transcript, encoding="utf-8")
logger.info(
"Transcripción guardada",
extra={"output_path": str(output_path)},
)
return output_path
def generate_summary(self, filepath: Path) -> tuple[Optional[Path], Optional[Path]]:
"""
Genera un resumen con IA y crea un PDF a partir de la transcripción.
Args:
filepath: Ruta original del archivo de audio.
Returns:
Tupla (md_path, pdf_path) con las rutas generadas o None si falló.
"""
transcriptions_dir = settings.TRANSCRIPTIONS_DIR
txt_path = transcriptions_dir / f"{filepath.stem}.txt"
if not txt_path.exists():
logger.warning(
"Archivo de transcripción no encontrado, omitiendo resumen",
extra={"txt_path": str(txt_path)},
)
return None, None
# Notificar inicio de resumen
telegram_service.send_summary_start(filepath.name)
# 1. Leer el .txt de transcripción
transcript_text = txt_path.read_text(encoding="utf-8")
# 2. Llamar a AISummaryService.summarize()
summary_text = self._ai_summary_service.summarize(transcript_text)
# 3. Guardar el resumen como .md en transcriptions/
md_path = transcriptions_dir / f"{filepath.stem}_resumen.md"
md_path.write_text(summary_text, encoding="utf-8")
logger.info(
"Resumen guardado",
extra={"md_path": str(md_path)},
)
# Notificar resumen completado
telegram_service.send_summary_complete(filepath.name, has_markdown=True)
# 4. Llam.markdown_to_pdfar a PDFGenerator()
pdf_path = None
try:
from services.pdf_generator import PDFGenerator
# Notificar inicio de PDF
telegram_service.send_pdf_start(filepath.name)
pdf_generator = PDFGenerator()
pdf_path = md_path.with_suffix(".pdf")
# Leer el contenido markdown y pasarlo al generator
markdown_content = md_path.read_text(encoding="utf-8")
pdf_generator.markdown_to_pdf(markdown_content, pdf_path)
logger.info(
"PDF generado",
extra={"pdf_path": str(pdf_path)},
)
# Notificar PDF completado
telegram_service.send_pdf_complete(filepath.name, str(pdf_path))
except ImportError:
logger.warning(
"PDFGenerator no disponible, solo se creó el archivo markdown",
extra={"md_path": str(md_path)},
)
return md_path, pdf_path
def _move_audio_to_transcriptions(self, filepath: Path) -> None:
"""
Mueve el archivo de audio a la carpeta de transcripciones.
Args:
filepath: Ruta del archivo de audio.
"""
downloads_dir = settings.DOWNLOADS_DIR
# Solo mover si el archivo está en downloads/
if downloads_dir and filepath.parent == downloads_dir:
transcriptions_dir = settings.TRANSCRIPTIONS_DIR
transcriptions_dir.mkdir(parents=True, exist_ok=True)
dest_path = transcriptions_dir / filepath.name
# Mover el archivo (con manejo de error si ya existe)
try:
filepath.rename(dest_path)
logger.info(
"Archivo de audio movido a transcripciones",
extra={
"from": str(filepath),
"to": str(dest_path),
},
)
except FileNotFoundError:
# El archivo ya fue movido o no existe, verificar si está en destino
if dest_path.exists():
logger.info(
"Archivo ya estaba en transcripciones",
extra={"path": str(dest_path)},
)
else:
logger.warning(
f"Archivo no encontrado en origen ni destino: {filepath}"
)
def _is_remote_path(self, filepath: Path) -> bool:
"""
Determina si la ruta es remota.
Args:
filepath: Ruta a verificar.
Returns:
True si es remota, False si es local.
"""
path_str = str(filepath)
# Detectar URLs WebDAV o rutas remotas
return path_str.startswith("http://") or path_str.startswith("https://")
def _download_from_remote(self, process: ProcessInfo) -> None:
"""
Descarga un archivo desde WebDAV.
Args:
process: Proceso con información del archivo.
Raises:
ProcessManagerError: Si la descarga falla.
"""
if not self._webdav_service:
raise ProcessManagerError("WebDAV no configurado")
remote_path = str(process.file_path)
local_path = Path(process.file_path).name
logger.info(
"Descargando archivo remoto",
extra={"remote_path": remote_path, "local_path": str(local_path)},
)
# El archivo ya debería tener la ruta remota
# Aquí se manejaría la descarga real
# Por ahora solo actualizamos el estado
process.updated_at = datetime.now()
def __repr__(self) -> str:
"""Representación string del manager."""
status = self.get_status()
return (
f"ProcessManager("
f"total={status['total_processes']}, "
f"processing={status['processing']}, "
f"completed={status['completed']}, "
f"errors={status['errors']})"
)