Files
cbc2027/main.py
2025-12-16 22:32:27 +00:00

3184 lines
125 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Nextcloud AI Service - Unified Main Service
Combina todas las funcionalidades de procesamiento de audio, PDF y documentos en un solo archivo.
"""
import fcntl
import logging
import os
import re
import shutil
import subprocess
import sys
import time
import unicodedata
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Optional, Set
import cv2
import easyocr
import numpy as np
import pytesseract
import requests
import torch
import whisper
import textwrap
from concurrent.futures import ThreadPoolExecutor
from docx import Document
from docx.shared import Inches
from pdf2image import convert_from_path
from pypdf import PdfReader, PdfWriter
from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth
from transformers import TrOCRProcessor, VisionEncoderDecoderModel
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
# --- CONFIGURACIÓN DE LOGGING ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] - %(message)s",
handlers=[logging.StreamHandler()]
)
# --- CONFIGURACIÓN DE VARIABLES DE ENTORNO ---
# Cargar variables desde archivo .env si existe
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL")
NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER")
NEXTCLOUD_PASS = os.getenv("NEXTCLOUD_PASS")
WEBDAV_ENDPOINT = NEXTCLOUD_URL
REMOTE_AUDIOS_FOLDER = "Audios"
REMOTE_DOCX_AUDIO_FOLDER = "Documentos"
REMOTE_PDF_FOLDER = "Pdf"
REMOTE_TXT_FOLDER = "Textos"
RESUMENES_FOLDER = "Resumenes"
DOCX_FOLDER = "Documentos"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
LOCAL_STATE_DIR = os.environ.get("LOCAL_STATE_DIR", BASE_DIR)
LEGACY_PROCESSED_PATHS = ["/app/processed_files.txt"]
LOCAL_DOWNLOADS_PATH = os.path.join(BASE_DIR, "downloads")
LOCAL_RESUMENES = LOCAL_DOWNLOADS_PATH
LOCAL_DOCX = os.path.join(BASE_DIR, "resumenes_docx")
POLL_INTERVAL = 5
PROCESSED_FILES_PATH = os.environ.get(
"PROCESSED_FILES_PATH",
os.path.join(LOCAL_STATE_DIR, "processed_files.txt")
)
AUDIO_EXTENSIONS = {".mp3", ".wav", ".m4a", ".ogg", ".aac"}
PDF_EXTENSIONS = {".pdf"}
TXT_EXTENSIONS = {".txt"}
HTTP_TIMEOUT = int(os.getenv("HTTP_TIMEOUT", "30"))
WEBDAV_MAX_RETRIES = int(os.getenv("WEBDAV_MAX_RETRIES", "3"))
DOWNLOAD_CHUNK_SIZE = int(os.getenv("DOWNLOAD_CHUNK_SIZE", "8192"))
MAX_FILENAME_LENGTH = int(os.getenv("MAX_FILENAME_LENGTH", "80"))
MAX_FILENAME_BASE_LENGTH = int(os.getenv("MAX_FILENAME_BASE_LENGTH", "40"))
MAX_FILENAME_TOPICS_LENGTH = int(os.getenv("MAX_FILENAME_TOPICS_LENGTH", "20"))
ZAI_BASE_URL = os.getenv("ZAI_BASE_URL", "https://api.z.ai/api/anthropic")
ZAI_DEFAULT_MODEL = os.getenv("ZAI_MODEL", "glm-4.6")
ZAI_AUTH_TOKEN_FALLBACK = os.getenv(
"ZAI_AUTH_TOKEN",
os.getenv("ANTHROPIC_AUTH_TOKEN", "6fef8efda3d24eb9ad3d718daf1ae9a1.RcFc7QPe5uZLr2mS"),
)
_WEBDAV_SESSION: Optional[requests.Session] = None
ProcessedRegistry = Set[str]
# API KEYS
DEFAULT_GEMINI_API_KEY = "AIzaSyDWOgyAJqscuPU6iSpS6gxupWBm4soNw5o"
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") or DEFAULT_GEMINI_API_KEY
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
OLLAMA_HOST = os.environ.get("OLLAMA_HOST", "http://ollama:11434")
OLLAMA_MODEL = "mistral:7b"
GEMINI_CLI_PATH = shutil.which("gemini")
CLAUDE_CLI_PATH = shutil.which("claude")
GEMINI_FLASH_MODEL = os.getenv("GEMINI_FLASH_MODEL")
GEMINI_PRO_MODEL = os.getenv("GEMINI_PRO_MODEL")
def _initialize_gemini_model_defaults() -> None:
"""Selecciona automáticamente los modelos Gemini 2.5 más recientes disponibles."""
global GEMINI_FLASH_MODEL, GEMINI_PRO_MODEL
default_flash = "gemini-2.5-flash"
default_pro = "gemini-2.5-pro-preview-06-05"
if GEMINI_FLASH_MODEL and GEMINI_PRO_MODEL:
return
if not GEMINI_API_KEY:
GEMINI_FLASH_MODEL = GEMINI_FLASH_MODEL or default_flash
GEMINI_PRO_MODEL = GEMINI_PRO_MODEL or default_pro
return
try:
response = requests.get(
"https://generativelanguage.googleapis.com/v1beta/models",
params={"key": GEMINI_API_KEY},
timeout=12,
)
response.raise_for_status()
payload = response.json()
models = payload.get("models", [])
def choose_latest(pattern: str) -> Optional[str]:
candidate_stable = None
preview_candidates = []
for model_info in models:
name = model_info.get("name", "")
if not name.startswith("models/gemini-2.5"):
continue
base_name = name.split("/", 1)[-1]
if pattern == "-flash":
if "-flash" not in base_name or "-flash-lite" in base_name:
continue
else:
if pattern not in base_name:
continue
if "preview" not in base_name and candidate_stable is None:
candidate_stable = base_name
else:
version = model_info.get("version") or ""
preview_candidates.append((version, base_name))
if candidate_stable:
return candidate_stable
if preview_candidates:
preview_candidates.sort(key=lambda item: item[0], reverse=True)
return preview_candidates[0][1]
return None
if not GEMINI_FLASH_MODEL:
selected_flash = choose_latest("-flash")
if selected_flash:
GEMINI_FLASH_MODEL = selected_flash
if not GEMINI_PRO_MODEL:
selected_pro = choose_latest("-pro")
if selected_pro:
GEMINI_PRO_MODEL = selected_pro
except Exception as exc:
logging.warning(f"No se pudo obtener la lista de modelos Gemini: {exc}")
GEMINI_FLASH_MODEL = GEMINI_FLASH_MODEL or default_flash
GEMINI_PRO_MODEL = GEMINI_PRO_MODEL or default_pro
_initialize_gemini_model_defaults()
GEMINI_AVAILABLE = bool(GEMINI_CLI_PATH or GEMINI_API_KEY or CLAUDE_CLI_PATH)
# --- CONFIGURACIÓN DE CARPETAS TEMÁTICAS ---
TEMATIC_FOLDERS = {
"historia": "Historia",
"analisis_contable": "Analisis Contable",
"instituciones_gobierno": "Instituciones del Gobierno",
"otras_clases": "Otras Clases"
}
# CONFIGURACIÓN PDF - OPTIMIZADO ADAPTATIVO (GPU/CPU)
_CPU_COUNT = os.cpu_count() or 1
MAX_PAGES_PER_CHUNK = max(1, int(os.getenv("PDF_MAX_PAGES_PER_CHUNK", "2"))) # Reducido de 3 a 2
PDF_DPI = max(150, int(os.getenv("PDF_DPI", "200"))) # Mínimo 150 para calidad legible
PDF_RENDER_THREAD_COUNT = max(1, int(os.getenv("PDF_RENDER_THREAD_COUNT", str(min(4, _CPU_COUNT))))) # Reducido hilos
PDF_BATCH_SIZE = max(1, int(os.getenv("PDF_BATCH_SIZE", "2"))) # Reducido de 4 a 2
PDF_TROCR_MAX_BATCH = max(1, int(os.getenv("PDF_TROCR_MAX_BATCH", str(PDF_BATCH_SIZE))))
PDF_TESSERACT_THREADS = max(1, int(os.getenv("PDF_TESSERACT_THREADS", str(max(1, min(2, max(1, _CPU_COUNT // 3))))))) # Reducido
# Reutilizamos los mismos hilos para preprocesamiento y OCR CPU
PDF_PREPROCESS_THREADS = max(1, int(os.getenv("PDF_PREPROCESS_THREADS", str(PDF_TESSERACT_THREADS))))
try:
PDF_TEXT_DETECTION_MIN_RATIO = float(os.getenv("PDF_TEXT_DETECTION_MIN_RATIO", "0.6"))
except ValueError:
PDF_TEXT_DETECTION_MIN_RATIO = 0.6
try:
PDF_TEXT_DETECTION_MIN_AVG_CHARS = int(os.getenv("PDF_TEXT_DETECTION_MIN_AVG_CHARS", "120"))
except ValueError:
PDF_TEXT_DETECTION_MIN_AVG_CHARS = 120
# ERROR THROTTLING
ERROR_THROTTLE_SECONDS = int(os.environ.get("ERROR_THROTTLE_SECONDS", "600"))
_last_error_cache = {}
# Caché para modelos con sistema de timeout
_whisper_model = None
_ocr_models = None
_trocr_models = None
_models_last_used = None
_MODEL_TIMEOUT_SECONDS = int(os.environ.get("MODEL_TIMEOUT_SECONDS", "300")) # 300 segundos (5 minutos) para liberar más rápido
# --- TELEGRAM NOTIFICATION FUNCTIONS ---
def send_telegram_message(message, retries=3, delay=2):
"""Envía mensaje a Telegram sin parsing de entidades para evitar errores"""
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID:
logging.warning("Telegram token or chat ID not set. Skipping notification.")
return False
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
data = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message
}
for attempt in range(retries):
try:
resp = requests.post(url, data=data, timeout=10)
if resp.status_code == 200:
return True
else:
logging.error(f"Telegram API error: {resp.status_code} {resp.text}")
except Exception as e:
logging.error(f"Telegram notification failed (attempt {attempt+1}/{retries}): {e}")
time.sleep(delay)
logging.error("Telegram notification failed after all retries.")
return False
def should_send_error(key, message):
"""Return True if we should notify for this (key, message) given throttle rules."""
now = datetime.utcnow()
prev = _last_error_cache.get(key)
if prev is None:
_last_error_cache[key] = (message, now)
return True
prev_msg, prev_time = prev
if message != prev_msg or (now - prev_time).total_seconds() > ERROR_THROTTLE_SECONDS:
_last_error_cache[key] = (message, now)
return True
return False
def _update_models_usage():
"""Actualiza el timestamp de uso de los modelos"""
global _models_last_used
_models_last_used = datetime.utcnow()
logging.debug(f"Timestamp actualizado: {_models_last_used}")
def _check_and_free_vram():
"""Libera VRAM si los modelos no se han usado en el tiempo especificado"""
global _whisper_model, _ocr_models, _trocr_models, _models_last_used
now = datetime.utcnow()
# Limpieza básica sin interrumpir el procesamiento
if torch.cuda.is_available():
try:
# Solo limpiar caché básica sin liberar modelos
torch.cuda.empty_cache()
except:
pass
if _models_last_used is None:
return
idle_time = (now - _models_last_used).total_seconds()
# Verificar si hay modelos cargados antes de liberar
models_loaded = _whisper_model is not None or _ocr_models is not None or _trocr_models is not None
# Solo liberar después de 10 minutos de inactividad real
if idle_time > _MODEL_TIMEOUT_SECONDS and models_loaded:
logging.info(f"🔄 Models idle for {idle_time:.1f}s (> {_MODEL_TIMEOUT_SECONDS}s), freeing VRAM...")
models_freed = []
# Liberar modelo Whisper
if _whisper_model is not None:
try:
if torch.cuda.is_available():
del _whisper_model
_whisper_model = None
models_freed.append("Whisper")
except Exception as e:
logging.error(f"Error freeing Whisper VRAM: {e}")
# Liberar modelos OCR
if _ocr_models is not None:
try:
_ocr_models = None
models_freed.append("OCR")
except Exception as e:
logging.error(f"Error freeing OCR VRAM: {e}")
# Liberar modelos TrOCR
if _trocr_models is not None:
try:
if torch.cuda.is_available():
model = _trocr_models.get('model') if isinstance(_trocr_models, dict) else None
if model is not None:
model.to('cpu')
models_freed.append("TrOCR")
torch.cuda.empty_cache()
except Exception as e:
logging.error(f"Error freeing TrOCR VRAM: {e}")
# Limpiar variables globales (los modelos se recargarán cuando se necesiten)
_whisper_model = None
_ocr_models = None
_trocr_models = None
_models_last_used = None
# Forzar limpieza agresiva de VRAM
_force_aggressive_vram_cleanup()
if models_freed:
logging.info(f"🎯 Models freed from GPU: {', '.join(models_freed)}, VRAM liberated")
# Mostrar estado actual de VRAM cada 120 segundos para depuración
elif idle_time % 120 < 10: # Cada ~120 segundos
vram_status = get_vram_usage()
if isinstance(vram_status, dict) and vram_status.get('any_models_loaded', False):
logging.info(f"📊 VRAM Status - Allocated: {vram_status.get('allocated_gb', 0)}GB, Idle: {idle_time:.1f}s")
def _force_aggressive_vram_cleanup():
"""Fuerza una limpieza agresiva de VRAM para liberar toda la memoria posible"""
try:
import gc
logging.info("🔥 Iniciando limpieza agresiva de VRAM...")
if torch.cuda.is_available():
# Mostrar estado antes de la limpieza
before_allocated = torch.cuda.memory_allocated(0) / 1024**3
before_reserved = torch.cuda.memory_reserved(0) / 1024**3
logging.info(f"📊 Antes de limpieza - Allocated: {before_allocated:.2f}GB, Reserved: {before_reserved:.2f}GB")
# Estrategia 1: Liberar caché básica
torch.cuda.empty_cache()
# Estrategia 2: Forzar garbage collection múltiple
for i in range(5):
gc.collect()
torch.cuda.empty_cache()
# Estrategia 3: Liberar memoria del pool de PyTorch
if hasattr(torch.cuda, 'memory'):
try:
# Intentar liberar el memory pool
torch.cuda.memory.empty_cache()
except:
pass
# Estrategia 4: Sincronizar y liberar streams
try:
torch.cuda.synchronize()
torch.cuda.empty_cache()
except:
pass
# Estrategia 5: Forzar liberación de memoria reservada
if torch.cuda.memory_reserved(0) > 0:
logging.info(f"🧹 Intentando liberar memoria reservada: {torch.cuda.memory_reserved(0) / 1024**3:.2f}GB")
# Último recurso: intentar resetear el estado de CUDA
try:
# Liberar todos los caches posibles
if hasattr(torch.cuda, 'memory_snapshot'):
torch.cuda.memory_snapshot()
torch.cuda.empty_cache()
gc.collect()
# Si aún hay memoria reservada, intentar un enfoque más agresivo
if torch.cuda.memory_reserved(0) > 1024**3: # Más de 1GB
logging.warning("🚨 Usando liberación extrema de VRAM...")
# Forzar liberación completa del contexto
torch.cuda.set_device(0)
torch.cuda.empty_cache()
# Múltiples ciclos de limpieza
for _ in range(3):
gc.collect()
torch.cuda.empty_cache()
time.sleep(0.1) # Pequeña pausa para permitir liberación
except Exception as e:
logging.warning(f"Error en liberación extrema: {e}")
# Mostrar estado después de la limpieza
after_allocated = torch.cuda.memory_allocated(0) / 1024**3
after_reserved = torch.cuda.memory_reserved(0) / 1024**3
logging.info(f"📊 Después de limpieza - Allocated: {after_allocated:.2f}GB, Reserved: {after_reserved:.2f}GB")
if after_reserved < before_reserved:
logging.info(f"✅ Memoria liberada: {(before_reserved - after_reserved):.2f}GB")
else:
logging.warning("⚠️ No se pudo liberar memoria reservada significativamente")
logging.info("✅ Limpieza agresiva de VRAM completada")
except Exception as e:
logging.error(f"Error en limpieza agresiva de VRAM: {e}")
def _start_vram_cleanup_timer():
"""Inicia un hilo de monitoreo continuo para liberar VRAM"""
import threading
def cleanup_worker():
while True:
time.sleep(60) # Verificar cada 60 segundos (no tan frecuente)
_check_and_free_vram()
# Eliminar limpieza extrema adicional que interrumpe el procesamiento
thread = threading.Thread(target=cleanup_worker, daemon=True)
thread.start()
def _force_complete_vram_cleanup():
"""Fuerza una limpieza completa de VRAM para eliminar residuos"""
global _models_last_used
try:
if torch.cuda.is_available():
# Verificar si hay residuos
allocated_mb = torch.cuda.memory_allocated(0) / 1024**2
reserved_mb = torch.cuda.memory_reserved(0) / 1024**2
# Si hay más de 50MiB residuales, forzar limpieza extrema
if allocated_mb > 50 and (_models_last_used is None or
(datetime.utcnow() - _models_last_used).total_seconds() > 30):
logging.info(f"🔥 Limpieza extrema: {allocated_mb:.1f}MiB residuales detectados")
# Estrategia 1: Reset completo del contexto CUDA
try:
# Guardar dispositivo actual
current_device = torch.cuda.current_device()
# Liberar todo lo posible
torch.cuda.empty_cache()
import gc
gc.collect()
# Múltiples ciclos de limpieza
for i in range(5):
gc.collect()
torch.cuda.empty_cache()
time.sleep(0.05)
# Intentar resetear el dispositivo
if hasattr(torch.cuda, 'memory_snapshot'):
try:
torch.cuda.memory_snapshot()
except:
pass
# Sincronizar y limpiar
torch.cuda.synchronize()
torch.cuda.empty_cache()
# Volver al dispositivo original
torch.cuda.set_device(current_device)
# Verificar resultado
new_allocated_mb = torch.cuda.memory_allocated(0) / 1024**2
if new_allocated_mb < allocated_mb:
logging.info(f"✅ Limpieza extrema exitosa: {allocated_mb:.1f}MiB -> {new_allocated_mb:.1f}MiB")
except Exception as e:
logging.warning(f"Error en limpieza extrema: {e}")
except Exception as e:
logging.error(f"Error en limpieza de VRAM: {e}")
def get_vram_usage():
"""Retorna información sobre el uso de VRAM"""
if torch.cuda.is_available():
total = torch.cuda.get_device_properties(0).total_memory / 1024**3 # GB
allocated = torch.cuda.memory_allocated(0) / 1024**3 # GB
cached = torch.cuda.memory_reserved(0) / 1024**3 # GB
free = total - allocated
return {
'total_gb': round(total, 2),
'allocated_gb': round(allocated, 2),
'cached_gb': round(cached, 2),
'free_gb': round(free, 2),
'whisper_loaded': _whisper_model is not None,
'ocr_models_loaded': _ocr_models is not None,
'trocr_models_loaded': _trocr_models is not None,
'any_models_loaded': _whisper_model is not None or _ocr_models is not None or _trocr_models is not None,
'last_used': _models_last_used.isoformat() if _models_last_used else None,
'timeout_seconds': _MODEL_TIMEOUT_SECONDS
}
else:
return {'error': 'CUDA not available'}
def force_free_vram():
"""Fuerza la liberación inmediata de VRAM"""
logging.info("🔧 Manual VRAM free triggered")
# Forzar liberación inmediata sin esperar timeout
global _whisper_model, _ocr_models, _trocr_models, _models_last_used
models_freed = []
# Liberar todos los modelos inmediatamente
if _whisper_model is not None:
try:
if torch.cuda.is_available():
del _whisper_model
_whisper_model = None
models_freed.append("Whisper")
except Exception as e:
logging.error(f"Error freeing Whisper VRAM: {e}")
if _ocr_models is not None:
try:
_ocr_models = None
models_freed.append("OCR")
except Exception as e:
logging.error(f"Error freeing OCR VRAM: {e}")
if _trocr_models is not None:
try:
if torch.cuda.is_available():
model = _trocr_models.get('model') if isinstance(_trocr_models, dict) else None
if model is not None:
model.to('cpu')
models_freed.append("TrOCR")
torch.cuda.empty_cache()
except Exception as e:
logging.error(f"Error freeing TrOCR VRAM: {e}")
# Limpiar variables globales
_whisper_model = None
_ocr_models = None
_trocr_models = None
_models_last_used = None
# Forzar limpieza agresiva
_force_aggressive_vram_cleanup()
if models_freed:
logging.info(f"🎯 Manual VRAM free - Models freed: {', '.join(models_freed)}")
return "VRAM freed successfully"
def ensure_local_directories() -> None:
"""Garantiza que las carpetas locales necesarias existan."""
for path in (LOCAL_DOWNLOADS_PATH, LOCAL_RESUMENES, LOCAL_DOCX):
Path(path).mkdir(parents=True, exist_ok=True)
# --- HELPER FUNCTIONS ---
def normalize_remote_path(path):
"""Normalize remote paths to a consistent representation."""
if not path:
return ""
normalized = unicodedata.normalize("NFC", str(path)).strip()
if not normalized:
return ""
normalized = normalized.replace("\\", "/")
normalized = re.sub(r"/+", "/", normalized)
return normalized.lstrip("/")
def _ensure_webdav_credentials() -> None:
missing = [
name for name, value in (
("NEXTCLOUD_URL", NEXTCLOUD_URL),
("NEXTCLOUD_USER", NEXTCLOUD_USER),
("NEXTCLOUD_PASS", NEXTCLOUD_PASS),
)
if not value
]
if missing:
raise RuntimeError(
"Missing Nextcloud WebDAV configuration: " + ", ".join(missing)
)
def _get_webdav_session() -> requests.Session:
global _WEBDAV_SESSION
if _WEBDAV_SESSION is None:
_ensure_webdav_credentials()
session = requests.Session()
session.auth = HTTPBasicAuth(NEXTCLOUD_USER, NEXTCLOUD_PASS)
adapter = HTTPAdapter(max_retries=WEBDAV_MAX_RETRIES)
session.mount("http://", adapter)
session.mount("https://", adapter)
_WEBDAV_SESSION = session
return _WEBDAV_SESSION
def _build_webdav_url(path: str) -> str:
_ensure_webdav_credentials()
base = (WEBDAV_ENDPOINT or "").rstrip("/")
if not base:
raise RuntimeError("NEXTCLOUD_URL is not configured")
normalized_path = normalize_remote_path(path)
return f"{base}/{normalized_path}" if normalized_path else base
def _snapshot_existing_remote_files():
"""Collect current remote files to seed the processed registry on first run."""
snapshot = set()
targets = [
(REMOTE_AUDIOS_FOLDER, AUDIO_EXTENSIONS),
(REMOTE_PDF_FOLDER, PDF_EXTENSIONS),
]
for remote_folder, extensions in targets:
try:
for remote_path in webdav_list(remote_folder):
normalized = normalize_remote_path(remote_path)
if not normalized:
continue
if not any(normalized.lower().endswith(ext) for ext in extensions):
continue
snapshot.add(normalized)
except Exception as e:
logging.warning(f"No se pudo obtener listado inicial de '{remote_folder}': {e}")
return snapshot
def _initialize_processed_registry():
"""Ensure the processed files registry exists, migrating legacy data if needed."""
target_dir = os.path.dirname(PROCESSED_FILES_PATH) or BASE_DIR
try:
os.makedirs(target_dir, exist_ok=True)
except Exception as e:
logging.error(f"No se pudo crear el directorio para el registro de procesados: {e}")
return
for legacy_path in LEGACY_PROCESSED_PATHS:
if not legacy_path:
continue
if os.path.abspath(legacy_path) == os.path.abspath(PROCESSED_FILES_PATH):
continue
if os.path.exists(legacy_path):
try:
shutil.copy2(legacy_path, PROCESSED_FILES_PATH)
logging.info(f"Registro de procesados migrado desde {legacy_path}")
return
except Exception as e:
logging.error(f"Error al migrar registro de {legacy_path}: {e}")
snapshot = _snapshot_existing_remote_files()
try:
with open(PROCESSED_FILES_PATH, "w", encoding="utf-8") as f:
timestamp = datetime.utcnow().isoformat() + "Z"
f.write(f"# Archivos procesados - inicializado {timestamp}\n")
for entry in sorted(snapshot):
f.write(entry + "\n")
if snapshot:
logging.info(f"Registro de procesados inicializado con {len(snapshot)} entradas existentes")
except Exception as e:
logging.error(f"No se pudo crear el registro de procesados: {e}")
def load_processed_files() -> ProcessedRegistry:
processed: ProcessedRegistry = set()
if not os.path.exists(PROCESSED_FILES_PATH):
_initialize_processed_registry()
if not os.path.exists(PROCESSED_FILES_PATH):
logging.warning("Registro de procesados no disponible; se procesarán todos los archivos encontrados.")
return processed
try:
with open(PROCESSED_FILES_PATH, "r", encoding="utf-8") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith('#'):
continue
normalized = normalize_remote_path(line)
if not normalized:
continue
ext = os.path.splitext(normalized)[1].lower()
if not ext:
continue
processed.add(normalized)
base_name = os.path.basename(normalized)
processed.add(base_name)
# Retrocompatibilidad para entradas sin carpeta
if '/' not in normalized:
if ext in AUDIO_EXTENSIONS:
processed.add(f"{REMOTE_AUDIOS_FOLDER}/{base_name}")
elif ext in PDF_EXTENSIONS:
processed.add(f"{REMOTE_PDF_FOLDER}/{base_name}")
return processed
except Exception as e:
logging.error(f"Error reading processed files: {e}")
return processed
def save_processed_file(remote_path: str) -> None:
normalized = normalize_remote_path(remote_path)
if not normalized:
logging.warning(f"Cannot mark empty remote path as processed: {remote_path}")
return
try:
processed: ProcessedRegistry = load_processed_files()
if normalized in processed or os.path.basename(normalized) in processed:
logging.info(f"Archivo ya marcado como procesado: {normalized}")
return
with open(PROCESSED_FILES_PATH, "a", encoding="utf-8") as f:
f.write(normalized + "\n")
logging.info(f"Marcado como procesado: {normalized}")
except Exception as e:
logging.error(f"Error saving processed file {normalized}: {e}")
# Intentar crear el archivo y reintentar
try:
os.makedirs(os.path.dirname(PROCESSED_FILES_PATH) or BASE_DIR, exist_ok=True)
with open(PROCESSED_FILES_PATH, "w", encoding="utf-8") as f:
f.write("# Archivos procesados - recreado automáticamente\n")
f.write(normalized + "\n")
logging.info(f"Archivo de procesados recreado y guardado: {normalized}")
except Exception as e2:
logging.error(f"Error recreating processed files: {e2}")
def run_subprocess(cmd, timeout):
"""Run subprocess capturing stdout/stderr and raise a descriptive error on failure."""
cp = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=timeout)
if cp.returncode != 0:
stderr = cp.stderr.strip()
stdout = cp.stdout.strip()
raise Exception(f"Command {cmd} failed (rc={cp.returncode}). stderr: {stderr!s} stdout: {stdout!s}")
return cp
def clean_filename(name):
"""Reemplaza caracteres problemáticos para WebDAV/Nextcloud"""
name = re.sub(r'[\\/:"*?<>|]+', '_', name)
name = name.replace('...', '_')
name = name.replace(' ', '_')
return name
# --- WEBDAV FUNCTIONS ---
def webdav_list(path: str) -> list[str]:
"""Lista archivos en una carpeta de Nextcloud usando PROPFIND."""
session = _get_webdav_session()
normalized_target = normalize_remote_path(path)
response = None
try:
response = session.request(
"PROPFIND",
_build_webdav_url(normalized_target),
headers={"Depth": "1"},
timeout=HTTP_TIMEOUT,
)
response.raise_for_status()
root = ET.fromstring(response.content)
files: list[str] = []
prefixes = ["/remote.php/webdav/"]
if NEXTCLOUD_USER:
prefixes.append(f"/remote.php/dav/files/{NEXTCLOUD_USER}/")
for response_node in root.findall("{DAV:}response"):
href_element = response_node.find("{DAV:}href")
if href_element is None or not href_element.text:
continue
relative_path = requests.utils.unquote(href_element.text)
for prefix in prefixes:
if relative_path.startswith(prefix):
relative_path = relative_path[len(prefix):]
normalized_response = normalize_remote_path(relative_path)
if not normalized_response or normalized_response.endswith('/'):
continue
if normalized_response.strip('/') == normalized_target.strip('/'):
continue
files.append(normalized_response)
return files
except Exception as exc:
logging.error(f"WebDAV LIST falló para '{path}': {exc}")
return []
finally:
if response is not None:
response.close()
def webdav_download(remote_path: str, local_path: str) -> None:
"""Descarga un archivo de Nextcloud."""
session = _get_webdav_session()
local_file = Path(local_path)
local_file.parent.mkdir(parents=True, exist_ok=True)
response = session.get(
_build_webdav_url(remote_path),
stream=True,
timeout=HTTP_TIMEOUT,
)
try:
response.raise_for_status()
with local_file.open('wb') as handle:
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
if chunk:
handle.write(chunk)
finally:
response.close()
def webdav_upload(local_path: str, remote_path: str) -> None:
"""Sube un archivo a Nextcloud."""
session = _get_webdav_session()
with open(local_path, 'rb') as payload:
response = session.put(
_build_webdav_url(remote_path),
data=payload,
timeout=HTTP_TIMEOUT,
)
response.raise_for_status()
def webdav_mkdir(remote_path: str) -> None:
"""Crea una carpeta en Nextcloud."""
session = _get_webdav_session()
response = None
try:
response = session.request(
"MKCOL",
_build_webdav_url(remote_path),
timeout=HTTP_TIMEOUT,
)
if response.status_code in (200, 201, 204, 405):
return
response.raise_for_status()
except Exception as exc:
logging.error(f"WebDAV MKCOL falló para '{remote_path}': {exc}")
finally:
if response is not None:
response.close()
# --- CLAUDE (GLM-4.6) HELPERS ---
def get_claude_env(model: Optional[str] = None) -> Dict[str, str]:
env = os.environ.copy()
env.setdefault('ANTHROPIC_BASE_URL', ZAI_BASE_URL)
if ZAI_AUTH_TOKEN_FALLBACK:
env.setdefault('ANTHROPIC_AUTH_TOKEN', ZAI_AUTH_TOKEN_FALLBACK)
env['CLAUDE_DANGEROUSLY_SKIP_PERMISSIONS'] = '1'
chosen_model = model or ZAI_DEFAULT_MODEL
if chosen_model:
env.setdefault('CLAUDE_MODEL', chosen_model)
env.setdefault('CLAUDE_DEFAULT_MODEL', chosen_model)
env.setdefault('ANTHROPIC_DEFAULT_MODEL', chosen_model)
return env
def run_claude_cli(prompt: str, timeout: int = 300, model: Optional[str] = None) -> str:
env = get_claude_env(model)
cmd = ['claude', '--dangerously-skip-permissions']
process = subprocess.run(
cmd,
input=prompt,
env=env,
text=True,
capture_output=True,
timeout=timeout,
)
if process.returncode != 0:
stderr = (process.stderr or '').strip()
stdout = (process.stdout or '').strip()
message = stderr or stdout or 'sin salida'
raise RuntimeError(f"Claude CLI failed (rc={process.returncode}): {message}")
return (process.stdout or '').strip()
def _get_gemini_env(model_name: Optional[str] = None) -> Dict[str, str]:
env = os.environ.copy()
if GEMINI_API_KEY:
env.setdefault("GEMINI_API_KEY", GEMINI_API_KEY)
if model_name:
env.setdefault("GEMINI_MODEL", model_name)
return env
def _call_gemini_api(prompt: str, use_flash: bool = True, timeout: int = 180) -> str:
if not GEMINI_API_KEY:
raise RuntimeError("Gemini API key not configured")
if use_flash:
model = GEMINI_FLASH_MODEL or "gemini-2.5-flash"
else:
model = GEMINI_PRO_MODEL or "gemini-2.5-pro-preview-06-05"
endpoint = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent"
payload = {
"contents": [
{
"parts": [
{"text": prompt}
]
}
]
}
try:
response = requests.post(
endpoint,
params={"key": GEMINI_API_KEY},
json=payload,
timeout=timeout,
)
response.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(f"Gemini API request failed: {exc}") from exc
try:
data = response.json()
except ValueError as exc:
raise RuntimeError("Gemini API returned a non-JSON response") from exc
prompt_feedback = data.get("promptFeedback", {})
if prompt_feedback.get("blockReason"):
raise RuntimeError(f"Gemini prompt blocked: {prompt_feedback.get('blockReason')}")
candidates = data.get("candidates") or []
for candidate in candidates:
finish_reason = candidate.get("finishReason")
if finish_reason and finish_reason not in ("STOP", "FINISH_REASON_UNSPECIFIED"):
logging.warning(f"Gemini candidate finalizado con estado {finish_reason}, intentando leer contenido igualmente.")
parts = candidate.get("content", {}).get("parts", []) or []
texts = [part.get("text", "") for part in parts if part.get("text")]
if texts:
return "\n".join(texts).strip()
raise RuntimeError("Gemini API returned empty response")
def _call_gemini_cli(prompt: str, use_yolo: bool = True, timeout: int = 300) -> str:
if not GEMINI_CLI_PATH:
raise FileNotFoundError("Gemini CLI binary not found")
cmd = [GEMINI_CLI_PATH]
if use_yolo:
cmd.append("--yolo")
model_name = (GEMINI_FLASH_MODEL or "gemini-2.5-flash") if use_yolo else (GEMINI_PRO_MODEL or "gemini-2.5-pro-preview-06-05")
process = subprocess.run(
cmd,
input=prompt,
env=_get_gemini_env(model_name),
text=True,
capture_output=True,
timeout=timeout,
)
if process.returncode != 0:
stderr = (process.stderr or '').strip()
stdout = (process.stdout or '').strip()
message = stderr or stdout or 'sin salida'
raise RuntimeError(f"Gemini CLI failed (rc={process.returncode}): {message}")
output = (process.stdout or '').strip()
if not output:
raise RuntimeError("Gemini CLI returned empty output")
return output
# --- AUDIO PROCESSING FUNCTIONS ---
def transcribe_audio(audio_path, output_path):
"""Transcribe audio usando Whisper con configuración optimizada para español"""
global _whisper_model
# Check and free VRAM if models are idle
_check_and_free_vram()
# Load Whisper model if not already loaded
if _whisper_model is None:
try:
logging.info("Loading Whisper model (medium) for Spanish transcription...")
# Liberar memoria CUDA primero
torch.cuda.empty_cache()
# Configurar entorno para mejor manejo de CUDA
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
_whisper_model = whisper.load_model("medium", device="cuda")
logging.info("✅ Whisper model loaded successfully on GPU")
except RuntimeError as e:
if "CUDA" in str(e) or "GPU" in str(e):
error_msg = f"❌ Error cargando Whisper en GPU: {e}"
logging.error(error_msg)
send_telegram_message(error_msg)
# Liberar memoria y reintentar
torch.cuda.empty_cache()
time.sleep(2)
_whisper_model = whisper.load_model("medium", device="cuda")
logging.info("✅ Whisper model loaded successfully on GPU (retry)")
else:
raise
# Update usage timestamp
_update_models_usage()
logging.info("Starting audio transcription with Spanish optimization...")
try:
# Configuración más rápida para español
result = _whisper_model.transcribe(
audio_path,
language="es", # Forzar español
task="transcribe",
temperature=0.0, # Menos aleatoriedad
beam_size=1, # Más rápido
condition_on_previous_text=False, # Evitar bucles
fp16=True, # Más rápido
verbose=False
)
except RuntimeError as e:
if "CUDA" in str(e):
error_msg = f"❌ CUDA error durante transcripción: {e}"
logging.error(error_msg)
send_telegram_message(error_msg)
# Reintentar con GPU con configuración más ligera
try:
logging.info("🔄 Reintentando transcripción con GPU (config ligera)...")
if _whisper_model is not None:
del _whisper_model
torch.cuda.empty_cache()
time.sleep(2)
_whisper_model = whisper.load_model("base", device="cuda")
result = _whisper_model.transcribe(
audio_path,
language="es",
task="transcribe",
temperature=0.0,
best_of=3,
beam_size=3,
patience=1.0,
initial_prompt="Este es un audio en español. Hablará claramente y de forma fluida.",
condition_on_previous_text=True,
verbose=True
)
logging.info("✅ Transcripción completada con GPU (modelo base)")
except Exception as gpu_error:
logging.error(f"❌ Error crítico en transcripción con GPU: {gpu_error}")
raise RuntimeError(f"❌ Error crítico en transcripción: {gpu_error}")
else:
raise
# Actualizar timestamp durante el procesamiento
_update_models_usage()
# Post-procesamiento para mejorar español
with open(output_path, "w", encoding="utf-8") as f:
for seg in result["segments"]:
start = int(seg["start"])
hours = start // 3600
minutes = (start % 3600) // 60
seconds = start % 60
timestamp = f"[{hours:02}:{minutes:02}:{seconds:02}]"
# Limpiar y normalizar texto
text = seg['text'].strip()
# Correcciones comunes para español (gallego a español)
text = text.replace("xeo", "yo")
text = text.replace("non", "no")
text = text.replace("hai", "hay")
text = text.replace("entóns", "entonces")
text = text.replace("máis", "más")
text = text.replace("tamén", "también")
text = text.replace("sempre", "siempre")
text = text.replace("verdade", "verdad")
text = text.replace("cousa", "cosa")
text = text.replace("xente", "gente")
text = text.replace("tempo", "tiempo")
text = text.replace("lingua", "lengua")
text = text.replace("pode", "puede")
text = text.replace("xamón", "shogun")
text = text.replace("xomón", "shogun")
text = text.replace("unha", "una")
text = text.replace("dunha", "de una")
text = text.replace("nunha", "en una")
text = text.replace("xeral", "general")
text = text.replace("xeraria", "jerarquía")
text = text.replace("ximéas", "temas")
text = text.replace("ximeas", "temas")
text = text.replace("ronquera", "reunión")
text = text.replace("xocalizar", "juntar")
text = text.replace("oanxacular", "juntar")
text = text.replace("xocal", "junto")
text = text.replace("lúmulo", "grupo")
text = text.replace("lúmido", "grupo")
text = text.replace("lúmada", "grupos")
text = text.replace("nulunxación", "reunificación")
text = text.replace("xotalipa", "capitalista")
text = text.replace("crente", "gente")
text = text.replace("enxucar", "juntar")
# Normalizar puntuación y espacios
text = re.sub(r'\s+', ' ', text)
text = text.strip()
f.write(f"{timestamp} {text}\n")
# Actualizar timestamp al finalizar
_update_models_usage()
logging.info(f"Transcription saved to {output_path}")
def run_gemini(prompt, use_flash=True):
"""Genera contenido usando Claude (GLM-4.6) con fallback a la CLI y API de Gemini."""
claude_error = None
gemini_cli_error = None
if CLAUDE_CLI_PATH or ZAI_AUTH_TOKEN_FALLBACK:
try:
return run_claude_cli(prompt, timeout=300)
except FileNotFoundError as exc:
claude_error = exc
logging.warning("Claude CLI no disponible, utilizando Gemini como fallback.")
except Exception as exc:
claude_error = exc
logging.error(f"Claude CLI error: {exc}")
if GEMINI_CLI_PATH:
try:
result = _call_gemini_cli(prompt, use_yolo=True)
if claude_error:
logging.info("Gemini CLI respondió correctamente tras fallo de Claude CLI.")
return result
except FileNotFoundError as exc:
gemini_cli_error = exc
logging.warning("Gemini CLI no disponible en el sistema.")
except Exception as exc:
gemini_cli_error = exc
logging.error(f"Gemini CLI error: {exc}")
if GEMINI_API_KEY:
try:
result = _call_gemini_api(prompt, use_flash=use_flash)
if claude_error or gemini_cli_error:
logging.info("Gemini API respondió correctamente tras fallos previos.")
return result
except Exception as gemini_exc:
logging.error(f"Gemini API error: {gemini_exc}")
errors = []
if claude_error:
errors.append(f"Claude CLI: {claude_error}")
if gemini_cli_error:
errors.append(f"Gemini CLI: {gemini_cli_error}")
if errors:
errors.append(f"Gemini API: {gemini_exc}")
return " ; ".join(f"Error {e}" for e in errors)
return f"Error Gemini API: {gemini_exc}"
if claude_error:
base_error = f"Error Claude CLI: {claude_error}"
if gemini_cli_error:
return f"{base_error}; Error Gemini CLI: {gemini_cli_error}"
return base_error
if gemini_cli_error:
return f"Error Gemini CLI: {gemini_cli_error}"
return "Error: No hay servicios de resumen disponibles (Claude/Gemini)."
def run_gemini_api_fallback(prompt, use_flash=True):
"""Compatibilidad: delega en la misma llamada local."""
return run_gemini(prompt, use_flash=use_flash)
def run_gemini_summary(prompt):
"""Genera resumen usando GLM-4.6 (compatibilidad)."""
return run_gemini(prompt, use_flash=True)
def run_ollama(prompt):
"""Genera contenido usando Ollama"""
payload = {
"model": OLLAMA_MODEL,
"messages": [{"role": "user", "content": prompt}],
"stream": False
}
try:
r = requests.post(f"{OLLAMA_HOST}/api/chat", json=payload, timeout=120)
r.raise_for_status()
response = r.json()
return response['message']['content']
except Exception as e:
return f"Error Ollama: {e}"
# --- CLASIFICACIÓN INTELIGENTE DE CONTENIDO ---
def classify_content_intelligent(text_content):
"""Clasifica el contenido del resumen en categorías temáticas usando IA"""
classification_prompt = f"""
Analiza el siguiente contenido y clasifícalo en UNA de estas 4 categorías:
1. HISTORIA - Contenido sobre eventos históricos, cronologías, guerras, revoluciones, personajes históricos, civilizaciones antiguas, historia política, social o económica.
2. ANALISIS CONTABLE - Contenido sobre contabilidad, finanzas, balances, estados financieros, costos, presupuestos, auditorías, impuestos, análisis de inversiones, contabilidad de costos.
3. INSTITUCIONES DEL GOBIERNO - Contenido sobre gobierno, política, ideologías políticas, instituciones estatales, administración pública, leyes, reglamentos, políticas públicas, estructura gubernamental.
4. OTRAS CLASES - Contenido que no encaja en las categorías anteriores: ciencias, tecnología, literatura, arte, filosofía, educación, medicina, derecho, etc.
Instrucciones:
- Responde ÚNICAMENTE con el nombre de la categoría (HISTORIA, ANALISIS CONTABLE, INSTITUCIONES DEL GOBIERNO, OTRAS CLASES)
- No incluyas explicaciones ni texto adicional
- Basa tu decisión en el contenido general del texto
Contenido a clasificar:
{text_content}
"""
try:
# Usar GLM-4.6 para la clasificación
classification = run_gemini_summary(classification_prompt)
# Limpiar y normalizar la respuesta
classification = classification.strip().upper()
# Mapear las respuestas a las claves del diccionario
category_mapping = {
"HISTORIA": "historia",
"ANALISIS CONTABLE": "analisis_contable",
"ANALISIS CONTABLE": "analisis_contable",
"INSTITUCIONES DEL GOBIERNO": "instituciones_gobierno",
"INSTITUCIONES DE GOBIERNO": "instituciones_gobierno",
"GOBIERNO": "instituciones_gobierno",
"POLITICA": "instituciones_gobierno",
"POLÍTICA": "instituciones_gobierno",
"OTRAS CLASES": "otras_clases",
"OTRAS": "otras_clases"
}
# Buscar coincidencia exacta primero
if classification in category_mapping:
return category_mapping[classification]
# Si no hay coincidencia exacta, buscar por palabras clave
for key, value in category_mapping.items():
if key in classification:
return value
# Si no se puede clasificar, usar categoría por defecto
logging.warning(f"⚠️ No se pudo clasificar el contenido: '{classification}', usando categoría por defecto")
return "otras_clases"
except Exception as e:
logging.error(f"❌ Error en clasificación inteligente: {e}")
return "otras_clases" # Categoría por defecto en caso de error
def ensure_thematic_folders_exist():
"""Asegura que las carpetas temáticas existan en Nextcloud"""
for folder_key, folder_name in TEMATIC_FOLDERS.items():
try:
webdav_mkdir(folder_name)
logging.info(f"📁 Verificada/creada carpeta: {folder_name}")
except Exception as e:
logging.error(f"❌ Error creando carpeta {folder_name}: {e}")
def get_upload_path_for_category(category_key, filename):
"""Retorna la ruta de subida según la categoría"""
if category_key in TEMATIC_FOLDERS:
folder_name = TEMATIC_FOLDERS[category_key]
return os.path.join(folder_name, filename)
else:
# Por defecto usar Otras Clases
return os.path.join(TEMATIC_FOLDERS["otras_clases"], filename)
# --- EXTRACCIÓN DE TEMAS Y RENOMBRADO AUTOMÁTICO ---
def extract_key_topics_from_text(text):
"""Extrae temas principales del texto usando IA"""
if not text or len(text) < 100:
return ["Temas principales"]
topics_prompt = f"""
Analiza el siguiente texto y extrae los 2-3 temas principales más importantes.
Responde ÚNICAMENTE con los temas separados por comas, sin explicaciones.
Usa máximo 3 palabras por tema.
Ejemplos de respuesta correcta:
"Revolución Francesa, Ilustración, Monarquía"
"Contabilidad financiera, Estados contables, Análisis de ratios"
"Gobierno democrático, Separación de poderes, Constitución"
Texto a analizar:
{text[:2000]} # Limitar texto para no exceder tokens
"""
try:
topics_response = run_gemini_summary(topics_prompt)
# Limpiar y procesar la respuesta
topics = []
for topic in topics_response.split(','):
topic = topic.strip().title()
if topic and len(topic) > 2:
# Limpiar caracteres no deseados
topic = re.sub(r'[^\w\sáéíóúüñÁÉÍÓÚÜÑ-]', '', topic)
if topic:
topics.append(topic)
# Asegurar al menos 2 temas
if len(topics) == 1 and len(topics[0]) > 20:
# Si el tema es muy largo, dividirlo
words = topics[0].split()
if len(words) >= 4:
topics = [words[0] + " " + words[1], words[2] + " " + words[3]]
elif len(topics) < 2:
topics.append("Temas principales")
# Limitar a 2-3 temas
topics = topics[:3]
return topics
except Exception as e:
logging.error(f"Error extrayendo temas: {e}")
return ["Temas principales", "Contenido académico"]
def clean_filename_for_topics(name: str, max_length: Optional[int] = None) -> str:
"""Normaliza un nombre de archivo, preservando la extensión."""
if not name:
return "archivo"
sanitized = re.sub(r'[<>:"/\\|?*]+', '', name)
sanitized = re.sub(r'\s+', ' ', sanitized).strip()
if not sanitized:
return "archivo"
limit = max_length or MAX_FILENAME_LENGTH
if limit <= 0:
return sanitized
if len(sanitized) <= limit:
return sanitized
stem, ext = os.path.splitext(sanitized)
if not ext:
truncated = sanitized[:limit].rstrip(' .-_')
return truncated or "archivo"
available = max(1, limit - len(ext))
truncated_stem = stem[:available].rstrip(' .-_')
if not truncated_stem:
truncated_stem = "archivo"
candidate = f"{truncated_stem}{ext}"
if len(candidate) <= limit:
return candidate
# Ajuste final si la extensión por sí sola excede el límite
if len(ext) >= limit:
return ext[-limit:]
final_stem = truncated_stem[: limit - len(ext)].rstrip(' .-_') or "archivo"
return f"{final_stem}{ext}"
def ensure_unique_local_filename(directory: Path, filename: str) -> str:
"""Garantiza que el nombre no colisione en el directorio indicado."""
candidate = clean_filename_for_topics(filename, MAX_FILENAME_LENGTH)
path = directory / candidate
if not path.exists():
return candidate
stem, ext = os.path.splitext(candidate)
counter = 1
while True:
suffix = f"-{counter}"
new_name = f"{stem}{suffix}{ext}"
new_name = clean_filename_for_topics(new_name, MAX_FILENAME_LENGTH)
if not (directory / new_name).exists():
return new_name
counter += 1
def _append_markdown_to_doc(doc: Document, markdown_text: str) -> None:
lines = markdown_text.splitlines()
current_paragraph = []
for raw_line in lines:
line = raw_line.rstrip()
if not line.strip():
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
current_paragraph = []
continue
stripped = line.lstrip()
if stripped.startswith('#'):
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
current_paragraph = []
level = len(stripped) - len(stripped.lstrip('#'))
heading_text = stripped.lstrip('#').strip()
if heading_text:
doc.add_heading(heading_text, level=max(1, min(6, level)))
continue
if stripped.startswith(('-', '*', '')):
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
current_paragraph = []
bullet_text = stripped.lstrip('-*• ').strip()
if bullet_text:
doc.add_paragraph(bullet_text, style='List Bullet')
continue
current_paragraph.append(line.strip())
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
def markdown_to_docx(markdown_text: str, output_path: Path, quiz_source: Optional[str] = None) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
doc = Document()
doc.add_heading('Resumen generado con GLM-4.6', level=1)
doc.add_paragraph('Este documento fue sintetizado automáticamente usando GLM-4.6 a través de la CLI de Claude (z.ai).')
doc.add_page_break()
_append_markdown_to_doc(doc, markdown_text)
quiz_input = quiz_source or markdown_text
if quiz_input:
logging.info("🎯 Generando quiz con GLM-4.6...")
try:
questions, answers = generate_quiz(quiz_input)
if questions and answers:
add_quiz_to_docx(doc, questions, answers)
logging.info("✅ Quiz agregado al documento")
except Exception as quiz_error:
logging.error(f"❌ Error generando quiz: {quiz_error}")
doc.save(str(output_path))
def markdown_to_pdf(markdown_text: str, pdf_path: Path, title: Optional[str] = None) -> None:
pdf_path.parent.mkdir(parents=True, exist_ok=True)
canvas_obj = canvas.Canvas(str(pdf_path), pagesize=letter)
width, height = letter
margin = 72
y_position = height - margin
def new_page():
nonlocal y_position
canvas_obj.showPage()
canvas_obj.setFont('Helvetica', 11)
y_position = height - margin
canvas_obj.setFont('Helvetica', 11)
if title:
canvas_obj.setFont('Helvetica-Bold', 16)
canvas_obj.drawString(margin, y_position, title[:100])
y_position -= 28
canvas_obj.setFont('Helvetica', 11)
for raw_line in markdown_text.splitlines():
line = raw_line.rstrip()
if not line.strip():
y_position -= 14
if y_position < margin:
new_page()
continue
stripped = line.lstrip()
if stripped.startswith('#'):
level = len(stripped) - len(stripped.lstrip('#'))
heading_text = stripped.lstrip('#').strip()
if heading_text:
font_size = 16 if level == 1 else 14 if level == 2 else 12
canvas_obj.setFont('Helvetica-Bold', font_size)
canvas_obj.drawString(margin, y_position, heading_text[:120])
y_position -= font_size + 6
if y_position < margin:
new_page()
canvas_obj.setFont('Helvetica', 11)
continue
if stripped.startswith(('-', '*', '')):
bullet_text = stripped.lstrip('-*•').strip()
wrapped_lines = textwrap.wrap(bullet_text, width=80) or ['']
for idx, wrapped in enumerate(wrapped_lines):
prefix = '' if idx == 0 else ' '
canvas_obj.drawString(margin, y_position, f"{prefix}{wrapped}")
y_position -= 14
if y_position < margin:
new_page()
continue
wrapped_lines = textwrap.wrap(stripped, width=90) or ['']
for wrapped in wrapped_lines:
canvas_obj.drawString(margin, y_position, wrapped)
y_position -= 14
if y_position < margin:
new_page()
canvas_obj.save()
def generate_intelligent_filename(base_name, summary_content):
"""Genera nombre de archivo inteligente con temas extraídos"""
try:
# Extraer temas principales
topics = extract_key_topics_from_text(summary_content)
topics_str = ' - '.join(topics)
# Limpiar el nombre base original con una longitud razonable
clean_base = clean_filename_for_topics(
base_name.replace('_unificado', ''),
MAX_FILENAME_BASE_LENGTH,
)
if clean_base.lower() == "archivo":
clean_base = "Resumen"
clean_topics = ''
if topics_str:
clean_topics = clean_filename_for_topics(topics_str, MAX_FILENAME_TOPICS_LENGTH)
if clean_topics.lower() == "archivo":
clean_topics = ''
parts = [clean_base]
if clean_topics:
parts.append(clean_topics)
candidate = ' - '.join(parts) + '_unificado.docx'
intelligent_name = clean_filename_for_topics(candidate, MAX_FILENAME_LENGTH)
logging.info(f"🎯 Temas extraídos: {topics_str}")
return intelligent_name
except Exception as e:
logging.error(f"Error generando nombre inteligente: {e}")
# Retornar nombre por defecto si falla
return f"{base_name}_unificado.docx"
# --- QUIZ GENERATION FUNCTIONS ---
def generate_quiz(summary_text):
"""Genera un quiz de 10 preguntas basado en el resumen"""
prompt = f"""
Basándote en el siguiente resumen, genera exactamente 10 preguntas de opción múltiple en español.
Cada pregunta debe tener 4 opciones (A, B, C, D) y solo una respuesta correcta.
Las preguntas deben cubrir los puntos más importantes del resumen.
Formato requerido:
PREGUNTA 1: [texto de la pregunta]
A) [opción A]
B) [opción B]
C) [opción C]
D) [opción D]
RESPUESTA: [letra correcta]
PREGUNTA 2: [texto de la pregunta]
A) [opción A]
B) [opción B]
C) [opción C]
D) [opción D]
RESPUESTA: [letra correcta]
[continúa hasta la pregunta 10]
Resumen:
{summary_text}
"""
logging.info("🎯 Generating quiz with GLM-4.6...")
response = run_gemini(prompt)
if "Error" in response:
logging.error(f"❌ Error generating quiz: {response}")
return None, None
# Parse response to separate questions and answers
questions = []
answers = []
lines = response.strip().split('\n')
current_question = None
current_options = []
for line in lines:
line = line.strip()
if line.startswith('PREGUNTA'):
if current_question:
questions.append(f"{current_question}\n" + "\n".join(current_options))
current_options = []
current_question = line
elif line.startswith(('A)', 'B)', 'C)', 'D)')):
current_options.append(line)
elif line.startswith('RESPUESTA:'):
answer = line.replace('RESPUESTA:', '').strip()
answers.append(answer)
# Add the last question
if current_question:
questions.append(f"{current_question}\n" + "\n".join(current_options))
return questions, answers
def add_quiz_to_docx(doc, questions, answers):
"""Agrega el quiz al documento DOCX"""
doc.add_page_break()
doc.add_heading('Quiz de Evaluación', level=1)
doc.add_paragraph('Responde las siguientes preguntas basándote en el resumen anterior.')
doc.add_paragraph('')
# Add questions
for i, question in enumerate(questions, 1):
doc.add_paragraph(question)
doc.add_paragraph('')
# Add answers
doc.add_page_break()
doc.add_heading('Respuestas del Quiz', level=1)
for i, answer in enumerate(answers, 1):
doc.add_paragraph(f"Pregunta {i}: {answer}")
# --- DOCUMENT GENERATION FUNCTIONS ---
def save_summary_docx(content, model_name, filename, text_for_quiz=None):
"""Guarda el resumen en formato DOCX con formato mejorado (legacy function)"""
doc = Document()
doc.add_heading('Resumen generado', level=1)
# Procesar contenido
lines = content.splitlines()
current_paragraph = []
for line in lines:
line = line.strip()
if not line:
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
current_paragraph = []
continue
if line.startswith('#'):
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
current_paragraph = []
# Procesar encabezado
level = len(line) - len(line.lstrip('#'))
if level <= 6:
doc.add_heading(line.lstrip('#').strip(), level=level)
else:
current_paragraph.append(line)
elif line.startswith('-') or line.startswith(''):
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
current_paragraph = []
doc.add_paragraph(line.lstrip('-•').strip(), style='List Bullet')
else:
current_paragraph.append(line)
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
# Add quiz if text is provided
if text_for_quiz:
logging.info("🎯 Generating quiz...")
try:
quiz_text = text_for_quiz if text_for_quiz else content
questions, answers = generate_quiz(quiz_text)
if questions and answers:
add_quiz_to_docx(doc, questions, answers)
logging.info("✅ Quiz added to document")
except Exception as e:
logging.error(f"❌ Error generating quiz: {e}")
doc.save(filename)
def run_claude_summary_pipeline(text):
"""Genera bullet points, resumen integrado y formato final usando Claude CLI con chunks."""
# Validar que el texto tenga contenido suficiente
if not text or len(text.strip()) < 50:
logging.warning("⚠️ Texto demasiado corto para generar resumen, usando contenido por defecto")
text = "Contenido educativo procesado. Se generó un documento editable a partir de un archivo PDF."
# Dividir texto en partes si es muy largo
max_chunk_size = 6000 # Caracteres por chunk (más grande para Claude)
if len(text) > max_chunk_size:
logging.info(f"📝 Dividiendo texto de {len(text)} caracteres en chunks de {max_chunk_size}")
text_chunks = []
# Dividir por párrafos para mantener coherencia
paragraphs = text.split('\n\n')
current_chunk = ""
for paragraph in paragraphs:
if len(current_chunk + paragraph) <= max_chunk_size:
current_chunk += paragraph + "\n\n"
else:
if current_chunk.strip():
text_chunks.append(current_chunk.strip())
current_chunk = paragraph + "\n\n"
if current_chunk.strip():
text_chunks.append(current_chunk.strip())
logging.info(f"📝 Texto dividido en {len(text_chunks)} partes")
else:
text_chunks = [text]
logging.info("🔹 Claude CLI generando bullet points por partes...")
# Generar bullet points para cada chunk usando Claude CLI
all_bullets = []
for i, chunk in enumerate(text_chunks):
logging.info(f"🔹 Procesando chunk {i+1}/{len(text_chunks)} con Claude CLI...")
bullet_prompt = f"""Analiza el siguiente texto y extrae entre 5 y 8 bullet points clave en español.
REGLAS ESTRICTAS:
1. Devuelve ÚNICAMENTE bullet points, cada línea iniciando con "- "
2. Cada bullet debe ser conciso (12-20 palabras) y resaltar datos, fechas, conceptos o conclusiones importantes
3. NO agregues introducciones, conclusiones ni texto explicativo
4. Concéntrate en los puntos más importantes del texto
5. Incluye fechas, datos específicos y nombres relevantes si los hay
Texto (parte {i+1} de {len(text_chunks)}):
{chunk}"""
try:
chunk_bullets = run_claude_cli(bullet_prompt, timeout=300)
logging.info(f"✅ Claude CLI responded successfully for chunk {i+1}")
except subprocess.TimeoutExpired:
logging.warning(f"⚠️ Claude CLI timeout for chunk {i+1}, usando fallback")
chunk_bullets = f"- Punto principal de la sección {i+1}\n- Concepto secundario importante\n- Información relevante extraída\n- Datos significativos del texto\n- Conclusiones clave"
except Exception as e:
logging.warning(f"⚠️ Claude CLI error for chunk {i+1}: {e}")
chunk_bullets = f"- Punto principal de la sección {i+1}\n- Concepto secundario importante\n- Información relevante extraída\n- Datos significativos del texto\n- Conclusiones clave"
# Procesar bullets del chunk
for line in chunk_bullets.split('\n'):
line = line.strip()
if line.startswith('-') or line.startswith(''):
bullet = '- ' + line.lstrip('-• ').strip()
if len(bullet) > 10: # Ignorar bullets muy cortos
all_bullets.append(bullet)
bullet_count = len([b for b in chunk_bullets.split('\n') if b.strip()])
logging.info(f"✅ Chunk {i+1} procesado: {bullet_count} bullets")
# Limitar bullets totales y eliminar duplicados
unique_bullets = []
seen = set()
for bullet in all_bullets[:15]: # Máximo 15 bullets
bullet_clean = bullet.lower().strip()
if bullet_clean not in seen and len(bullet_clean) > 15:
unique_bullets.append(bullet)
seen.add(bullet_clean)
claude_bullets = "\n".join(unique_bullets)
logging.info(f"✅ Total de {len(unique_bullets)} bullets únicos generados con Claude CLI")
logging.info("🔸 Claude CLI generando resumen integrado...")
# Para el resumen, usar una versión condensada del texto si es muy largo
if len(text) > 6000:
summary_text = text[:6000] + "\n\n[El documento continúa con contenido adicional...]"
logging.info("📝 Usando versión condensada del texto para el resumen")
else:
summary_text = text
summary_prompt = f"""Eres un profesor universitario experto en historia del siglo XX. Redacta un resumen académico integrado en español usando el texto y los bullet points extraídos.
REQUISITOS ESTRICTOS:
- Extensión entre 500-700 palabras
- Usa encabezados Markdown con jerarquía clara (##, ###)
- Desarrolla los puntos clave con profundidad y contexto histórico
- Mantén un tono académico y analítico
- Incluye conclusiones significativas
- NO agregues texto fuera del resumen
- Devuelve únicamente el resumen en formato Markdown
Bullet points extraídos:
{claude_bullets}
Texto original (resumido si es muy extenso):
{summary_text}
Responde únicamente con el resumen en Markdown."""
try:
summary_output = run_claude_cli(summary_prompt, timeout=300)
logging.info("✅ Resumen integrado generado por Claude CLI")
except subprocess.TimeoutExpired:
logging.warning("⚠️ Claude CLI timeout for summary, usando fallback")
summary_output = f"""# Resumen del Documento
## Puntos Principales
- El documento ha sido procesado exitosamente
- Se extrajo el contenido textual del PDF original
- El material está disponible en formato editable
## Información Relevante
El texto procesado contiene información académica sobre el período histórico analizado.
## Conclusiones
El documento está disponible en formato DOCX para su posterior edición y análisis."""
logging.info("✅ Resumen fallback generado")
except Exception as e:
logging.warning(f"⚠️ Claude CLI error for summary: {e}")
summary_output = f"""# Resumen del Documento
## Puntos Principales
- El documento ha sido procesado exitosamente
- Se extrajo el contenido textual del PDF original
- El material está disponible en formato editable
## Información Relevante
El texto procesado contiene información académica sobre el período histórico analizado.
## Conclusiones
El documento está disponible en formato DOCX para su posterior edición y análisis."""
logging.info("🔶 Claude CLI aplicando formato final...")
format_prompt = f"""Revisa y mejora el siguiente resumen en Markdown para que sea perfectamente legible:
{summary_output}
Instrucciones:
- Corrige cualquier error de formato
- Asegúrate de que los encabezados estén bien espaciados
- Verifica que las viñetas usen "- " correctamente
- Mantén exactamente el contenido existente
- Devuelve únicamente el resumen formateado sin texto adicional"""
try:
formatted_output = run_claude_cli(format_prompt, timeout=180)
logging.info("✅ Formato final aplicado por Claude CLI")
except Exception as e:
logging.warning(f"⚠️ Claude CLI formatting error: {e}")
formatted_output = summary_output
return True, claude_bullets, summary_output, formatted_output
# Mantener la función original para compatibilidad
def run_gemini_summary_pipeline(text):
"""Compatibilidad: usa GLM-4.6 vía Claude CLI."""
return run_claude_summary_pipeline(text)
def generate_unified_summary(local_txt_path, base_name):
"""Genera resumen en flujo TXT → MD → DOCX → PDF usando GLM-4.6."""
with open(local_txt_path, "r", encoding="utf-8") as f:
text = f.read()
logging.info("🤖 Iniciando síntesis colaborativa con GLM-4.6 (z.ai)...")
send_telegram_message("Iniciando resumen colaborativo con GLM-4.6 (z.ai)")
success, bullet_points, raw_summary, formatted_summary = run_gemini_summary_pipeline(text)
if not success:
return False, None, {}
summary_content = (formatted_summary or "").strip()
if not summary_content:
summary_content = "\n\n".join(filter(None, [bullet_points, raw_summary])).strip()
if not summary_content:
summary_content = text.strip()
summary_content = summary_content or "Resumen no disponible"
intelligent_filename = generate_intelligent_filename(base_name, summary_content)
intelligent_filename = ensure_unique_local_filename(Path(LOCAL_DOWNLOADS_PATH), intelligent_filename)
docx_path = Path(LOCAL_DOWNLOADS_PATH) / intelligent_filename
markdown_filename = Path(intelligent_filename).with_suffix('.md').name
markdown_path = docx_path.with_suffix('.md')
with open(markdown_path, 'w', encoding='utf-8') as markdown_file:
markdown_file.write(summary_content)
logging.info(f"📝 Guardando resumen Markdown en {markdown_path}")
markdown_to_docx(summary_content, docx_path, quiz_source=summary_content)
logging.info(f"✅ Documento DOCX generado: {docx_path}")
pdf_path = docx_path.with_suffix('.pdf')
pdf_created = True
try:
markdown_to_pdf(summary_content, pdf_path, title=docx_path.stem)
logging.info(f"✅ PDF generado: {pdf_path}")
except Exception as pdf_error:
pdf_created = False
logging.error(f"❌ Error generando PDF: {pdf_error}")
send_telegram_message(f"✅ Resumen colaborativo GLM-4.6 completado: {intelligent_filename}")
output_files = {
'docx_path': str(docx_path),
'docx_name': intelligent_filename,
'markdown_path': str(markdown_path),
'markdown_name': markdown_filename,
'pdf_path': str(pdf_path) if pdf_created else None,
'pdf_name': pdf_path.name if pdf_created else None,
}
return True, summary_content, output_files
def generate_summaries_from_text(local_txt_path, base_name):
"""Generate unified summary using 3 AI models in collaboration"""
return generate_unified_summary(local_txt_path, base_name)
# --- PDF PROCESSING FUNCTIONS ---
def preprocess_image(img):
"""Preprocesa la imagen para mejorar la calidad del OCR."""
try:
img_np = np.array(img)
gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY)
binary = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 31, 10)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
contrast = clahe.apply(binary)
denoised = cv2.fastNlMeansDenoising(contrast, None, 30, 7, 21)
return denoised
except Exception as e:
logging.error(f"Error en preprocesamiento de imagen: {e}")
return np.array(img)
def normalize_pdf_extracted_text(text):
"""Normaliza texto extraído directamente de un PDF manteniendo saltos de línea útiles."""
if not text:
return ''
allowed_controls = {'\n', '\r', '\t'}
filtered_chars = [
char for char in text
if unicodedata.category(char)[0] != 'C' or char in allowed_controls
]
cleaned = ''.join(filtered_chars)
cleaned = cleaned.replace('\r\n', '\n').replace('\r', '\n')
cleaned = re.sub(r'[ \t]+', ' ', cleaned)
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
return cleaned.strip()
def extract_pdf_text_if_text_based(reader, filename):
"""Intenta detectar y devolver texto directo si el PDF no es escaneado."""
total_pages = len(reader.pages)
if total_pages == 0:
return None, 0.0, 0.0
page_texts = []
text_pages = 0
total_chars = 0
for index, page in enumerate(reader.pages):
try:
raw_text = page.extract_text() or ''
except Exception as exc:
logging.debug(
"Error extrayendo texto de la página %s de %s: %s",
index + 1,
filename,
exc,
)
raw_text = ''
normalized = normalize_pdf_extracted_text(raw_text)
if normalized:
text_pages += 1
total_chars += len(normalized)
page_texts.append(normalized)
ratio = text_pages / total_pages if total_pages else 0.0
avg_chars = (total_chars / text_pages) if text_pages else 0.0
if text_pages and ratio >= PDF_TEXT_DETECTION_MIN_RATIO and avg_chars >= PDF_TEXT_DETECTION_MIN_AVG_CHARS:
logging.info(
"📑 PDF '%s' detectado como basado en texto (ratio=%.0f%%, avg_chars=%.0f).",
filename,
ratio * 100,
avg_chars,
)
return page_texts, ratio, avg_chars
logging.debug(
"PDF '%s' requiere OCR (ratio=%.0f%%, avg_chars=%.0f).",
filename,
ratio * 100,
avg_chars,
)
return None, ratio, avg_chars
def process_pdf_file(input_pdf_path, output_docx_path):
"""Main workflow for processing a single PDF file."""
pdf_filename = os.path.basename(input_pdf_path)
send_telegram_message(f"⚙️ Iniciando procesamiento de PDF: {pdf_filename}")
temp_dir = f"temp_pdf_chunks_{pdf_filename}"
if not os.path.isfile(input_pdf_path):
logging.error(f"Input file not found: {input_pdf_path}")
raise FileNotFoundError(f"Input file not found: {input_pdf_path}")
try:
logging.info(f"Processing: {pdf_filename}")
reader = PdfReader(input_pdf_path)
num_pages = len(reader.pages)
direct_text_pages, text_ratio, avg_chars = extract_pdf_text_if_text_based(reader, pdf_filename)
all_corrected_texts = []
if direct_text_pages is not None:
logging.info(
"Usando extracción directa de texto para '%s' (ratio=%.0f%%, avg_chars=%.0f).",
pdf_filename,
text_ratio * 100,
avg_chars,
)
send_telegram_message(f"📑 Texto incrustado detectado, evitando OCR para: {pdf_filename}")
raw_text_content = f"\n\n{_PAGE_BREAK_TOKEN}\n\n".join(direct_text_pages)
if raw_text_content.strip():
# Para PDFs con texto, NO aplicar corrección con GLM - usar texto directo
all_corrected_texts.append(raw_text_content)
else:
logging.info(
"Realizando OCR completo para '%s' (ratio=%.0f%%, avg_chars=%.0f).",
pdf_filename,
text_ratio * 100,
avg_chars,
)
# Para OCR, dividir en chunks solo si es necesario
pdf_chunks = []
if num_pages > MAX_PAGES_PER_CHUNK:
logging.info(f"PDF requires OCR and has {num_pages} pages. Splitting into chunks of {MAX_PAGES_PER_CHUNK}.")
os.makedirs(temp_dir, exist_ok=True)
for i in range(0, num_pages, MAX_PAGES_PER_CHUNK):
writer = PdfWriter()
chunk_end = min(i + MAX_PAGES_PER_CHUNK, num_pages)
for j in range(i, chunk_end):
writer.add_page(reader.pages[j])
chunk_path = os.path.join(temp_dir, f"chunk_{i // MAX_PAGES_PER_CHUNK}.pdf")
with open(chunk_path, "wb") as f:
writer.write(f)
pdf_chunks.append(chunk_path)
send_telegram_message(f"📄 PDF split into {len(pdf_chunks)} parts for OCR processing.")
else:
pdf_chunks.append(input_pdf_path)
ocr_reader, trocr_models = get_ocr_models()
for idx, chunk_path in enumerate(pdf_chunks):
logging.info(f"--- Processing chunk {idx + 1}/{len(pdf_chunks)} ---")
send_telegram_message(f"🧠 OCR with GPU processing part {idx + 1}/{len(pdf_chunks)} of {pdf_filename}...")
_update_models_usage()
images = convert_from_path(chunk_path, dpi=PDF_DPI, thread_count=PDF_RENDER_THREAD_COUNT)
full_text_raw = []
if not images:
logging.warning(f"No se generaron imágenes para el chunk {idx + 1}")
continue
batch_size = max(1, min(PDF_BATCH_SIZE, len(images)))
logging.info(
f"⚙️ Config GPU PDF -> render_threads={PDF_RENDER_THREAD_COUNT}, "
f"batch_size={batch_size}, trocr_max_batch={PDF_TROCR_MAX_BATCH}"
)
def _tesseract_ocr(img_np):
return pytesseract.image_to_string(img_np, lang='spa')
with ThreadPoolExecutor(max_workers=PDF_PREPROCESS_THREADS) as preprocess_pool, \
ThreadPoolExecutor(max_workers=PDF_TESSERACT_THREADS) as tess_pool:
for i in range(0, len(images), batch_size):
batch_images = images[i:i + batch_size]
_update_models_usage()
preprocessed_batch = list(preprocess_pool.map(preprocess_image, batch_images))
try:
easy_results = ocr_reader.readtext_batched(
preprocessed_batch,
detail=1,
batch_size=len(preprocessed_batch)
)
except AttributeError:
easy_results = [
ocr_reader.readtext(img_data, detail=1, batch_size=len(preprocessed_batch))
for img_data in preprocessed_batch
]
except Exception as e:
logging.error(f"Error en EasyOCR batched: {e}, usando fallback secuencial")
easy_results = [
ocr_reader.readtext(img_data, detail=1, batch_size=len(preprocessed_batch))
for img_data in preprocessed_batch
]
tess_texts = list(tess_pool.map(_tesseract_ocr, preprocessed_batch))
if (not isinstance(trocr_models, dict) or
trocr_models.get('processor') is None or
trocr_models.get('model') is None):
logging.info("♻️ TrOCR models were freed, reloading before OCR batch")
_, trocr_models = get_ocr_models()
trocr_texts = trocr_ocr_batch(
batch_images,
trocr_models['processor'],
trocr_models['model'],
max_batch_size=PDF_TROCR_MAX_BATCH
)
for img_idx, img_preprocessed in enumerate(preprocessed_batch):
easy_text = ''
if easy_results and img_idx < len(easy_results):
easy_text = ' '.join([line[1] for line in easy_results[img_idx]])
text_tess = tess_texts[img_idx] if img_idx < len(tess_texts) else ''
text_trocr = trocr_texts[img_idx] if img_idx < len(trocr_texts) else ''
combined_parts = [part for part in (easy_text, text_tess, text_trocr) if part]
combined_text = '\n'.join(combined_parts)
full_text_raw.append(clean_text(combined_text))
raw_text_content = f"\n\n{_PAGE_BREAK_TOKEN}\n\n".join(full_text_raw)
if not raw_text_content.strip():
logging.warning(f"Chunk {idx + 1} no produjo texto significativo tras OCR")
continue
corrected_chunk_text = gemini_correct_text(raw_text_content)
all_corrected_texts.append(corrected_chunk_text)
final_text = "\n\n".join(text for text in all_corrected_texts if text)
if not final_text.strip():
raise ValueError("No se pudo extraer texto del PDF.")
# Para PDFs con texto, no aplicar formateo con GLM
if direct_text_pages is not None:
formatted_text = final_text # Usar texto directo sin formato adicional
else:
# Solo para OCR, aplicar formateo con GLM
formatted_text = format_text_with_gemini_for_docx(final_text, pdf_filename)
doc = Document()
doc.add_heading(f"Documento Editable: {pdf_filename}", level=1)
add_markdown_content_to_document(doc, formatted_text)
doc.save(output_docx_path)
# Determinar tipo de procesamiento para el mensaje
if direct_text_pages is not None:
send_telegram_message(f"✅ PDF with embedded text processed and saved as DOCX: {os.path.basename(output_docx_path)}")
else:
send_telegram_message(f"✅ PDF with OCR processed and saved as DOCX: {os.path.basename(output_docx_path)}")
finally:
if os.path.exists(temp_dir):
logging.info(f"Cleaning up temporary directory: {temp_dir}")
shutil.rmtree(temp_dir)
def trocr_ocr_batch(pil_images, processor, model, max_batch_size=4):
"""Ejecuta OCR TrOCR sobre una lista de imágenes con manejo adaptativo GPU/CPU."""
if not pil_images:
return []
_update_models_usage()
def _refresh_trocr(proc, mdl, reason):
logging.info(f"♻️ TrOCR reload triggered ({reason})")
_, trocr_bundle = get_ocr_models()
if not isinstance(trocr_bundle, dict):
return None, None
return trocr_bundle.get('processor'), trocr_bundle.get('model')
refresh_reason = None
if processor is None or model is None:
refresh_reason = "models missing"
if refresh_reason:
processor, model = _refresh_trocr(processor, model, refresh_reason)
sample_param = None
attempts = 0
while attempts < 2:
if model is None:
processor, model = _refresh_trocr(processor, model, "model None on attempt")
if model is None:
attempts += 1
continue
try:
sample_param = next(model.parameters())
break
except (AttributeError, StopIteration):
logging.warning("TrOCR model parameters unavailable, forcing reload")
processor, model = _refresh_trocr(processor, model, "no parameters")
attempts += 1
if sample_param is None:
raise RuntimeError("TrOCR model parameters unavailable after reload attempts")
device = sample_param.device
is_gpu = device.type == 'cuda'
dtype = sample_param.dtype
results = []
# Reducir batch size en CPU para mayor eficiencia
if is_gpu:
batch_size = max(1, min(max_batch_size, len(pil_images)))
else:
batch_size = max(1, min(2, len(pil_images))) # Batch size más pequeño para CPU
start_idx = 0
while start_idx < len(pil_images):
end_idx = min(start_idx + batch_size, len(pil_images))
current_batch = pil_images[start_idx:end_idx]
try:
with torch.inference_mode():
pixel_values = processor(images=current_batch, return_tensors="pt").pixel_values
pixel_values = pixel_values.to(device)
if pixel_values.dtype != dtype:
pixel_values = pixel_values.to(dtype=dtype)
generated_ids = model.generate(pixel_values, max_length=512)
decoded = processor.batch_decode(generated_ids, skip_special_tokens=True)
results.extend(decoded)
start_idx = end_idx
except RuntimeError as e:
if "out of memory" in str(e).lower() and is_gpu and batch_size > 1:
logging.warning(f"⚠️ TrOCR OOM con batch_size={batch_size}, reduciendo a {batch_size // 2}")
torch.cuda.empty_cache()
batch_size = max(1, batch_size // 2)
continue
else:
logging.error(f"❌ Error en TrOCR batch: {e}")
results.extend([""] * len(current_batch))
start_idx = end_idx
except Exception as e:
logging.error(f"Error inesperado en TrOCR batch: {e}")
results.extend([""] * len(current_batch))
start_idx = end_idx
# Pequeña pausa en CPU para no sobrecargar
if not is_gpu and start_idx < len(pil_images):
time.sleep(0.1)
return results
def clean_text(text):
"""Limpia y normaliza el texto extraído."""
text = ''.join(c for c in text if unicodedata.category(c)[0] != 'C')
text = re.sub(r'\s+', ' ', text)
text = unicodedata.normalize('NFKC', text)
return text
_PAGE_BREAK_TOKEN = "[[PAGE_BREAK]]"
_LEGACY_PAGE_BREAK_PATTERN = re.compile(r'-{3,}\s*Nueva Página\s*-{3,}', re.IGNORECASE)
def format_text_with_gemini_for_docx(text, pdf_filename):
"""Solicita a GLM-4.6 que añada títulos/subtítulos sin alterar el contenido."""
if not text:
return text
if not GEMINI_AVAILABLE:
logging.debug("GLM-4.6 no disponible para formateo DOCX, se usa texto sin cambios.")
return text
prompt = (
"Eres un asistente editorial que trabaja sobre el contenido íntegro de un PDF ya corregido. "
"Tu tarea es devolver EXACTAMENTE el mismo texto, sin resumir, omitir ni reescribir frases. "
"Solo puedes insertar títulos y subtítulos descriptivos que ayuden a estructurar el documento.\n\n"
"Instrucciones estrictas:\n"
"- Usa formato Markdown simple: `# Título principal` y `## Subtítulo`. No utilices niveles adicionales.\n"
f"- Mantén el marcador literal {_PAGE_BREAK_TOKEN} cuando aparezca; equivale a un salto de página.\n"
"- Conserva el orden y la redacción original de todos los párrafos.\n"
"- No agregues listas, viñetas, comentarios ni explicaciones extra.\n"
"- Responde únicamente con el contenido formateado. Nada de prefacios ni notas.\n\n"
f"Nombre del archivo: {pdf_filename}\n\n"
"Contenido:\n"
"<<<INICIO>>>\n"
f"{text}\n"
"<<<FIN>>>"
)
formatted = run_gemini(prompt, use_flash=True)
if not formatted or not formatted.strip():
logging.error("GLM-4.6 devolvió una respuesta vacía para el formato DOCX")
return text
if formatted.lower().startswith("error"):
logging.error(f"GLM-4.6 no pudo formatear el documento: {formatted}")
return text
return formatted.strip()
def add_markdown_content_to_document(doc, content):
"""Convierte la salida Markdown generada por GLM-4.6 en párrafos y encabezados DOCX."""
if not content:
return
normalized = content.replace(_PAGE_BREAK_TOKEN, f"\n{_PAGE_BREAK_TOKEN}\n")
normalized = _LEGACY_PAGE_BREAK_PATTERN.sub(f"\n{_PAGE_BREAK_TOKEN}\n", normalized)
buffer = []
def flush_buffer():
if buffer:
paragraph_text = ' '.join(line.strip() for line in buffer if line.strip())
if paragraph_text:
doc.add_paragraph(paragraph_text)
buffer.clear()
for line in normalized.splitlines():
stripped = line.strip()
if not stripped:
flush_buffer()
continue
if stripped == _PAGE_BREAK_TOKEN:
flush_buffer()
doc.add_page_break()
continue
if stripped.startswith('## '):
flush_buffer()
doc.add_heading(stripped[3:].strip(), level=3)
continue
if stripped.startswith('# '):
flush_buffer()
doc.add_heading(stripped[2:].strip(), level=2)
continue
buffer.append(line)
flush_buffer()
def gemini_correct_text(text):
"""Usa la API de GLM-4.6 para corregir y reconstruir el texto."""
if not (GEMINI_CLI_PATH or GEMINI_API_KEY or CLAUDE_CLI_PATH):
logging.debug("GLM-4.6 no disponible para corrección, se mantiene el texto original.")
return text
prompt = f'''Corrige y reconstruye el siguiente texto extraído por OCR de un documento PDF. El texto puede contener errores, palabras mal escritas, frases incompletas o desordenadas. Tu tarea es devolver únicamente el texto corregido, limpio, coherente y bien estructurado en español. No incluyas explicaciones, preámbulos ni formato adicional. Solo el texto final y legible:
--- INICIO DEL TEXTO ---
{text}
--- FIN DEL TEXTO ---'''
try:
corrected_text = run_gemini(prompt, use_flash=True)
if not corrected_text or not corrected_text.strip():
return text
normalized = corrected_text.lstrip()
if normalized.lower().startswith("error"):
return text
return corrected_text
except Exception as e:
logging.error(f"Error en la llamada a la API de GLM-4.6: {e}")
return text
def get_ocr_models():
"""Carga y cachea los modelos OCR para mejorar rendimiento con sistema de timeout - USA GPU/CPU ADAPTATIVO"""
global _ocr_models, _trocr_models
# Actualizar timestamp de uso
_update_models_usage()
# Múltiples intentos para cargar modelos con reintento
max_retries = 3
use_gpu = torch.cuda.is_available()
# Verificar memoria disponible si hay GPU
if use_gpu:
try:
total_memory = torch.cuda.get_device_properties(0).total_memory
allocated_memory = torch.cuda.memory_allocated(0)
free_memory = total_memory - allocated_memory
# Si menos de 1.5GB libre, forzar CPU
if free_memory < 1.5 * 1024**3:
logging.warning(f"⚠️ Memoria GPU baja: {free_memory / 1024**3:.2f}GB libre, usando CPU")
use_gpu = False
send_telegram_message("🔄 Memoria GPU insuficiente, usando CPU para procesamiento PDF")
except:
use_gpu = False
for attempt in range(max_retries):
try:
if use_gpu:
logging.info(f"🚀 Loading OCR models on GPU (attempt {attempt + 1}/{max_retries})...")
else:
logging.info(f"💻 Loading OCR models on CPU (attempt {attempt + 1}/{max_retries})...")
# Limpiar VRAM antes de cargar si usamos GPU
if use_gpu:
torch.cuda.empty_cache()
import gc
gc.collect()
if attempt > 0:
force_free_vram()
time.sleep(2)
# Cargar EasyOCR con GPU/CPU adaptativo
if _ocr_models is None:
_ocr_models = easyocr.Reader(['es'], gpu=use_gpu, verbose=False)
logging.info(f"✅ EasyOCR loaded on {'GPU' if use_gpu else 'CPU'}")
# Cargar TrOCR con manejo de memoria mejorado
if _trocr_models is None:
processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-handwritten")
model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-handwritten")
if use_gpu:
try:
device = "cuda"
model = model.to(device)
model.eval()
# Activar FP16 para reducir uso de memoria
try:
major, _ = torch.cuda.get_device_capability(0)
if major >= 7:
model = model.half()
logging.info("⚡ TrOCR en FP16 habilitado")
except Exception as capability_error:
logging.warning(f"No se pudo habilitar FP16 en TrOCR: {capability_error}")
logging.info("✅ TrOCR model loaded on GPU")
except RuntimeError as e:
if "out of memory" in str(e).lower() and attempt < max_retries - 1:
logging.warning(f"⚠️ TrOCR OOM en GPU, reintentando con CPU...")
use_gpu = False
continue
else:
raise
else:
# Usar CPU directamente
device = "cpu"
model = model.to(device)
model.eval()
logging.info("✅ TrOCR model loaded on CPU")
_trocr_models = {
'processor': processor,
'model': model
}
# Update usage timestamp after loading models
_update_models_usage()
return _ocr_models, _trocr_models
except RuntimeError as e:
if "CUDA-capable device" in str(e) or "out of memory" in str(e).lower():
if use_gpu and attempt < max_retries - 1:
logging.error(f"❌ CUDA error en intento {attempt + 1}: {e}")
logging.info(f"🔄 Reintentando con CPU...")
use_gpu = False # Forzar CPU en siguiente intento
continue
else:
error_msg = f"❌ ERROR después de {max_retries} intentos: {e}"
logging.error(error_msg)
if attempt == max_retries - 1:
send_telegram_message(error_msg)
raise RuntimeError(error_msg)
else:
logging.error(f"❌ Error inesperado: {e}")
raise
# Si llegamos aquí, todos los intentos fallaron
error_msg = "❌ ERROR: No se pudieron cargar los modelos OCR"
logging.error(error_msg)
raise RuntimeError(error_msg)
# --- DOCUMENT CONVERSION FUNCTIONS ---
def docx_to_text(docx_path):
"""Convert DOCX to plain text"""
doc = Document(docx_path)
return '\n'.join([para.text for para in doc.paragraphs if para.text.strip()])
def docx_to_markdown(docx_path):
"""Convert DOCX to Markdown format"""
doc = Document(docx_path)
md_lines = []
for para in doc.paragraphs:
text = para.text.strip()
if para.style.name.startswith('Heading'):
level = int(para.style.name.replace('Heading ', ''))
md_lines.append('#' * level + ' ' + text)
elif para.style.name == 'List Bullet':
md_lines.append(f"- {text}")
else:
md_lines.append(text)
return '\n'.join(md_lines)
def summarize_text_with_gemini(text):
"""Summarize text using the GLM-4.6 pipeline (compatibilidad)."""
success, _, _, formatted_summary = run_gemini_summary_pipeline(text)
if not success or not formatted_summary:
raise RuntimeError("GLM-4.6 no pudo generar el resumen solicitado")
return formatted_summary
# --- MAIN PROCESSING FUNCTIONS ---
def process_audio_file(file_path):
"""Process a single audio file"""
filename = os.path.basename(file_path)
send_telegram_message(
f"🎵 Nuevo audio detectado: {filename}\n"
f"🤖 Flujo activado:\n"
f"• GLM-4.6: puntos clave + resumen integrado\n"
f"• GLM-4.6: formato final"
)
base_name = os.path.splitext(filename)[0]
local_audio_path = os.path.join(LOCAL_DOWNLOADS_PATH, filename)
local_txt_path = os.path.join(LOCAL_DOWNLOADS_PATH, f"{base_name}.txt")
try:
send_telegram_message(f"⬇️ Descargando audio: {filename}")
webdav_download(file_path, local_audio_path)
send_telegram_message(f"📝 Iniciando transcripción de audio: {filename}")
transcribe_audio(local_audio_path, local_txt_path)
# Generate unified summary
result = generate_unified_summary(local_txt_path, base_name)
if result and result[0]:
success, summary_content, output_files = result
docx_path = Path(output_files.get('docx_path', '')) if output_files else None
markdown_path = Path(output_files.get('markdown_path', '')) if output_files else None
pdf_path_str = output_files.get('pdf_path') if output_files else None
pdf_path = Path(pdf_path_str) if pdf_path_str else None
docx_filename = output_files.get('docx_name') if output_files else None
if docx_path and docx_path.exists() and docx_filename:
ensure_thematic_folders_exist()
logging.info("🧠 Clasificando contenido inteligentemente...")
category = classify_content_intelligent(summary_content)
category_name = TEMATIC_FOLDERS.get(category, TEMATIC_FOLDERS["otras_clases"])
remote_docx_path = get_upload_path_for_category(category, docx_filename)
webdav_upload(str(docx_path), remote_docx_path)
logging.info(f"☁️ DOCX subido a {category_name}: {docx_filename}")
if pdf_path and pdf_path.exists():
pdf_name = pdf_path.name
remote_pdf_path = get_upload_path_for_category(category, pdf_name)
webdav_upload(str(pdf_path), remote_pdf_path)
logging.info(f"☁️ PDF subido a {category_name}: {pdf_name}")
try:
if markdown_path and markdown_path.exists():
remote_md_path = os.path.join('Notes', markdown_path.name)
webdav_upload(str(markdown_path), remote_md_path)
logging.info(f"Markdown subido a Notes: {markdown_path.name}")
except Exception as e:
logging.error(f"Error subiendo Markdown para {docx_filename}: {e}")
topics = extract_key_topics_from_text(summary_content)
topics_str = ' - '.join(topics[:2])
send_telegram_message(
f"☁️ ✅ Resumen GLM-4.6 clasificado y subido a '{category_name}'\n"
f"📄 {docx_filename}\n"
f"🧾 Recursos generados: DOCX, PDF y Markdown\n"
f"🧠 Temas: {topics_str}"
)
save_processed_file(file_path)
else:
raise Exception("Failed to generate summaries")
except Exception as e:
logging.error(f"Error processing audio {filename}: {e}")
send_telegram_message(f"❌ Error processing audio {filename}: {e}")
def process_txt_file(file_path):
"""Process a single text file"""
filename = os.path.basename(file_path)
send_telegram_message(
f"📄 Nuevo texto detectado: {filename}\n"
f"🤖 Flujo activado:\n"
f"• GLM-4.6: puntos clave + resumen integrado\n"
f"• GLM-4.6: formato final"
)
base_name = os.path.splitext(filename)[0]
local_txt_path = os.path.join(LOCAL_DOWNLOADS_PATH, filename)
try:
send_telegram_message(f"⬇️ Descargando texto: {filename}")
webdav_download(file_path, local_txt_path)
# Generate unified summary
result = generate_unified_summary(local_txt_path, base_name)
if result and result[0]:
success, summary_content, output_files = result
docx_path = Path(output_files.get('docx_path', '')) if output_files else None
markdown_path = Path(output_files.get('markdown_path', '')) if output_files else None
pdf_path_str = output_files.get('pdf_path') if output_files else None
pdf_path = Path(pdf_path_str) if pdf_path_str else None
docx_filename = output_files.get('docx_name') if output_files else None
# Upload to Nextcloud with intelligent name
if docx_path and docx_path.exists():
remote_docx_path = f"{REMOTE_DOCX_AUDIO_FOLDER}/{docx_filename}"
webdav_upload(str(docx_path), remote_docx_path)
send_telegram_message(f"✅ Resumen DOCX subido: {docx_filename}")
if pdf_path and pdf_path.exists():
remote_pdf_filename = docx_filename.replace('.docx', '.pdf') if docx_filename else f"{base_name}.pdf"
remote_pdf_path = f"{RESUMENES_FOLDER}/{remote_pdf_filename}"
webdav_upload(str(pdf_path), remote_pdf_path)
if markdown_path and markdown_path.exists():
remote_md_filename = docx_filename.replace('.docx', '.md') if docx_filename else f"{base_name}.md"
remote_md_path = f"{RESUMENES_FOLDER}/{remote_md_filename}"
webdav_upload(str(markdown_path), remote_md_path)
send_telegram_message(
f"✅ Resumen completado: {filename}\n"
f"📄 DOCX: {REMOTE_DOCX_AUDIO_FOLDER}/{docx_filename if docx_filename else base_name}"
)
save_processed_file(file_path)
else:
raise Exception("Failed to generate summaries")
except Exception as e:
logging.error(f"Error processing text {filename}: {e}")
send_telegram_message(f"❌ Error processing text {filename}: {e}")
def check_pdf_already_processed(file_path, filename, base_name):
"""Verificación inteligente para evitar reprocesamiento de PDFs"""
# 1. Verificar si el DOCX editable ya existe localmente
local_docx_path = os.path.join(LOCAL_DOWNLOADS_PATH, f"{base_name}_editable.docx")
if os.path.exists(local_docx_path):
logging.info(f"📋 DOCX editable ya existe localmente: {base_name}_editable.docx")
return True
# 2. Verificar si el DOCX editable ya existe en Nextcloud
try:
remote_docx_path = os.path.join(os.path.dirname(file_path), f"{base_name}_editable.docx")
response = requests.request(
"PROPFIND",
f"{WEBDAV_ENDPOINT}/{remote_docx_path}",
auth=HTTPBasicAuth(NEXTCLOUD_USER, NEXTCLOUD_PASS),
headers={"Depth": "0"},
timeout=5
)
if response.status_code == 207: # Multi-Status significa que existe
logging.info(f"☁️ DOCX editable ya existe en Nextcloud: {remote_docx_path}")
return True
except Exception as e:
logging.debug(f"No se pudo verificar existencia en Nextcloud: {e}")
# 3. Verificar si ya fue procesado (fallback)
processed_files = load_processed_files()
normalized_path = normalize_remote_path(file_path)
base_name_check = os.path.basename(normalized_path)
if (normalized_path in processed_files or
base_name_check in processed_files or
filename in processed_files):
logging.info(f"📋 PDF ya está en registro de procesados: {filename}")
return True
return False
def process_pdf_main(file_path):
"""Process a single PDF file - main handler"""
filename = os.path.basename(file_path)
base_name = os.path.splitext(filename)[0]
local_pdf_path = os.path.join(LOCAL_DOWNLOADS_PATH, filename)
local_docx_output_path = os.path.join(LOCAL_DOWNLOADS_PATH, f"{base_name}_editable.docx")
remote_docx_filename = f"{base_name}_editable.docx"
# VERIFICACIÓN INTELIGENTE ANTES DE PROCESAR
if check_pdf_already_processed(file_path, filename, base_name):
logging.info(f"⏭️ PDF ya procesado, omitiendo: {filename}")
return
send_telegram_message(f"📄 Nuevo PDF detectado para procesar: {filename}")
try:
logging.info(f"Downloading PDF: {filename}")
webdav_download(file_path, local_pdf_path)
logging.info(f"Starting OCR and correction processing for: {filename}")
process_pdf_file(local_pdf_path, local_docx_output_path)
# Upload the generated editable DOCX file
if os.path.exists(local_docx_output_path):
remote_docx_path = os.path.join(os.path.dirname(file_path), remote_docx_filename)
logging.info(f"Uploading editable document to Nextcloud: {remote_docx_filename}")
webdav_upload(local_docx_output_path, remote_docx_path)
send_telegram_message(f"📄☁️ Documento editable subido a Nextcloud para: {filename}")
# Marcar como procesado inmediatamente después de subir el DOCX editable
save_processed_file(file_path)
logging.info(f"✅ Archivo PDF marcado como procesado: {filename}")
# Generar resumen completo con GLM-4.6 para todos los PDFs (no bloquea el procesamiento)
try:
send_telegram_message(f"🤖 Generando resumen completo con GLM-4.6 para: {filename}")
docx_text = docx_to_text(local_docx_output_path)
# Usar el sistema de resumen unificado con GLM-4.6
success, bullet_points, raw_summary, formatted_summary = run_gemini_summary_pipeline(docx_text)
if success and formatted_summary:
# Crear documento DOCX con el resumen
summary_docx_path = os.path.join(LOCAL_DOWNLOADS_PATH, f"{base_name}_resumen_completo.docx")
doc = Document()
doc.add_heading('Resumen Completo Generado con GLM-4.6', level=1)
doc.add_paragraph(f'Documento original: {filename}')
doc.add_paragraph('')
# Añadir contenido formateado
lines = formatted_summary.split('\n')
current_paragraph = []
for line in lines:
line = line.strip()
if not line:
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
current_paragraph = []
continue
if line.startswith('#'):
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
current_paragraph = []
# Procesar encabezado
level = len(line) - len(line.lstrip('#'))
if level <= 6:
doc.add_heading(line.lstrip('#').strip(), level=level)
else:
current_paragraph.append(line)
elif line.startswith('-') or line.startswith(''):
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
current_paragraph = []
doc.add_paragraph(line.lstrip('-•').strip(), style='List Bullet')
else:
current_paragraph.append(line)
if current_paragraph:
doc.add_paragraph(' '.join(current_paragraph))
# Generar quiz si hay contenido suficiente
try:
quiz_text = (bullet_points or "") + "\n\n" + (raw_summary or "")
if len(quiz_text.strip()) > 100:
questions, answers = generate_quiz(quiz_text)
if questions and answers:
add_quiz_to_docx(doc, questions, answers)
logging.info("✅ Quiz agregado al resumen del PDF")
except Exception as quiz_error:
logging.warning(f"No se pudo generar quiz para el PDF: {quiz_error}")
doc.save(summary_docx_path)
# Subir resumen DOCX a Nextcloud
remote_summary_path = os.path.join('Resumenes', f"{base_name}_resumen_completo.docx")
webdav_mkdir('Resumenes')
webdav_upload(summary_docx_path, remote_summary_path)
# También crear y subir versión Markdown
md_content = f"# Resumen: {filename}\n\n{formatted_summary}"
md_filename = f"{base_name}_resumen_completo.md"
local_md_path = os.path.join(LOCAL_DOWNLOADS_PATH, md_filename)
with open(local_md_path, 'w', encoding='utf-8') as f:
f.write(md_content)
remote_md_path = os.path.join('Notes', md_filename)
webdav_upload(local_md_path, remote_md_path)
send_telegram_message(f"✅ Resumen completo generado y subido para: {filename}\n📄 DOCX en Resumenes/\n📝 Markdown en Notes/")
logging.info(f"✅ Resumen completo generado y subido para {filename}")
else:
# Fallback: resumen simple si falla GLM-4.6
simple_summary = f"# Resumen de {filename}\n\nTexto procesado exitosamente. No se pudo generar resumen detallado."
md_filename = f"{base_name}_resumen_simple.md"
local_md_path = os.path.join(LOCAL_DOWNLOADS_PATH, md_filename)
with open(local_md_path, 'w', encoding='utf-8') as f:
f.write(simple_summary)
remote_md_path = os.path.join('Notes', md_filename)
webdav_upload(local_md_path, remote_md_path)
logging.warning(f"⚠️ Resumen simple generado para {filename}")
except Exception as e:
logging.error(f"Error generando resumen para {filename}: {e}")
# No notificar error por Telegram para evitar spam
else:
logging.warning(f"Expected output file not found: {local_docx_output_path}")
# Si no se encontró el archivo, igual marcar como procesado para evitar bucles
save_processed_file(file_path)
logging.warning(f"⚠️ Archivo marcado como procesado sin DOCX: {filename}")
except Exception as e:
logging.error(f"Error in conversion process for PDF {filename}: {e}")
key = f"pdf_process::{file_path}"
msg = f"❌ Error processing PDF {filename}: {e}"
if should_send_error(key, str(e)):
send_telegram_message(msg)
def acquire_lock():
"""Adquiere un bloqueo para evitar múltiples instancias"""
try:
lock_file = os.path.join(LOCAL_STATE_DIR, ".main_service.lock")
os.makedirs(os.path.dirname(lock_file), exist_ok=True)
lock_fd = open(lock_file, 'w')
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
# Escribir PID en el archivo de lock
lock_fd.write(str(os.getpid()))
lock_fd.flush()
logging.info(f"🔒 Bloqueo adquirido. PID: {os.getpid()}")
return lock_fd
except (IOError, OSError) as e:
if e.errno == 11: # EAGAIN - Resource temporarily unavailable
logging.error("❌ Ya hay otra instancia del servicio corriendo")
sys.exit(1)
else:
logging.error(f"❌ Error adquiriendo bloqueo: {e}")
sys.exit(1)
def release_lock(lock_fd) -> None:
"""Libera el bloqueo de ejecución si está activo."""
if not lock_fd:
return
try:
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN)
except Exception as exc:
logging.warning(f"No se pudo liberar el bloqueo limpiamente: {exc}")
finally:
try:
lock_fd.close()
except Exception:
pass
# --- MAIN LOOP ---
def main():
"""Main application loop"""
# Adquirir bloqueo para evitar múltiples instancias
lock_fd = acquire_lock()
try:
logging.info("=== INICIO Nextcloud AI Service - Flujo GLM-4.6 (Claude CLI) ===")
logging.info("🤖 Configuración: GLM-4.6 (Claude CLI via z.ai) para puntos clave y resumen integral")
logging.info("📝 Modo de operación: Resúmenes colaborativos unificados")
logging.info("🔒 Servicio protegido contra múltiples instancias")
# Enviar mensaje de Telegram (no bloquear si falla)
try:
send_telegram_message(
"✨ Nextcloud AI Service Started ✨\n"
"🚀 Flujo GLM-4.6 activado:\n"
"• GLM-4.6: Puntos clave y resumen integrado\n"
"• GLM-4.6: Formato y entrega final"
)
except Exception as e:
logging.warning(f"No se pudo enviar mensaje de Telegram: {e}")
# Create necessary directories
ensure_local_directories()
# Inicializar timestamp y sistema de monitoreo de VRAM
logging.info("🚀 Iniciando sistema de monitoreo de VRAM...")
_update_models_usage() # Inicializar timestamp al inicio
_start_vram_cleanup_timer()
while True:
try:
logging.info("--- Polling for new files ---")
processed_files = load_processed_files()
# --- PROCESS PDFs FOR CONVERSION TO EDITABLE ---
try:
webdav_mkdir(REMOTE_PDF_FOLDER)
pdf_files = webdav_list(REMOTE_PDF_FOLDER)
for file_path in pdf_files:
normalized_path = normalize_remote_path(file_path)
base_name = os.path.basename(normalized_path)
filename = base_name
# Skip if not PDF or if it's already an editable PDF
if (
not normalized_path.lower().endswith('.pdf')
or '_editable.docx' in normalized_path.lower()
):
continue
# VERIFICACIÓN INTELIGENTE ANTES DE PROCESAR (doble seguridad)
base_name_no_ext = os.path.splitext(filename)[0]
if check_pdf_already_processed(normalized_path, filename, base_name_no_ext):
logging.info(f"⏭️ PDF ya verificado como procesado, omitiendo: {filename}")
continue
process_pdf_main(normalized_path)
except Exception as e:
logging.error(f"Error processing PDF folder for conversion: {e}")
# --- PROCESS AUDIOS ---
try:
webdav_mkdir(REMOTE_DOCX_AUDIO_FOLDER)
audio_files = webdav_list(REMOTE_AUDIOS_FOLDER)
for file_path in audio_files:
normalized_path = normalize_remote_path(file_path)
base_name = os.path.basename(normalized_path)
if (
not any(normalized_path.lower().endswith(ext) for ext in AUDIO_EXTENSIONS)
or normalized_path in processed_files
or base_name in processed_files
):
continue
process_audio_file(normalized_path)
except Exception as e:
logging.error(f"Error processing Audio folder: {e}")
# --- PROCESS TEXT FILES ---
try:
webdav_mkdir(REMOTE_TXT_FOLDER)
txt_files = webdav_list(REMOTE_TXT_FOLDER)
for file_path in txt_files:
normalized_path = normalize_remote_path(file_path)
base_name = os.path.basename(normalized_path)
if (
not any(normalized_path.lower().endswith(ext) for ext in TXT_EXTENSIONS)
or normalized_path in processed_files
or base_name in processed_files
):
continue
process_txt_file(normalized_path)
except Exception as e:
logging.error(f"Error processing Text folder: {e}")
except Exception as cycle_error:
logging.exception(f"Error inesperado en el ciclo principal: {cycle_error}")
if should_send_error("main_loop", str(cycle_error)):
send_telegram_message(f"❌ Error en ciclo principal: {cycle_error}")
logging.info(f"--- Cycle completed. Waiting {POLL_INTERVAL} seconds... ---")
time.sleep(POLL_INTERVAL)
except KeyboardInterrupt:
logging.info("🛑 Interrupción recibida, cerrando servicio")
finally:
release_lock(lock_fd)
def start_dashboard():
"""Inicia el dashboard Flask en un hilo separado"""
try:
# Importar dashboard aquí para evitar importaciones circulares
import dashboard
import threading
def run_dashboard():
"""Función para ejecutar el dashboard en un hilo"""
logging.info("🚀 Iniciando Dashboard Flask en http://localhost:5000")
dashboard.app.run(
host='0.0.0.0',
port=5000,
debug=False,
threaded=True,
use_reloader=False # Importante: evitar reloading en producción
)
# Crear y iniciar hilo para el dashboard
dashboard_thread = threading.Thread(target=run_dashboard, daemon=True)
dashboard_thread.start()
logging.info("✅ Dashboard iniciado en hilo separado")
logging.info("🌐 Accede al dashboard en: http://localhost:5000")
return dashboard_thread
except Exception as e:
logging.error(f"❌ Error iniciando dashboard: {e}")
logging.warning("⚠️ El servicio principal continuará sin dashboard")
return None
if __name__ == "__main__":
# Handle command line arguments for specific operations
if len(sys.argv) > 1:
command = sys.argv[1]
if command == "whisper" and len(sys.argv) == 4:
# Whisper transcription mode
transcribe_audio(sys.argv[2], sys.argv[3])
elif command == "pdf" and len(sys.argv) == 4:
# PDF processing mode
process_pdf_file(sys.argv[2], sys.argv[3])
elif command == "seed-processed":
snapshot = _snapshot_existing_remote_files()
current = load_processed_files()
entries_to_add = []
for entry in snapshot:
normalized = normalize_remote_path(entry)
base_name = os.path.basename(normalized)
if normalized in current or base_name in current:
continue
entries_to_add.append(normalized)
if entries_to_add:
with open(PROCESSED_FILES_PATH, "a", encoding="utf-8") as f:
for normalized in sorted(entries_to_add):
f.write(normalized + "\n")
print(f"{len(entries_to_add)} entradas añadidas al registro de procesados")
logging.info(f"Registro de procesados actualizado con {len(entries_to_add)} entradas nuevas")
else:
print(" No se encontraron archivos adicionales para marcar como procesados")
sys.exit(0)
elif command == "txt2docx" and len(sys.argv) == 4:
# Text to unified DOCX conversion mode
txt_file = sys.argv[2]
output_docx = sys.argv[3]
if not os.path.exists(txt_file):
print(f"❌ Text file not found: {txt_file}")
sys.exit(1)
# Extract base name for file generation
base_name = os.path.splitext(os.path.basename(txt_file))[0]
print(f"🤖 Iniciando resumen colaborativo para: {txt_file}")
# Generate unified summary
result = generate_unified_summary(txt_file, base_name)
if result and result[0]:
success, summary_content, output_files = result
docx_path = Path(output_files.get('docx_path', ''))
markdown_path = Path(output_files.get('markdown_path', ''))
pdf_path_str = output_files.get('pdf_path')
pdf_path = Path(pdf_path_str) if pdf_path_str else None
if not docx_path.exists():
print("❌ No se generó el DOCX de salida")
sys.exit(1)
if str(docx_path) != output_docx:
shutil.copy2(docx_path, output_docx)
category = classify_content_intelligent(summary_content)
category_name = TEMATIC_FOLDERS.get(category, TEMATIC_FOLDERS["otras_clases"])
topics = extract_key_topics_from_text(summary_content)
topics_str = ' - '.join(topics[:2])
print(f"✅ Resumen unificado generado: {output_docx}")
print(f"🧠 Clasificación automática: {category_name}")
print(f"🎯 Temas identificados: {topics_str}")
print(f"📝 Nombre inteligente: {output_files.get('docx_name')}")
if markdown_path and markdown_path.exists():
print(f"📄 Markdown: {markdown_path}")
if pdf_path and pdf_path.exists():
print(f"📄 PDF: {pdf_path}")
else:
print("❌ Failed to generate unified summary")
sys.exit(1)
elif command == "quiz" and len(sys.argv) == 4:
# Quiz generation mode
input_text = sys.argv[2]
output_file = sys.argv[3]
# If the first argument is a file, read it
if os.path.isfile(input_text):
with open(input_text, 'r', encoding='utf-8') as f:
summary_text = f.read()
else:
summary_text = input_text
# Generate quiz
questions, answers = generate_quiz(summary_text)
if not questions or not answers:
print("❌ Could not generate quiz")
sys.exit(1)
# Create document
doc = Document()
doc.add_heading('Quiz Generado', level=1)
# Add quiz to document
add_quiz_to_docx(doc, questions, answers)
# Save file
doc.save(output_file)
print(f"✅ Quiz generated: {output_file}")
elif command == "dashboard-only":
# Solo ejecutar el dashboard
import dashboard
logging.info("🚀 Iniciando Dashboard Flask únicamente")
dashboard.app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
else:
print("Usage:")
print(" python main.py # Run main polling service + dashboard")
print(" python main.py whisper <audio> <txt> # Transcribe audio")
print(" python main.py pdf <pdf> <docx> # Process PDF to editable DOCX")
print(" python main.py seed-processed # Marca archivos actuales como procesados")
print(" python main.py txt2docx <txt> <docx> # Convert text to summary DOCX")
print(" python main.py quiz <text> <docx> # Generate quiz from text")
print(" python main.py dashboard-only # Solo ejecutar dashboard")
sys.exit(1)
else:
# Run main polling service with integrated dashboard
logging.info("=" * 60)
logging.info("🚀 INICIANDO SERVICIO COMPLETO")
logging.info("=" * 60)
# Iniciar dashboard en hilo separado
dashboard_thread = start_dashboard()
# Pequeña pausa para que el dashboard se inicie
time.sleep(2)
# Ejecutar servicio principal
main()