- Instalado notion-client SDK oficial para integración robusta - Refactorizado services/notion_service.py con SDK oficial de Notion - Rate limiting con retry y exponential backoff - Parser Markdown → Notion blocks (headings, bullets, paragraphs) - Soporte para pages y databases - Manejo robusto de errores - Integración automática en document/generators.py - PDFs se suben automáticamente a Notion después de generarse - Contenido completo del resumen formateado con bloques - Metadata rica (tipo de archivo, path, fecha) - Configuración de Notion en main.py - Inicialización automática al arrancar el servicio - Validación de credenciales - Actualizado config/settings.py - Agregado load_dotenv() para cargar variables de .env - Configuración de Notion (NOTION_API, NOTION_DATABASE_ID) - Scripts de utilidad creados: - test_notion_integration.py: Test de subida a Notion - test_pipeline_notion.py: Test del pipeline completo - verify_notion_permissions.py: Verificación de permisos - list_notion_pages.py: Listar páginas accesibles - diagnose_notion.py: Diagnóstico completo - create_notion_database.py: Crear database automáticamente - restart_service.sh: Script de reinicio del servicio - Documentación completa en opus.md: - Análisis exhaustivo del codebase (42 archivos Python) - Bugs críticos identificados y soluciones - Mejoras de seguridad (autenticación, rate limiting, CORS, CSP) - Optimizaciones de rendimiento (Celery, Redis, PostgreSQL, WebSockets) - Plan de testing (estructura, ejemplos, 80% coverage goal) - Roadmap de implementación (6 sprints detallados) - Integración avanzada con Notion documentada Estado: Notion funcionando correctamente, PDFs se suben automáticamente
70 KiB
🚀 CBCFacil - Plan de Mejoras y Optimizaciones
Fecha: 26 de Enero 2026
Proyecto: CBCFacil v9
Documentación: Mejoras, Fixes de Bugs, Recomendaciones e Integración con Notion
📋 TABLA DE CONTENIDOS
- Resumen Ejecutivo
- Bugs Críticos a Corregir
- Mejoras de Seguridad
- Optimizaciones de Rendimiento
- Mejoras de Código y Mantenibilidad
- Integración Avanzada con Notion
- Plan de Testing
- Roadmap de Implementación
📊 RESUMEN EJECUTIVO
CBCFacil es un sistema de procesamiento de documentos con IA bien arquitectado, pero requiere mejoras críticas en seguridad, testing y escalabilidad antes de ser considerado production-ready.
Calificación General
Arquitectura: ████████░░ 8/10
Código: ███████░░░ 7/10
Seguridad: ████░░░░░░ 4/10
Testing: ░░░░░░░░░░ 0/10
Documentación: █████████░ 9/10
Performance: ██████░░░░ 6/10
TOTAL: ██████░░░░ 5.7/10
Prioridades
- 🔴 CRÍTICO: Seguridad básica + Tests fundamentales (Sprint 1)
- 🟡 ALTO: Performance y escalabilidad (Sprint 2)
- 🟢 MEDIO: Frontend modernization y features avanzados (Sprint 3-4)
🐛 BUGS CRÍTICOS A CORREGIR
1. 🔴 Notion API Token Expuesto en .env.example
Ubicación: config/settings.py:47, .env.example
Problema:
# .env.example contiene un token real de Notion
NOTION_API_TOKEN=secret_XXX...REAL_TOKEN...XXX
Riesgo: Alta - Token expuesto públicamente en repositorio
Solución:
# .env.example
NOTION_API_TOKEN=secret_YOUR_NOTION_TOKEN_HERE_replace_this
NOTION_DATABASE_ID=your_database_id_here
Acción Inmediata:
- Cambiar el token de Notion desde la consola de Notion
- Actualizar
.env.examplecon placeholder - Verificar que
.envesté en.gitignore - Escanear el historial de Git por tokens expuestos
2. 🔴 Path Traversal Vulnerability en /downloads
Ubicación: api/routes.py:142-148
Problema:
@app.route('/downloads/<path:filepath>')
def serve_file(filepath):
safe_path = os.path.normpath(filepath)
# Validación insuficiente - puede ser bypasseada con symlinks
if '..' in filepath or filepath.startswith('/'):
abort(403)
Riesgo: Alta - Acceso no autorizado a archivos del sistema
Solución:
from werkzeug.security import safe_join
from pathlib import Path
@app.route('/downloads/<path:filepath>')
def serve_file(filepath):
# Sanitizar filename
safe_filename = secure_filename(filepath)
# Usar safe_join para prevenir path traversal
base_dir = settings.LOCAL_DOWNLOADS_PATH
safe_path = safe_join(str(base_dir), safe_filename)
if safe_path is None:
abort(403, "Access denied")
# Verificar que el path resuelto está dentro del directorio permitido
resolved_path = Path(safe_path).resolve()
if not str(resolved_path).startswith(str(base_dir.resolve())):
abort(403, "Access denied")
if not resolved_path.exists() or not resolved_path.is_file():
abort(404)
return send_file(resolved_path)
3. 🔴 SECRET_KEY Generado Aleatoriamente
Ubicación: api/routes.py:30
Problema:
# Se genera un SECRET_KEY aleatorio si no existe
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', os.urandom(24).hex())
Riesgo: Media - Sesiones inválidas tras cada restart, inseguro en producción
Solución:
# config/settings.py
@property
def SECRET_KEY(self) -> str:
key = os.getenv('SECRET_KEY')
if not key:
raise ValueError(
"SECRET_KEY is required in production. "
"Generate one with: python -c 'import secrets; print(secrets.token_hex(32))'"
)
return key
# api/routes.py
app.config['SECRET_KEY'] = settings.SECRET_KEY
Acción:
# Generar secret key seguro
python -c 'import secrets; print(secrets.token_hex(32))' >> .env
# Agregar a .env
SECRET_KEY=<generated_key>
4. 🔴 Imports Dentro de Funciones
Ubicación: main.py:306-342
Problema:
def process_audio_file(audio_path: Path):
from processors.audio_processor import audio_processor # Import dentro
from document.generators import DocumentGenerator # de función
# ...
Riesgo: Media - Performance hit, problemas de circular imports
Solución:
# main.py (top level)
from processors.audio_processor import audio_processor
from processors.pdf_processor import pdf_processor
from document.generators import DocumentGenerator
# Eliminar todos los imports de dentro de funciones
def process_audio_file(audio_path: Path):
# Usar imports globales
result = audio_processor.process(audio_path)
# ...
5. 🔴 No Hay Autenticación en API
Ubicación: api/routes.py (todos los endpoints)
Problema: Cualquier usuario puede acceder a todos los endpoints sin autenticación
Riesgo: Crítica - Exposición de datos y control no autorizado
Solución con API Key:
# config/settings.py
@property
def API_KEY(self) -> Optional[str]:
return os.getenv('API_KEY')
@property
def REQUIRE_AUTH(self) -> bool:
return os.getenv('REQUIRE_AUTH', 'true').lower() == 'true'
# api/auth.py (nuevo archivo)
from functools import wraps
from flask import request, abort, jsonify
from config import settings
def require_api_key(f):
"""Decorator to require API key authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not settings.REQUIRE_AUTH:
return f(*args, **kwargs)
api_key = request.headers.get('X-API-Key')
if not api_key:
abort(401, {'error': 'API key required'})
if api_key != settings.API_KEY:
abort(403, {'error': 'Invalid API key'})
return f(*args, **kwargs)
return decorated_function
# api/routes.py
from api.auth import require_api_key
@app.route('/api/files')
@require_api_key
def get_files():
# ...
Solución con JWT (más robusta):
# requirements.txt
PyJWT>=2.8.0
flask-jwt-extended>=4.5.3
# api/auth.py
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
jwt = JWTManager(app)
@app.route('/api/login', methods=['POST'])
def login():
username = request.json.get('username')
password = request.json.get('password')
# Validar credenciales (usar bcrypt en producción)
if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD:
access_token = create_access_token(identity=username)
return jsonify(access_token=access_token)
abort(401)
@app.route('/api/files')
@jwt_required()
def get_files():
current_user = get_jwt_identity()
# ...
6. 🟡 Truncamiento de Texto en Resúmenes
Ubicación: document/generators.py:38, 61
Problema:
bullet_prompt = f"""...\nTexto:\n{text[:15000]}""" # Trunca a 15k chars
summary_prompt = f"""...\n{text[:20000]}\n...""" # Trunca a 20k chars
Riesgo: Media - Pérdida de información en documentos largos
Solución - Chunking Inteligente:
def _chunk_text(self, text: str, max_chunk_size: int = 15000) -> List[str]:
"""Split text into intelligent chunks by paragraphs"""
if len(text) <= max_chunk_size:
return [text]
chunks = []
current_chunk = []
current_size = 0
# Split by double newlines (paragraphs)
paragraphs = text.split('\n\n')
for para in paragraphs:
para_size = len(para)
if current_size + para_size > max_chunk_size:
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
current_chunk = []
current_size = 0
current_chunk.append(para)
current_size += para_size
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
return chunks
def generate_summary(self, text: str, base_name: str):
"""Generate summary with intelligent chunking"""
chunks = self._chunk_text(text, max_chunk_size=15000)
# Process each chunk and combine
all_bullets = []
for i, chunk in enumerate(chunks):
self.logger.info(f"Processing chunk {i+1}/{len(chunks)}")
bullet_prompt = f"""Analiza el siguiente texto (parte {i+1} de {len(chunks)})...\n{chunk}"""
bullets = self.ai_provider.generate_text(bullet_prompt)
all_bullets.append(bullets)
# Combine all bullets
combined_bullets = '\n'.join(all_bullets)
# Generate unified summary from combined bullets
# ...
7. 🟡 Cache Key Usa Solo 500 Caracteres
Ubicación: services/ai_service.py:111
Problema:
def _get_cache_key(self, prompt: str, model: str = "default") -> str:
content = f"{model}:{prompt[:500]}" # Solo primeros 500 chars
return hashlib.sha256(content.encode()).hexdigest()
Riesgo: Media - Colisiones de cache en prompts similares
Solución:
def _get_cache_key(self, prompt: str, model: str = "default") -> str:
"""Generate cache key from full prompt hash"""
content = f"{model}:{prompt}" # Hash completo del prompt
return hashlib.sha256(content.encode()).hexdigest()
8. 🟡 Bloom Filter Usa MD5
Ubicación: storage/processed_registry.py:24
Problema:
import hashlib
def _hash(self, item: str) -> int:
return int(hashlib.md5(item.encode()).hexdigest(), 16) # MD5 no es seguro
Riesgo: Baja - MD5 obsoleto, posibles colisiones
Solución:
def _hash(self, item: str) -> int:
"""Use SHA256 instead of MD5 for better collision resistance"""
return int(hashlib.sha256(item.encode()).hexdigest(), 16) % (2**64)
🔒 MEJORAS DE SEGURIDAD
1. Implementar Rate Limiting
Instalar flask-limiter:
pip install flask-limiter
Implementación:
# api/routes.py
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
storage_uri="redis://localhost:6379" # O memory:// para testing
)
@app.route('/api/files')
@limiter.limit("30 per minute")
@require_api_key
def get_files():
# ...
@app.route('/api/regenerate-summary', methods=['POST'])
@limiter.limit("5 per minute") # Más restrictivo para operaciones costosas
@require_api_key
def regenerate_summary():
# ...
2. Configurar CORS Restrictivo
Ubicación: api/routes.py:25
Problema:
CORS(app) # Permite todos los orígenes (*)
Solución:
# config/settings.py
@property
def CORS_ORIGINS(self) -> List[str]:
origins_str = os.getenv('CORS_ORIGINS', 'http://localhost:5000')
return [o.strip() for o in origins_str.split(',')]
# api/routes.py
from flask_cors import CORS
CORS(app, resources={
r"/api/*": {
"origins": settings.CORS_ORIGINS,
"methods": ["GET", "POST", "DELETE"],
"allow_headers": ["Content-Type", "X-API-Key", "Authorization"],
"expose_headers": ["Content-Type"],
"supports_credentials": True,
"max_age": 3600
}
})
Configuración .env:
# Producción
CORS_ORIGINS=https://cbcfacil.com,https://app.cbcfacil.com
# Desarrollo
CORS_ORIGINS=http://localhost:5000,http://localhost:3000
3. Implementar Content Security Policy (CSP)
Nueva funcionalidad:
# api/security.py (nuevo archivo)
from flask import make_response
def add_security_headers(response):
"""Add security headers to all responses"""
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; "
"font-src 'self' https://fonts.gstatic.com; "
"img-src 'self' data: https:; "
"connect-src 'self'"
)
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
return response
# api/routes.py
from api.security import add_security_headers
@app.after_request
def apply_security_headers(response):
return add_security_headers(response)
4. Sanitizar Inputs y Outputs
Nueva funcionalidad:
# core/sanitizer.py (nuevo archivo)
import re
import html
from pathlib import Path
class InputSanitizer:
"""Sanitize user inputs"""
@staticmethod
def sanitize_filename(filename: str) -> str:
"""Remove dangerous characters from filename"""
# Remove path separators
filename = filename.replace('/', '_').replace('\\', '_')
# Remove null bytes
filename = filename.replace('\x00', '')
# Limit length
filename = filename[:255]
# Remove leading/trailing dots and spaces
filename = filename.strip('. ')
return filename
@staticmethod
def sanitize_html(text: str) -> str:
"""Escape HTML to prevent XSS"""
return html.escape(text)
@staticmethod
def sanitize_path(path: str, base_dir: Path) -> Path:
"""Ensure path is within base directory"""
from werkzeug.security import safe_join
safe_path = safe_join(str(base_dir), path)
if safe_path is None:
raise ValueError("Invalid path")
resolved = Path(safe_path).resolve()
if not str(resolved).startswith(str(base_dir.resolve())):
raise ValueError("Path traversal attempt")
return resolved
# Uso en api/routes.py
from core.sanitizer import InputSanitizer
@app.route('/api/transcription/<filename>')
@require_api_key
def get_transcription(filename):
# Sanitizar filename
safe_filename = InputSanitizer.sanitize_filename(filename)
# ...
5. Filtrar Información Sensible de Logs
Implementación:
# core/logging_filter.py (nuevo archivo)
import logging
import re
class SensitiveDataFilter(logging.Filter):
"""Filter sensitive data from logs"""
PATTERNS = [
(re.compile(r'(api[_-]?key["\']?\s*[:=]\s*["\']?)([^"\']+)(["\']?)', re.I), r'\1***REDACTED***\3'),
(re.compile(r'(token["\']?\s*[:=]\s*["\']?)([^"\']+)(["\']?)', re.I), r'\1***REDACTED***\3'),
(re.compile(r'(password["\']?\s*[:=]\s*["\']?)([^"\']+)(["\']?)', re.I), r'\1***REDACTED***\3'),
(re.compile(r'(secret["\']?\s*[:=]\s*["\']?)([^"\']+)(["\']?)', re.I), r'\1***REDACTED***\3'),
]
def filter(self, record):
message = record.getMessage()
for pattern, replacement in self.PATTERNS:
message = pattern.sub(replacement, message)
record.msg = message
record.args = ()
return True
# main.py
from core.logging_filter import SensitiveDataFilter
# Agregar filtro a todos los handlers
for handler in logging.root.handlers:
handler.addFilter(SensitiveDataFilter())
6. Usar HTTPS con Reverse Proxy
nginx configuration:
# /etc/nginx/sites-available/cbcfacil
server {
listen 80;
server_name cbcfacil.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name cbcfacil.com;
# SSL Configuration
ssl_certificate /etc/letsencrypt/live/cbcfacil.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/cbcfacil.com/privkey.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
# Security Headers
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
add_header X-Frame-Options "DENY" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
# Rate Limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req zone=api burst=20 nodelay;
# Proxy to Flask
location / {
proxy_pass http://127.0.0.1:5000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeouts
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
# Static files caching
location /static/ {
alias /home/app/cbcfacil/static/;
expires 1y;
add_header Cache-Control "public, immutable";
}
}
⚡ OPTIMIZACIONES DE RENDIMIENTO
1. Implementar Queue System con Celery
Problema Actual: Processing síncrono bloquea el loop principal
Instalación:
pip install celery redis
Configuración:
# celery_app.py (nuevo archivo)
from celery import Celery
from config import settings
celery_app = Celery(
'cbcfacil',
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND
)
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=3600, # 1 hora
task_soft_time_limit=3300, # 55 minutos
)
# tasks/processing.py (nuevo archivo)
from celery_app import celery_app
from processors.audio_processor import audio_processor
from processors.pdf_processor import pdf_processor
from document.generators import DocumentGenerator
@celery_app.task(bind=True, max_retries=3)
def process_audio_task(self, audio_path: str):
"""Process audio file asynchronously"""
try:
result = audio_processor.process(Path(audio_path))
if result.success:
generator = DocumentGenerator()
generator.generate_summary(result.data['text'], result.data['base_name'])
return {'success': True, 'file': audio_path}
except Exception as e:
self.retry(exc=e, countdown=60)
@celery_app.task(bind=True, max_retries=3)
def process_pdf_task(self, pdf_path: str):
"""Process PDF file asynchronously"""
try:
result = pdf_processor.process(Path(pdf_path))
if result.success:
generator = DocumentGenerator()
generator.generate_summary(result.data['text'], result.data['base_name'])
return {'success': True, 'file': pdf_path}
except Exception as e:
self.retry(exc=e, countdown=60)
# main.py
from tasks.processing import process_audio_task, process_pdf_task
def process_new_files(files: List[Path]):
"""Queue files for processing"""
for file in files:
if file.suffix.lower() in ['.mp3', '.wav', '.m4a']:
task = process_audio_task.delay(str(file))
logger.info(f"Queued audio processing: {file.name} (task_id={task.id})")
elif file.suffix.lower() == '.pdf':
task = process_pdf_task.delay(str(file))
logger.info(f"Queued PDF processing: {file.name} (task_id={task.id})")
# config/settings.py
@property
def CELERY_BROKER_URL(self) -> str:
return os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
@property
def CELERY_RESULT_BACKEND(self) -> str:
return os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
Ejecutar workers:
# Terminal 1: Flask app
python main.py
# Terminal 2: Celery worker
celery -A celery_app worker --loglevel=info --concurrency=2
# Terminal 3: Celery beat (para tareas programadas)
celery -A celery_app beat --loglevel=info
2. Implementar Redis para Caching Distribuido
Problema: Cache LRU en memoria se pierde en restarts
Instalación:
pip install redis hiredis
Implementación:
# services/cache_service.py (nuevo archivo)
import redis
import json
import hashlib
from typing import Optional, Any
from config import settings
class CacheService:
"""Distributed cache with Redis"""
def __init__(self):
self.redis_client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
self.default_ttl = 3600 # 1 hora
def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
value = self.redis_client.get(key)
if value:
return json.loads(value)
return None
except Exception as e:
logger.error(f"Cache get error: {e}")
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""Set value in cache"""
try:
ttl = ttl or self.default_ttl
serialized = json.dumps(value)
return self.redis_client.setex(key, ttl, serialized)
except Exception as e:
logger.error(f"Cache set error: {e}")
return False
def delete(self, key: str) -> bool:
"""Delete key from cache"""
try:
return bool(self.redis_client.delete(key))
except Exception as e:
logger.error(f"Cache delete error: {e}")
return False
def get_or_compute(self, key: str, compute_fn, ttl: Optional[int] = None):
"""Get from cache or compute and store"""
cached = self.get(key)
if cached is not None:
return cached
value = compute_fn()
self.set(key, value, ttl)
return value
cache_service = CacheService()
# services/ai_service.py
from services.cache_service import cache_service
class AIService:
def generate_text(self, prompt: str, model: str = "default") -> str:
cache_key = self._get_cache_key(prompt, model)
# Usar Redis cache
def compute():
return self.ai_provider.generate_text(prompt)
return cache_service.get_or_compute(cache_key, compute, ttl=3600)
# config/settings.py
@property
def REDIS_HOST(self) -> str:
return os.getenv('REDIS_HOST', 'localhost')
@property
def REDIS_PORT(self) -> int:
return int(os.getenv('REDIS_PORT', '6379'))
@property
def REDIS_DB(self) -> int:
return int(os.getenv('REDIS_DB', '0'))
3. Migrar a PostgreSQL para Metadata
Problema: processed_files.txt no escala, falta ACID
Instalación:
pip install psycopg2-binary sqlalchemy alembic
Schema:
# models/database.py (nuevo archivo)
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, JSON, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from config import settings
Base = declarative_base()
class ProcessedFile(Base):
__tablename__ = 'processed_files'
id = Column(Integer, primary_key=True)
filename = Column(String(255), unique=True, nullable=False, index=True)
filepath = Column(String(512), nullable=False)
file_type = Column(String(50), nullable=False) # audio, pdf, text
status = Column(String(50), default='pending') # pending, processing, completed, failed
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
processed_at = Column(DateTime)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Processing results
transcription_text = Column(Text)
summary_text = Column(Text)
# Generated files
markdown_path = Column(String(512))
docx_path = Column(String(512))
pdf_path = Column(String(512))
# Metadata
file_size = Column(Integer)
duration = Column(Integer) # For audio files
page_count = Column(Integer) # For PDFs
# Notion integration
notion_uploaded = Column(Boolean, default=False)
notion_page_id = Column(String(255))
# Metrics
processing_time = Column(Integer) # seconds
error_message = Column(Text)
retry_count = Column(Integer, default=0)
# Additional metadata
metadata = Column(JSON)
# Database session
engine = create_engine(settings.DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# storage/processed_registry.py (refactor)
from models.database import ProcessedFile, get_db
class ProcessedRegistry:
def is_processed(self, filename: str) -> bool:
with get_db() as db:
return db.query(ProcessedFile).filter_by(
filename=filename,
status='completed'
).first() is not None
def mark_processed(self, filename: str, metadata: dict):
with get_db() as db:
file_record = ProcessedFile(
filename=filename,
filepath=metadata.get('filepath'),
file_type=metadata.get('file_type'),
status='completed',
processed_at=datetime.utcnow(),
transcription_text=metadata.get('transcription'),
summary_text=metadata.get('summary'),
markdown_path=metadata.get('markdown_path'),
docx_path=metadata.get('docx_path'),
pdf_path=metadata.get('pdf_path'),
notion_uploaded=metadata.get('notion_uploaded', False),
processing_time=metadata.get('processing_time'),
metadata=metadata
)
db.add(file_record)
db.commit()
# config/settings.py
@property
def DATABASE_URL(self) -> str:
return os.getenv(
'DATABASE_URL',
'postgresql://cbcfacil:password@localhost/cbcfacil'
)
Migrations con Alembic:
# Inicializar Alembic
alembic init migrations
# Crear migración
alembic revision --autogenerate -m "Create processed_files table"
# Aplicar migración
alembic upgrade head
4. WebSockets para Updates en Tiempo Real
Instalación:
pip install flask-socketio python-socketio eventlet
Implementación:
# api/routes.py
from flask_socketio import SocketIO, emit
socketio = SocketIO(app, cors_allowed_origins=settings.CORS_ORIGINS, async_mode='eventlet')
@socketio.on('connect')
def handle_connect():
emit('connected', {'message': 'Connected to CBCFacil'})
@socketio.on('subscribe_file')
def handle_subscribe(data):
filename = data.get('filename')
# Join room para recibir updates de este archivo
join_room(filename)
# tasks/processing.py
from api.routes import socketio
@celery_app.task(bind=True)
def process_audio_task(self, audio_path: str):
filename = Path(audio_path).name
# Notificar inicio
socketio.emit('processing_started', {
'filename': filename,
'status': 'processing'
}, room=filename)
try:
# Progress updates
socketio.emit('processing_progress', {
'filename': filename,
'progress': 25,
'stage': 'transcription'
}, room=filename)
result = audio_processor.process(Path(audio_path))
socketio.emit('processing_progress', {
'filename': filename,
'progress': 75,
'stage': 'summary_generation'
}, room=filename)
generator = DocumentGenerator()
generator.generate_summary(result.data['text'], result.data['base_name'])
# Notificar completado
socketio.emit('processing_completed', {
'filename': filename,
'status': 'completed',
'progress': 100
}, room=filename)
except Exception as e:
socketio.emit('processing_failed', {
'filename': filename,
'status': 'failed',
'error': str(e)
}, room=filename)
raise
# templates/index.html (JavaScript)
const socket = io('http://localhost:5000');
socket.on('connect', () => {
console.log('Connected to server');
});
socket.on('processing_started', (data) => {
showNotification(`Processing started: ${data.filename}`);
});
socket.on('processing_progress', (data) => {
updateProgressBar(data.filename, data.progress, data.stage);
});
socket.on('processing_completed', (data) => {
showNotification(`Completed: ${data.filename}`, 'success');
refreshFileList();
});
socket.on('processing_failed', (data) => {
showNotification(`Failed: ${data.filename} - ${data.error}`, 'error');
});
// Subscribir a archivo específico
function subscribeToFile(filename) {
socket.emit('subscribe_file', { filename: filename });
}
📝 MEJORAS DE CÓDIGO Y MANTENIBILIDAD
1. Agregar Type Hints Completos
Problema: No todos los métodos tienen type hints
Solución:
# Usar mypy para verificar
pip install mypy
# pyproject.toml
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
# Ejecutar
mypy cbcfacil/
2. Implementar Logging Rotativo
Problema: main.log puede crecer indefinidamente
Solución:
# main.py
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
# Rotar por tamaño (max 10MB, 5 backups)
file_handler = RotatingFileHandler(
'main.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
# O rotar diariamente
file_handler = TimedRotatingFileHandler(
'main.log',
when='midnight',
interval=1,
backupCount=30 # Mantener 30 días
)
file_handler.setFormatter(formatter)
logging.root.addHandler(file_handler)
3. Agregar Health Checks Avanzados
# core/health_check.py (mejorado)
class HealthCheckService:
def get_full_status(self) -> Dict[str, Any]:
"""Get comprehensive health status"""
return {
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'version': settings.APP_VERSION,
'checks': {
'database': self._check_database(),
'redis': self._check_redis(),
'celery': self._check_celery(),
'gpu': self._check_gpu(),
'disk_space': self._check_disk_space(),
'external_apis': {
'nextcloud': self._check_nextcloud(),
'notion': self._check_notion(),
'telegram': self._check_telegram(),
'claude': self._check_claude(),
'gemini': self._check_gemini(),
}
},
'metrics': {
'processed_files_today': self._count_processed_today(),
'queue_size': self._get_queue_size(),
'avg_processing_time': self._get_avg_processing_time(),
'error_rate': self._get_error_rate(),
}
}
def _check_database(self) -> Dict[str, Any]:
try:
from models.database import engine
with engine.connect() as conn:
conn.execute("SELECT 1")
return {'status': 'healthy'}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
def _check_redis(self) -> Dict[str, Any]:
try:
from services.cache_service import cache_service
cache_service.redis_client.ping()
return {'status': 'healthy'}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
def _check_celery(self) -> Dict[str, Any]:
try:
from celery_app import celery_app
stats = celery_app.control.inspect().stats()
active = celery_app.control.inspect().active()
return {
'status': 'healthy' if stats else 'unhealthy',
'workers': len(stats) if stats else 0,
'active_tasks': sum(len(tasks) for tasks in active.values()) if active else 0
}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
4. Modularizar Frontend
Problema: index.html tiene 2500+ líneas
Solución - Migrar a React:
# Crear frontend moderno
npx create-react-app frontend
cd frontend
npm install axios socket.io-client recharts date-fns
Estructura propuesta:
frontend/
├── src/
│ ├── components/
│ │ ├── Dashboard/
│ │ │ ├── StatsCards.jsx
│ │ │ ├── ProcessingQueue.jsx
│ │ │ └── SystemHealth.jsx
│ │ ├── Files/
│ │ │ ├── FileList.jsx
│ │ │ ├── FileItem.jsx
│ │ │ └── FileUpload.jsx
│ │ ├── Preview/
│ │ │ ├── PreviewPanel.jsx
│ │ │ ├── TranscriptionView.jsx
│ │ │ └── SummaryView.jsx
│ │ ├── Versions/
│ │ │ └── VersionHistory.jsx
│ │ └── Layout/
│ │ ├── Sidebar.jsx
│ │ ├── Header.jsx
│ │ └── Footer.jsx
│ ├── hooks/
│ │ ├── useWebSocket.js
│ │ ├── useFiles.js
│ │ └── useAuth.js
│ ├── services/
│ │ ├── api.js
│ │ └── socket.js
│ ├── store/
│ │ └── store.js (Redux/Zustand)
│ ├── App.jsx
│ └── index.jsx
└── package.json
🔗 INTEGRACIÓN AVANZADA CON NOTION
Estado Actual
La integración con Notion está parcialmente implementada en services/notion_service.py y document/generators.py. Actualmente:
- ✅ Upload de PDFs a Notion database
- ✅ Creación de páginas con título y status
- ⚠️ Upload con base64 (limitado a 5MB por la API de Notion)
- ❌ No hay sincronización bidireccional
- ❌ No se actualizan páginas existentes
- ❌ No se manejan rate limits de Notion
- ❌ No hay webhook para cambios en Notion
Mejoras Propuestas
1. Migrar a Cliente Oficial de Notion
Problema: Uso directo de requests sin manejo de rate limits
Solución:
pip install notion-client
# services/notion_service.py (refactorizado)
from notion_client import Client
from notion_client.errors import APIResponseError
import time
from typing import Optional, Dict, Any, List
from pathlib import Path
import logging
class NotionService:
"""Enhanced Notion integration service"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._client: Optional[Client] = None
self._database_id: Optional[str] = None
self._rate_limiter = RateLimiter(max_requests=3, time_window=1) # 3 req/sec
def configure(self, token: str, database_id: str) -> None:
"""Configure Notion with official SDK"""
self._client = Client(auth=token)
self._database_id = database_id
self.logger.info("Notion service configured with official SDK")
@property
def is_configured(self) -> bool:
return bool(self._client and self._database_id)
def _rate_limited_request(self, func, *args, **kwargs):
"""Execute request with rate limiting and retry"""
max_retries = 3
base_delay = 1
for attempt in range(max_retries):
try:
self._rate_limiter.wait()
return func(*args, **kwargs)
except APIResponseError as e:
if e.code == 'rate_limited':
delay = base_delay * (2 ** attempt) # Exponential backoff
self.logger.warning(f"Rate limited, waiting {delay}s")
time.sleep(delay)
else:
raise
raise Exception("Max retries exceeded")
def create_page(self, title: str, content: str, metadata: Dict[str, Any]) -> Optional[str]:
"""Create a new page in Notion database"""
if not self.is_configured:
self.logger.warning("Notion not configured")
return None
try:
# Preparar properties
properties = {
"Name": {
"title": [
{
"text": {
"content": title
}
}
]
},
"Status": {
"select": {
"name": "Procesado"
}
},
"Tipo": {
"select": {
"name": metadata.get('file_type', 'Desconocido')
}
},
"Fecha Procesamiento": {
"date": {
"start": metadata.get('processed_at', datetime.utcnow().isoformat())
}
}
}
# Agregar campos opcionales
if metadata.get('duration'):
properties["Duración (min)"] = {
"number": round(metadata['duration'] / 60, 2)
}
if metadata.get('page_count'):
properties["Páginas"] = {
"number": metadata['page_count']
}
# Crear página
page = self._rate_limited_request(
self._client.pages.create,
parent={"database_id": self._database_id},
properties=properties
)
page_id = page['id']
self.logger.info(f"Notion page created: {page_id}")
# Agregar contenido como bloques
self._add_content_blocks(page_id, content)
return page_id
except Exception as e:
self.logger.error(f"Error creating Notion page: {e}")
return None
def _add_content_blocks(self, page_id: str, content: str) -> bool:
"""Add content blocks to Notion page"""
try:
# Dividir contenido en secciones
sections = self._parse_markdown_to_blocks(content)
# Notion API limita a 100 bloques por request
for i in range(0, len(sections), 100):
batch = sections[i:i+100]
self._rate_limited_request(
self._client.blocks.children.append,
block_id=page_id,
children=batch
)
return True
except Exception as e:
self.logger.error(f"Error adding content blocks: {e}")
return False
def _parse_markdown_to_blocks(self, markdown: str) -> List[Dict]:
"""Convert markdown to Notion blocks"""
blocks = []
lines = markdown.split('\n')
for line in lines:
line = line.strip()
if not line:
continue
# Headings
if line.startswith('# '):
blocks.append({
"object": "block",
"type": "heading_1",
"heading_1": {
"rich_text": [{"type": "text", "text": {"content": line[2:]}}]
}
})
elif line.startswith('## '):
blocks.append({
"object": "block",
"type": "heading_2",
"heading_2": {
"rich_text": [{"type": "text", "text": {"content": line[3:]}}]
}
})
elif line.startswith('### '):
blocks.append({
"object": "block",
"type": "heading_3",
"heading_3": {
"rich_text": [{"type": "text", "text": {"content": line[4:]}}]
}
})
# Bullet points
elif line.startswith('- ') or line.startswith('* '):
blocks.append({
"object": "block",
"type": "bulleted_list_item",
"bulleted_list_item": {
"rich_text": [{"type": "text", "text": {"content": line[2:]}}]
}
})
# Paragraph
else:
# Notion limita rich_text a 2000 chars
if len(line) > 2000:
chunks = [line[i:i+2000] for i in range(0, len(line), 2000)]
for chunk in chunks:
blocks.append({
"object": "block",
"type": "paragraph",
"paragraph": {
"rich_text": [{"type": "text", "text": {"content": chunk}}]
}
})
else:
blocks.append({
"object": "block",
"type": "paragraph",
"paragraph": {
"rich_text": [{"type": "text", "text": {"content": line}}]
}
})
return blocks
def upload_file_to_page(self, page_id: str, file_path: Path, file_type: str = 'pdf') -> bool:
"""Upload file as external file to Notion page"""
if not file_path.exists():
self.logger.error(f"File not found: {file_path}")
return False
try:
# Notion no soporta upload directo, necesitas hosting externo
# Opción 1: Subir a Nextcloud y obtener link público
# Opción 2: Usar S3/MinIO
# Opción 3: Usar servicio de hosting dedicado
# Asumiendo que tienes un endpoint público para el archivo
file_url = self._get_public_url(file_path)
if not file_url:
self.logger.warning("Could not generate public URL for file")
return False
# Agregar como bloque de archivo
self._rate_limited_request(
self._client.blocks.children.append,
block_id=page_id,
children=[
{
"object": "block",
"type": "file",
"file": {
"type": "external",
"external": {
"url": file_url
}
}
}
]
)
return True
except Exception as e:
self.logger.error(f"Error uploading file to Notion: {e}")
return False
def _get_public_url(self, file_path: Path) -> Optional[str]:
"""Generate public URL for file (via Nextcloud or S3)"""
# Implementar según tu infraestructura
# Opción 1: Nextcloud share link
from services.webdav_service import webdav_service
# Subir a Nextcloud si no está
remote_path = f"/cbcfacil/{file_path.name}"
webdav_service.upload_file(file_path, remote_path)
# Generar share link (requiere Nextcloud API adicional)
# return webdav_service.create_share_link(remote_path)
# Opción 2: Usar el endpoint de downloads de tu API
return f"{settings.PUBLIC_API_URL}/downloads/{file_path.name}"
def update_page_status(self, page_id: str, status: str) -> bool:
"""Update page status"""
try:
self._rate_limited_request(
self._client.pages.update,
page_id=page_id,
properties={
"Status": {
"select": {
"name": status
}
}
}
)
return True
except Exception as e:
self.logger.error(f"Error updating page status: {e}")
return False
def search_pages(self, query: str) -> List[Dict]:
"""Search pages in database"""
try:
results = self._rate_limited_request(
self._client.databases.query,
database_id=self._database_id,
filter={
"property": "Name",
"title": {
"contains": query
}
}
)
return results.get('results', [])
except Exception as e:
self.logger.error(f"Error searching Notion pages: {e}")
return []
def get_page_content(self, page_id: str) -> Optional[str]:
"""Get page content as markdown"""
try:
blocks = self._rate_limited_request(
self._client.blocks.children.list,
block_id=page_id
)
markdown = self._blocks_to_markdown(blocks.get('results', []))
return markdown
except Exception as e:
self.logger.error(f"Error getting page content: {e}")
return None
def _blocks_to_markdown(self, blocks: List[Dict]) -> str:
"""Convert Notion blocks to markdown"""
markdown_lines = []
for block in blocks:
block_type = block.get('type')
if block_type == 'heading_1':
text = self._extract_text(block['heading_1'])
markdown_lines.append(f"# {text}")
elif block_type == 'heading_2':
text = self._extract_text(block['heading_2'])
markdown_lines.append(f"## {text}")
elif block_type == 'heading_3':
text = self._extract_text(block['heading_3'])
markdown_lines.append(f"### {text}")
elif block_type == 'bulleted_list_item':
text = self._extract_text(block['bulleted_list_item'])
markdown_lines.append(f"- {text}")
elif block_type == 'paragraph':
text = self._extract_text(block['paragraph'])
markdown_lines.append(text)
return '\n\n'.join(markdown_lines)
def _extract_text(self, block_data: Dict) -> str:
"""Extract text from Notion rich_text"""
rich_texts = block_data.get('rich_text', [])
return ''.join(rt.get('text', {}).get('content', '') for rt in rich_texts)
# Rate limiter helper
class RateLimiter:
def __init__(self, max_requests: int, time_window: float):
self.max_requests = max_requests
self.time_window = time_window
self.requests = []
def wait(self):
"""Wait if rate limit is reached"""
now = time.time()
# Remove old requests
self.requests = [r for r in self.requests if now - r < self.time_window]
# Wait if limit reached
if len(self.requests) >= self.max_requests:
sleep_time = self.time_window - (now - self.requests[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.requests = []
self.requests.append(now)
# Global instance
notion_service = NotionService()
2. Sincronización Bidireccional
Implementar webhooks para recibir cambios desde Notion:
# api/webhooks.py (nuevo archivo)
from flask import Blueprint, request, jsonify
from services.notion_service import notion_service
from tasks.sync import sync_notion_changes
webhooks_bp = Blueprint('webhooks', __name__)
@webhooks_bp.route('/webhooks/notion', methods=['POST'])
def notion_webhook():
"""Handle Notion webhook events"""
# Verificar signature (si Notion lo soporta)
# signature = request.headers.get('X-Notion-Signature')
# if not verify_signature(request.data, signature):
# abort(403)
data = request.json
# Procesar evento
event_type = data.get('type')
if event_type == 'page.updated':
page_id = data.get('page_id')
# Queue task para sincronizar cambios
sync_notion_changes.delay(page_id)
return jsonify({'status': 'ok'}), 200
# tasks/sync.py (nuevo archivo)
from celery_app import celery_app
from services.notion_service import notion_service
from models.database import ProcessedFile, get_db
@celery_app.task
def sync_notion_changes(page_id: str):
"""Sync changes from Notion back to local database"""
logger = logging.getLogger(__name__)
try:
# Obtener contenido actualizado de Notion
content = notion_service.get_page_content(page_id)
if not content:
logger.error(f"Could not fetch Notion page: {page_id}")
return
# Buscar registro local
with get_db() as db:
file_record = db.query(ProcessedFile).filter_by(
notion_page_id=page_id
).first()
if file_record:
file_record.summary_text = content
file_record.updated_at = datetime.utcnow()
db.commit()
logger.info(f"Synced changes from Notion for {file_record.filename}")
else:
logger.warning(f"No local record found for Notion page {page_id}")
except Exception as e:
logger.error(f"Error syncing Notion changes: {e}")
Configurar webhook en Notion:
# Nota: Notion actualmente no tiene webhooks nativos
# Alternativas:
# 1. Polling periódico (cada 5 min)
# 2. Usar servicios de terceros como Zapier/Make
# 3. Implementar polling con Celery beat
# tasks/sync.py
@celery_app.task
def poll_notion_changes():
"""Poll Notion for changes (scheduled task)"""
# Buscar páginas modificadas recientemente
# ...
3. Pipeline Completo de Integración con Notion
Diagrama del flujo:
┌─────────────────────────────────────────────────────────────┐
│ CBCFacil Pipeline │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ 1. Archivo detectado en │
│ Nextcloud │
└─────────────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ 2. Procesar (Audio/PDF) │
│ - Transcripción │
│ - OCR │
└─────────────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ 3. Generar Resumen con IA │
│ - Claude/Gemini │
│ - Formateo │
└─────────────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ 4. Crear Documentos │
│ - Markdown │
│ - DOCX │
│ - PDF │
└─────────────────────────────────┘
│
┌───────────┴──────────┐
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ 5a. Subir a │ │ 5b. Guardar en │
│ Notion │ │ Database │
│ - Crear página │ │ - PostgreSQL │
│ - Agregar │ │ - Metadata │
│ contenido │ │ - notion_page_id│
│ - Adjuntar PDF │ │ │
└──────────────────┘ └──────────────────┘
│ │
└───────────┬──────────┘
▼
┌─────────────────────────────────┐
│ 6. Notificar │
│ - Telegram │
│ - Email (opcional) │
│ - WebSocket (dashboard) │
└─────────────────────────────────┘
Implementación:
# document/generators.py (mejorado)
def generate_summary(self, text: str, base_name: str, file_metadata: Dict[str, Any]) -> Tuple[bool, str, Dict[str, Any]]:
"""Generate summary with full Notion integration"""
try:
# Steps 1-4: Existing logic
# ...
# Step 5: Upload to Notion with rich metadata
notion_page_id = None
if settings.has_notion_config:
try:
title = base_name.replace('_', ' ').title()
# Preparar metadata enriquecida
metadata = {
'file_type': file_metadata.get('file_type', 'Desconocido'),
'processed_at': datetime.utcnow().isoformat(),
'duration': file_metadata.get('duration'),
'page_count': file_metadata.get('page_count'),
'file_size': file_metadata.get('file_size'),
}
# Crear página en Notion
notion_page_id = notion_service.create_page(
title=title,
content=summary,
metadata=metadata
)
if notion_page_id:
self.logger.info(f"Notion page created: {notion_page_id}")
# Upload PDF to Notion page
notion_service.upload_file_to_page(
page_id=notion_page_id,
file_path=pdf_path,
file_type='pdf'
)
except Exception as e:
self.logger.warning(f"Notion integration failed: {e}")
# Update response metadata
metadata = {
'markdown_path': str(markdown_path),
'docx_path': str(docx_path),
'pdf_path': str(pdf_path),
'summary': summary,
'notion_page_id': notion_page_id,
'notion_uploaded': bool(notion_page_id),
}
return True, summary, metadata
except Exception as e:
self.logger.error(f"Document generation failed: {e}")
return False, "", {}
4. Configuración de Base de Datos Notion
Schema recomendado para la base de datos de Notion:
| Propiedad | Tipo | Descripción |
|---|---|---|
| Name | Title | Nombre del documento |
| Status | Select | Procesado / En Revisión / Aprobado |
| Tipo | Select | Audio / PDF / Texto |
| Fecha Procesamiento | Date | Cuándo se procesó |
| Duración (min) | Number | Para archivos de audio |
| Páginas | Number | Para PDFs |
| Tamaño (MB) | Number | Tamaño del archivo |
| Calidad | Select | Alta / Media / Baja |
| Categoría | Multi-select | Tags/categorías |
| Archivo Original | Files & Media | Link al archivo |
| Resumen PDF | Files & Media | PDF generado |
Script para crear la base de datos:
# scripts/setup_notion_database.py (nuevo archivo)
from notion_client import Client
import os
def create_cbcfacil_database(token: str, parent_page_id: str):
"""Create Notion database for CBCFacil"""
client = Client(auth=token)
database = client.databases.create(
parent={"type": "page_id", "page_id": parent_page_id},
title=[
{
"type": "text",
"text": {"content": "CBCFacil - Documentos Procesados"}
}
],
properties={
"Name": {
"title": {}
},
"Status": {
"select": {
"options": [
{"name": "Procesado", "color": "green"},
{"name": "En Revisión", "color": "yellow"},
{"name": "Aprobado", "color": "blue"},
{"name": "Error", "color": "red"},
]
}
},
"Tipo": {
"select": {
"options": [
{"name": "Audio", "color": "purple"},
{"name": "PDF", "color": "orange"},
{"name": "Texto", "color": "gray"},
]
}
},
"Fecha Procesamiento": {
"date": {}
},
"Duración (min)": {
"number": {
"format": "number_with_commas"
}
},
"Páginas": {
"number": {}
},
"Tamaño (MB)": {
"number": {
"format": "number_with_commas"
}
},
"Calidad": {
"select": {
"options": [
{"name": "Alta", "color": "green"},
{"name": "Media", "color": "yellow"},
{"name": "Baja", "color": "red"},
]
}
},
"Categoría": {
"multi_select": {
"options": [
{"name": "Historia", "color": "blue"},
{"name": "Ciencia", "color": "green"},
{"name": "Literatura", "color": "purple"},
{"name": "Política", "color": "red"},
]
}
},
}
)
print(f"Database created: {database['id']}")
print(f"Add this to your .env: NOTION_DATABASE_ID={database['id']}")
return database['id']
if __name__ == '__main__':
token = input("Enter your Notion API token: ")
parent_page_id = input("Enter the parent page ID: ")
create_cbcfacil_database(token, parent_page_id)
Ejecutar:
python scripts/setup_notion_database.py
5. Features Avanzados de Notion
Auto-categorización con IA:
# services/notion_service.py
def auto_categorize(self, summary: str) -> List[str]:
"""Auto-categorize content using AI"""
from services.ai import ai_provider_factory
ai = ai_provider_factory.get_best_provider()
prompt = f"""Analiza el siguiente resumen y asigna 1-3 categorías principales de esta lista:
- Historia
- Ciencia
- Literatura
- Política
- Economía
- Tecnología
- Filosofía
- Arte
- Deporte
Resumen: {summary[:500]}
Devuelve solo las categorías separadas por comas."""
categories_str = ai.generate_text(prompt)
categories = [c.strip() for c in categories_str.split(',')]
return categories[:3]
def create_page(self, title: str, content: str, metadata: Dict[str, Any]):
# ...
# Auto-categorizar
categories = self.auto_categorize(content)
properties["Categoría"] = {
"multi_select": [{"name": cat} for cat in categories]
}
# ...
Evaluación de calidad:
def assess_quality(self, transcription: str, summary: str) -> str:
"""Assess document quality based on metrics"""
# Criterios:
# - Longitud del resumen (500-700 palabras = Alta)
# - Coherencia (evaluar con IA)
# - Presencia de datos clave (fechas, nombres)
word_count = len(summary.split())
if word_count < 300:
return "Baja"
elif word_count < 600:
return "Media"
else:
return "Alta"
✅ PLAN DE TESTING
Estructura de Tests
tests/
├── unit/
│ ├── test_settings.py
│ ├── test_validators.py
│ ├── test_webdav_service.py
│ ├── test_vram_manager.py
│ ├── test_ai_service.py
│ ├── test_notion_service.py
│ ├── test_audio_processor.py
│ ├── test_pdf_processor.py
│ ├── test_document_generator.py
│ └── test_processed_registry.py
├── integration/
│ ├── test_audio_pipeline.py
│ ├── test_pdf_pipeline.py
│ ├── test_notion_integration.py
│ └── test_api_endpoints.py
├── e2e/
│ └── test_full_workflow.py
├── conftest.py
└── fixtures/
├── sample_audio.mp3
├── sample_pdf.pdf
└── mock_responses.json
Ejemplos de Tests
# tests/unit/test_notion_service.py
import pytest
from unittest.mock import Mock, patch
from services.notion_service import NotionService
@pytest.fixture
def notion_service():
service = NotionService()
service.configure(token="test_token", database_id="test_db")
return service
def test_notion_service_configuration(notion_service):
assert notion_service.is_configured
assert notion_service._database_id == "test_db"
@patch('notion_client.Client')
def test_create_page_success(mock_client, notion_service):
# Mock response
mock_client.return_value.pages.create.return_value = {
'id': 'page_123'
}
page_id = notion_service.create_page(
title="Test Page",
content="# Test Content",
metadata={'file_type': 'pdf'}
)
assert page_id == 'page_123'
def test_rate_limiter():
from services.notion_service import RateLimiter
import time
limiter = RateLimiter(max_requests=3, time_window=1.0)
# Should allow 3 requests immediately
start = time.time()
for _ in range(3):
limiter.wait()
elapsed = time.time() - start
assert elapsed < 0.1
# 4th request should wait
start = time.time()
limiter.wait()
elapsed = time.time() - start
assert elapsed >= 0.9
# tests/integration/test_notion_integration.py
@pytest.mark.integration
def test_full_notion_workflow(tmpdir):
"""Test complete workflow: process file -> create Notion page"""
# Setup
audio_file = tmpdir / "test_audio.mp3"
# ... create test file
# Process audio
from processors.audio_processor import audio_processor
result = audio_processor.process(audio_file)
# Generate summary
from document.generators import DocumentGenerator
generator = DocumentGenerator()
success, summary, metadata = generator.generate_summary(
result.data['text'],
'test_audio'
)
assert success
assert metadata.get('notion_page_id')
# Verify Notion page exists
from services.notion_service import notion_service
content = notion_service.get_page_content(metadata['notion_page_id'])
assert content is not None
Coverage Goal
# Ejecutar tests con coverage
pytest --cov=. --cov-report=html --cov-report=term
# Meta: 80% coverage
# - Unit tests: 90% coverage
# - Integration tests: 70% coverage
# - E2E tests: 60% coverage
📅 ROADMAP DE IMPLEMENTACIÓN
Sprint 1: Seguridad y Fixes Críticos (2 semanas)
Semana 1:
- Cambiar Notion API token
- Fix path traversal vulnerability
- Fix SECRET_KEY generation
- Mover imports a module level
- Implementar API authentication (JWT)
Semana 2:
- Configurar CORS restrictivo
- Agregar rate limiting (flask-limiter)
- Implementar CSP headers
- Input sanitization completo
- Filtrar info sensible de logs
Entregables:
- Sistema con seguridad básica
- Vulnerabilidades críticas resueltas
- Autenticación funcional
Sprint 2: Testing y Performance (2 semanas)
Semana 1:
- Setup testing infrastructure
- Unit tests para services (50% coverage)
- Integration tests para pipelines
- CI/CD con GitHub Actions
Semana 2:
- Implementar Celery + Redis
- Queue system para processing
- Cache distribuido con Redis
- WebSockets para updates en tiempo real
Entregables:
- 50% code coverage
- Processing asíncrono funcional
- Real-time dashboard updates
Sprint 3: Notion Integration Avanzada (2 semanas)
Semana 1:
- Migrar a notion-client oficial
- Implementar rate limiting para Notion
- Markdown to Notion blocks parser
- Auto-categorización con IA
Semana 2:
- Sistema de sincronización bidireccional
- Webhooks/polling para cambios
- File hosting para attachments
- Dashboard de métricas Notion
Entregables:
- Integración robusta con Notion
- Sincronización bidireccional
- Auto-categorización funcional
Sprint 4: Database y Escalabilidad (2 semanas)
Semana 1:
- Setup PostgreSQL
- Schema design y migrations (Alembic)
- Migrar desde processed_files.txt
- Implementar repository pattern
Semana 2:
- Health checks avanzados
- Prometheus metrics exporter
- Logging rotativo
- Error tracking (Sentry)
Entregables:
- Database production-ready
- Observabilidad completa
- Sistema escalable
Sprint 5: Frontend Modernization (3 semanas)
Semana 1:
- Setup React app
- Componentizar UI
- State management (Redux/Zustand)
Semana 2:
- WebSocket integration
- Real-time updates
- File upload con progress
Semana 3:
- Testing frontend (Jest)
- Responsive design
- Deployment production
Entregables:
- Frontend moderno y mantenible
- UX mejorada
- Tests de frontend
Sprint 6: Features Avanzados (2 semanas)
Semana 1:
- i18n (internacionalización)
- Plugin system
- Video processor (nuevo)
Semana 2:
- Editor de prompts customizable
- Historial de versiones avanzado
- Reportes y analytics
Entregables:
- Sistema extensible
- Features premium
- Analytics dashboard
🎯 MÉTRICAS DE ÉXITO
KPIs Sprint 1-2
- ✅ 0 vulnerabilidades críticas
- ✅ 50% code coverage
- ✅ 100% autenticación en endpoints
- ✅ < 100ms response time (API)
KPIs Sprint 3-4
- ✅ 95% uptime
- ✅ 80% code coverage
- ✅ < 5 min tiempo de procesamiento (audio 1h)
- ✅ 100% tasa de sincronización con Notion
KPIs Sprint 5-6
- ✅ < 2s load time (frontend)
- ✅ 90% user satisfaction
- ✅ Soporte para 5+ idiomas
- ✅ 100+ archivos procesados/día sin degradación
📚 RECURSOS Y DOCUMENTACIÓN
Librerías a Agregar
# requirements.txt (additions)
# Security
PyJWT>=2.8.0
flask-jwt-extended>=4.5.3
flask-limiter>=3.5.0
werkzeug>=3.0.0
# Queue & Cache
celery>=5.3.4
redis>=5.0.0
hiredis>=2.2.3
# Database
psycopg2-binary>=2.9.9
sqlalchemy>=2.0.23
alembic>=1.13.0
# Notion
notion-client>=2.2.1
# WebSockets
flask-socketio>=5.3.5
python-socketio>=5.10.0
eventlet>=0.33.3
# Monitoring
prometheus-client>=0.19.0
sentry-sdk>=1.39.1
# Testing
pytest>=7.4.3
pytest-cov>=4.1.0
pytest-asyncio>=0.21.1
pytest-mock>=3.12.0
faker>=22.0.0
# Type checking
mypy>=1.7.1
types-requests>=2.31.0
Scripts Útiles
# scripts/deploy.sh
#!/bin/bash
set -e
echo "Deploying CBCFacil..."
# Pull latest code
git pull origin main
# Activate venv
source .venv/bin/activate
# Install dependencies
pip install -r requirements.txt
# Run migrations
alembic upgrade head
# Restart services
sudo systemctl restart cbcfacil
sudo systemctl restart cbcfacil-worker
sudo systemctl restart nginx
echo "Deployment complete!"
🏁 CONCLUSIÓN
Este documento proporciona un roadmap completo para llevar CBCFacil de un prototipo funcional a un sistema production-ready y enterprise-grade.
Próximos Pasos Inmediatos
- DÍA 1: Cambiar Notion API token, fix vulnerabilidades críticas
- SEMANA 1: Implementar autenticación y rate limiting
- SEMANA 2: Setup testing infrastructure
- MES 1: Completar Sprint 1-2
Prioridad de Implementación
CRÍTICO (Ahora):
├── Seguridad básica
├── Fixes de bugs
└── Tests fundamentales
ALTO (2-4 semanas):
├── Performance (Celery + Redis)
├── Notion integration avanzada
└── Database migration
MEDIO (1-2 meses):
├── Frontend modernization
├── Observabilidad completa
└── Features avanzados
Estado Final Esperado: Sistema production-ready con 80%+ coverage, seguridad robusta, integración avanzada con Notion, y arquitectura escalable.
Documento generado el 26 de Enero 2026
Versión: 1.0
Autor: CBCFacil Development Team