Files
cbc2027/api/routes.py

281 lines
9.5 KiB
Python

"""
Flask API routes for CBCFacil dashboard
"""
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List
from flask import Flask, render_template, request, jsonify, send_from_directory
from flask_cors import CORS
from ..config import settings
from ..storage import processed_registry
from ..services.webdav_service import webdav_service
from ..services import vram_manager
def create_app() -> Flask:
"""Create and configure Flask application"""
app = Flask(__name__)
CORS(app)
# Configure app
app.config['SECRET_KEY'] = settings.DASHBOARD_SECRET_KEY or os.urandom(24)
app.config['DOWNLOADS_FOLDER'] = str(settings.LOCAL_DOWNLOADS_PATH)
@app.route('/')
def index():
"""Dashboard home page"""
return render_template('index.html')
@app.route('/api/files')
def get_files():
"""Get list of audio files"""
try:
files = get_audio_files()
return jsonify({
'success': True,
'files': files,
'total': len(files),
'processed': sum(1 for f in files if f['processed']),
'pending': sum(1 for f in files if not f['processed'])
})
except Exception as e:
app.logger.error(f"Error getting files: {e}")
return jsonify({
'success': False,
'message': f"Error: {str(e)}"
}), 500
@app.route('/api/reprocess', methods=['POST'])
def reprocess_file():
"""Reprocess a file"""
try:
data = request.get_json()
file_path = data.get('path')
source = data.get('source', 'local')
if not file_path:
return jsonify({
'success': False,
'message': "Path del archivo es requerido"
}), 400
# TODO: Implement file reprocessing
# This would trigger the main processing loop
return jsonify({
'success': True,
'message': f"Archivo {Path(file_path).name} enviado a reprocesamiento"
})
except Exception as e:
app.logger.error(f"Error reprocessing file: {e}")
return jsonify({
'success': False,
'message': f"Error: {str(e)}"
}), 500
@app.route('/api/mark-unprocessed', methods=['POST'])
def mark_unprocessed():
"""Mark file as unprocessed"""
try:
data = request.get_json()
file_path = data.get('path')
if not file_path:
return jsonify({
'success': False,
'message': "Path del archivo es requerido"
}), 400
success = processed_registry.remove(file_path)
if success:
return jsonify({
'success': True,
'message': "Archivo marcado como no procesado"
})
else:
return jsonify({
'success': False,
'message': "No se pudo marcar como no procesado"
}), 500
except Exception as e:
app.logger.error(f"Error marking unprocessed: {e}")
return jsonify({
'success': False,
'message': f"Error: {str(e)}"
}), 500
@app.route('/api/refresh')
def refresh_files():
"""Refresh file list"""
try:
processed_registry.load()
files = get_audio_files()
return jsonify({
'success': True,
'message': "Lista de archivos actualizada",
'files': files
})
except Exception as e:
app.logger.error(f"Error refreshing files: {e}")
return jsonify({
'success': False,
'message': f"Error: {str(e)}"
}), 500
@app.route('/downloads/<path:filename>')
def download_file(filename):
"""Download file"""
try:
# Validate path to prevent traversal and injection attacks
normalized = Path(filename).resolve()
base_downloads = Path(str(settings.LOCAL_DOWNLOADS_PATH)).resolve()
base_docx = Path(str(settings.LOCAL_DOCX)).resolve()
if '..' in filename or filename.startswith('/') or \
normalized.parts[0] in ['..', '...'] if len(normalized.parts) > 0 else False or \
not (normalized == base_downloads or normalized.is_relative_to(base_downloads) or
normalized == base_docx or normalized.is_relative_to(base_docx)):
return jsonify({'error': 'Invalid filename'}), 400
# Try downloads directory
downloads_path = settings.LOCAL_DOWNLOADS_PATH / filename
if downloads_path.exists():
return send_from_directory(str(settings.LOCAL_DOWNLOADS_PATH), filename)
# Try resumenes_docx directory
docx_path = settings.LOCAL_DOCX / filename
if docx_path.exists():
return send_from_directory(str(settings.LOCAL_DOCX), filename)
return jsonify({'error': 'File not found'}), 404
except Exception as e:
app.logger.error(f"Error downloading file: {e}")
return jsonify({'error': 'File not found'}), 404
@app.route('/health')
def health_check():
"""Health check endpoint"""
gpu_info = vram_manager.get_usage()
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'processed_files_count': processed_registry.count(),
'gpu': gpu_info,
'config': {
'webdav_configured': settings.has_webdav_config,
'ai_configured': settings.has_ai_config,
'debug': settings.DEBUG
}
})
return app
def get_audio_files() -> List[Dict[str, Any]]:
"""Get list of audio files from WebDAV and local"""
files = []
# Get files from WebDAV
if settings.has_webdav_config:
try:
webdav_files = webdav_service.list(settings.REMOTE_AUDIOS_FOLDER)
for file_path in webdav_files:
normalized_path = webdav_service.normalize_path(file_path)
base_name = Path(normalized_path).name
if any(normalized_path.lower().endswith(ext) for ext in settings.AUDIO_EXTENSIONS):
is_processed = processed_registry.is_processed(normalized_path)
files.append({
'filename': base_name,
'path': normalized_path,
'source': 'webdav',
'processed': is_processed,
'size': 'Unknown',
'last_modified': 'Unknown',
'available_formats': get_available_formats(base_name)
})
except Exception as e:
app.logger.error(f"Error getting WebDAV files: {e}")
# Get local files
try:
if settings.LOCAL_DOWNLOADS_PATH.exists():
for ext in settings.AUDIO_EXTENSIONS:
for file_path in settings.LOCAL_DOWNLOADS_PATH.glob(f"*{ext}"):
stat = file_path.stat()
is_processed = processed_registry.is_processed(file_path.name)
files.append({
'filename': file_path.name,
'path': str(file_path),
'source': 'local',
'processed': is_processed,
'size': format_size(stat.st_size),
'last_modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'available_formats': get_available_formats(file_path.name)
})
except Exception as e:
app.logger.error(f"Error getting local files: {e}")
# Remove duplicates (WebDAV takes precedence)
unique_files = {}
for file in files:
key = file['filename']
if key not in unique_files or file['source'] == 'webdav':
unique_files[key] = file
return sorted(unique_files.values(), key=lambda x: x['filename'])
def get_available_formats(audio_filename: str) -> Dict[str, bool]:
"""Check which output formats are available for an audio file"""
base_name = Path(audio_filename).stem
formats = {
'txt': False,
'md': False,
'pdf': False,
'docx': False
}
directories_to_check = [
settings.LOCAL_DOWNLOADS_PATH,
settings.LOCAL_DOCX
]
for directory in directories_to_check:
if not directory.exists():
continue
for ext in formats.keys():
name_variants = [
base_name,
f"{base_name}_unificado",
base_name.replace(' ', '_'),
f"{base_name.replace(' ', '_')}_unificado",
]
for name_variant in name_variants:
file_path = directory / f"{name_variant}.{ext}"
if file_path.exists():
formats[ext] = True
break
return formats
def format_size(size_bytes: int) -> str:
"""Format size in human-readable format"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"