""" Flask API routes for CBCFacil dashboard """ import os from datetime import datetime from pathlib import Path from typing import Dict, Any, List from flask import Flask, render_template, request, jsonify, send_from_directory from flask_cors import CORS from config import settings from storage import processed_registry from services.webdav_service import webdav_service from services import vram_manager def create_app() -> Flask: """Create and configure Flask application""" app = Flask(__name__) CORS(app) # Configure app app.config['SECRET_KEY'] = settings.DASHBOARD_SECRET_KEY or os.urandom(24) app.config['DOWNLOADS_FOLDER'] = str(settings.LOCAL_DOWNLOADS_PATH) @app.route('/') def index(): """Dashboard home page""" return render_template('index.html') @app.route('/api/files') def get_files(): """Get list of audio files""" try: files = get_audio_files() return jsonify({ 'success': True, 'files': files, 'total': len(files), 'processed': sum(1 for f in files if f['processed']), 'pending': sum(1 for f in files if not f['processed']) }) except Exception as e: app.logger.error(f"Error getting files: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/api/reprocess', methods=['POST']) def reprocess_file(): """Reprocess a file""" try: data = request.get_json() file_path = data.get('path') source = data.get('source', 'local') if not file_path: return jsonify({ 'success': False, 'message': "Path del archivo es requerido" }), 400 # TODO: Implement file reprocessing # This would trigger the main processing loop return jsonify({ 'success': True, 'message': f"Archivo {Path(file_path).name} enviado a reprocesamiento" }) except Exception as e: app.logger.error(f"Error reprocessing file: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/api/mark-unprocessed', methods=['POST']) def mark_unprocessed(): """Mark file as unprocessed""" try: data = request.get_json() file_path = data.get('path') if not file_path: return jsonify({ 'success': False, 'message': "Path del archivo es requerido" }), 400 success = processed_registry.remove(file_path) if success: return jsonify({ 'success': True, 'message': "Archivo marcado como no procesado" }) else: return jsonify({ 'success': False, 'message': "No se pudo marcar como no procesado" }), 500 except Exception as e: app.logger.error(f"Error marking unprocessed: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/api/refresh') def refresh_files(): """Refresh file list""" try: processed_registry.load() files = get_audio_files() return jsonify({ 'success': True, 'message': "Lista de archivos actualizada", 'files': files }) except Exception as e: app.logger.error(f"Error refreshing files: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/downloads/') def download_file(filename): """Download file""" try: # Validate path to prevent traversal and injection attacks normalized = Path(filename).resolve() base_downloads = Path(str(settings.LOCAL_DOWNLOADS_PATH)).resolve() base_docx = Path(str(settings.LOCAL_DOCX)).resolve() if '..' in filename or filename.startswith('/') or \ normalized.parts[0] in ['..', '...'] if len(normalized.parts) > 0 else False or \ not (normalized == base_downloads or normalized.is_relative_to(base_downloads) or normalized == base_docx or normalized.is_relative_to(base_docx)): return jsonify({'error': 'Invalid filename'}), 400 # Try downloads directory downloads_path = settings.LOCAL_DOWNLOADS_PATH / filename if downloads_path.exists(): return send_from_directory(str(settings.LOCAL_DOWNLOADS_PATH), filename) # Try resumenes_docx directory docx_path = settings.LOCAL_DOCX / filename if docx_path.exists(): return send_from_directory(str(settings.LOCAL_DOCX), filename) return jsonify({'error': 'File not found'}), 404 except Exception as e: app.logger.error(f"Error downloading file: {e}") return jsonify({'error': 'File not found'}), 404 @app.route('/health') def health_check(): """Health check endpoint""" gpu_info = vram_manager.get_usage() return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat(), 'processed_files_count': processed_registry.count(), 'gpu': gpu_info, 'config': { 'webdav_configured': settings.has_webdav_config, 'ai_configured': settings.has_ai_config, 'debug': settings.DEBUG } }) return app def get_audio_files() -> List[Dict[str, Any]]: """Get list of audio files from WebDAV and local""" files = [] # Get files from WebDAV if settings.has_webdav_config: try: webdav_files = webdav_service.list(settings.REMOTE_AUDIOS_FOLDER) for file_path in webdav_files: normalized_path = webdav_service.normalize_path(file_path) base_name = Path(normalized_path).name if any(normalized_path.lower().endswith(ext) for ext in settings.AUDIO_EXTENSIONS): is_processed = processed_registry.is_processed(normalized_path) files.append({ 'filename': base_name, 'path': normalized_path, 'source': 'webdav', 'processed': is_processed, 'size': 'Unknown', 'last_modified': 'Unknown', 'available_formats': get_available_formats(base_name) }) except Exception as e: app.logger.error(f"Error getting WebDAV files: {e}") # Get local files try: if settings.LOCAL_DOWNLOADS_PATH.exists(): for ext in settings.AUDIO_EXTENSIONS: for file_path in settings.LOCAL_DOWNLOADS_PATH.glob(f"*{ext}"): stat = file_path.stat() is_processed = processed_registry.is_processed(file_path.name) files.append({ 'filename': file_path.name, 'path': str(file_path), 'source': 'local', 'processed': is_processed, 'size': format_size(stat.st_size), 'last_modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'available_formats': get_available_formats(file_path.name) }) except Exception as e: app.logger.error(f"Error getting local files: {e}") # Remove duplicates (WebDAV takes precedence) unique_files = {} for file in files: key = file['filename'] if key not in unique_files or file['source'] == 'webdav': unique_files[key] = file return sorted(unique_files.values(), key=lambda x: x['filename']) def get_available_formats(audio_filename: str) -> Dict[str, bool]: """Check which output formats are available for an audio file""" base_name = Path(audio_filename).stem formats = { 'txt': False, 'md': False, 'pdf': False, 'docx': False } directories_to_check = [ settings.LOCAL_DOWNLOADS_PATH, settings.LOCAL_DOCX ] for directory in directories_to_check: if not directory.exists(): continue for ext in formats.keys(): name_variants = [ base_name, f"{base_name}_unificado", base_name.replace(' ', '_'), f"{base_name.replace(' ', '_')}_unificado", ] for name_variant in name_variants: file_path = directory / f"{name_variant}.{ext}" if file_path.exists(): formats[ext] = True break return formats def format_size(size_bytes: int) -> str: """Format size in human-readable format""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f} TB"