281 lines
9.5 KiB
Python
281 lines
9.5 KiB
Python
"""
|
|
Flask API routes for CBCFacil dashboard
|
|
"""
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List
|
|
from flask import Flask, render_template, request, jsonify, send_from_directory
|
|
from flask_cors import CORS
|
|
|
|
from config import settings
|
|
from storage import processed_registry
|
|
from services.webdav_service import webdav_service
|
|
from services import vram_manager
|
|
|
|
|
|
def create_app() -> Flask:
|
|
"""Create and configure Flask application"""
|
|
app = Flask(__name__)
|
|
CORS(app)
|
|
|
|
# Configure app
|
|
app.config['SECRET_KEY'] = settings.DASHBOARD_SECRET_KEY or os.urandom(24)
|
|
app.config['DOWNLOADS_FOLDER'] = str(settings.LOCAL_DOWNLOADS_PATH)
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Dashboard home page"""
|
|
return render_template('index.html')
|
|
|
|
@app.route('/api/files')
|
|
def get_files():
|
|
"""Get list of audio files"""
|
|
try:
|
|
files = get_audio_files()
|
|
return jsonify({
|
|
'success': True,
|
|
'files': files,
|
|
'total': len(files),
|
|
'processed': sum(1 for f in files if f['processed']),
|
|
'pending': sum(1 for f in files if not f['processed'])
|
|
})
|
|
except Exception as e:
|
|
app.logger.error(f"Error getting files: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'message': f"Error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route('/api/reprocess', methods=['POST'])
|
|
def reprocess_file():
|
|
"""Reprocess a file"""
|
|
try:
|
|
data = request.get_json()
|
|
file_path = data.get('path')
|
|
source = data.get('source', 'local')
|
|
|
|
if not file_path:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': "Path del archivo es requerido"
|
|
}), 400
|
|
|
|
# TODO: Implement file reprocessing
|
|
# This would trigger the main processing loop
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f"Archivo {Path(file_path).name} enviado a reprocesamiento"
|
|
})
|
|
|
|
except Exception as e:
|
|
app.logger.error(f"Error reprocessing file: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'message': f"Error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route('/api/mark-unprocessed', methods=['POST'])
|
|
def mark_unprocessed():
|
|
"""Mark file as unprocessed"""
|
|
try:
|
|
data = request.get_json()
|
|
file_path = data.get('path')
|
|
|
|
if not file_path:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': "Path del archivo es requerido"
|
|
}), 400
|
|
|
|
success = processed_registry.remove(file_path)
|
|
|
|
if success:
|
|
return jsonify({
|
|
'success': True,
|
|
'message': "Archivo marcado como no procesado"
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': "No se pudo marcar como no procesado"
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
app.logger.error(f"Error marking unprocessed: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'message': f"Error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route('/api/refresh')
|
|
def refresh_files():
|
|
"""Refresh file list"""
|
|
try:
|
|
processed_registry.load()
|
|
files = get_audio_files()
|
|
return jsonify({
|
|
'success': True,
|
|
'message': "Lista de archivos actualizada",
|
|
'files': files
|
|
})
|
|
except Exception as e:
|
|
app.logger.error(f"Error refreshing files: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'message': f"Error: {str(e)}"
|
|
}), 500
|
|
|
|
@app.route('/downloads/<path:filename>')
|
|
def download_file(filename):
|
|
"""Download file"""
|
|
try:
|
|
# Validate path to prevent traversal and injection attacks
|
|
normalized = Path(filename).resolve()
|
|
base_downloads = Path(str(settings.LOCAL_DOWNLOADS_PATH)).resolve()
|
|
base_docx = Path(str(settings.LOCAL_DOCX)).resolve()
|
|
if '..' in filename or filename.startswith('/') or \
|
|
normalized.parts[0] in ['..', '...'] if len(normalized.parts) > 0 else False or \
|
|
not (normalized == base_downloads or normalized.is_relative_to(base_downloads) or
|
|
normalized == base_docx or normalized.is_relative_to(base_docx)):
|
|
return jsonify({'error': 'Invalid filename'}), 400
|
|
|
|
# Try downloads directory
|
|
downloads_path = settings.LOCAL_DOWNLOADS_PATH / filename
|
|
if downloads_path.exists():
|
|
return send_from_directory(str(settings.LOCAL_DOWNLOADS_PATH), filename)
|
|
|
|
# Try resumenes_docx directory
|
|
docx_path = settings.LOCAL_DOCX / filename
|
|
if docx_path.exists():
|
|
return send_from_directory(str(settings.LOCAL_DOCX), filename)
|
|
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
except Exception as e:
|
|
app.logger.error(f"Error downloading file: {e}")
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
@app.route('/health')
|
|
def health_check():
|
|
"""Health check endpoint"""
|
|
gpu_info = vram_manager.get_usage()
|
|
|
|
return jsonify({
|
|
'status': 'healthy',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'processed_files_count': processed_registry.count(),
|
|
'gpu': gpu_info,
|
|
'config': {
|
|
'webdav_configured': settings.has_webdav_config,
|
|
'ai_configured': settings.has_ai_config,
|
|
'debug': settings.DEBUG
|
|
}
|
|
})
|
|
|
|
return app
|
|
|
|
|
|
def get_audio_files() -> List[Dict[str, Any]]:
|
|
"""Get list of audio files from WebDAV and local"""
|
|
files = []
|
|
|
|
# Get files from WebDAV
|
|
if settings.has_webdav_config:
|
|
try:
|
|
webdav_files = webdav_service.list(settings.REMOTE_AUDIOS_FOLDER)
|
|
for file_path in webdav_files:
|
|
normalized_path = webdav_service.normalize_path(file_path)
|
|
base_name = Path(normalized_path).name
|
|
|
|
if any(normalized_path.lower().endswith(ext) for ext in settings.AUDIO_EXTENSIONS):
|
|
is_processed = processed_registry.is_processed(normalized_path)
|
|
|
|
files.append({
|
|
'filename': base_name,
|
|
'path': normalized_path,
|
|
'source': 'webdav',
|
|
'processed': is_processed,
|
|
'size': 'Unknown',
|
|
'last_modified': 'Unknown',
|
|
'available_formats': get_available_formats(base_name)
|
|
})
|
|
except Exception as e:
|
|
app.logger.error(f"Error getting WebDAV files: {e}")
|
|
|
|
# Get local files
|
|
try:
|
|
if settings.LOCAL_DOWNLOADS_PATH.exists():
|
|
for ext in settings.AUDIO_EXTENSIONS:
|
|
for file_path in settings.LOCAL_DOWNLOADS_PATH.glob(f"*{ext}"):
|
|
stat = file_path.stat()
|
|
is_processed = processed_registry.is_processed(file_path.name)
|
|
|
|
files.append({
|
|
'filename': file_path.name,
|
|
'path': str(file_path),
|
|
'source': 'local',
|
|
'processed': is_processed,
|
|
'size': format_size(stat.st_size),
|
|
'last_modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
|
|
'available_formats': get_available_formats(file_path.name)
|
|
})
|
|
except Exception as e:
|
|
app.logger.error(f"Error getting local files: {e}")
|
|
|
|
# Remove duplicates (WebDAV takes precedence)
|
|
unique_files = {}
|
|
for file in files:
|
|
key = file['filename']
|
|
if key not in unique_files or file['source'] == 'webdav':
|
|
unique_files[key] = file
|
|
|
|
return sorted(unique_files.values(), key=lambda x: x['filename'])
|
|
|
|
|
|
def get_available_formats(audio_filename: str) -> Dict[str, bool]:
|
|
"""Check which output formats are available for an audio file"""
|
|
base_name = Path(audio_filename).stem
|
|
|
|
formats = {
|
|
'txt': False,
|
|
'md': False,
|
|
'pdf': False,
|
|
'docx': False
|
|
}
|
|
|
|
directories_to_check = [
|
|
settings.LOCAL_DOWNLOADS_PATH,
|
|
settings.LOCAL_DOCX
|
|
]
|
|
|
|
for directory in directories_to_check:
|
|
if not directory.exists():
|
|
continue
|
|
|
|
for ext in formats.keys():
|
|
name_variants = [
|
|
base_name,
|
|
f"{base_name}_unificado",
|
|
base_name.replace(' ', '_'),
|
|
f"{base_name.replace(' ', '_')}_unificado",
|
|
]
|
|
|
|
for name_variant in name_variants:
|
|
file_path = directory / f"{name_variant}.{ext}"
|
|
if file_path.exists():
|
|
formats[ext] = True
|
|
break
|
|
|
|
return formats
|
|
|
|
|
|
def format_size(size_bytes: int) -> str:
|
|
"""Format size in human-readable format"""
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if size_bytes < 1024.0:
|
|
return f"{size_bytes:.1f} {unit}"
|
|
size_bytes /= 1024.0
|
|
return f"{size_bytes:.1f} TB"
|