#!/usr/bin/env python3 """ CBFacil - Sistema de transcripción de audio con IA y Notion Características: - Polling de Nextcloud vía WebDAV - Transcripción con Whisper (medium, GPU) - Resúmenes con IA (MiniMax) - Generación de PDF - Notificaciones Telegram """ from dotenv import load_dotenv # Cargar variables de entorno desde .env load_dotenv() import logging import os import sys import threading import time from collections import deque from datetime import datetime from enum import Enum from pathlib import Path from typing import Callable, Optional import torch import torch.cuda from flask import Flask from flask_cors import CORS # API y configuración from api import api_bp, init_api from config import settings from processors.audio_processor import AudioProcessor, AudioProcessingError from services import WebDAVService from services.webdav_service import WebDAVService as WebDAVService_Class from services.telegram_service import TelegramService from watchers import RemoteFolderWatcher # Importar ProcessManager del core from core.process_manager import ProcessManager as CoreProcessManager, ProcessInfo # Paquetes de logging from pythonjsonlogger import jsonlogger class JSONFormatter(jsonlogger.JsonFormatter): """Formateador JSON para logs.""" def add_fields(self, log_record: dict, record: logging.LogRecord, message_dict: dict) -> None: super().add_fields(log_record, record, message_dict) log_record["timestamp"] = datetime.utcnow().isoformat() + "Z" log_record["level"] = record.levelname log_record["module"] = record.module def setup_logging() -> logging.Logger: """Configura el sistema de logging.""" logger = logging.getLogger() logger.setLevel(getattr(logging, settings.LOG_LEVEL.upper())) # Limpiar handlers existentes logger.handlers.clear() # Handler de consola console_handler = logging.StreamHandler(sys.stdout) if settings.is_production: console_handler.setFormatter(JSONFormatter( "%(timestamp)s %(level)s %(name)s %(message)s" )) else: console_handler.setFormatter( logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s") ) logger.addHandler(console_handler) # Handler de archivo si está configurado if settings.LOG_FILE: file_handler = logging.FileHandler(settings.LOG_FILE) file_handler.setFormatter(JSONFormatter( "%(timestamp)s %(level)s %(name)s %(message)s" )) logger.addHandler(file_handler) return logger logger = setup_logging() # ============================================================================ # MONITOR GLOBAL - Solo UN archivo procesando a la vez # ============================================================================ class ProcessingMonitor: """Monitor global para garantizar SOLO UN archivo en proceso a la vez.""" _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._processing = False cls._instance._current_file = None cls._instance._queue = deque() cls._instance._queued_files = set() return cls._instance def is_processing(self) -> bool: """Verifica si hay un archivo en proceso.""" with self._lock: return self._processing def get_current_file(self) -> Optional[str]: """Obtiene el archivo actual en proceso.""" with self._lock: return self._current_file def add_to_queue(self, file_path: Path) -> bool: """ Agrega un archivo a la cola. Returns: True si se agregó a la cola, False si ya estaba. """ with self._lock: file_key = str(file_path) if file_key in self._queued_files: return False self._queue.append(file_path) self._queued_files.add(file_key) return True def start_processing(self, file_path: Path) -> bool: """ Marca que se está procesando un archivo. Returns: True si se pudo iniciar el procesamiento, False si ya había uno en curso. """ with self._lock: if self._processing: return False self._processing = True self._current_file = str(file_path) return True def finish_processing(self): """Marca que el procesamiento terminó y retorna el siguiente archivo si existe.""" with self._lock: self._processing = False file_key = self._current_file self._current_file = None # Remover de queued_files if file_key: self._queued_files.discard(file_key) # Intentar procesar el siguiente archivo (fuera del lock) self._process_next() def get_queue_size(self) -> int: """Retorna el tamaño de la cola.""" with self._lock: return len(self._queue) def _process_next(self): """Procesa el siguiente archivo en la cola (debe llamarse fuera del lock).""" with self._lock: if self._processing or not self._queue: return next_file = self._queue.popleft() self._processing = True self._current_file = str(next_file) # Iniciar procesamiento fuera del lock logger.info( "🔄 Auto-starting NEXT file from queue", extra={"file": next_file.name, "remaining": len(self._queue)} ) # Crear un thread wrapper que garantiza cleanup def process_wrapper(): try: _process_file_async_safe(next_file) finally: # Siempre limpiar, incluso si hay excepción monitor.finish_processing() thread = threading.Thread(target=process_wrapper, daemon=True) thread.start() # Instancia global del monitor monitor = ProcessingMonitor() # ============================================================================ # Polling Service # ============================================================================ class PollingService: """Servicio principal de polling.""" def __init__(self) -> None: self.webdav_service: Optional[WebDAVService_Class] = None self.watcher: Optional[RemoteFolderWatcher] = None self.flask_app: Optional[Flask] = None self._telegram_service: Optional[TelegramService] = None self._process_manager: Optional[CoreProcessManager] = None self._running = False def initialize(self) -> None: """Inicializa los servicios.""" logger.info("Initializing CBCFacil polling service") # Verificar configuración if not settings.has_webdav_config: logger.error( "WebDAV configuration missing. Set NEXTCLOUD_URL, NEXTCLOUD_USER, NEXTCLOUD_PASSWORD" ) sys.exit(1) # Inicializar WebDAV self.webdav_service = WebDAVService() logger.info("Testing WebDAV connection...") if not self.webdav_service.test_connection(): logger.error("Failed to connect to Nextcloud") sys.exit(1) logger.info("WebDAV connection successful") # Inicializar TelegramService self._telegram_service = TelegramService() logger.info( "Telegram service initialized", extra={"configured": self._telegram_service._configured}, ) # Inicializar ProcessManager con callbacks de Telegram self._process_manager = CoreProcessManager( webdav_service=self.webdav_service, on_state_change=self._on_state_change, on_complete=self._on_process_complete, on_error=self._on_process_error, ) logger.info("ProcessManager initialized") # Asignar a variable global para uso en _process_file_async_safe global process_manager process_manager = self._process_manager # Inicializar watcher self.watcher = RemoteFolderWatcher( webdav_service=self.webdav_service, local_path=settings.DOWNLOADS_DIR, remote_path=settings.WATCHED_REMOTE_PATH, ) self.watcher.set_callback(self._on_file_downloaded) self.watcher.start() # Inicializar Flask self._setup_flask() logger.info("CBCFacil initialized successfully") def _setup_flask(self) -> None: """Configura la aplicación Flask.""" self.flask_app = Flask(__name__) CORS(self.flask_app) init_api(self._process_manager, self.webdav_service, self.watcher) self.flask_app.register_blueprint(api_bp) # Ruta principal @self.flask_app.route("/") def index(): return {"message": "CBCFacil Polling Service", "version": "1.0.0"} def _on_state_change(self, process_info: ProcessInfo) -> None: """Callback cuando cambia el estado de un proceso.""" filename = process_info.file_path.name # Enviar notificación apropiada según el estado if process_info.state.value == "transcribing" and self._telegram_service: self._telegram_service.send_start_notification(filename) def _on_process_complete(self, process_info: ProcessInfo) -> None: """Callback cuando un proceso se completa exitosamente.""" filename = process_info.file_path.name if process_info.transcript: logger.info( "Transcripción completada", extra={"file_name": filename, "text_length": len(process_info.transcript)}, ) # Enviar notificación de completación if self._telegram_service: duration = (process_info.updated_at - process_info.created_at).total_seconds() self._telegram_service.send_completion_notification( filename=filename, duration=duration, ) def _on_process_error(self, process_info: ProcessInfo) -> None: """Callback cuando ocurre un error en un proceso.""" filename = process_info.file_path.name error_msg = process_info.error or "Unknown error" logger.warning( "Transcripción fallida", extra={"file": filename, "error": error_msg}, ) # Enviar notificación de error if self._telegram_service: self._telegram_service.send_error_notification(filename, error_msg) def _on_file_downloaded(self, file_path: Path) -> None: """Callback when a file is downloaded.""" # Verificar si ya fue procesado (existe transcripción con nombre exacto) transcriptions_dir = settings.TRANSCRIPTIONS_DIR txt_path = transcriptions_dir / f"{file_path.stem}.txt" if txt_path.exists(): logger.info(f"Skipping already processed file: {file_path.name}") return self.queue_file_for_processing(file_path) def queue_file_for_processing(self, file_path: Path) -> None: """ Agrega un archivo a la cola de procesamiento SECUENCIAL. Solo UN archivo se procesa a la vez. """ # Intentar iniciar procesamiento inmediatamente if monitor.start_processing(file_path): # Se pudo iniciar - procesar este archivo logger.info( "🚀 Starting IMMEDIATE processing (SOLE file)", extra={"file": file_path.name, "queue_size": monitor.get_queue_size()} ) def process_wrapper(): try: _process_file_async_safe(file_path) finally: # Siempre limpiar y continuar con el siguiente monitor.finish_processing() thread = threading.Thread(target=process_wrapper, daemon=True) thread.start() else: # Ya hay un archivo en proceso - agregar a la cola if monitor.add_to_queue(file_path): logger.info( "⏳ File QUEUED (waiting for current to finish)", extra={ "file": file_path.name, "queue_position": monitor.get_queue_size(), "currently_processing": monitor.get_current_file() } ) else: logger.debug(f"File already in queue: {file_path.name}") def run(self) -> None: """Ejecuta el servicio.""" self._running = True # Iniciar Flask en un hilo separado flask_thread = threading.Thread( target=self._run_flask, daemon=True ) flask_thread.start() logger.info( "Flask server started", extra={ "host": settings.DASHBOARD_HOST, "port": settings.DASHBOARD_PORT, }, ) # Procesar archivos pendientes self._process_pending_files() # Loop principal de polling logger.info("Starting main polling loop") try: while self._running: time.sleep(settings.POLL_INTERVAL) self.watcher.check_now() except KeyboardInterrupt: logger.info("Received shutdown signal") self.stop() def _run_flask(self) -> None: """Ejecuta la aplicación Flask.""" logger.info("Starting Flask development server") self.flask_app.run( host=settings.DASHBOARD_HOST, port=settings.DASHBOARD_PORT, debug=False, use_reloader=False, ) def _process_pending_files(self) -> None: """Procesa archivos pendientes en la carpeta de descargas.""" if self._process_manager is None: logger.warning("ProcessManager not initialized, skipping pending files") return downloads_dir = settings.DOWNLOADS_DIR if not downloads_dir.exists(): logger.debug("Downloads directory does not exist") return # Extensiones de audio soportadas audio_extensions = {".mp3", ".wav", ".m4a", ".mp4", ".webm", ".ogg", ".flac"} # Obtener transcripciones existentes - verificar por nombre EXACTO transcriptions_dir = settings.TRANSCRIPTIONS_DIR # Filtrar solo archivos que NO han sido procesados pending_files = [] for f in downloads_dir.iterdir(): if f.is_file() and f.suffix.lower() in audio_extensions and not f.name.startswith("."): # Verificar si ya existe transcripción con el MISMO nombre txt_path = transcriptions_dir / f"{f.stem}.txt" if not txt_path.exists(): # No existe .txt, agregar a pendientes pending_files.append(f) if not pending_files: logger.debug("No pending audio files to process") return logger.info( f"Found {len(pending_files)} pending audio files", extra={"count": len(pending_files)}, ) for file_path in pending_files: logger.info(f"Processing pending file: {file_path.name}") self.queue_file_for_processing(file_path) def stop(self) -> None: """Detiene el servicio.""" self._running = False if self.watcher: self.watcher.stop() # ============================================================================ # Función segura de procesamiento de archivos # ============================================================================ def _process_file_async_safe(file_path: Path) -> None: """ Procesa un archivo de forma asíncrona. El cleanup y continuación de la cola se maneja en los wrappers (process_wrapper en queue_file_for_processing y _process_next). """ try: if process_manager is None: logger.error("ProcessManager not initialized") return logger.info( "Starting file processing", extra={"file": file_path.name}, ) # Procesar el archivo process_manager.process_file(file_path) except Exception as e: logger.exception( "Error processing file", extra={ "file_name": file_path.name, "error": str(e), "error_type": type(e).__name__, }, ) # Variable global para el ProcessManager process_manager = None # ============================================================================ # Main # ============================================================================ def main() -> int: """Punto de entrada principal.""" try: service = PollingService() service.initialize() service.run() return 0 except Exception as e: logger.exception(f"Fatal error: {e}") return 1 if __name__ == "__main__": sys.exit(main())