Files
cbc2027/api/routes.py
renato97 ee8fc183be feat: Sistema CBCFacil completo con cola secuencial
- Implementa ProcessingMonitor singleton para procesamiento secuencial de archivos
- Agrega AI summary service con soporte para MiniMax API
- Agrega PDF generator para resúmenes
- Agrega watchers para monitoreo de carpeta remota
- Mejora sistema de notificaciones Telegram
- Implementa gestión de VRAM para GPU
- Configuración mediante variables de entorno (sin hardcoded secrets)
- .env y transcriptions/ agregados a .gitignore

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 15:35:39 +00:00

323 lines
11 KiB
Python

"""
Rutas API de Flask.
"""
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from flask import Blueprint, jsonify, request, send_file
from flask.typing import ResponseValue
from config import settings
from core.process_manager import ProcessManager as CoreProcessManager
from services import WebDAVService
from watchers import RemoteFolderWatcher
# Logger
logger = logging.getLogger(__name__)
# Blueprint
api_bp = Blueprint("api", __name__)
# Instancias globales (se inicializan en main.py)
webdav_service: WebDAVService = None
remote_watcher: RemoteFolderWatcher = None
process_manager: CoreProcessManager = None
def init_api(
pm: CoreProcessManager,
wd_service: Optional[WebDAVService] = None,
watcher: Optional[RemoteFolderWatcher] = None,
) -> None:
"""Inicializa las referencias a los servicios."""
global process_manager, webdav_service, remote_watcher
process_manager = pm
webdav_service = wd_service
remote_watcher = watcher
class LocalProcessManager:
"""
Gestor local de archivos para la API.
Provee métodos para obtener detalles de archivos y transcripciones
desde el sistema de archivos local.
"""
def __init__(self) -> None:
self._transcriptions_dir = settings.TRANSCRIPTIONS_DIR
def get_all_files_detailed(self) -> list[dict[str, Any]]:
"""Obtiene información detallada de todos los archivos."""
files_data = []
if self._transcriptions_dir.exists():
for f in self._transcriptions_dir.iterdir():
if f.is_file() and not f.name.startswith("."):
file_info = self._get_file_detail(f)
files_data.append(file_info)
return sorted(files_data, key=lambda x: x["modified"], reverse=True)
def _get_file_detail(self, file_path: Path) -> dict[str, Any]:
"""Obtiene información detallada de un archivo."""
filename = file_path.name
stat = file_path.stat()
# Buscar transcripción si existe archivo .txt
transcription_text = None
if file_path.suffix != ".txt":
txt_path = file_path.with_suffix(".txt")
if txt_path.exists():
transcription_text = txt_path.read_text(encoding="utf-8")
return {
"name": filename,
"path": str(file_path),
"size": stat.st_size,
"size_mb": round(stat.st_size / (1024 * 1024), 2),
"modified": stat.st_mtime,
"modified_iso": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"extension": file_path.suffix.lower(),
"status": "transcribed" if transcription_text else "pending",
"transcription": transcription_text,
"transcription_length": len(transcription_text or ""),
}
def get_file_detail(self, filename: str) -> Optional[dict[str, Any]]:
"""Obtiene información detallada de un archivo específico."""
transcriptions_dir = settings.TRANSCRIPTIONS_DIR
# Buscar archivo con cualquier extensión que coincida con el nombre base
name_without_ext = Path(filename).stem
for f in transcriptions_dir.iterdir():
if f.stem == name_without_ext:
return self._get_file_detail(f)
return None
def get_transcription_data(self, filename: str) -> Optional[dict[str, Any]]:
"""Obtiene la transcripción de un archivo."""
transcriptions_dir = settings.TRANSCRIPTIONS_DIR
name_without_ext = Path(filename).stem
# Buscar archivo .txt
txt_path = transcriptions_dir / f"{name_without_ext}.txt"
if txt_path.exists():
text = txt_path.read_text(encoding="utf-8")
return {
"text": text,
"created_at": datetime.fromtimestamp(txt_path.stat().st_mtime).isoformat(),
"metadata": {},
}
return None
# Instancia local para detalles de archivos
local_pm = LocalProcessManager()
@api_bp.route("/health", methods=["GET"])
def health_check() -> ResponseValue:
"""Health check endpoint."""
return jsonify({"status": "ok"}), 200
@api_bp.route("/status", methods=["GET"])
def status() -> ResponseValue:
"""Estado del sistema."""
status_data = {
"webdav_configured": settings.has_webdav_config,
"webdav_connected": False,
"watcher": None,
}
# Verificar conexión WebDAV
if settings.has_webdav_config and webdav_service:
try:
status_data["webdav_connected"] = webdav_service.test_connection()
except Exception as e:
logger.error(f"WebDAV connection test failed: {e}")
# Estado del watcher
if remote_watcher:
status_data["watcher"] = remote_watcher.get_status()
return jsonify(status_data), 200
@api_bp.route("/files", methods=["GET"])
def list_files() -> ResponseValue:
"""Lista archivos en la carpeta local."""
try:
files = []
downloads_dir = settings.DOWNLOADS_DIR
if downloads_dir.exists():
for f in downloads_dir.iterdir():
if f.is_file() and not f.name.startswith("."):
files.append({
"name": f.name,
"size": f.stat().st_size,
"modified": f.stat().st_mtime,
})
return jsonify({"files": files, "count": len(files)}), 200
except Exception as e:
logger.error(f"Error listing files: {e}")
return jsonify({"error": str(e)}), 500
@api_bp.route("/trigger", methods=["POST"])
def trigger_check() -> ResponseValue:
"""Fuerza una verificación de archivos remotos."""
try:
if remote_watcher:
remote_watcher.check_now()
return jsonify({"message": "Check triggered"}), 200
else:
return jsonify({"error": "Watcher not initialized"}), 500
except Exception as e:
logger.error(f"Error triggering check: {e}")
return jsonify({"error": str(e)}), 500
@api_bp.route("/remote-files", methods=["GET"])
def list_remote_files() -> ResponseValue:
"""Lista archivos en la carpeta remota de Nextcloud."""
try:
if not settings.has_webdav_config:
return jsonify({"error": "WebDAV not configured"}), 500
path = request.args.get("path", settings.WATCHED_REMOTE_PATH)
files = webdav_service.list_files(path)
return jsonify({"files": files, "path": path}), 200
except Exception as e:
logger.error(f"Error listing remote files: {e}")
return jsonify({"error": str(e)}), 500
@api_bp.route("/files-detailed", methods=["GET"])
def list_files_detailed() -> ResponseValue:
"""Lista archivos con información detallada y estado de transcripción."""
try:
files = local_pm.get_all_files_detailed()
logger.info(f"Listing {len(files)} files with detailed info")
return jsonify({"files": files, "count": len(files)}), 200
except Exception as e:
logger.error(f"Error listing detailed files: {e}")
return jsonify({"error": str(e)}), 500
@api_bp.route("/transcription/<path:filename>", methods=["GET"])
def get_transcription(filename: str) -> ResponseValue:
"""Obtiene la transcripción de un archivo específico."""
try:
logger.info(f"Getting transcription for: {filename}")
# Validar que el archivo existe
file_detail = local_pm.get_file_detail(filename)
if not file_detail:
logger.warning(f"File not found: {filename}")
return jsonify({"error": "File not found"}), 404
# Obtener transcripción
transcription_data = local_pm.get_transcription_data(filename)
if not transcription_data:
logger.info(f"No transcription found for: {filename}")
return jsonify({
"file_name": filename,
"transcription": None,
"status": file_detail.get("status", "pending"),
"metadata": {
"size_mb": file_detail.get("size_mb"),
"extension": file_detail.get("extension"),
"modified": file_detail.get("modified_iso"),
},
}), 200
return jsonify({
"file_name": filename,
"transcription": transcription_data["text"],
"status": "transcribed",
"created_at": transcription_data["created_at"],
"metadata": {
**file_detail.get("metadata", {}),
"transcription_length": len(transcription_data["text"]),
},
}), 200
except Exception as e:
logger.error(f"Error getting transcription for {filename}: {e}")
return jsonify({"error": str(e)}), 500
@api_bp.route("/summary/<path:filename>", methods=["GET"])
def get_summary(filename: str) -> ResponseValue:
"""Obtiene el resumen (markdown) de un archivo."""
try:
logger.info(f"Getting summary for: {filename}")
transcriptions_dir = settings.TRANSCRIPTIONS_DIR
name_without_ext = Path(filename).stem
# Buscar archivo .md con el resumen
md_path = transcriptions_dir / f"{name_without_ext}.md"
if not md_path.exists():
logger.warning(f"Summary not found: {filename}")
return jsonify({"error": "Summary not found"}), 404
summary_content = md_path.read_text(encoding="utf-8")
stat = md_path.stat()
return jsonify({
"file_name": filename,
"summary": summary_content,
"created_at": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"size": stat.st_size,
}), 200
except Exception as e:
logger.error(f"Error getting summary for {filename}: {e}")
return jsonify({"error": str(e)}), 500
@api_bp.route("/pdf/<path:filename>", methods=["GET"])
def get_pdf(filename: str):
"""Descarga el PDF de un archivo."""
try:
logger.info(f"Getting PDF for: {filename}")
transcriptions_dir = settings.TRANSCRIPTIONS_DIR
name_without_ext = Path(filename).stem
# Buscar archivo .pdf
pdf_path = transcriptions_dir / f"{name_without_ext}.pdf"
if not pdf_path.exists():
logger.warning(f"PDF not found: {filename}")
return jsonify({"error": "PDF not found"}), 404
return send_file(
pdf_path,
mimetype="application/pdf",
as_attachment=True,
download_name=f"{name_without_ext}.pdf",
)
except Exception as e:
logger.error(f"Error getting PDF for {filename}: {e}")
return jsonify({"error": str(e)}), 500