- Implementar obtención previa del título del video antes de descargar - Usar títulos reales de YouTube como nombres de archivos - Limpiar caracteres especiales para compatibilidad con sistema de archivos - Mantener compatibilidad con UUIDs como fallback - Mejorar experiencia de usuario con nombres descriptivos Antes: archivos con UUID (ej: 16dc1717-25b4-40fb-9069-3a639e331a65.mp3) Ahora: archivos con títulos (ej: Rick Astley - Never Gonna Give You Up.mp3) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
343 lines
12 KiB
Python
343 lines
12 KiB
Python
from flask import Flask, render_template, request, jsonify, send_file, redirect, url_for
|
|
import yt_dlp
|
|
import os
|
|
import uuid
|
|
from datetime import datetime
|
|
import threading
|
|
import subprocess
|
|
import shutil
|
|
import time
|
|
|
|
app = Flask(__name__)
|
|
app.config['DOWNLOAD_FOLDER'] = 'static/downloads'
|
|
|
|
if not os.path.exists(app.config['DOWNLOAD_FOLDER']):
|
|
os.makedirs(app.config['DOWNLOAD_FOLDER'])
|
|
|
|
download_status = {}
|
|
|
|
def check_ffmpeg():
|
|
"""Verificar si FFmpeg está disponible en el sistema"""
|
|
try:
|
|
result = subprocess.run(['ffmpeg', '-version'],
|
|
capture_output=True, text=True, timeout=10)
|
|
return result.returncode == 0
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
return False
|
|
|
|
def install_ffmpeg():
|
|
"""Intentar instalar FFmpeg automáticamente"""
|
|
try:
|
|
# Detectar el sistema operativo
|
|
import platform
|
|
system = platform.system().lower()
|
|
|
|
if system == 'linux':
|
|
# Para sistemas basados en Debian/Ubuntu
|
|
commands = [
|
|
['sudo', 'apt-get', 'update'],
|
|
['sudo', 'apt-get', 'install', '-y', 'ffmpeg']
|
|
]
|
|
elif system == 'darwin':
|
|
# Para macOS usando Homebrew
|
|
commands = [
|
|
['brew', 'update'],
|
|
['brew', 'install', 'ffmpeg']
|
|
]
|
|
elif system == 'windows':
|
|
# Para Windows (simplificado - requeriría manual)
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
for cmd in commands:
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
return False
|
|
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
class DownloadProgress:
|
|
def __init__(self, download_id):
|
|
self.download_id = download_id
|
|
self.status = 'starting'
|
|
self.progress = 0
|
|
self.speed = ''
|
|
self.eta = ''
|
|
self.filename = ''
|
|
self.error = None
|
|
self.complete = False
|
|
|
|
def update(self, d):
|
|
if d['status'] == 'downloading':
|
|
self.status = 'downloading'
|
|
self.progress = d.get('_percent_str', '0%').strip('%')
|
|
self.speed = d.get('_speed_str', '')
|
|
self.eta = d.get('_eta_str', '')
|
|
self.filename = d.get('filename', '')
|
|
elif d['status'] == 'finished':
|
|
self.status = 'finished'
|
|
self.complete = True
|
|
self.progress = 100
|
|
elif d['status'] == 'error':
|
|
self.status = 'error'
|
|
self.error = d.get('error', 'Unknown error')
|
|
|
|
def hook(d, download_id):
|
|
if download_id in download_status:
|
|
download_status[download_id].update(d)
|
|
|
|
def download_video(url, download_id, format_type='mp4'):
|
|
progress = DownloadProgress(download_id)
|
|
download_status[download_id] = progress
|
|
|
|
# Verificar FFmpeg para formatos que lo requieren
|
|
if format_type == 'mp3':
|
|
if not check_ffmpeg():
|
|
progress.status = 'error'
|
|
progress.error = "FFmpeg no está disponible. La conversión a MP3 requiere FFmpeg. Por favor, instala FFmpeg o contacta al administrador."
|
|
return
|
|
|
|
# Opcional: intentar instalar FFmpeg automáticamente si no está disponible
|
|
# if install_ffmpeg():
|
|
# progress.status = 'info'
|
|
# progress.error = "FFmpeg ha sido instalado automáticamente. Intenta la descarga nuevamente."
|
|
# else:
|
|
# progress.status = 'error'
|
|
# progress.error = "No se pudo instalar FFmpeg automáticamente. La conversión a MP3 requiere FFmpeg."
|
|
# return
|
|
|
|
try:
|
|
# Primero obtener información del video para el título
|
|
info_opts = {
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'extract_flat': False,
|
|
'restrictfilenames': True,
|
|
'no_check_certificate': True,
|
|
'socket_timeout': 60,
|
|
'retries': 5,
|
|
'ignoreerrors': True,
|
|
'http_headers': {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.5',
|
|
'Accept-Encoding': 'gzip, deflate',
|
|
'DNT': '1',
|
|
'Connection': 'keep-alive',
|
|
'Upgrade-Insecure-Requests': '1'
|
|
}
|
|
}
|
|
|
|
# Obtener información del video para usar el título
|
|
video_title = None
|
|
try:
|
|
with yt_dlp.YoutubeDL(info_opts) as ydl:
|
|
info = ydl.extract_info(url, download=False)
|
|
video_title = info.get('title', f'video_{download_id}')
|
|
# Limpiar el título para el sistema de archivos
|
|
import re
|
|
video_title = re.sub(r'[<>:"/\\|?*]', '', video_title)
|
|
video_title = video_title.strip()
|
|
if not video_title:
|
|
video_title = f'video_{download_id}'
|
|
except Exception as e:
|
|
print(f"Error obteniendo título del video: {e}")
|
|
video_title = f'video_{download_id}'
|
|
|
|
# Estrategia 1: Configuración simple y robusta
|
|
ydl_opts = {
|
|
'outtmpl': os.path.join(app.config['DOWNLOAD_FOLDER'], f'{video_title}.%(ext)s'),
|
|
'progress_hooks': [lambda d: hook(d, download_id)],
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'extract_flat': False,
|
|
'restrictfilenames': True,
|
|
'no_check_certificate': True,
|
|
'socket_timeout': 180, # Timeout muy generoso
|
|
'retries': 20, # Muchos reintentos
|
|
'fragment_retries': 30,
|
|
'extractor_retries': 20,
|
|
'file_access_retries': 20,
|
|
'ignoreerrors': True,
|
|
# Headers actualizados y simplificados
|
|
'http_headers': {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.5',
|
|
'Accept-Encoding': 'gzip, deflate',
|
|
'DNT': '1',
|
|
'Connection': 'keep-alive',
|
|
'Upgrade-Insecure-Requests': '1'
|
|
}
|
|
}
|
|
|
|
if format_type == 'mp3':
|
|
ydl_opts.update({
|
|
'format': 'bestaudio[ext=m4a]/bestaudio/best',
|
|
'postprocessors': [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '192',
|
|
}],
|
|
'writethumbnail': False,
|
|
'embed_thumbnail': False,
|
|
'addmetadata': True,
|
|
})
|
|
else: # mp4
|
|
ydl_opts.update({
|
|
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
|
|
'merge_output_format': 'mp4',
|
|
})
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
ydl.extract_info(url, download=True)
|
|
|
|
except yt_dlp.utils.DownloadError as e:
|
|
progress.status = 'error'
|
|
if "403" in str(e):
|
|
progress.error = "Error de acceso (403). El video puede estar restringido o no disponible. Intenta con otro video."
|
|
else:
|
|
progress.error = f"Error de descarga: {str(e)}"
|
|
except Exception as e:
|
|
progress.status = 'error'
|
|
if "403" in str(e):
|
|
progress.error = "Error de acceso (403). YouTube ha bloqueado la solicitud. Intenta nuevamente en unos minutos."
|
|
else:
|
|
progress.error = f"Error inesperado: {str(e)}"
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/api/downloads', methods=['POST'])
|
|
def start_download():
|
|
data = request.get_json()
|
|
url = data.get('url')
|
|
format_type = data.get('format', 'mp4')
|
|
|
|
if not url:
|
|
return jsonify({'error': 'URL is required'}), 400
|
|
|
|
download_id = str(uuid.uuid4())
|
|
|
|
# Start download in background thread
|
|
thread = threading.Thread(target=download_video, args=(url, download_id, format_type))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
return jsonify({'download_id': download_id})
|
|
|
|
@app.route('/api/status/<download_id>')
|
|
def get_status(download_id):
|
|
if download_id is None or download_id == 'null' or download_id not in download_status:
|
|
return jsonify({'error': 'Download not found'}), 404
|
|
|
|
progress = download_status[download_id]
|
|
|
|
response = {
|
|
'status': progress.status,
|
|
'progress': progress.progress,
|
|
'speed': progress.speed,
|
|
'eta': progress.eta,
|
|
'filename': progress.filename,
|
|
'error': progress.error,
|
|
'complete': progress.complete
|
|
}
|
|
|
|
return jsonify(response)
|
|
|
|
@app.route('/api/downloads')
|
|
def list_downloads():
|
|
downloads = []
|
|
try:
|
|
for filename in os.listdir(app.config['DOWNLOAD_FOLDER']):
|
|
if filename.endswith(('.mp4', '.mp3', '.webm', '.m4a')):
|
|
file_path = os.path.join(app.config['DOWNLOAD_FOLDER'], filename)
|
|
file_size = os.path.getsize(file_path)
|
|
modified_time = os.path.getmtime(file_path)
|
|
modified_time = datetime.fromtimestamp(modified_time).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
downloads.append({
|
|
'filename': filename,
|
|
'size': file_size,
|
|
'modified': modified_time,
|
|
'url': url_for('download_file', filename=filename)
|
|
})
|
|
except Exception as e:
|
|
print(f"Error listing downloads: {e}")
|
|
|
|
return jsonify(downloads)
|
|
|
|
@app.route('/api/ffmpeg/status')
|
|
def ffmpeg_status():
|
|
"""Verificar el estado de FFmpeg"""
|
|
ffmpeg_available = check_ffmpeg()
|
|
return jsonify({
|
|
'available': ffmpeg_available,
|
|
'can_install': True if not ffmpeg_available else False
|
|
})
|
|
|
|
@app.route('/api/ffmpeg/install', methods=['POST'])
|
|
def install_ffmpeg_api():
|
|
"""Intentar instalar FFmpeg automáticamente"""
|
|
try:
|
|
if check_ffmpeg():
|
|
return jsonify({'success': True, 'message': 'FFmpeg ya está disponible.'})
|
|
|
|
# Intentar instalar FFmpeg
|
|
success = install_ffmpeg()
|
|
if success:
|
|
return jsonify({'success': True, 'message': 'FFmpeg instalado correctamente.'})
|
|
else:
|
|
return jsonify({'success': False, 'message': 'No se pudo instalar FFmpeg automáticamente. Debes instalarlo manualmente.'})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': f'Error durante la instalación: {str(e)}'}), 500
|
|
|
|
@app.route('/api/cleanup', methods=['POST'])
|
|
def cleanup_downloads():
|
|
"""Limpia descargas fallidas y muy antiguas"""
|
|
try:
|
|
cleaned_files = []
|
|
current_time = time.time()
|
|
|
|
# Limpiar archivos temporales (menores de 1KB y más antiguos de 1 hora)
|
|
for filename in os.listdir(app.config['DOWNLOAD_FOLDER']):
|
|
file_path = os.path.join(app.config['DOWNLOAD_FOLDER'], filename)
|
|
|
|
if os.path.isfile(file_path):
|
|
file_size = os.path.getsize(file_path)
|
|
file_age = current_time - os.path.getmtime(file_path)
|
|
|
|
# Eliminar archivos muy pequeños o muy antiguos
|
|
if (file_size < 1024 and file_age > 3600) or file_age > 86400 * 7: # 7 días
|
|
os.remove(file_path)
|
|
cleaned_files.append(filename)
|
|
|
|
# Limpiar estados de descarga antiguos
|
|
to_remove = []
|
|
for download_id, progress in download_status.items():
|
|
if progress.complete or progress.status == 'error':
|
|
to_remove.append(download_id)
|
|
|
|
for download_id in to_remove:
|
|
del download_status[download_id]
|
|
|
|
return jsonify({
|
|
'cleaned_files': cleaned_files,
|
|
'cleaned_downloads': len(to_remove)
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/download/<filename>')
|
|
def download_file(filename):
|
|
file_path = os.path.join(app.config['DOWNLOAD_FOLDER'], filename)
|
|
if os.path.exists(file_path):
|
|
return send_file(file_path, as_attachment=True)
|
|
return "File not found", 404
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5000, debug=True) |