🚀 Mejoras principales: - Dashboard Flask ahora corre en thread daemon independiente - Integración con python-dotenv para variables de entorno - Configuración de puerto vía DASHBOARD_PORT (default: 5000) - Mejor logging con Thread-ID para debugging 📦 Nuevos archivos: - kubectl: binary de Kubernetes para deployments - plus.md: documentación adicional del proyecto - todo.md: roadmap y tareas pendientes 🔧 Cambios técnicos: - run_dashboard_thread(): ejecuta Flask en thread separado - start_dashboard(): crea y arranca daemon thread - Configuración de reloader desactivado en threaded mode Esto permite que el dashboard corra sin bloquear el loop principal de procesamiento, mejorando la arquitectura del servicio.
507 lines
21 KiB
Python
507 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
CBCFacil - Main Service Entry Point
|
|
Unified AI service for document processing (audio, PDF, text)
|
|
"""
|
|
import logging
|
|
import sys
|
|
import time
|
|
import fcntl
|
|
import os
|
|
import json
|
|
import threading
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
# Load environment variables from .env file
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
# Configure logging with JSON formatter for production
|
|
class JSONFormatter(logging.Formatter):
|
|
"""JSON formatter for structured logging in production"""
|
|
|
|
def format(self, record):
|
|
log_entry = {
|
|
"timestamp": datetime.utcnow().isoformat() + "Z",
|
|
"level": record.levelname,
|
|
"message": record.getMessage(),
|
|
"module": record.module,
|
|
"function": record.funcName,
|
|
"line": record.lineno
|
|
}
|
|
|
|
# Add exception info if present
|
|
if record.exc_info:
|
|
log_entry["exception"] = self.formatException(record.exc_info)
|
|
|
|
return json.dumps(log_entry)
|
|
|
|
|
|
def setup_logging() -> logging.Logger:
|
|
"""Setup logging configuration"""
|
|
from config import settings
|
|
|
|
# Create logger
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(getattr(logging, settings.LOG_LEVEL.upper()))
|
|
|
|
# Remove existing handlers
|
|
logger.handlers.clear()
|
|
|
|
# Console handler
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
if settings.is_production:
|
|
console_handler.setFormatter(JSONFormatter())
|
|
else:
|
|
console_handler.setFormatter(logging.Formatter(
|
|
"%(asctime)s [%(levelname)s] - %(name)s - %(message)s"
|
|
))
|
|
logger.addHandler(console_handler)
|
|
|
|
# File handler if configured
|
|
if settings.LOG_FILE:
|
|
file_handler = logging.FileHandler(settings.LOG_FILE)
|
|
file_handler.setFormatter(JSONFormatter())
|
|
logger.addHandler(file_handler)
|
|
|
|
return logger
|
|
|
|
|
|
logger = setup_logging()
|
|
|
|
|
|
def acquire_lock() -> int:
|
|
"""Acquire single instance lock"""
|
|
lock_file = Path(os.getenv("LOCAL_STATE_DIR", str(Path(__file__).parent))) / ".main_service.lock"
|
|
lock_file.parent.mkdir(parents=True, exist_ok=True)
|
|
lock_fd = open(lock_file, 'w')
|
|
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
lock_fd.write(str(os.getpid()))
|
|
lock_fd.flush()
|
|
logger.info(f"Lock acquired. PID: {os.getpid()}")
|
|
return lock_fd
|
|
|
|
|
|
def release_lock(lock_fd) -> None:
|
|
"""Release lock"""
|
|
try:
|
|
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN)
|
|
lock_fd.close()
|
|
except Exception as e:
|
|
logger.warning(f"Could not release lock: {e}")
|
|
|
|
|
|
def validate_configuration() -> None:
|
|
"""Validate configuration at startup"""
|
|
from config.validators import validate_environment, ConfigurationError
|
|
|
|
try:
|
|
warnings = validate_environment()
|
|
if warnings:
|
|
logger.info(f"Configuration validation completed with {len(warnings)} warnings")
|
|
except ConfigurationError as e:
|
|
logger.error(f"Configuration validation failed: {e}")
|
|
raise
|
|
|
|
|
|
def check_service_health() -> dict:
|
|
"""
|
|
Check health of all external services
|
|
Returns dict with health status
|
|
"""
|
|
from config import settings
|
|
from services.webdav_service import webdav_service
|
|
|
|
health_status = {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"status": "healthy",
|
|
"services": {}
|
|
}
|
|
|
|
# Check WebDAV
|
|
try:
|
|
if settings.has_webdav_config:
|
|
# Try a simple operation
|
|
webdav_service.list(".")
|
|
health_status["services"]["webdav"] = {"status": "healthy"}
|
|
else:
|
|
health_status["services"]["webdav"] = {"status": "not_configured"}
|
|
except Exception as e:
|
|
health_status["services"]["webdav"] = {
|
|
"status": "unhealthy",
|
|
"error": str(e)
|
|
}
|
|
health_status["status"] = "degraded"
|
|
|
|
# Check Telegram
|
|
try:
|
|
from services.telegram_service import telegram_service
|
|
if telegram_service.is_configured:
|
|
health_status["services"]["telegram"] = {"status": "healthy"}
|
|
else:
|
|
health_status["services"]["telegram"] = {"status": "not_configured"}
|
|
except Exception as e:
|
|
health_status["services"]["telegram"] = {
|
|
"status": "unavailable",
|
|
"error": str(e)
|
|
}
|
|
|
|
# Check VRAM manager
|
|
try:
|
|
from services.vram_manager import vram_manager
|
|
vram_info = vram_manager.get_vram_info()
|
|
health_status["services"]["vram"] = {
|
|
"status": "healthy",
|
|
"available_gb": vram_info.get("free", 0) / (1024**3)
|
|
}
|
|
except Exception as e:
|
|
health_status["services"]["vram"] = {
|
|
"status": "unavailable",
|
|
"error": str(e)
|
|
}
|
|
|
|
return health_status
|
|
|
|
|
|
def initialize_services() -> None:
|
|
"""Initialize all services with configuration validation"""
|
|
from config import settings
|
|
from services.webdav_service import webdav_service
|
|
from services.vram_manager import vram_manager
|
|
from services.telegram_service import telegram_service
|
|
from storage.processed_registry import processed_registry
|
|
|
|
logger.info("Initializing services...")
|
|
|
|
# Validate configuration
|
|
validate_configuration()
|
|
|
|
# Warn if WebDAV not configured
|
|
if not settings.has_webdav_config:
|
|
logger.warning("WebDAV not configured - file sync functionality disabled")
|
|
|
|
# Warn if AI providers not configured
|
|
if not settings.has_ai_config:
|
|
logger.warning("AI providers not configured - summary generation will not work")
|
|
|
|
# Configure Telegram if credentials available
|
|
if settings.TELEGRAM_TOKEN and settings.TELEGRAM_CHAT_ID:
|
|
try:
|
|
telegram_service.configure(settings.TELEGRAM_TOKEN, settings.TELEGRAM_CHAT_ID)
|
|
telegram_service.send_start_notification()
|
|
logger.info("Telegram notifications enabled")
|
|
except Exception as e:
|
|
logger.error(f"Failed to configure Telegram: {e}")
|
|
|
|
# Initialize WebDAV if configured
|
|
if settings.has_webdav_config:
|
|
try:
|
|
webdav_service.initialize()
|
|
logger.info("WebDAV service initialized")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize WebDAV: {e}")
|
|
logger.exception("WebDAV initialization error details")
|
|
else:
|
|
logger.info("Skipping WebDAV initialization (not configured)")
|
|
|
|
# Initialize VRAM manager
|
|
try:
|
|
vram_manager.initialize()
|
|
logger.info("VRAM manager initialized")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize VRAM manager: {e}")
|
|
logger.exception("VRAM manager initialization error details")
|
|
|
|
# Initialize processed registry
|
|
try:
|
|
processed_registry.initialize()
|
|
logger.info("Processed registry initialized")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize processed registry: {e}")
|
|
logger.exception("Registry initialization error details")
|
|
|
|
# Run health check
|
|
health = check_service_health()
|
|
logger.info(f"Initial health check: {json.dumps(health, indent=2)}")
|
|
|
|
logger.info("All services initialized successfully")
|
|
|
|
|
|
def send_error_notification(error_type: str, error_message: str) -> None:
|
|
"""Send error notification via Telegram"""
|
|
try:
|
|
from services.telegram_service import telegram_service
|
|
if telegram_service.is_configured:
|
|
telegram_service.send_error_notification(error_type, error_message)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to send error notification: {e}")
|
|
|
|
|
|
def run_dashboard_thread() -> None:
|
|
"""Run Flask dashboard in a separate thread"""
|
|
try:
|
|
from api.routes import create_app
|
|
app = create_app()
|
|
|
|
# Run Flask in production mode with threaded=True
|
|
app.run(
|
|
host='0.0.0.0',
|
|
port=5000,
|
|
debug=False,
|
|
threaded=True,
|
|
use_reloader=False # Important: disable reloader in thread
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Dashboard thread error: {e}")
|
|
logger.exception("Dashboard thread exception details")
|
|
|
|
|
|
def start_dashboard() -> threading.Thread:
|
|
"""Start dashboard in a background daemon thread"""
|
|
dashboard_port = int(os.getenv('DASHBOARD_PORT', '5000'))
|
|
logger.info(f"Starting dashboard on port {dashboard_port}...")
|
|
|
|
# Create daemon thread so it doesn't block shutdown
|
|
dashboard_thread = threading.Thread(
|
|
target=run_dashboard_thread,
|
|
name="DashboardThread",
|
|
daemon=True
|
|
)
|
|
dashboard_thread.start()
|
|
logger.info(f"Dashboard thread started (Thread-ID: {dashboard_thread.ident})")
|
|
return dashboard_thread
|
|
|
|
|
|
def run_main_loop() -> None:
|
|
"""Main processing loop with improved error handling"""
|
|
from config import settings
|
|
from services.webdav_service import webdav_service
|
|
from storage.processed_registry import processed_registry
|
|
from processors.audio_processor import AudioProcessor
|
|
from processors.pdf_processor import PDFProcessor
|
|
from processors.text_processor import TextProcessor
|
|
|
|
audio_processor = AudioProcessor()
|
|
pdf_processor = PDFProcessor()
|
|
text_processor = TextProcessor()
|
|
|
|
consecutive_errors = 0
|
|
max_consecutive_errors = 5
|
|
|
|
while True:
|
|
try:
|
|
logger.info("--- Polling for new files ---")
|
|
processed_registry.load()
|
|
|
|
# Process PDFs
|
|
if settings.has_webdav_config:
|
|
try:
|
|
webdav_service.mkdir(settings.REMOTE_PDF_FOLDER)
|
|
pdf_files = webdav_service.list(settings.REMOTE_PDF_FOLDER)
|
|
for file_path in pdf_files:
|
|
if file_path.lower().endswith('.pdf'):
|
|
if not processed_registry.is_processed(file_path):
|
|
pdf_processor.process(file_path)
|
|
processed_registry.save(file_path)
|
|
except Exception as e:
|
|
logger.exception(f"Error processing PDFs: {e}")
|
|
send_error_notification("pdf_processing", str(e))
|
|
|
|
# Process Audio files
|
|
if settings.has_webdav_config:
|
|
try:
|
|
audio_files = webdav_service.list(settings.REMOTE_AUDIOS_FOLDER)
|
|
for file_path in audio_files:
|
|
if any(file_path.lower().endswith(ext) for ext in settings.AUDIO_EXTENSIONS):
|
|
if not processed_registry.is_processed(file_path):
|
|
from pathlib import Path
|
|
from urllib.parse import unquote
|
|
from document.generators import DocumentGenerator
|
|
from services.telegram_service import telegram_service
|
|
|
|
local_filename = unquote(Path(file_path).name)
|
|
base_name = Path(local_filename).stem
|
|
local_path = settings.LOCAL_DOWNLOADS_PATH / local_filename
|
|
settings.LOCAL_DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Step 1: Notify and download
|
|
telegram_service.send_message(
|
|
f"🎵 Nuevo audio detectado: {local_filename}\n"
|
|
f"⬇️ Descargando..."
|
|
)
|
|
logger.info(f"Downloading audio: {file_path} -> {local_path}")
|
|
webdav_service.download(file_path, local_path)
|
|
|
|
# Step 2: Transcribe
|
|
telegram_service.send_message(f"📝 Transcribiendo audio con Whisper...")
|
|
result = audio_processor.process(str(local_path))
|
|
|
|
if result.get("success") and result.get("transcription_path"):
|
|
transcription_file = Path(result["transcription_path"])
|
|
transcription_text = result.get("text", "")
|
|
|
|
# Step 3: Generate AI summary and documents
|
|
telegram_service.send_message(f"🤖 Generando resumen con IA...")
|
|
doc_generator = DocumentGenerator()
|
|
success, summary, output_files = doc_generator.generate_summary(
|
|
transcription_text, base_name
|
|
)
|
|
|
|
# Step 4: Upload all files to Nextcloud
|
|
if success and output_files:
|
|
# Create folders
|
|
for folder in [settings.RESUMENES_FOLDER, settings.DOCX_FOLDER]:
|
|
try:
|
|
webdav_service.makedirs(folder)
|
|
except Exception:
|
|
pass
|
|
|
|
# Upload transcription TXT
|
|
if transcription_file.exists():
|
|
remote_txt = f"{settings.RESUMENES_FOLDER}/{transcription_file.name}"
|
|
webdav_service.upload(transcription_file, remote_txt)
|
|
logger.info(f"Uploaded: {remote_txt}")
|
|
|
|
# Upload DOCX
|
|
docx_path = Path(output_files.get('docx_path', ''))
|
|
if docx_path.exists():
|
|
remote_docx = f"{settings.DOCX_FOLDER}/{docx_path.name}"
|
|
webdav_service.upload(docx_path, remote_docx)
|
|
logger.info(f"Uploaded: {remote_docx}")
|
|
|
|
# Upload PDF
|
|
pdf_path = Path(output_files.get('pdf_path', ''))
|
|
if pdf_path.exists():
|
|
remote_pdf = f"{settings.DOCX_FOLDER}/{pdf_path.name}"
|
|
webdav_service.upload(pdf_path, remote_pdf)
|
|
logger.info(f"Uploaded: {remote_pdf}")
|
|
|
|
# Upload Markdown
|
|
md_path = Path(output_files.get('markdown_path', ''))
|
|
if md_path.exists():
|
|
remote_md = f"{settings.RESUMENES_FOLDER}/{md_path.name}"
|
|
webdav_service.upload(md_path, remote_md)
|
|
logger.info(f"Uploaded: {remote_md}")
|
|
|
|
# Final notification
|
|
telegram_service.send_message(
|
|
f"✅ Audio procesado: {local_filename}\n"
|
|
f"📄 DOCX: {docx_path.name if docx_path.exists() else 'N/A'}\n"
|
|
f"📑 PDF: {pdf_path.name if pdf_path.exists() else 'N/A'}\n"
|
|
f"☁️ Subido a Nextcloud"
|
|
)
|
|
else:
|
|
# Just upload transcription if summary failed
|
|
if transcription_file.exists():
|
|
try:
|
|
webdav_service.makedirs(settings.RESUMENES_FOLDER)
|
|
except Exception:
|
|
pass
|
|
remote_txt = f"{settings.RESUMENES_FOLDER}/{transcription_file.name}"
|
|
webdav_service.upload(transcription_file, remote_txt)
|
|
telegram_service.send_message(
|
|
f"⚠️ Resumen fallido, solo transcripción subida:\n{transcription_file.name}"
|
|
)
|
|
|
|
processed_registry.save(file_path)
|
|
except Exception as e:
|
|
logger.exception(f"Error processing audio: {e}")
|
|
send_error_notification("audio_processing", str(e))
|
|
|
|
# Process Text files
|
|
if settings.has_webdav_config:
|
|
try:
|
|
text_files = webdav_service.list(settings.REMOTE_TXT_FOLDER)
|
|
for file_path in text_files:
|
|
if any(file_path.lower().endswith(ext) for ext in settings.TXT_EXTENSIONS):
|
|
if not processed_registry.is_processed(file_path):
|
|
text_processor.process(file_path)
|
|
processed_registry.save(file_path)
|
|
except Exception as e:
|
|
logger.exception(f"Error processing text: {e}")
|
|
send_error_notification("text_processing", str(e))
|
|
|
|
# Reset error counter on success
|
|
consecutive_errors = 0
|
|
|
|
except Exception as e:
|
|
# Improved error logging with full traceback
|
|
logger.exception(f"Critical error in main loop: {e}")
|
|
|
|
# Send notification for critical errors
|
|
send_error_notification("main_loop", str(e))
|
|
|
|
# Track consecutive errors
|
|
consecutive_errors += 1
|
|
|
|
if consecutive_errors >= max_consecutive_errors:
|
|
logger.critical(
|
|
f"Too many consecutive errors ({consecutive_errors}). "
|
|
"Service may be unstable. Consider checking configuration."
|
|
)
|
|
send_error_notification(
|
|
"consecutive_errors",
|
|
f"Service has failed {consecutive_errors} consecutive times"
|
|
)
|
|
|
|
# Don't exit, let the loop continue with backoff
|
|
logger.info(f"Waiting {settings.POLL_INTERVAL * 2} seconds before retry...")
|
|
time.sleep(settings.POLL_INTERVAL * 2)
|
|
continue
|
|
|
|
logger.info(f"Cycle completed. Waiting {settings.POLL_INTERVAL} seconds...")
|
|
time.sleep(settings.POLL_INTERVAL)
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
lock_fd = None
|
|
dashboard_thread = None
|
|
try:
|
|
logger.info("=== CBCFacil Service Started ===")
|
|
logger.info(f"Version: {os.getenv('APP_VERSION', '8.0')}")
|
|
logger.info(f"Environment: {'production' if os.getenv('DEBUG', 'false').lower() != 'true' else 'development'}")
|
|
|
|
lock_fd = acquire_lock()
|
|
initialize_services()
|
|
|
|
# Start dashboard in background thread
|
|
dashboard_thread = start_dashboard()
|
|
|
|
# Run main processing loop
|
|
run_main_loop()
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Shutdown requested by user")
|
|
except Exception as e:
|
|
logger.exception(f"Fatal error in main: {e}")
|
|
send_error_notification("fatal_error", str(e))
|
|
sys.exit(1)
|
|
finally:
|
|
if lock_fd:
|
|
release_lock(lock_fd)
|
|
logger.info("=== CBCFacil Service Stopped ===")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Handle CLI commands
|
|
if len(sys.argv) > 1:
|
|
command = sys.argv[1]
|
|
if command == "whisper" and len(sys.argv) == 4:
|
|
from processors.audio_processor import AudioProcessor
|
|
AudioProcessor().process(sys.argv[2])
|
|
elif command == "pdf" and len(sys.argv) == 4:
|
|
from processors.pdf_processor import PDFProcessor
|
|
PDFProcessor().process(sys.argv[2])
|
|
elif command == "health":
|
|
from main import check_service_health
|
|
health = check_service_health()
|
|
print(json.dumps(health, indent=2))
|
|
else:
|
|
print("Usage: python main.py [whisper|pdf|health]")
|
|
sys.exit(1)
|
|
else:
|
|
main()
|