#!/usr/bin/env python3 """ Dashboard Flask para gestión de archivos de audio Interfaz web simple para reprocesar archivos MP3 con 1 click """ import os import logging import sys from datetime import datetime from pathlib import Path from typing import List, Dict, Any from flask import Flask, render_template, request, jsonify, send_from_directory from flask_cors import CORS # Importar configuraciones del main.py sin rutas absolutas PROJECT_ROOT = Path(__file__).resolve().parent if str(PROJECT_ROOT) not in sys.path: sys.path.append(str(PROJECT_ROOT)) from main import ( AUDIO_EXTENSIONS, LOCAL_DOWNLOADS_PATH, PROCESSED_FILES_PATH, load_processed_files, save_processed_file, process_audio_file, REMOTE_AUDIOS_FOLDER, webdav_list, normalize_remote_path ) app = Flask(__name__) CORS(app) # Configuración app.config['SECRET_KEY'] = os.getenv('DASHBOARD_SECRET_KEY', 'dashboard-secret-key-change-in-production') app.config['DOWNLOADS_FOLDER'] = LOCAL_DOWNLOADS_PATH logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class FileManager: """Gestor de archivos para el dashboard""" def __init__(self): self.processed_files = set() self.load_processed_files() def load_processed_files(self): """Cargar archivos procesados desde el registro""" try: self.processed_files = load_processed_files() logger.info(f"Cargados {len(self.processed_files)} archivos procesados") except Exception as e: logger.error(f"Error cargando archivos procesados: {e}") self.processed_files = set() def get_audio_files(self) -> List[Dict[str, Any]]: """Obtener lista de archivos de audio disponibles""" files = [] # Obtener archivos de WebDAV try: webdav_files = webdav_list(REMOTE_AUDIOS_FOLDER) for file_path in webdav_files: normalized_path = normalize_remote_path(file_path) base_name = os.path.basename(normalized_path) if any(normalized_path.lower().endswith(ext) for ext in AUDIO_EXTENSIONS): available_formats = self._get_available_formats(base_name) # Considerar procesado si está en el registro O si tiene archivos de salida is_processed = (normalized_path in self.processed_files or base_name in self.processed_files or any(available_formats.values())) files.append({ 'filename': base_name, 'path': normalized_path, 'source': 'webdav', 'processed': is_processed, 'size': 'Unknown', 'last_modified': 'Unknown', 'available_formats': available_formats }) except Exception as e: logger.error(f"Error obteniendo archivos WebDAV: {e}") # Obtener archivos locales try: if os.path.exists(LOCAL_DOWNLOADS_PATH): local_files = [] for ext in AUDIO_EXTENSIONS: local_files.extend(Path(LOCAL_DOWNLOADS_PATH).glob(f"*{ext}")) for file_path in local_files: stat = file_path.stat() available_formats = self._get_available_formats(file_path.name) # Considerar procesado si está en el registro O si tiene archivos de salida is_processed = (file_path.name in self.processed_files or any(available_formats.values())) files.append({ 'filename': file_path.name, 'path': str(file_path), 'source': 'local', 'processed': is_processed, 'size': self._format_size(stat.st_size), 'last_modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'available_formats': available_formats }) except Exception as e: logger.error(f"Error obteniendo archivos locales: {e}") # Eliminar duplicados y ordenar unique_files = {} for file in files: key = file['filename'] if key not in unique_files or file['source'] == 'webdav': unique_files[key] = file return sorted(unique_files.values(), key=lambda x: x['filename']) def _get_available_formats(self, audio_filename: str) -> Dict[str, bool]: """Verificar qué formatos de salida existen para un archivo de audio""" # Obtener el nombre base sin extensión base_name = Path(audio_filename).stem # Extensiones a verificar formats = { 'txt': False, 'md': False, 'pdf': False, 'docx': False } # Verificar en directorio local y resumenes_docx directories_to_check = [LOCAL_DOWNLOADS_PATH, './resumenes_docx'] for directory in directories_to_check: if not os.path.exists(directory): continue for ext in formats.keys(): # Buscar variaciones del nombre del archivo name_variants = [ base_name, # Nombre exacto f"{base_name}_unificado", # Con sufijo _unificado f"{base_name.replace(' ', '_')}", # Espacios reemplazados por guiones bajos f"{base_name.replace(' ', '_')}_unificado", # Ambas variaciones ] # También verificar variantes con espacios originales pero _unificado if ' ' in base_name: name_variants.append(f"{base_name}_unificado") # Para cada variante, verificar si existe el archivo for name_variant in name_variants: file_path = os.path.join(directory, f"{name_variant}.{ext}") if os.path.exists(file_path): formats[ext] = True break # Encontrado, pasar al siguiente formato return formats def _format_size(self, size_bytes: int) -> str: """Formatear tamaño de archivo""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f} TB" def reprocess_file(self, file_path: str, source: str) -> Dict[str, Any]: """Reprocesar un archivo específico""" try: # Verificar formatos existentes antes de reprocesar filename = os.path.basename(file_path) existing_formats = self._get_available_formats(filename) has_existing_files = any(existing_formats.values()) if source == 'webdav': # Para archivos WebDAV, llamar directamente a process_audio_file logger.info(f"Iniciando reprocesamiento de WebDAV: {file_path}") process_audio_file(file_path) else: # Para archivos locales, procesar directamente logger.info(f"Iniciando reprocesamiento local: {file_path}") # Aquí podrías agregar lógica adicional para archivos locales return { 'success': True, 'message': f"Archivo {os.path.basename(file_path)} enviado a reprocesamiento", 'had_existing_files': has_existing_files, 'existing_formats': existing_formats } except Exception as e: logger.error(f"Error reprocesando {file_path}: {e}") return { 'success': False, 'message': f"Error: {str(e)}" } def mark_as_unprocessed(self, file_path: str) -> bool: """Marcar archivo como no procesado para forzar reprocesamiento""" try: # Eliminar del registro de procesados processed_files = load_processed_files() normalized_path = normalize_remote_path(file_path) base_name = os.path.basename(normalized_path) # Crear nuevo registro sin este archivo temp_path = PROCESSED_FILES_PATH + '.temp' with open(temp_path, 'w', encoding='utf-8') as f: for line in open(PROCESSED_FILES_PATH, 'r', encoding='utf-8'): if (line.strip() != normalized_path and os.path.basename(line.strip()) != base_name and line.strip() != file_path): f.write(line) # Reemplazar archivo original os.replace(temp_path, PROCESSED_FILES_PATH) self.load_processed_files() # Recargar return True except Exception as e: logger.error(f"Error marcando como no procesado {file_path}: {e}") return False # Instancia global del gestor de archivos file_manager = FileManager() @app.route('/') def index(): """Página principal del dashboard""" return render_template('index.html') @app.route('/api/files') def get_files(): """API endpoint para obtener lista de archivos""" try: files = file_manager.get_audio_files() return jsonify({ 'success': True, 'files': files, 'total': len(files), 'processed': sum(1 for f in files if f['processed']), 'pending': sum(1 for f in files if not f['processed']) }) except Exception as e: logger.error(f"Error obteniendo archivos: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/api/reprocess', methods=['POST']) def reprocess_file(): """API endpoint para reprocesar un archivo""" try: data = request.get_json() file_path = data.get('path') source = data.get('source', 'local') if not file_path: return jsonify({ 'success': False, 'message': "Path del archivo es requerido" }), 400 result = file_manager.reprocess_file(file_path, source) return jsonify(result) except Exception as e: logger.error(f"Error en endpoint reprocesar: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/api/mark-unprocessed', methods=['POST']) def mark_unprocessed(): """API endpoint para marcar archivo como no procesado""" try: data = request.get_json() file_path = data.get('path') if not file_path: return jsonify({ 'success': False, 'message': "Path del archivo es requerido" }), 400 success = file_manager.mark_as_unprocessed(file_path) if success: return jsonify({ 'success': True, 'message': "Archivo marcado como no procesado" }) else: return jsonify({ 'success': False, 'message': "No se pudo marcar como no procesado" }), 500 except Exception as e: logger.error(f"Error marcando como no procesado: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/api/refresh') def refresh_files(): """API endpoint para refrescar lista de archivos""" try: file_manager.load_processed_files() files = file_manager.get_audio_files() return jsonify({ 'success': True, 'message': "Lista de archivos actualizada", 'files': files }) except Exception as e: logger.error(f"Error refrescando archivos: {e}") return jsonify({ 'success': False, 'message': f"Error: {str(e)}" }), 500 @app.route('/downloads/find-file') def find_and_download_file(): """Buscar y servir archivos con diferentes variaciones de nombre""" try: from flask import request filename = request.args.get('filename') ext = request.args.get('ext') if not filename or not ext: return jsonify({'error': 'Missing parameters'}), 400 # Generar posibles variaciones del nombre del archivo from pathlib import Path base_name = Path(filename).stem possible_names = [ f"{base_name}.{ext}", f"{base_name}_unificado.{ext}", f"{base_name.replace(' ', '_')}.{ext}", f"{base_name.replace(' ', '_')}_unificado.{ext}" ] # Directorios donde buscar directories = [LOCAL_DOWNLOADS_PATH, './resumenes_docx'] # Intentar encontrar el archivo en cada directorio con cada variación for directory in directories: if not os.path.exists(directory): continue for name in possible_names: file_path = os.path.join(directory, name) if os.path.exists(file_path): return send_from_directory(directory, name) # Si no se encuentra el archivo return jsonify({'error': 'File not found'}), 404 except Exception as e: logger.error(f"Error buscando archivo: {e}") return jsonify({'error': 'File not found'}), 404 @app.route('/downloads/') def download_file(filename): """Servir archivos de descarga desde downloads o resumenes_docx""" try: # Primero intentar en downloads try: return send_from_directory(LOCAL_DOWNLOADS_PATH, filename) except FileNotFoundError: pass # Si no se encuentra en downloads, intentar en resumenes_docx try: return send_from_directory('./resumenes_docx', filename) except FileNotFoundError: pass # Si no se encuentra en ninguna ubicación return jsonify({'error': 'File not found'}), 404 except Exception as e: logger.error(f"Error sirviendo archivo {filename}: {e}") return jsonify({'error': 'File not found'}), 404 @app.route('/health') def health_check(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat(), 'processed_files_count': len(file_manager.processed_files) }) if __name__ == '__main__': logger.info("🚀 Iniciando Dashboard de Gestión de Audio") logger.info(f"📁 Carpeta de descargas: {LOCAL_DOWNLOADS_PATH}") logger.info(f"📊 Servidor web en http://localhost:5000") try: app.run( host='0.0.0.0', port=5000, debug=False, threaded=True, use_reloader=False # Evitar problemas con threading ) except KeyboardInterrupt: logger.info("🛑 Dashboard detenido por el usuario") except Exception as e: logger.error(f"❌ Error en dashboard: {e}") raise