""" Flask API routes for CBCFacil dashboard """ import os import time from datetime import datetime from pathlib import Path from typing import Dict, Any, List from flask import Flask, render_template, request, jsonify, send_from_directory from flask_cors import CORS from config import settings from storage.processed_registry import processed_registry from services.webdav_service import webdav_service from services import vram_manager from document.generators import DocumentGenerator def create_app() -> Flask: """Create and configure Flask application""" # Get the project root directory (parent of api/) current_dir = Path(__file__).parent project_root = current_dir.parent template_dir = project_root / 'templates' app = Flask(__name__, template_folder=str(template_dir)) CORS(app) # Configure app app.config['SECRET_KEY'] = settings.DASHBOARD_SECRET_KEY or os.urandom(24) app.config['DOWNLOADS_FOLDER'] = str(settings.LOCAL_DOWNLOADS_PATH) @app.route('/') def index(): """Dashboard home page""" return render_template('index.html') @app.route('/api/files') def get_files(): """Get list of audio files""" try: files = get_audio_files() return jsonify({ 'success': True, 'files': files, 'total': len(files), 'processed': sum(1 for f in files if f['processed']), 'pending': sum(1 for f in files if not f['processed']) }) except Exception as e: app.logger.error(f"Error getting files: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/api/reprocess', methods=['POST']) def reprocess_file(): """Reprocess a file""" try: data = request.get_json() file_path = data.get('path') source = data.get('source', 'local') if not file_path: return jsonify({ 'success': False, 'message': "Path del archivo es requerido" }), 400 # TODO: Implement file reprocessing # This would trigger the main processing loop return jsonify({ 'success': True, 'message': f"Archivo {Path(file_path).name} enviado a reprocesamiento" }) except Exception as e: app.logger.error(f"Error reprocessing file: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/api/mark-unprocessed', methods=['POST']) def mark_unprocessed(): """Mark file as unprocessed""" try: data = request.get_json() file_path = data.get('path') if not file_path: return jsonify({ 'success': False, 'message': "Path del archivo es requerido" }), 400 success = processed_registry.remove(file_path) if success: return jsonify({ 'success': True, 'message': "Archivo marcado como no procesado" }) else: return jsonify({ 'success': False, 'message': "No se pudo marcar como no procesado" }), 500 except Exception as e: app.logger.error(f"Error marking unprocessed: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/api/refresh') def refresh_files(): """Refresh file list""" try: processed_registry.load() files = get_audio_files() return jsonify({ 'success': True, 'message': "Lista de archivos actualizada", 'files': files }) except Exception as e: app.logger.error(f"Error refreshing files: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/downloads/') def download_file(filename): """Download file""" try: # Validate path to prevent traversal and injection attacks normalized = Path(filename).resolve() base_downloads = Path(str(settings.LOCAL_DOWNLOADS_PATH)).resolve() base_docx = Path(str(settings.LOCAL_DOCX)).resolve() if '..' in filename or filename.startswith('/') or \ normalized.parts[0] in ['..', '...'] if len(normalized.parts) > 0 else False or \ not (normalized == base_downloads or normalized.is_relative_to(base_downloads) or normalized == base_docx or normalized.is_relative_to(base_docx)): return jsonify({'error': 'Invalid filename'}), 400 # Try downloads directory downloads_path = settings.LOCAL_DOWNLOADS_PATH / filename if downloads_path.exists(): return send_from_directory(str(settings.LOCAL_DOWNLOADS_PATH), filename) # Try resumenes_docx directory docx_path = settings.LOCAL_DOCX / filename if docx_path.exists(): return send_from_directory(str(settings.LOCAL_DOCX), filename) return jsonify({'error': 'File not found'}), 404 except Exception as e: app.logger.error(f"Error downloading file: {e}") return jsonify({'error': 'File not found'}), 404 @app.route('/downloads/find-file') def find_and_download_file(): """Find and download file with various name variants""" try: filename = request.args.get('filename', '') ext = request.args.get('ext', 'txt') if not filename: return jsonify({'error': 'Filename required'}), 400 # Validate to prevent path traversal if '..' in filename or filename.startswith('/'): return jsonify({'error': 'Invalid filename'}), 400 # Try various name variants base_name = filename.replace('_unificado', '').replace('_unified', '') name_variants = [ f"{base_name}.{ext}", f"{base_name}_unificado.{ext}", f"{base_name}_unified.{ext}", f"{base_name.replace(' ', '_')}.{ext}", f"{base_name.replace(' ', '_')}_unificado.{ext}", ] # Directories to search directories = [ settings.LOCAL_DOWNLOADS_PATH, settings.LOCAL_DOCX ] # Search for file for directory in directories: if not directory.exists(): continue for variant in name_variants: file_path = directory / variant if file_path.exists(): # Determinar mimetype para que se abra en el navegador mimetype = None if ext == 'pdf': mimetype = 'application/pdf' elif ext == 'md': mimetype = 'text/markdown' elif ext == 'txt': mimetype = 'text/plain' # as_attachment=False para abrir en navegador, no descargar return send_from_directory(str(directory), variant, as_attachment=False, mimetype=mimetype) return jsonify({'error': f'File not found: {filename}.{ext}'}), 404 except Exception as e: app.logger.error(f"Error finding file: {e}") return jsonify({'error': 'File not found'}), 404 @app.route('/health') def health_check(): """Health check endpoint""" gpu_info = vram_manager.get_usage() return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat(), 'processed_files_count': processed_registry.count(), 'gpu': gpu_info, 'config': { 'webdav_configured': settings.has_webdav_config, 'ai_configured': settings.has_ai_config, 'debug': settings.DEBUG } }) @app.route('/api/transcription/') def get_transcription(filename: str): """Get transcription content for a specific file""" try: # Validate filename to prevent path traversal if '..' in filename or filename.startswith('/'): return jsonify({ 'success': False, 'message': 'Invalid filename' }), 400 # Extract base name without extension (handle .mp3, .wav, .txt, etc.) base_name = Path(filename).stem # Construct file path for transcription file_path = settings.LOCAL_DOWNLOADS_PATH / f"{base_name}.txt" # Check if file exists if not file_path.exists(): return jsonify({ 'success': False, 'message': f'Transcription file not found: {base_name}.txt' }), 404 # Read file content with open(file_path, 'r', encoding='utf-8') as f: transcription_text = f.read() # Calculate statistics word_count = len(transcription_text.split()) char_count = len(transcription_text) return jsonify({ 'success': True, 'filename': filename, 'transcription': transcription_text, 'file_path': str(file_path), 'word_count': word_count, 'char_count': char_count }) except Exception as e: app.logger.error(f"Error reading transcription: {e}") return jsonify({ 'success': False, 'message': f"Error reading transcription: {str(e)}" }), 500 @app.route('/api/summary/') def get_summary(filename: str): """Get summary content for a specific file""" try: # Validate filename to prevent path traversal if '..' in filename or filename.startswith('/'): return jsonify({ 'success': False, 'message': 'Invalid filename' }), 400 # Extract base name without extension (handle .mp3, .wav, etc.) base_name = Path(filename).stem # Also remove _unificado/_unified suffixes if present base_name = base_name.replace('_unificado', '').replace('_unified', '') # Try different file path variants possible_paths = [ settings.LOCAL_DOWNLOADS_PATH / f"{base_name}_unificado.md", settings.LOCAL_DOWNLOADS_PATH / f"{base_name}_unified.md", settings.LOCAL_DOWNLOADS_PATH / f"{base_name}.md", ] file_path = None for path in possible_paths: if path.exists(): file_path = path break if not file_path: return jsonify({ 'success': False, 'message': f'Summary file not found for: {filename}' }), 404 # Read file content with open(file_path, 'r', encoding='utf-8') as f: summary_text = f.read() # Get available formats formats_available = get_available_formats(base_name) return jsonify({ 'success': True, 'filename': base_name, 'summary': summary_text, 'file_path': str(file_path), 'formats_available': formats_available }) except Exception as e: app.logger.error(f"Error reading summary: {e}") return jsonify({ 'success': False, 'message': f"Error reading summary: {str(e)}" }), 500 @app.route('/api/versions/') def get_versions(filename: str): """Get all summary versions for a file""" try: # Validate filename if '..' in filename or filename.startswith('/'): return jsonify({'success': False, 'message': 'Invalid filename'}), 400 # Extract base name base_name = Path(filename).stem base_name = base_name.replace('_unificado', '').replace('_unified', '') versions = [] downloads_path = settings.LOCAL_DOWNLOADS_PATH docx_path = settings.LOCAL_DOCX # Check for transcription (original) txt_path = downloads_path / f"{base_name}.txt" if txt_path.exists(): stat = txt_path.stat() versions.append({ 'type': 'transcription', 'label': '📝 Transcripción Original', 'filename': txt_path.name, 'path': f"/downloads/find-file?filename={base_name}&ext=txt", 'date': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M'), 'size': f"{stat.st_size / 1024:.1f} KB" }) # Check for summary versions (md, docx, pdf) summary_patterns = [ (f"{base_name}_unificado.md", "📋 Resumen MD"), (f"{base_name}_unificado.docx", "📄 Documento DOCX"), (f"{base_name}_unificado.pdf", "📑 PDF"), ] for pattern, label in summary_patterns: # Check downloads path file_path = downloads_path / pattern if not file_path.exists(): file_path = docx_path / pattern if file_path.exists(): stat = file_path.stat() ext = file_path.suffix[1:] # Remove the dot versions.append({ 'type': 'summary', 'label': label, 'filename': pattern, 'path': f"/downloads/find-file?filename={base_name}&ext={ext}", 'date': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M'), 'size': f"{stat.st_size / 1024:.1f} KB" }) # Sort by date descending versions.sort(key=lambda x: x['date'], reverse=True) return jsonify({ 'success': True, 'base_name': base_name, 'versions': versions, 'count': len(versions) }) except Exception as e: app.logger.error(f"Error getting versions: {e}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/regenerate-summary', methods=['POST']) def regenerate_summary(): """Regenerate summary from existing transcription""" start_time = time.time() try: data = request.get_json() filename = data.get('filename') custom_prompt = data.get('custom_prompt') if not filename: return jsonify({ 'success': False, 'message': 'Filename is required' }), 400 # Validate filename to prevent path traversal if '..' in filename or filename.startswith('/'): return jsonify({ 'success': False, 'message': 'Invalid filename' }), 400 # Get base name (remove extension if present) base_name = Path(filename).stem # Read transcription from .txt file transcription_path = settings.LOCAL_DOWNLOADS_PATH / f"{base_name}.txt" if not transcription_path.exists(): # Try without .txt extension if already included transcription_path = settings.LOCAL_DOWNLOADS_PATH / filename if not transcription_path.exists(): return jsonify({ 'success': False, 'message': f'Transcription file not found for: {filename}' }), 404 with open(transcription_path, 'r', encoding='utf-8') as f: transcription_text = f.read() # Generate new summary using DocumentGenerator doc_generator = DocumentGenerator() success, new_summary, metadata = doc_generator.generate_summary( transcription_text, base_name ) if not success: return jsonify({ 'success': False, 'message': 'Failed to generate summary' }), 500 # Upload to WebDAV if configured files_updated = [] if settings.has_webdav_config: try: # Upload markdown if 'markdown_path' in metadata: md_path = Path(metadata['markdown_path']) if md_path.exists(): remote_md_path = f"{settings.REMOTE_TXT_FOLDER}/{md_path.name}" webdav_service.upload(str(md_path), remote_md_path) files_updated.append(remote_md_path) # Upload DOCX if 'docx_path' in metadata: docx_path = Path(metadata['docx_path']) if docx_path.exists(): remote_docx_path = f"{settings.DOCX_FOLDER}/{docx_path.name}" webdav_service.upload(str(docx_path), remote_docx_path) files_updated.append(remote_docx_path) # Upload PDF if available if 'pdf_path' in metadata: pdf_path = Path(metadata['pdf_path']) if pdf_path.exists(): remote_pdf_path = f"{settings.REMOTE_PDF_FOLDER}/{pdf_path.name}" webdav_service.upload(str(pdf_path), remote_pdf_path) files_updated.append(remote_pdf_path) except Exception as e: app.logger.warning(f"WebDAV upload failed: {e}") # Continue even if upload fails processing_time = time.time() - start_time return jsonify({ 'success': True, 'message': 'Summary regenerated successfully', 'new_summary': new_summary, 'files_updated': files_updated, 'processing_time': f"{processing_time:.2f}s", 'metadata': metadata }) except Exception as e: app.logger.error(f"Error regenerating summary: {e}") return jsonify({ 'success': False, 'message': f"Error regenerating summary: {str(e)}" }), 500 @app.route('/api/files-detailed') def get_files_detailed(): """Get detailed list of files with transcription and summary info""" try: files = get_audio_files_detailed() # Calculate statistics total = len(files) with_transcription = sum(1 for f in files if f['has_transcription']) with_summary = sum(1 for f in files if f['has_summary']) return jsonify({ 'success': True, 'files': files, 'total': total, 'with_transcription': with_transcription, 'with_summary': with_summary }) except Exception as e: app.logger.error(f"Error getting detailed files: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 return app def get_audio_files() -> List[Dict[str, Any]]: """Get list of audio files from WebDAV and local""" import logging logger = logging.getLogger(__name__) files = [] # Get files from WebDAV if settings.has_webdav_config: try: webdav_files = webdav_service.list(settings.REMOTE_AUDIOS_FOLDER) for file_path in webdav_files: normalized_path = webdav_service.normalize_path(file_path) base_name = Path(normalized_path).name if any(normalized_path.lower().endswith(ext) for ext in settings.AUDIO_EXTENSIONS): is_processed = processed_registry.is_processed(normalized_path) files.append({ 'filename': base_name, 'path': normalized_path, 'source': 'webdav', 'processed': is_processed, 'size': 'Unknown', 'last_modified': 'Unknown', 'available_formats': get_available_formats(base_name) }) except Exception as e: logger.warning(f"Error getting WebDAV files: {e}") # Get local files try: if settings.LOCAL_DOWNLOADS_PATH.exists(): for ext in settings.AUDIO_EXTENSIONS: for file_path in settings.LOCAL_DOWNLOADS_PATH.glob(f"*{ext}"): stat = file_path.stat() is_processed = processed_registry.is_processed(file_path.name) files.append({ 'filename': file_path.name, 'path': str(file_path), 'source': 'local', 'processed': is_processed, 'size': format_size(stat.st_size), 'last_modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'available_formats': get_available_formats(file_path.name) }) except Exception as e: logger.error(f"Error getting local files: {e}") # Remove duplicates (keep both local and webdav - distinguish by source) unique_files = {} for file in files: # Use (filename, source) as key to keep both local and webdav files key = (file['filename'], file['source']) unique_files[key] = file return sorted(unique_files.values(), key=lambda x: (x['source'], x['filename'])) def get_audio_files_detailed() -> List[Dict[str, Any]]: """Get detailed list of audio files with transcription and summary information""" files = [] # Get local audio files only for detailed view try: if settings.LOCAL_DOWNLOADS_PATH.exists(): for ext in settings.AUDIO_EXTENSIONS: for file_path in settings.LOCAL_DOWNLOADS_PATH.glob(f"*{ext}"): stat = file_path.stat() filename = file_path.name base_name = file_path.stem # Check for transcription transcription_path = settings.LOCAL_DOWNLOADS_PATH / f"{base_name}.txt" has_transcription = transcription_path.exists() transcription_words = 0 if has_transcription: try: with open(transcription_path, 'r', encoding='utf-8') as f: transcription_text = f.read() transcription_words = len(transcription_text.split()) except Exception: pass # Check for summary and formats formats = get_available_formats(filename) has_summary = formats.get('md', False) # Get summary path summary_path = None if has_summary: summary_variants = [ settings.LOCAL_DOWNLOADS_PATH / f"{base_name}_unificado.md", settings.LOCAL_DOWNLOADS_PATH / f"{base_name}_unified.md", settings.LOCAL_DOWNLOADS_PATH / f"{base_name}.md", ] for variant in summary_variants: if variant.exists(): summary_path = str(variant) break files.append({ 'filename': filename, 'base_name': base_name, 'audio_path': str(file_path), 'has_transcription': has_transcription, 'transcription_path': str(transcription_path) if has_transcription else None, 'transcription_words': transcription_words, 'has_summary': has_summary, 'summary_path': summary_path, 'formats': formats, 'processed': processed_registry.is_processed(filename), 'last_modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'size': format_size(stat.st_size) }) except Exception as e: pass # Error logged in endpoint # Get WebDAV files if settings.has_webdav_config: try: webdav_files = webdav_service.list(settings.REMOTE_AUDIOS_FOLDER) for file_path in webdav_files: normalized_path = webdav_service.normalize_path(file_path) base_name = Path(normalized_path).stem if any(normalized_path.lower().endswith(ext) for ext in settings.AUDIO_EXTENSIONS): # Check if already in local files if not any(f['base_name'] == base_name for f in files): formats = get_available_formats(base_name) files.append({ 'filename': Path(normalized_path).name, 'base_name': base_name, 'audio_path': normalized_path, 'has_transcription': formats.get('txt', False), 'transcription_path': None, 'transcription_words': 0, 'has_summary': formats.get('md', False), 'summary_path': None, 'formats': formats, 'processed': processed_registry.is_processed(normalized_path), 'last_modified': 'Unknown', 'size': 'Unknown' }) except Exception as e: pass # Error logged in endpoint # Remove duplicates and sort unique_files = {} for file in files: key = file['base_name'] if key not in unique_files: unique_files[key] = file return sorted(unique_files.values(), key=lambda x: x['filename']) def get_available_formats(audio_filename: str) -> Dict[str, bool]: """Check which output formats are available for an audio file""" base_name = Path(audio_filename).stem formats = { 'txt': False, 'md': False, 'pdf': False, 'docx': False } directories_to_check = [ settings.LOCAL_DOWNLOADS_PATH, settings.LOCAL_DOCX ] for directory in directories_to_check: if not directory.exists(): continue for ext in formats.keys(): name_variants = [ base_name, f"{base_name}_unificado", base_name.replace(' ', '_'), f"{base_name.replace(' ', '_')}_unificado", ] for name_variant in name_variants: file_path = directory / f"{name_variant}.{ext}" if file_path.exists(): formats[ext] = True break return formats def format_size(size_bytes: int) -> str: """Format size in human-readable format""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f} TB"