Files
cbc2027/main.py
renato97 1f6bfa771b fix: Mejoras en generación de PDFs y resúmenes
- Corrige PDFGenerator para pasar contenido (no ruta)
- Agrega prompt siguiendo código.md (español, estructura académica)
- Limpia thinking tokens de respuesta AI
- Agrega skip de archivos ya procesados en watcher
- Implementa tablas LaTeX en PDFs (reportlab Table)
- Agrega load_dotenv() en main.py
- Actualiza .env con MiniMax config
- Agrega transcriptions/ a .gitignore

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 17:12:00 +00:00

525 lines
17 KiB
Python

#!/usr/bin/env python3
"""
CBFacil - Sistema de transcripción de audio con IA y Notion
Características:
- Polling de Nextcloud vía WebDAV
- Transcripción con Whisper (medium, GPU)
- Resúmenes con IA (MiniMax)
- Generación de PDF
- Notificaciones Telegram
"""
from dotenv import load_dotenv
# Cargar variables de entorno desde .env
load_dotenv()
import logging
import os
import sys
import threading
import time
from collections import deque
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Callable, Optional
import torch
import torch.cuda
from flask import Flask
from flask_cors import CORS
# API y configuración
from api import api_bp, init_api
from config import settings
from processors.audio_processor import AudioProcessor, AudioProcessingError
from services import WebDAVService
from services.webdav_service import WebDAVService as WebDAVService_Class
from services.telegram_service import TelegramService
from watchers import RemoteFolderWatcher
# Importar ProcessManager del core
from core.process_manager import ProcessManager as CoreProcessManager, ProcessInfo
# Paquetes de logging
from pythonjsonlogger import jsonlogger
class JSONFormatter(jsonlogger.JsonFormatter):
"""Formateador JSON para logs."""
def add_fields(self, log_record: dict, record: logging.LogRecord, message_dict: dict) -> None:
super().add_fields(log_record, record, message_dict)
log_record["timestamp"] = datetime.utcnow().isoformat() + "Z"
log_record["level"] = record.levelname
log_record["module"] = record.module
def setup_logging() -> logging.Logger:
"""Configura el sistema de logging."""
logger = logging.getLogger()
logger.setLevel(getattr(logging, settings.LOG_LEVEL.upper()))
# Limpiar handlers existentes
logger.handlers.clear()
# Handler de consola
console_handler = logging.StreamHandler(sys.stdout)
if settings.is_production:
console_handler.setFormatter(JSONFormatter(
"%(timestamp)s %(level)s %(name)s %(message)s"
))
else:
console_handler.setFormatter(
logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
)
logger.addHandler(console_handler)
# Handler de archivo si está configurado
if settings.LOG_FILE:
file_handler = logging.FileHandler(settings.LOG_FILE)
file_handler.setFormatter(JSONFormatter(
"%(timestamp)s %(level)s %(name)s %(message)s"
))
logger.addHandler(file_handler)
return logger
logger = setup_logging()
# ============================================================================
# MONITOR GLOBAL - Solo UN archivo procesando a la vez
# ============================================================================
class ProcessingMonitor:
"""Monitor global para garantizar SOLO UN archivo en proceso a la vez."""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._processing = False
cls._instance._current_file = None
cls._instance._queue = deque()
cls._instance._queued_files = set()
return cls._instance
def is_processing(self) -> bool:
"""Verifica si hay un archivo en proceso."""
with self._lock:
return self._processing
def get_current_file(self) -> Optional[str]:
"""Obtiene el archivo actual en proceso."""
with self._lock:
return self._current_file
def add_to_queue(self, file_path: Path) -> bool:
"""
Agrega un archivo a la cola.
Returns:
True si se agregó a la cola, False si ya estaba.
"""
with self._lock:
file_key = str(file_path)
if file_key in self._queued_files:
return False
self._queue.append(file_path)
self._queued_files.add(file_key)
return True
def start_processing(self, file_path: Path) -> bool:
"""
Marca que se está procesando un archivo.
Returns:
True si se pudo iniciar el procesamiento, False si ya había uno en curso.
"""
with self._lock:
if self._processing:
return False
self._processing = True
self._current_file = str(file_path)
return True
def finish_processing(self):
"""Marca que el procesamiento terminó y retorna el siguiente archivo si existe."""
with self._lock:
self._processing = False
file_key = self._current_file
self._current_file = None
# Remover de queued_files
if file_key:
self._queued_files.discard(file_key)
# Intentar procesar el siguiente archivo (fuera del lock)
self._process_next()
def get_queue_size(self) -> int:
"""Retorna el tamaño de la cola."""
with self._lock:
return len(self._queue)
def _process_next(self):
"""Procesa el siguiente archivo en la cola (debe llamarse fuera del lock)."""
with self._lock:
if self._processing or not self._queue:
return
next_file = self._queue.popleft()
self._processing = True
self._current_file = str(next_file)
# Iniciar procesamiento fuera del lock
logger.info(
"🔄 Auto-starting NEXT file from queue",
extra={"file": next_file.name, "remaining": len(self._queue)}
)
# Crear un thread wrapper que garantiza cleanup
def process_wrapper():
try:
_process_file_async_safe(next_file)
finally:
# Siempre limpiar, incluso si hay excepción
monitor.finish_processing()
thread = threading.Thread(target=process_wrapper, daemon=True)
thread.start()
# Instancia global del monitor
monitor = ProcessingMonitor()
# ============================================================================
# Polling Service
# ============================================================================
class PollingService:
"""Servicio principal de polling."""
def __init__(self) -> None:
self.webdav_service: Optional[WebDAVService_Class] = None
self.watcher: Optional[RemoteFolderWatcher] = None
self.flask_app: Optional[Flask] = None
self._telegram_service: Optional[TelegramService] = None
self._process_manager: Optional[CoreProcessManager] = None
self._running = False
def initialize(self) -> None:
"""Inicializa los servicios."""
logger.info("Initializing CBCFacil polling service")
# Verificar configuración
if not settings.has_webdav_config:
logger.error(
"WebDAV configuration missing. Set NEXTCLOUD_URL, NEXTCLOUD_USER, NEXTCLOUD_PASSWORD"
)
sys.exit(1)
# Inicializar WebDAV
self.webdav_service = WebDAVService()
logger.info("Testing WebDAV connection...")
if not self.webdav_service.test_connection():
logger.error("Failed to connect to Nextcloud")
sys.exit(1)
logger.info("WebDAV connection successful")
# Inicializar TelegramService
self._telegram_service = TelegramService()
logger.info(
"Telegram service initialized",
extra={"configured": self._telegram_service._configured},
)
# Inicializar ProcessManager con callbacks de Telegram
self._process_manager = CoreProcessManager(
webdav_service=self.webdav_service,
on_state_change=self._on_state_change,
on_complete=self._on_process_complete,
on_error=self._on_process_error,
)
logger.info("ProcessManager initialized")
# Asignar a variable global para uso en _process_file_async_safe
global process_manager
process_manager = self._process_manager
# Inicializar watcher
self.watcher = RemoteFolderWatcher(
webdav_service=self.webdav_service,
local_path=settings.DOWNLOADS_DIR,
remote_path=settings.WATCHED_REMOTE_PATH,
)
self.watcher.set_callback(self._on_file_downloaded)
self.watcher.start()
# Inicializar Flask
self._setup_flask()
logger.info("CBCFacil initialized successfully")
def _setup_flask(self) -> None:
"""Configura la aplicación Flask."""
self.flask_app = Flask(__name__)
CORS(self.flask_app)
init_api(self._process_manager, self.webdav_service, self.watcher)
self.flask_app.register_blueprint(api_bp)
# Ruta principal
@self.flask_app.route("/")
def index():
return {"message": "CBCFacil Polling Service", "version": "1.0.0"}
def _on_state_change(self, process_info: ProcessInfo) -> None:
"""Callback cuando cambia el estado de un proceso."""
filename = process_info.file_path.name
# Enviar notificación apropiada según el estado
if process_info.state.value == "transcribing" and self._telegram_service:
self._telegram_service.send_start_notification(filename)
def _on_process_complete(self, process_info: ProcessInfo) -> None:
"""Callback cuando un proceso se completa exitosamente."""
filename = process_info.file_path.name
if process_info.transcript:
logger.info(
"Transcripción completada",
extra={"file_name": filename, "text_length": len(process_info.transcript)},
)
# Enviar notificación de completación
if self._telegram_service:
duration = (process_info.updated_at - process_info.created_at).total_seconds()
self._telegram_service.send_completion_notification(
filename=filename,
duration=duration,
)
def _on_process_error(self, process_info: ProcessInfo) -> None:
"""Callback cuando ocurre un error en un proceso."""
filename = process_info.file_path.name
error_msg = process_info.error or "Unknown error"
logger.warning(
"Transcripción fallida",
extra={"file": filename, "error": error_msg},
)
# Enviar notificación de error
if self._telegram_service:
self._telegram_service.send_error_notification(filename, error_msg)
def _on_file_downloaded(self, file_path: Path) -> None:
"""Callback when a file is downloaded."""
self.queue_file_for_processing(file_path)
def queue_file_for_processing(self, file_path: Path) -> None:
"""
Agrega un archivo a la cola de procesamiento SECUENCIAL.
Solo UN archivo se procesa a la vez.
"""
# Intentar iniciar procesamiento inmediatamente
if monitor.start_processing(file_path):
# Se pudo iniciar - procesar este archivo
logger.info(
"🚀 Starting IMMEDIATE processing (SOLE file)",
extra={"file": file_path.name, "queue_size": monitor.get_queue_size()}
)
def process_wrapper():
try:
_process_file_async_safe(file_path)
finally:
# Siempre limpiar y continuar con el siguiente
monitor.finish_processing()
thread = threading.Thread(target=process_wrapper, daemon=True)
thread.start()
else:
# Ya hay un archivo en proceso - agregar a la cola
if monitor.add_to_queue(file_path):
logger.info(
"⏳ File QUEUED (waiting for current to finish)",
extra={
"file": file_path.name,
"queue_position": monitor.get_queue_size(),
"currently_processing": monitor.get_current_file()
}
)
else:
logger.debug(f"File already in queue: {file_path.name}")
def run(self) -> None:
"""Ejecuta el servicio."""
self._running = True
# Iniciar Flask en un hilo separado
flask_thread = threading.Thread(
target=self._run_flask,
daemon=True
)
flask_thread.start()
logger.info(
"Flask server started",
extra={
"host": settings.DASHBOARD_HOST,
"port": settings.DASHBOARD_PORT,
},
)
# Procesar archivos pendientes
self._process_pending_files()
# Loop principal de polling
logger.info("Starting main polling loop")
try:
while self._running:
time.sleep(settings.POLL_INTERVAL)
self.watcher.check_now()
except KeyboardInterrupt:
logger.info("Received shutdown signal")
self.stop()
def _run_flask(self) -> None:
"""Ejecuta la aplicación Flask."""
logger.info("Starting Flask development server")
self.flask_app.run(
host=settings.DASHBOARD_HOST,
port=settings.DASHBOARD_PORT,
debug=False,
use_reloader=False,
)
def _process_pending_files(self) -> None:
"""Procesa archivos pendientes en la carpeta de descargas."""
if self._process_manager is None:
logger.warning("ProcessManager not initialized, skipping pending files")
return
downloads_dir = settings.DOWNLOADS_DIR
if not downloads_dir.exists():
logger.debug("Downloads directory does not exist")
return
# Extensiones de audio soportadas
audio_extensions = {".mp3", ".wav", ".m4a", ".mp4", ".webm", ".ogg", ".flac"}
# Obtener transcripciones existentes para comparar
transcriptions_dir = settings.TRANSCRIPTIONS_DIR
processed_names = set()
if transcriptions_dir.exists():
for f in transcriptions_dir.iterdir():
if f.is_file() and f.suffix == ".txt":
# Extraer nombre base sin extensión
processed_names.add(f.stem)
# Filtrar solo archivos que NO han sido procesados
pending_files = []
for f in downloads_dir.iterdir():
if f.is_file() and f.suffix.lower() in audio_extensions and not f.name.startswith("."):
# Verificar si ya existe transcripción para este archivo
file_stem = f.stem
# También verificar versiones con " (copy)" o similar
if file_stem not in processed_names:
# Verificar variaciones del nombre
is_processed = False
for processed in processed_names:
if file_stem in processed or processed in file_stem:
is_processed = True
break
if not is_processed:
pending_files.append(f)
if not pending_files:
logger.debug("No pending audio files to process")
return
logger.info(
f"Found {len(pending_files)} pending audio files",
extra={"count": len(pending_files)},
)
for file_path in pending_files:
logger.info(f"Processing pending file: {file_path.name}")
self.queue_file_for_processing(file_path)
def stop(self) -> None:
"""Detiene el servicio."""
self._running = False
if self.watcher:
self.watcher.stop()
# ============================================================================
# Función segura de procesamiento de archivos
# ============================================================================
def _process_file_async_safe(file_path: Path) -> None:
"""
Procesa un archivo de forma asíncrona.
El cleanup y continuación de la cola se maneja en los wrappers
(process_wrapper en queue_file_for_processing y _process_next).
"""
try:
if process_manager is None:
logger.error("ProcessManager not initialized")
return
logger.info(
"Starting file processing",
extra={"file": file_path.name},
)
# Procesar el archivo
process_manager.process_file(file_path)
except Exception as e:
logger.exception(
"Error processing file",
extra={
"file_name": file_path.name,
"error": str(e),
"error_type": type(e).__name__,
},
)
# Variable global para el ProcessManager
process_manager = None
# ============================================================================
# Main
# ============================================================================
def main() -> int:
"""Punto de entrada principal."""
try:
service = PollingService()
service.initialize()
service.run()
return 0
except Exception as e:
logger.exception(f"Fatal error: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())