Files
cbc2027/opus.md
renato97 6058dc642e feat: Integración automática con Notion + análisis completo del código
- Instalado notion-client SDK oficial para integración robusta
- Refactorizado services/notion_service.py con SDK oficial de Notion
  - Rate limiting con retry y exponential backoff
  - Parser Markdown → Notion blocks (headings, bullets, paragraphs)
  - Soporte para pages y databases
  - Manejo robusto de errores

- Integración automática en document/generators.py
  - PDFs se suben automáticamente a Notion después de generarse
  - Contenido completo del resumen formateado con bloques
  - Metadata rica (tipo de archivo, path, fecha)

- Configuración de Notion en main.py
  - Inicialización automática al arrancar el servicio
  - Validación de credenciales

- Actualizado config/settings.py
  - Agregado load_dotenv() para cargar variables de .env
  - Configuración de Notion (NOTION_API, NOTION_DATABASE_ID)

- Scripts de utilidad creados:
  - test_notion_integration.py: Test de subida a Notion
  - test_pipeline_notion.py: Test del pipeline completo
  - verify_notion_permissions.py: Verificación de permisos
  - list_notion_pages.py: Listar páginas accesibles
  - diagnose_notion.py: Diagnóstico completo
  - create_notion_database.py: Crear database automáticamente
  - restart_service.sh: Script de reinicio del servicio

- Documentación completa en opus.md:
  - Análisis exhaustivo del codebase (42 archivos Python)
  - Bugs críticos identificados y soluciones
  - Mejoras de seguridad (autenticación, rate limiting, CORS, CSP)
  - Optimizaciones de rendimiento (Celery, Redis, PostgreSQL, WebSockets)
  - Plan de testing (estructura, ejemplos, 80% coverage goal)
  - Roadmap de implementación (6 sprints detallados)
  - Integración avanzada con Notion documentada

Estado: Notion funcionando correctamente, PDFs se suben automáticamente
2026-01-26 17:31:17 +00:00

70 KiB

🚀 CBCFacil - Plan de Mejoras y Optimizaciones

Fecha: 26 de Enero 2026
Proyecto: CBCFacil v9
Documentación: Mejoras, Fixes de Bugs, Recomendaciones e Integración con Notion


📋 TABLA DE CONTENIDOS

  1. Resumen Ejecutivo
  2. Bugs Críticos a Corregir
  3. Mejoras de Seguridad
  4. Optimizaciones de Rendimiento
  5. Mejoras de Código y Mantenibilidad
  6. Integración Avanzada con Notion
  7. Plan de Testing
  8. Roadmap de Implementación

📊 RESUMEN EJECUTIVO

CBCFacil es un sistema de procesamiento de documentos con IA bien arquitectado, pero requiere mejoras críticas en seguridad, testing y escalabilidad antes de ser considerado production-ready.

Calificación General

Arquitectura:     ████████░░ 8/10
Código:           ███████░░░ 7/10
Seguridad:        ████░░░░░░ 4/10
Testing:          ░░░░░░░░░░ 0/10
Documentación:    █████████░ 9/10
Performance:      ██████░░░░ 6/10

TOTAL:            ██████░░░░ 5.7/10

Prioridades

  • 🔴 CRÍTICO: Seguridad básica + Tests fundamentales (Sprint 1)
  • 🟡 ALTO: Performance y escalabilidad (Sprint 2)
  • 🟢 MEDIO: Frontend modernization y features avanzados (Sprint 3-4)

🐛 BUGS CRÍTICOS A CORREGIR

1. 🔴 Notion API Token Expuesto en .env.example

Ubicación: config/settings.py:47, .env.example

Problema:

# .env.example contiene un token real de Notion
NOTION_API_TOKEN=secret_XXX...REAL_TOKEN...XXX

Riesgo: Alta - Token expuesto públicamente en repositorio

Solución:

# .env.example
NOTION_API_TOKEN=secret_YOUR_NOTION_TOKEN_HERE_replace_this
NOTION_DATABASE_ID=your_database_id_here

Acción Inmediata:

  1. Cambiar el token de Notion desde la consola de Notion
  2. Actualizar .env.example con placeholder
  3. Verificar que .env esté en .gitignore
  4. Escanear el historial de Git por tokens expuestos

2. 🔴 Path Traversal Vulnerability en /downloads

Ubicación: api/routes.py:142-148

Problema:

@app.route('/downloads/<path:filepath>')
def serve_file(filepath):
    safe_path = os.path.normpath(filepath)
    # Validación insuficiente - puede ser bypasseada con symlinks
    if '..' in filepath or filepath.startswith('/'):
        abort(403)

Riesgo: Alta - Acceso no autorizado a archivos del sistema

Solución:

from werkzeug.security import safe_join
from pathlib import Path

@app.route('/downloads/<path:filepath>')
def serve_file(filepath):
    # Sanitizar filename
    safe_filename = secure_filename(filepath)
    
    # Usar safe_join para prevenir path traversal
    base_dir = settings.LOCAL_DOWNLOADS_PATH
    safe_path = safe_join(str(base_dir), safe_filename)
    
    if safe_path is None:
        abort(403, "Access denied")
    
    # Verificar que el path resuelto está dentro del directorio permitido
    resolved_path = Path(safe_path).resolve()
    if not str(resolved_path).startswith(str(base_dir.resolve())):
        abort(403, "Access denied")
    
    if not resolved_path.exists() or not resolved_path.is_file():
        abort(404)
    
    return send_file(resolved_path)

3. 🔴 SECRET_KEY Generado Aleatoriamente

Ubicación: api/routes.py:30

Problema:

# Se genera un SECRET_KEY aleatorio si no existe
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', os.urandom(24).hex())

Riesgo: Media - Sesiones inválidas tras cada restart, inseguro en producción

Solución:

# config/settings.py
@property
def SECRET_KEY(self) -> str:
    key = os.getenv('SECRET_KEY')
    if not key:
        raise ValueError(
            "SECRET_KEY is required in production. "
            "Generate one with: python -c 'import secrets; print(secrets.token_hex(32))'"
        )
    return key

# api/routes.py
app.config['SECRET_KEY'] = settings.SECRET_KEY

Acción:

# Generar secret key seguro
python -c 'import secrets; print(secrets.token_hex(32))' >> .env

# Agregar a .env
SECRET_KEY=<generated_key>

4. 🔴 Imports Dentro de Funciones

Ubicación: main.py:306-342

Problema:

def process_audio_file(audio_path: Path):
    from processors.audio_processor import audio_processor  # Import dentro
    from document.generators import DocumentGenerator       # de función
    # ...

Riesgo: Media - Performance hit, problemas de circular imports

Solución:

# main.py (top level)
from processors.audio_processor import audio_processor
from processors.pdf_processor import pdf_processor
from document.generators import DocumentGenerator

# Eliminar todos los imports de dentro de funciones
def process_audio_file(audio_path: Path):
    # Usar imports globales
    result = audio_processor.process(audio_path)
    # ...

5. 🔴 No Hay Autenticación en API

Ubicación: api/routes.py (todos los endpoints)

Problema: Cualquier usuario puede acceder a todos los endpoints sin autenticación

Riesgo: Crítica - Exposición de datos y control no autorizado

Solución con API Key:

# config/settings.py
@property
def API_KEY(self) -> Optional[str]:
    return os.getenv('API_KEY')

@property
def REQUIRE_AUTH(self) -> bool:
    return os.getenv('REQUIRE_AUTH', 'true').lower() == 'true'

# api/auth.py (nuevo archivo)
from functools import wraps
from flask import request, abort, jsonify
from config import settings

def require_api_key(f):
    """Decorator to require API key authentication"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not settings.REQUIRE_AUTH:
            return f(*args, **kwargs)
        
        api_key = request.headers.get('X-API-Key')
        if not api_key:
            abort(401, {'error': 'API key required'})
        
        if api_key != settings.API_KEY:
            abort(403, {'error': 'Invalid API key'})
        
        return f(*args, **kwargs)
    return decorated_function

# api/routes.py
from api.auth import require_api_key

@app.route('/api/files')
@require_api_key
def get_files():
    # ...

Solución con JWT (más robusta):

# requirements.txt
PyJWT>=2.8.0
flask-jwt-extended>=4.5.3

# api/auth.py
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity

jwt = JWTManager(app)

@app.route('/api/login', methods=['POST'])
def login():
    username = request.json.get('username')
    password = request.json.get('password')
    
    # Validar credenciales (usar bcrypt en producción)
    if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD:
        access_token = create_access_token(identity=username)
        return jsonify(access_token=access_token)
    
    abort(401)

@app.route('/api/files')
@jwt_required()
def get_files():
    current_user = get_jwt_identity()
    # ...

6. 🟡 Truncamiento de Texto en Resúmenes

Ubicación: document/generators.py:38, 61

Problema:

bullet_prompt = f"""...\nTexto:\n{text[:15000]}"""  # Trunca a 15k chars
summary_prompt = f"""...\n{text[:20000]}\n..."""     # Trunca a 20k chars

Riesgo: Media - Pérdida de información en documentos largos

Solución - Chunking Inteligente:

def _chunk_text(self, text: str, max_chunk_size: int = 15000) -> List[str]:
    """Split text into intelligent chunks by paragraphs"""
    if len(text) <= max_chunk_size:
        return [text]
    
    chunks = []
    current_chunk = []
    current_size = 0
    
    # Split by double newlines (paragraphs)
    paragraphs = text.split('\n\n')
    
    for para in paragraphs:
        para_size = len(para)
        
        if current_size + para_size > max_chunk_size:
            if current_chunk:
                chunks.append('\n\n'.join(current_chunk))
                current_chunk = []
                current_size = 0
        
        current_chunk.append(para)
        current_size += para_size
    
    if current_chunk:
        chunks.append('\n\n'.join(current_chunk))
    
    return chunks

def generate_summary(self, text: str, base_name: str):
    """Generate summary with intelligent chunking"""
    chunks = self._chunk_text(text, max_chunk_size=15000)
    
    # Process each chunk and combine
    all_bullets = []
    for i, chunk in enumerate(chunks):
        self.logger.info(f"Processing chunk {i+1}/{len(chunks)}")
        bullet_prompt = f"""Analiza el siguiente texto (parte {i+1} de {len(chunks)})...\n{chunk}"""
        bullets = self.ai_provider.generate_text(bullet_prompt)
        all_bullets.append(bullets)
    
    # Combine all bullets
    combined_bullets = '\n'.join(all_bullets)
    
    # Generate unified summary from combined bullets
    # ...

7. 🟡 Cache Key Usa Solo 500 Caracteres

Ubicación: services/ai_service.py:111

Problema:

def _get_cache_key(self, prompt: str, model: str = "default") -> str:
    content = f"{model}:{prompt[:500]}"  # Solo primeros 500 chars
    return hashlib.sha256(content.encode()).hexdigest()

Riesgo: Media - Colisiones de cache en prompts similares

Solución:

def _get_cache_key(self, prompt: str, model: str = "default") -> str:
    """Generate cache key from full prompt hash"""
    content = f"{model}:{prompt}"  # Hash completo del prompt
    return hashlib.sha256(content.encode()).hexdigest()

8. 🟡 Bloom Filter Usa MD5

Ubicación: storage/processed_registry.py:24

Problema:

import hashlib

def _hash(self, item: str) -> int:
    return int(hashlib.md5(item.encode()).hexdigest(), 16)  # MD5 no es seguro

Riesgo: Baja - MD5 obsoleto, posibles colisiones

Solución:

def _hash(self, item: str) -> int:
    """Use SHA256 instead of MD5 for better collision resistance"""
    return int(hashlib.sha256(item.encode()).hexdigest(), 16) % (2**64)

🔒 MEJORAS DE SEGURIDAD

1. Implementar Rate Limiting

Instalar flask-limiter:

pip install flask-limiter

Implementación:

# api/routes.py
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
    app=app,
    key_func=get_remote_address,
    default_limits=["200 per day", "50 per hour"],
    storage_uri="redis://localhost:6379"  # O memory:// para testing
)

@app.route('/api/files')
@limiter.limit("30 per minute")
@require_api_key
def get_files():
    # ...

@app.route('/api/regenerate-summary', methods=['POST'])
@limiter.limit("5 per minute")  # Más restrictivo para operaciones costosas
@require_api_key
def regenerate_summary():
    # ...

2. Configurar CORS Restrictivo

Ubicación: api/routes.py:25

Problema:

CORS(app)  # Permite todos los orígenes (*)

Solución:

# config/settings.py
@property
def CORS_ORIGINS(self) -> List[str]:
    origins_str = os.getenv('CORS_ORIGINS', 'http://localhost:5000')
    return [o.strip() for o in origins_str.split(',')]

# api/routes.py
from flask_cors import CORS

CORS(app, resources={
    r"/api/*": {
        "origins": settings.CORS_ORIGINS,
        "methods": ["GET", "POST", "DELETE"],
        "allow_headers": ["Content-Type", "X-API-Key", "Authorization"],
        "expose_headers": ["Content-Type"],
        "supports_credentials": True,
        "max_age": 3600
    }
})

Configuración .env:

# Producción
CORS_ORIGINS=https://cbcfacil.com,https://app.cbcfacil.com

# Desarrollo
CORS_ORIGINS=http://localhost:5000,http://localhost:3000

3. Implementar Content Security Policy (CSP)

Nueva funcionalidad:

# api/security.py (nuevo archivo)
from flask import make_response

def add_security_headers(response):
    """Add security headers to all responses"""
    response.headers['Content-Security-Policy'] = (
        "default-src 'self'; "
        "script-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
        "style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
        "font-src 'self' https://fonts.gstatic.com; "
        "img-src 'self' data: https:; "
        "connect-src 'self'"
    )
    response.headers['X-Content-Type-Options'] = 'nosniff'
    response.headers['X-Frame-Options'] = 'DENY'
    response.headers['X-XSS-Protection'] = '1; mode=block'
    response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
    return response

# api/routes.py
from api.security import add_security_headers

@app.after_request
def apply_security_headers(response):
    return add_security_headers(response)

4. Sanitizar Inputs y Outputs

Nueva funcionalidad:

# core/sanitizer.py (nuevo archivo)
import re
import html
from pathlib import Path

class InputSanitizer:
    """Sanitize user inputs"""
    
    @staticmethod
    def sanitize_filename(filename: str) -> str:
        """Remove dangerous characters from filename"""
        # Remove path separators
        filename = filename.replace('/', '_').replace('\\', '_')
        
        # Remove null bytes
        filename = filename.replace('\x00', '')
        
        # Limit length
        filename = filename[:255]
        
        # Remove leading/trailing dots and spaces
        filename = filename.strip('. ')
        
        return filename
    
    @staticmethod
    def sanitize_html(text: str) -> str:
        """Escape HTML to prevent XSS"""
        return html.escape(text)
    
    @staticmethod
    def sanitize_path(path: str, base_dir: Path) -> Path:
        """Ensure path is within base directory"""
        from werkzeug.security import safe_join
        
        safe_path = safe_join(str(base_dir), path)
        if safe_path is None:
            raise ValueError("Invalid path")
        
        resolved = Path(safe_path).resolve()
        if not str(resolved).startswith(str(base_dir.resolve())):
            raise ValueError("Path traversal attempt")
        
        return resolved

# Uso en api/routes.py
from core.sanitizer import InputSanitizer

@app.route('/api/transcription/<filename>')
@require_api_key
def get_transcription(filename):
    # Sanitizar filename
    safe_filename = InputSanitizer.sanitize_filename(filename)
    # ...

5. Filtrar Información Sensible de Logs

Implementación:

# core/logging_filter.py (nuevo archivo)
import logging
import re

class SensitiveDataFilter(logging.Filter):
    """Filter sensitive data from logs"""
    
    PATTERNS = [
        (re.compile(r'(api[_-]?key["\']?\s*[:=]\s*["\']?)([^"\']+)(["\']?)', re.I), r'\1***REDACTED***\3'),
        (re.compile(r'(token["\']?\s*[:=]\s*["\']?)([^"\']+)(["\']?)', re.I), r'\1***REDACTED***\3'),
        (re.compile(r'(password["\']?\s*[:=]\s*["\']?)([^"\']+)(["\']?)', re.I), r'\1***REDACTED***\3'),
        (re.compile(r'(secret["\']?\s*[:=]\s*["\']?)([^"\']+)(["\']?)', re.I), r'\1***REDACTED***\3'),
    ]
    
    def filter(self, record):
        message = record.getMessage()
        
        for pattern, replacement in self.PATTERNS:
            message = pattern.sub(replacement, message)
        
        record.msg = message
        record.args = ()
        
        return True

# main.py
from core.logging_filter import SensitiveDataFilter

# Agregar filtro a todos los handlers
for handler in logging.root.handlers:
    handler.addFilter(SensitiveDataFilter())

6. Usar HTTPS con Reverse Proxy

nginx configuration:

# /etc/nginx/sites-available/cbcfacil
server {
    listen 80;
    server_name cbcfacil.com;
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl http2;
    server_name cbcfacil.com;

    # SSL Configuration
    ssl_certificate /etc/letsencrypt/live/cbcfacil.com/fullchain.pem;
    ssl_certificate_key /etc/letsencrypt/live/cbcfacil.com/privkey.pem;
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers HIGH:!aNULL:!MD5;
    ssl_prefer_server_ciphers on;

    # Security Headers
    add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
    add_header X-Frame-Options "DENY" always;
    add_header X-Content-Type-Options "nosniff" always;
    add_header X-XSS-Protection "1; mode=block" always;

    # Rate Limiting
    limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
    limit_req zone=api burst=20 nodelay;

    # Proxy to Flask
    location / {
        proxy_pass http://127.0.0.1:5000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # Timeouts
        proxy_connect_timeout 60s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
    }

    # Static files caching
    location /static/ {
        alias /home/app/cbcfacil/static/;
        expires 1y;
        add_header Cache-Control "public, immutable";
    }
}

OPTIMIZACIONES DE RENDIMIENTO

1. Implementar Queue System con Celery

Problema Actual: Processing síncrono bloquea el loop principal

Instalación:

pip install celery redis

Configuración:

# celery_app.py (nuevo archivo)
from celery import Celery
from config import settings

celery_app = Celery(
    'cbcfacil',
    broker=settings.CELERY_BROKER_URL,
    backend=settings.CELERY_RESULT_BACKEND
)

celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=3600,  # 1 hora
    task_soft_time_limit=3300,  # 55 minutos
)

# tasks/processing.py (nuevo archivo)
from celery_app import celery_app
from processors.audio_processor import audio_processor
from processors.pdf_processor import pdf_processor
from document.generators import DocumentGenerator

@celery_app.task(bind=True, max_retries=3)
def process_audio_task(self, audio_path: str):
    """Process audio file asynchronously"""
    try:
        result = audio_processor.process(Path(audio_path))
        if result.success:
            generator = DocumentGenerator()
            generator.generate_summary(result.data['text'], result.data['base_name'])
        return {'success': True, 'file': audio_path}
    except Exception as e:
        self.retry(exc=e, countdown=60)

@celery_app.task(bind=True, max_retries=3)
def process_pdf_task(self, pdf_path: str):
    """Process PDF file asynchronously"""
    try:
        result = pdf_processor.process(Path(pdf_path))
        if result.success:
            generator = DocumentGenerator()
            generator.generate_summary(result.data['text'], result.data['base_name'])
        return {'success': True, 'file': pdf_path}
    except Exception as e:
        self.retry(exc=e, countdown=60)

# main.py
from tasks.processing import process_audio_task, process_pdf_task

def process_new_files(files: List[Path]):
    """Queue files for processing"""
    for file in files:
        if file.suffix.lower() in ['.mp3', '.wav', '.m4a']:
            task = process_audio_task.delay(str(file))
            logger.info(f"Queued audio processing: {file.name} (task_id={task.id})")
        elif file.suffix.lower() == '.pdf':
            task = process_pdf_task.delay(str(file))
            logger.info(f"Queued PDF processing: {file.name} (task_id={task.id})")

# config/settings.py
@property
def CELERY_BROKER_URL(self) -> str:
    return os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')

@property
def CELERY_RESULT_BACKEND(self) -> str:
    return os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')

Ejecutar workers:

# Terminal 1: Flask app
python main.py

# Terminal 2: Celery worker
celery -A celery_app worker --loglevel=info --concurrency=2

# Terminal 3: Celery beat (para tareas programadas)
celery -A celery_app beat --loglevel=info

2. Implementar Redis para Caching Distribuido

Problema: Cache LRU en memoria se pierde en restarts

Instalación:

pip install redis hiredis

Implementación:

# services/cache_service.py (nuevo archivo)
import redis
import json
import hashlib
from typing import Optional, Any
from config import settings

class CacheService:
    """Distributed cache with Redis"""
    
    def __init__(self):
        self.redis_client = redis.Redis(
            host=settings.REDIS_HOST,
            port=settings.REDIS_PORT,
            db=settings.REDIS_DB,
            decode_responses=True,
            socket_connect_timeout=5,
            socket_timeout=5
        )
        self.default_ttl = 3600  # 1 hora
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        try:
            value = self.redis_client.get(key)
            if value:
                return json.loads(value)
            return None
        except Exception as e:
            logger.error(f"Cache get error: {e}")
            return None
    
    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
        """Set value in cache"""
        try:
            ttl = ttl or self.default_ttl
            serialized = json.dumps(value)
            return self.redis_client.setex(key, ttl, serialized)
        except Exception as e:
            logger.error(f"Cache set error: {e}")
            return False
    
    def delete(self, key: str) -> bool:
        """Delete key from cache"""
        try:
            return bool(self.redis_client.delete(key))
        except Exception as e:
            logger.error(f"Cache delete error: {e}")
            return False
    
    def get_or_compute(self, key: str, compute_fn, ttl: Optional[int] = None):
        """Get from cache or compute and store"""
        cached = self.get(key)
        if cached is not None:
            return cached
        
        value = compute_fn()
        self.set(key, value, ttl)
        return value

cache_service = CacheService()

# services/ai_service.py
from services.cache_service import cache_service

class AIService:
    def generate_text(self, prompt: str, model: str = "default") -> str:
        cache_key = self._get_cache_key(prompt, model)
        
        # Usar Redis cache
        def compute():
            return self.ai_provider.generate_text(prompt)
        
        return cache_service.get_or_compute(cache_key, compute, ttl=3600)

# config/settings.py
@property
def REDIS_HOST(self) -> str:
    return os.getenv('REDIS_HOST', 'localhost')

@property
def REDIS_PORT(self) -> int:
    return int(os.getenv('REDIS_PORT', '6379'))

@property
def REDIS_DB(self) -> int:
    return int(os.getenv('REDIS_DB', '0'))

3. Migrar a PostgreSQL para Metadata

Problema: processed_files.txt no escala, falta ACID

Instalación:

pip install psycopg2-binary sqlalchemy alembic

Schema:

# models/database.py (nuevo archivo)
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, JSON, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from config import settings

Base = declarative_base()

class ProcessedFile(Base):
    __tablename__ = 'processed_files'
    
    id = Column(Integer, primary_key=True)
    filename = Column(String(255), unique=True, nullable=False, index=True)
    filepath = Column(String(512), nullable=False)
    file_type = Column(String(50), nullable=False)  # audio, pdf, text
    status = Column(String(50), default='pending')  # pending, processing, completed, failed
    
    # Timestamps
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    processed_at = Column(DateTime)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Processing results
    transcription_text = Column(Text)
    summary_text = Column(Text)
    
    # Generated files
    markdown_path = Column(String(512))
    docx_path = Column(String(512))
    pdf_path = Column(String(512))
    
    # Metadata
    file_size = Column(Integer)
    duration = Column(Integer)  # For audio files
    page_count = Column(Integer)  # For PDFs
    
    # Notion integration
    notion_uploaded = Column(Boolean, default=False)
    notion_page_id = Column(String(255))
    
    # Metrics
    processing_time = Column(Integer)  # seconds
    error_message = Column(Text)
    retry_count = Column(Integer, default=0)
    
    # Additional metadata
    metadata = Column(JSON)

# Database session
engine = create_engine(settings.DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# storage/processed_registry.py (refactor)
from models.database import ProcessedFile, get_db

class ProcessedRegistry:
    def is_processed(self, filename: str) -> bool:
        with get_db() as db:
            return db.query(ProcessedFile).filter_by(
                filename=filename,
                status='completed'
            ).first() is not None
    
    def mark_processed(self, filename: str, metadata: dict):
        with get_db() as db:
            file_record = ProcessedFile(
                filename=filename,
                filepath=metadata.get('filepath'),
                file_type=metadata.get('file_type'),
                status='completed',
                processed_at=datetime.utcnow(),
                transcription_text=metadata.get('transcription'),
                summary_text=metadata.get('summary'),
                markdown_path=metadata.get('markdown_path'),
                docx_path=metadata.get('docx_path'),
                pdf_path=metadata.get('pdf_path'),
                notion_uploaded=metadata.get('notion_uploaded', False),
                processing_time=metadata.get('processing_time'),
                metadata=metadata
            )
            db.add(file_record)
            db.commit()

# config/settings.py
@property
def DATABASE_URL(self) -> str:
    return os.getenv(
        'DATABASE_URL',
        'postgresql://cbcfacil:password@localhost/cbcfacil'
    )

Migrations con Alembic:

# Inicializar Alembic
alembic init migrations

# Crear migración
alembic revision --autogenerate -m "Create processed_files table"

# Aplicar migración
alembic upgrade head

4. WebSockets para Updates en Tiempo Real

Instalación:

pip install flask-socketio python-socketio eventlet

Implementación:

# api/routes.py
from flask_socketio import SocketIO, emit

socketio = SocketIO(app, cors_allowed_origins=settings.CORS_ORIGINS, async_mode='eventlet')

@socketio.on('connect')
def handle_connect():
    emit('connected', {'message': 'Connected to CBCFacil'})

@socketio.on('subscribe_file')
def handle_subscribe(data):
    filename = data.get('filename')
    # Join room para recibir updates de este archivo
    join_room(filename)

# tasks/processing.py
from api.routes import socketio

@celery_app.task(bind=True)
def process_audio_task(self, audio_path: str):
    filename = Path(audio_path).name
    
    # Notificar inicio
    socketio.emit('processing_started', {
        'filename': filename,
        'status': 'processing'
    }, room=filename)
    
    try:
        # Progress updates
        socketio.emit('processing_progress', {
            'filename': filename,
            'progress': 25,
            'stage': 'transcription'
        }, room=filename)
        
        result = audio_processor.process(Path(audio_path))
        
        socketio.emit('processing_progress', {
            'filename': filename,
            'progress': 75,
            'stage': 'summary_generation'
        }, room=filename)
        
        generator = DocumentGenerator()
        generator.generate_summary(result.data['text'], result.data['base_name'])
        
        # Notificar completado
        socketio.emit('processing_completed', {
            'filename': filename,
            'status': 'completed',
            'progress': 100
        }, room=filename)
        
    except Exception as e:
        socketio.emit('processing_failed', {
            'filename': filename,
            'status': 'failed',
            'error': str(e)
        }, room=filename)
        raise

# templates/index.html (JavaScript)
const socket = io('http://localhost:5000');

socket.on('connect', () => {
    console.log('Connected to server');
});

socket.on('processing_started', (data) => {
    showNotification(`Processing started: ${data.filename}`);
});

socket.on('processing_progress', (data) => {
    updateProgressBar(data.filename, data.progress, data.stage);
});

socket.on('processing_completed', (data) => {
    showNotification(`Completed: ${data.filename}`, 'success');
    refreshFileList();
});

socket.on('processing_failed', (data) => {
    showNotification(`Failed: ${data.filename} - ${data.error}`, 'error');
});

// Subscribir a archivo específico
function subscribeToFile(filename) {
    socket.emit('subscribe_file', { filename: filename });
}

📝 MEJORAS DE CÓDIGO Y MANTENIBILIDAD

1. Agregar Type Hints Completos

Problema: No todos los métodos tienen type hints

Solución:

# Usar mypy para verificar
pip install mypy

# pyproject.toml
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true

# Ejecutar
mypy cbcfacil/

2. Implementar Logging Rotativo

Problema: main.log puede crecer indefinidamente

Solución:

# main.py
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler

# Rotar por tamaño (max 10MB, 5 backups)
file_handler = RotatingFileHandler(
    'main.log',
    maxBytes=10*1024*1024,  # 10MB
    backupCount=5
)

# O rotar diariamente
file_handler = TimedRotatingFileHandler(
    'main.log',
    when='midnight',
    interval=1,
    backupCount=30  # Mantener 30 días
)

file_handler.setFormatter(formatter)
logging.root.addHandler(file_handler)

3. Agregar Health Checks Avanzados

# core/health_check.py (mejorado)
class HealthCheckService:
    def get_full_status(self) -> Dict[str, Any]:
        """Get comprehensive health status"""
        return {
            'status': 'healthy',
            'timestamp': datetime.utcnow().isoformat(),
            'version': settings.APP_VERSION,
            'checks': {
                'database': self._check_database(),
                'redis': self._check_redis(),
                'celery': self._check_celery(),
                'gpu': self._check_gpu(),
                'disk_space': self._check_disk_space(),
                'external_apis': {
                    'nextcloud': self._check_nextcloud(),
                    'notion': self._check_notion(),
                    'telegram': self._check_telegram(),
                    'claude': self._check_claude(),
                    'gemini': self._check_gemini(),
                }
            },
            'metrics': {
                'processed_files_today': self._count_processed_today(),
                'queue_size': self._get_queue_size(),
                'avg_processing_time': self._get_avg_processing_time(),
                'error_rate': self._get_error_rate(),
            }
        }
    
    def _check_database(self) -> Dict[str, Any]:
        try:
            from models.database import engine
            with engine.connect() as conn:
                conn.execute("SELECT 1")
            return {'status': 'healthy'}
        except Exception as e:
            return {'status': 'unhealthy', 'error': str(e)}
    
    def _check_redis(self) -> Dict[str, Any]:
        try:
            from services.cache_service import cache_service
            cache_service.redis_client.ping()
            return {'status': 'healthy'}
        except Exception as e:
            return {'status': 'unhealthy', 'error': str(e)}
    
    def _check_celery(self) -> Dict[str, Any]:
        try:
            from celery_app import celery_app
            stats = celery_app.control.inspect().stats()
            active = celery_app.control.inspect().active()
            
            return {
                'status': 'healthy' if stats else 'unhealthy',
                'workers': len(stats) if stats else 0,
                'active_tasks': sum(len(tasks) for tasks in active.values()) if active else 0
            }
        except Exception as e:
            return {'status': 'unhealthy', 'error': str(e)}

4. Modularizar Frontend

Problema: index.html tiene 2500+ líneas

Solución - Migrar a React:

# Crear frontend moderno
npx create-react-app frontend
cd frontend
npm install axios socket.io-client recharts date-fns

Estructura propuesta:

frontend/
├── src/
│   ├── components/
│   │   ├── Dashboard/
│   │   │   ├── StatsCards.jsx
│   │   │   ├── ProcessingQueue.jsx
│   │   │   └── SystemHealth.jsx
│   │   ├── Files/
│   │   │   ├── FileList.jsx
│   │   │   ├── FileItem.jsx
│   │   │   └── FileUpload.jsx
│   │   ├── Preview/
│   │   │   ├── PreviewPanel.jsx
│   │   │   ├── TranscriptionView.jsx
│   │   │   └── SummaryView.jsx
│   │   ├── Versions/
│   │   │   └── VersionHistory.jsx
│   │   └── Layout/
│   │       ├── Sidebar.jsx
│   │       ├── Header.jsx
│   │       └── Footer.jsx
│   ├── hooks/
│   │   ├── useWebSocket.js
│   │   ├── useFiles.js
│   │   └── useAuth.js
│   ├── services/
│   │   ├── api.js
│   │   └── socket.js
│   ├── store/
│   │   └── store.js (Redux/Zustand)
│   ├── App.jsx
│   └── index.jsx
└── package.json

🔗 INTEGRACIÓN AVANZADA CON NOTION

Estado Actual

La integración con Notion está parcialmente implementada en services/notion_service.py y document/generators.py. Actualmente:

  • Upload de PDFs a Notion database
  • Creación de páginas con título y status
  • ⚠️ Upload con base64 (limitado a 5MB por la API de Notion)
  • No hay sincronización bidireccional
  • No se actualizan páginas existentes
  • No se manejan rate limits de Notion
  • No hay webhook para cambios en Notion

Mejoras Propuestas

1. Migrar a Cliente Oficial de Notion

Problema: Uso directo de requests sin manejo de rate limits

Solución:

pip install notion-client
# services/notion_service.py (refactorizado)
from notion_client import Client
from notion_client.errors import APIResponseError
import time
from typing import Optional, Dict, Any, List
from pathlib import Path
import logging

class NotionService:
    """Enhanced Notion integration service"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self._client: Optional[Client] = None
        self._database_id: Optional[str] = None
        self._rate_limiter = RateLimiter(max_requests=3, time_window=1)  # 3 req/sec
    
    def configure(self, token: str, database_id: str) -> None:
        """Configure Notion with official SDK"""
        self._client = Client(auth=token)
        self._database_id = database_id
        self.logger.info("Notion service configured with official SDK")
    
    @property
    def is_configured(self) -> bool:
        return bool(self._client and self._database_id)
    
    def _rate_limited_request(self, func, *args, **kwargs):
        """Execute request with rate limiting and retry"""
        max_retries = 3
        base_delay = 1
        
        for attempt in range(max_retries):
            try:
                self._rate_limiter.wait()
                return func(*args, **kwargs)
            except APIResponseError as e:
                if e.code == 'rate_limited':
                    delay = base_delay * (2 ** attempt)  # Exponential backoff
                    self.logger.warning(f"Rate limited, waiting {delay}s")
                    time.sleep(delay)
                else:
                    raise
        
        raise Exception("Max retries exceeded")
    
    def create_page(self, title: str, content: str, metadata: Dict[str, Any]) -> Optional[str]:
        """Create a new page in Notion database"""
        if not self.is_configured:
            self.logger.warning("Notion not configured")
            return None
        
        try:
            # Preparar properties
            properties = {
                "Name": {
                    "title": [
                        {
                            "text": {
                                "content": title
                            }
                        }
                    ]
                },
                "Status": {
                    "select": {
                        "name": "Procesado"
                    }
                },
                "Tipo": {
                    "select": {
                        "name": metadata.get('file_type', 'Desconocido')
                    }
                },
                "Fecha Procesamiento": {
                    "date": {
                        "start": metadata.get('processed_at', datetime.utcnow().isoformat())
                    }
                }
            }
            
            # Agregar campos opcionales
            if metadata.get('duration'):
                properties["Duración (min)"] = {
                    "number": round(metadata['duration'] / 60, 2)
                }
            
            if metadata.get('page_count'):
                properties["Páginas"] = {
                    "number": metadata['page_count']
                }
            
            # Crear página
            page = self._rate_limited_request(
                self._client.pages.create,
                parent={"database_id": self._database_id},
                properties=properties
            )
            
            page_id = page['id']
            self.logger.info(f"Notion page created: {page_id}")
            
            # Agregar contenido como bloques
            self._add_content_blocks(page_id, content)
            
            return page_id
            
        except Exception as e:
            self.logger.error(f"Error creating Notion page: {e}")
            return None
    
    def _add_content_blocks(self, page_id: str, content: str) -> bool:
        """Add content blocks to Notion page"""
        try:
            # Dividir contenido en secciones
            sections = self._parse_markdown_to_blocks(content)
            
            # Notion API limita a 100 bloques por request
            for i in range(0, len(sections), 100):
                batch = sections[i:i+100]
                self._rate_limited_request(
                    self._client.blocks.children.append,
                    block_id=page_id,
                    children=batch
                )
            
            return True
            
        except Exception as e:
            self.logger.error(f"Error adding content blocks: {e}")
            return False
    
    def _parse_markdown_to_blocks(self, markdown: str) -> List[Dict]:
        """Convert markdown to Notion blocks"""
        blocks = []
        lines = markdown.split('\n')
        
        for line in lines:
            line = line.strip()
            
            if not line:
                continue
            
            # Headings
            if line.startswith('# '):
                blocks.append({
                    "object": "block",
                    "type": "heading_1",
                    "heading_1": {
                        "rich_text": [{"type": "text", "text": {"content": line[2:]}}]
                    }
                })
            elif line.startswith('## '):
                blocks.append({
                    "object": "block",
                    "type": "heading_2",
                    "heading_2": {
                        "rich_text": [{"type": "text", "text": {"content": line[3:]}}]
                    }
                })
            elif line.startswith('### '):
                blocks.append({
                    "object": "block",
                    "type": "heading_3",
                    "heading_3": {
                        "rich_text": [{"type": "text", "text": {"content": line[4:]}}]
                    }
                })
            # Bullet points
            elif line.startswith('- ') or line.startswith('* '):
                blocks.append({
                    "object": "block",
                    "type": "bulleted_list_item",
                    "bulleted_list_item": {
                        "rich_text": [{"type": "text", "text": {"content": line[2:]}}]
                    }
                })
            # Paragraph
            else:
                # Notion limita rich_text a 2000 chars
                if len(line) > 2000:
                    chunks = [line[i:i+2000] for i in range(0, len(line), 2000)]
                    for chunk in chunks:
                        blocks.append({
                            "object": "block",
                            "type": "paragraph",
                            "paragraph": {
                                "rich_text": [{"type": "text", "text": {"content": chunk}}]
                            }
                        })
                else:
                    blocks.append({
                        "object": "block",
                        "type": "paragraph",
                        "paragraph": {
                            "rich_text": [{"type": "text", "text": {"content": line}}]
                        }
                    })
        
        return blocks
    
    def upload_file_to_page(self, page_id: str, file_path: Path, file_type: str = 'pdf') -> bool:
        """Upload file as external file to Notion page"""
        if not file_path.exists():
            self.logger.error(f"File not found: {file_path}")
            return False
        
        try:
            # Notion no soporta upload directo, necesitas hosting externo
            # Opción 1: Subir a Nextcloud y obtener link público
            # Opción 2: Usar S3/MinIO
            # Opción 3: Usar servicio de hosting dedicado
            
            # Asumiendo que tienes un endpoint público para el archivo
            file_url = self._get_public_url(file_path)
            
            if not file_url:
                self.logger.warning("Could not generate public URL for file")
                return False
            
            # Agregar como bloque de archivo
            self._rate_limited_request(
                self._client.blocks.children.append,
                block_id=page_id,
                children=[
                    {
                        "object": "block",
                        "type": "file",
                        "file": {
                            "type": "external",
                            "external": {
                                "url": file_url
                            }
                        }
                    }
                ]
            )
            
            return True
            
        except Exception as e:
            self.logger.error(f"Error uploading file to Notion: {e}")
            return False
    
    def _get_public_url(self, file_path: Path) -> Optional[str]:
        """Generate public URL for file (via Nextcloud or S3)"""
        # Implementar según tu infraestructura
        # Opción 1: Nextcloud share link
        from services.webdav_service import webdav_service
        
        # Subir a Nextcloud si no está
        remote_path = f"/cbcfacil/{file_path.name}"
        webdav_service.upload_file(file_path, remote_path)
        
        # Generar share link (requiere Nextcloud API adicional)
        # return webdav_service.create_share_link(remote_path)
        
        # Opción 2: Usar el endpoint de downloads de tu API
        return f"{settings.PUBLIC_API_URL}/downloads/{file_path.name}"
    
    def update_page_status(self, page_id: str, status: str) -> bool:
        """Update page status"""
        try:
            self._rate_limited_request(
                self._client.pages.update,
                page_id=page_id,
                properties={
                    "Status": {
                        "select": {
                            "name": status
                        }
                    }
                }
            )
            return True
        except Exception as e:
            self.logger.error(f"Error updating page status: {e}")
            return False
    
    def search_pages(self, query: str) -> List[Dict]:
        """Search pages in database"""
        try:
            results = self._rate_limited_request(
                self._client.databases.query,
                database_id=self._database_id,
                filter={
                    "property": "Name",
                    "title": {
                        "contains": query
                    }
                }
            )
            return results.get('results', [])
        except Exception as e:
            self.logger.error(f"Error searching Notion pages: {e}")
            return []
    
    def get_page_content(self, page_id: str) -> Optional[str]:
        """Get page content as markdown"""
        try:
            blocks = self._rate_limited_request(
                self._client.blocks.children.list,
                block_id=page_id
            )
            
            markdown = self._blocks_to_markdown(blocks.get('results', []))
            return markdown
            
        except Exception as e:
            self.logger.error(f"Error getting page content: {e}")
            return None
    
    def _blocks_to_markdown(self, blocks: List[Dict]) -> str:
        """Convert Notion blocks to markdown"""
        markdown_lines = []
        
        for block in blocks:
            block_type = block.get('type')
            
            if block_type == 'heading_1':
                text = self._extract_text(block['heading_1'])
                markdown_lines.append(f"# {text}")
            elif block_type == 'heading_2':
                text = self._extract_text(block['heading_2'])
                markdown_lines.append(f"## {text}")
            elif block_type == 'heading_3':
                text = self._extract_text(block['heading_3'])
                markdown_lines.append(f"### {text}")
            elif block_type == 'bulleted_list_item':
                text = self._extract_text(block['bulleted_list_item'])
                markdown_lines.append(f"- {text}")
            elif block_type == 'paragraph':
                text = self._extract_text(block['paragraph'])
                markdown_lines.append(text)
        
        return '\n\n'.join(markdown_lines)
    
    def _extract_text(self, block_data: Dict) -> str:
        """Extract text from Notion rich_text"""
        rich_texts = block_data.get('rich_text', [])
        return ''.join(rt.get('text', {}).get('content', '') for rt in rich_texts)

# Rate limiter helper
class RateLimiter:
    def __init__(self, max_requests: int, time_window: float):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = []
    
    def wait(self):
        """Wait if rate limit is reached"""
        now = time.time()
        
        # Remove old requests
        self.requests = [r for r in self.requests if now - r < self.time_window]
        
        # Wait if limit reached
        if len(self.requests) >= self.max_requests:
            sleep_time = self.time_window - (now - self.requests[0])
            if sleep_time > 0:
                time.sleep(sleep_time)
            self.requests = []
        
        self.requests.append(now)

# Global instance
notion_service = NotionService()

2. Sincronización Bidireccional

Implementar webhooks para recibir cambios desde Notion:

# api/webhooks.py (nuevo archivo)
from flask import Blueprint, request, jsonify
from services.notion_service import notion_service
from tasks.sync import sync_notion_changes

webhooks_bp = Blueprint('webhooks', __name__)

@webhooks_bp.route('/webhooks/notion', methods=['POST'])
def notion_webhook():
    """Handle Notion webhook events"""
    # Verificar signature (si Notion lo soporta)
    # signature = request.headers.get('X-Notion-Signature')
    # if not verify_signature(request.data, signature):
    #     abort(403)
    
    data = request.json
    
    # Procesar evento
    event_type = data.get('type')
    
    if event_type == 'page.updated':
        page_id = data.get('page_id')
        # Queue task para sincronizar cambios
        sync_notion_changes.delay(page_id)
    
    return jsonify({'status': 'ok'}), 200

# tasks/sync.py (nuevo archivo)
from celery_app import celery_app
from services.notion_service import notion_service
from models.database import ProcessedFile, get_db

@celery_app.task
def sync_notion_changes(page_id: str):
    """Sync changes from Notion back to local database"""
    logger = logging.getLogger(__name__)
    
    try:
        # Obtener contenido actualizado de Notion
        content = notion_service.get_page_content(page_id)
        
        if not content:
            logger.error(f"Could not fetch Notion page: {page_id}")
            return
        
        # Buscar registro local
        with get_db() as db:
            file_record = db.query(ProcessedFile).filter_by(
                notion_page_id=page_id
            ).first()
            
            if file_record:
                file_record.summary_text = content
                file_record.updated_at = datetime.utcnow()
                db.commit()
                logger.info(f"Synced changes from Notion for {file_record.filename}")
            else:
                logger.warning(f"No local record found for Notion page {page_id}")
    
    except Exception as e:
        logger.error(f"Error syncing Notion changes: {e}")

Configurar webhook en Notion:

# Nota: Notion actualmente no tiene webhooks nativos
# Alternativas:
# 1. Polling periódico (cada 5 min)
# 2. Usar servicios de terceros como Zapier/Make
# 3. Implementar polling con Celery beat

# tasks/sync.py
@celery_app.task
def poll_notion_changes():
    """Poll Notion for changes (scheduled task)"""
    # Buscar páginas modificadas recientemente
    # ...

3. Pipeline Completo de Integración con Notion

Diagrama del flujo:

┌─────────────────────────────────────────────────────────────┐
│                     CBCFacil Pipeline                        │
└─────────────────────────────────────────────────────────────┘
                            │
                            ▼
          ┌─────────────────────────────────┐
          │  1. Archivo detectado en         │
          │     Nextcloud                    │
          └─────────────────────────────────┘
                            │
                            ▼
          ┌─────────────────────────────────┐
          │  2. Procesar (Audio/PDF)         │
          │     - Transcripción              │
          │     - OCR                        │
          └─────────────────────────────────┘
                            │
                            ▼
          ┌─────────────────────────────────┐
          │  3. Generar Resumen con IA       │
          │     - Claude/Gemini              │
          │     - Formateo                   │
          └─────────────────────────────────┘
                            │
                            ▼
          ┌─────────────────────────────────┐
          │  4. Crear Documentos             │
          │     - Markdown                   │
          │     - DOCX                       │
          │     - PDF                        │
          └─────────────────────────────────┘
                            │
                ┌───────────┴──────────┐
                ▼                      ▼
    ┌──────────────────┐   ┌──────────────────┐
    │  5a. Subir a     │   │  5b. Guardar en  │
    │      Notion      │   │      Database    │
    │  - Crear página  │   │  - PostgreSQL    │
    │  - Agregar       │   │  - Metadata      │
    │    contenido     │   │  - notion_page_id│
    │  - Adjuntar PDF  │   │                  │
    └──────────────────┘   └──────────────────┘
                │                      │
                └───────────┬──────────┘
                            ▼
          ┌─────────────────────────────────┐
          │  6. Notificar                    │
          │     - Telegram                   │
          │     - Email (opcional)           │
          │     - WebSocket (dashboard)      │
          └─────────────────────────────────┘

Implementación:

# document/generators.py (mejorado)
def generate_summary(self, text: str, base_name: str, file_metadata: Dict[str, Any]) -> Tuple[bool, str, Dict[str, Any]]:
    """Generate summary with full Notion integration"""
    
    try:
        # Steps 1-4: Existing logic
        # ...
        
        # Step 5: Upload to Notion with rich metadata
        notion_page_id = None
        if settings.has_notion_config:
            try:
                title = base_name.replace('_', ' ').title()
                
                # Preparar metadata enriquecida
                metadata = {
                    'file_type': file_metadata.get('file_type', 'Desconocido'),
                    'processed_at': datetime.utcnow().isoformat(),
                    'duration': file_metadata.get('duration'),
                    'page_count': file_metadata.get('page_count'),
                    'file_size': file_metadata.get('file_size'),
                }
                
                # Crear página en Notion
                notion_page_id = notion_service.create_page(
                    title=title,
                    content=summary,
                    metadata=metadata
                )
                
                if notion_page_id:
                    self.logger.info(f"Notion page created: {notion_page_id}")
                    
                    # Upload PDF to Notion page
                    notion_service.upload_file_to_page(
                        page_id=notion_page_id,
                        file_path=pdf_path,
                        file_type='pdf'
                    )
                    
            except Exception as e:
                self.logger.warning(f"Notion integration failed: {e}")
        
        # Update response metadata
        metadata = {
            'markdown_path': str(markdown_path),
            'docx_path': str(docx_path),
            'pdf_path': str(pdf_path),
            'summary': summary,
            'notion_page_id': notion_page_id,
            'notion_uploaded': bool(notion_page_id),
        }
        
        return True, summary, metadata
        
    except Exception as e:
        self.logger.error(f"Document generation failed: {e}")
        return False, "", {}

4. Configuración de Base de Datos Notion

Schema recomendado para la base de datos de Notion:

Propiedad Tipo Descripción
Name Title Nombre del documento
Status Select Procesado / En Revisión / Aprobado
Tipo Select Audio / PDF / Texto
Fecha Procesamiento Date Cuándo se procesó
Duración (min) Number Para archivos de audio
Páginas Number Para PDFs
Tamaño (MB) Number Tamaño del archivo
Calidad Select Alta / Media / Baja
Categoría Multi-select Tags/categorías
Archivo Original Files & Media Link al archivo
Resumen PDF Files & Media PDF generado

Script para crear la base de datos:

# scripts/setup_notion_database.py (nuevo archivo)
from notion_client import Client
import os

def create_cbcfacil_database(token: str, parent_page_id: str):
    """Create Notion database for CBCFacil"""
    client = Client(auth=token)
    
    database = client.databases.create(
        parent={"type": "page_id", "page_id": parent_page_id},
        title=[
            {
                "type": "text",
                "text": {"content": "CBCFacil - Documentos Procesados"}
            }
        ],
        properties={
            "Name": {
                "title": {}
            },
            "Status": {
                "select": {
                    "options": [
                        {"name": "Procesado", "color": "green"},
                        {"name": "En Revisión", "color": "yellow"},
                        {"name": "Aprobado", "color": "blue"},
                        {"name": "Error", "color": "red"},
                    ]
                }
            },
            "Tipo": {
                "select": {
                    "options": [
                        {"name": "Audio", "color": "purple"},
                        {"name": "PDF", "color": "orange"},
                        {"name": "Texto", "color": "gray"},
                    ]
                }
            },
            "Fecha Procesamiento": {
                "date": {}
            },
            "Duración (min)": {
                "number": {
                    "format": "number_with_commas"
                }
            },
            "Páginas": {
                "number": {}
            },
            "Tamaño (MB)": {
                "number": {
                    "format": "number_with_commas"
                }
            },
            "Calidad": {
                "select": {
                    "options": [
                        {"name": "Alta", "color": "green"},
                        {"name": "Media", "color": "yellow"},
                        {"name": "Baja", "color": "red"},
                    ]
                }
            },
            "Categoría": {
                "multi_select": {
                    "options": [
                        {"name": "Historia", "color": "blue"},
                        {"name": "Ciencia", "color": "green"},
                        {"name": "Literatura", "color": "purple"},
                        {"name": "Política", "color": "red"},
                    ]
                }
            },
        }
    )
    
    print(f"Database created: {database['id']}")
    print(f"Add this to your .env: NOTION_DATABASE_ID={database['id']}")
    
    return database['id']

if __name__ == '__main__':
    token = input("Enter your Notion API token: ")
    parent_page_id = input("Enter the parent page ID: ")
    
    create_cbcfacil_database(token, parent_page_id)

Ejecutar:

python scripts/setup_notion_database.py

5. Features Avanzados de Notion

Auto-categorización con IA:

# services/notion_service.py
def auto_categorize(self, summary: str) -> List[str]:
    """Auto-categorize content using AI"""
    from services.ai import ai_provider_factory
    
    ai = ai_provider_factory.get_best_provider()
    
    prompt = f"""Analiza el siguiente resumen y asigna 1-3 categorías principales de esta lista:
    - Historia
    - Ciencia
    - Literatura
    - Política
    - Economía
    - Tecnología
    - Filosofía
    - Arte
    - Deporte
    
    Resumen: {summary[:500]}
    
    Devuelve solo las categorías separadas por comas."""
    
    categories_str = ai.generate_text(prompt)
    categories = [c.strip() for c in categories_str.split(',')]
    
    return categories[:3]

def create_page(self, title: str, content: str, metadata: Dict[str, Any]):
    # ...
    
    # Auto-categorizar
    categories = self.auto_categorize(content)
    
    properties["Categoría"] = {
        "multi_select": [{"name": cat} for cat in categories]
    }
    
    # ...

Evaluación de calidad:

def assess_quality(self, transcription: str, summary: str) -> str:
    """Assess document quality based on metrics"""
    
    # Criterios:
    # - Longitud del resumen (500-700 palabras = Alta)
    # - Coherencia (evaluar con IA)
    # - Presencia de datos clave (fechas, nombres)
    
    word_count = len(summary.split())
    
    if word_count < 300:
        return "Baja"
    elif word_count < 600:
        return "Media"
    else:
        return "Alta"

PLAN DE TESTING

Estructura de Tests

tests/
├── unit/
│   ├── test_settings.py
│   ├── test_validators.py
│   ├── test_webdav_service.py
│   ├── test_vram_manager.py
│   ├── test_ai_service.py
│   ├── test_notion_service.py
│   ├── test_audio_processor.py
│   ├── test_pdf_processor.py
│   ├── test_document_generator.py
│   └── test_processed_registry.py
├── integration/
│   ├── test_audio_pipeline.py
│   ├── test_pdf_pipeline.py
│   ├── test_notion_integration.py
│   └── test_api_endpoints.py
├── e2e/
│   └── test_full_workflow.py
├── conftest.py
└── fixtures/
    ├── sample_audio.mp3
    ├── sample_pdf.pdf
    └── mock_responses.json

Ejemplos de Tests

# tests/unit/test_notion_service.py
import pytest
from unittest.mock import Mock, patch
from services.notion_service import NotionService

@pytest.fixture
def notion_service():
    service = NotionService()
    service.configure(token="test_token", database_id="test_db")
    return service

def test_notion_service_configuration(notion_service):
    assert notion_service.is_configured
    assert notion_service._database_id == "test_db"

@patch('notion_client.Client')
def test_create_page_success(mock_client, notion_service):
    # Mock response
    mock_client.return_value.pages.create.return_value = {
        'id': 'page_123'
    }
    
    page_id = notion_service.create_page(
        title="Test Page",
        content="# Test Content",
        metadata={'file_type': 'pdf'}
    )
    
    assert page_id == 'page_123'

def test_rate_limiter():
    from services.notion_service import RateLimiter
    import time
    
    limiter = RateLimiter(max_requests=3, time_window=1.0)
    
    # Should allow 3 requests immediately
    start = time.time()
    for _ in range(3):
        limiter.wait()
    elapsed = time.time() - start
    assert elapsed < 0.1
    
    # 4th request should wait
    start = time.time()
    limiter.wait()
    elapsed = time.time() - start
    assert elapsed >= 0.9

# tests/integration/test_notion_integration.py
@pytest.mark.integration
def test_full_notion_workflow(tmpdir):
    """Test complete workflow: process file -> create Notion page"""
    # Setup
    audio_file = tmpdir / "test_audio.mp3"
    # ... create test file
    
    # Process audio
    from processors.audio_processor import audio_processor
    result = audio_processor.process(audio_file)
    
    # Generate summary
    from document.generators import DocumentGenerator
    generator = DocumentGenerator()
    success, summary, metadata = generator.generate_summary(
        result.data['text'],
        'test_audio'
    )
    
    assert success
    assert metadata.get('notion_page_id')
    
    # Verify Notion page exists
    from services.notion_service import notion_service
    content = notion_service.get_page_content(metadata['notion_page_id'])
    assert content is not None

Coverage Goal

# Ejecutar tests con coverage
pytest --cov=. --cov-report=html --cov-report=term

# Meta: 80% coverage
# - Unit tests: 90% coverage
# - Integration tests: 70% coverage
# - E2E tests: 60% coverage

📅 ROADMAP DE IMPLEMENTACIÓN

Sprint 1: Seguridad y Fixes Críticos (2 semanas)

Semana 1:

  • Cambiar Notion API token
  • Fix path traversal vulnerability
  • Fix SECRET_KEY generation
  • Mover imports a module level
  • Implementar API authentication (JWT)

Semana 2:

  • Configurar CORS restrictivo
  • Agregar rate limiting (flask-limiter)
  • Implementar CSP headers
  • Input sanitization completo
  • Filtrar info sensible de logs

Entregables:

  • Sistema con seguridad básica
  • Vulnerabilidades críticas resueltas
  • Autenticación funcional

Sprint 2: Testing y Performance (2 semanas)

Semana 1:

  • Setup testing infrastructure
  • Unit tests para services (50% coverage)
  • Integration tests para pipelines
  • CI/CD con GitHub Actions

Semana 2:

  • Implementar Celery + Redis
  • Queue system para processing
  • Cache distribuido con Redis
  • WebSockets para updates en tiempo real

Entregables:

  • 50% code coverage
  • Processing asíncrono funcional
  • Real-time dashboard updates

Sprint 3: Notion Integration Avanzada (2 semanas)

Semana 1:

  • Migrar a notion-client oficial
  • Implementar rate limiting para Notion
  • Markdown to Notion blocks parser
  • Auto-categorización con IA

Semana 2:

  • Sistema de sincronización bidireccional
  • Webhooks/polling para cambios
  • File hosting para attachments
  • Dashboard de métricas Notion

Entregables:

  • Integración robusta con Notion
  • Sincronización bidireccional
  • Auto-categorización funcional

Sprint 4: Database y Escalabilidad (2 semanas)

Semana 1:

  • Setup PostgreSQL
  • Schema design y migrations (Alembic)
  • Migrar desde processed_files.txt
  • Implementar repository pattern

Semana 2:

  • Health checks avanzados
  • Prometheus metrics exporter
  • Logging rotativo
  • Error tracking (Sentry)

Entregables:

  • Database production-ready
  • Observabilidad completa
  • Sistema escalable

Sprint 5: Frontend Modernization (3 semanas)

Semana 1:

  • Setup React app
  • Componentizar UI
  • State management (Redux/Zustand)

Semana 2:

  • WebSocket integration
  • Real-time updates
  • File upload con progress

Semana 3:

  • Testing frontend (Jest)
  • Responsive design
  • Deployment production

Entregables:

  • Frontend moderno y mantenible
  • UX mejorada
  • Tests de frontend

Sprint 6: Features Avanzados (2 semanas)

Semana 1:

  • i18n (internacionalización)
  • Plugin system
  • Video processor (nuevo)

Semana 2:

  • Editor de prompts customizable
  • Historial de versiones avanzado
  • Reportes y analytics

Entregables:

  • Sistema extensible
  • Features premium
  • Analytics dashboard

🎯 MÉTRICAS DE ÉXITO

KPIs Sprint 1-2

  • 0 vulnerabilidades críticas
  • 50% code coverage
  • 100% autenticación en endpoints
  • < 100ms response time (API)

KPIs Sprint 3-4

  • 95% uptime
  • 80% code coverage
  • < 5 min tiempo de procesamiento (audio 1h)
  • 100% tasa de sincronización con Notion

KPIs Sprint 5-6

  • < 2s load time (frontend)
  • 90% user satisfaction
  • Soporte para 5+ idiomas
  • 100+ archivos procesados/día sin degradación

📚 RECURSOS Y DOCUMENTACIÓN

Librerías a Agregar

# requirements.txt (additions)

# Security
PyJWT>=2.8.0
flask-jwt-extended>=4.5.3
flask-limiter>=3.5.0
werkzeug>=3.0.0

# Queue & Cache
celery>=5.3.4
redis>=5.0.0
hiredis>=2.2.3

# Database
psycopg2-binary>=2.9.9
sqlalchemy>=2.0.23
alembic>=1.13.0

# Notion
notion-client>=2.2.1

# WebSockets
flask-socketio>=5.3.5
python-socketio>=5.10.0
eventlet>=0.33.3

# Monitoring
prometheus-client>=0.19.0
sentry-sdk>=1.39.1

# Testing
pytest>=7.4.3
pytest-cov>=4.1.0
pytest-asyncio>=0.21.1
pytest-mock>=3.12.0
faker>=22.0.0

# Type checking
mypy>=1.7.1
types-requests>=2.31.0

Scripts Útiles

# scripts/deploy.sh
#!/bin/bash
set -e

echo "Deploying CBCFacil..."

# Pull latest code
git pull origin main

# Activate venv
source .venv/bin/activate

# Install dependencies
pip install -r requirements.txt

# Run migrations
alembic upgrade head

# Restart services
sudo systemctl restart cbcfacil
sudo systemctl restart cbcfacil-worker
sudo systemctl restart nginx

echo "Deployment complete!"

🏁 CONCLUSIÓN

Este documento proporciona un roadmap completo para llevar CBCFacil de un prototipo funcional a un sistema production-ready y enterprise-grade.

Próximos Pasos Inmediatos

  1. DÍA 1: Cambiar Notion API token, fix vulnerabilidades críticas
  2. SEMANA 1: Implementar autenticación y rate limiting
  3. SEMANA 2: Setup testing infrastructure
  4. MES 1: Completar Sprint 1-2

Prioridad de Implementación

CRÍTICO (Ahora):
├── Seguridad básica
├── Fixes de bugs
└── Tests fundamentales

ALTO (2-4 semanas):
├── Performance (Celery + Redis)
├── Notion integration avanzada
└── Database migration

MEDIO (1-2 meses):
├── Frontend modernization
├── Observabilidad completa
└── Features avanzados

Estado Final Esperado: Sistema production-ready con 80%+ coverage, seguridad robusta, integración avanzada con Notion, y arquitectura escalable.


Documento generado el 26 de Enero 2026
Versión: 1.0
Autor: CBCFacil Development Team