import sys import json import logging import subprocess import torch import numpy as np from pathlib import Path logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def get_device(): """Obtiene el dispositivo (GPU o CPU)""" if torch.cuda.is_available(): return torch.device("cuda") return torch.device("cpu") def extract_audio_gpu(video_file, output_wav="audio.wav"): """Extrae audio usando ffmpeg""" logger.info(f"Extrayendo audio de {video_file}...") subprocess.run([ "ffmpeg", "-i", video_file, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", output_wav, "-y" ], capture_output=True) return output_wav def detect_audio_peaks_gpu(audio_file, threshold=1.5, window_seconds=5, device="cpu"): """ Detecta picos de audio usando PyTorch para procesamiento """ logger.info("Analizando picos de audio con GPU...") # Cargar audio con scipy import scipy.io.wavfile as wavfile sr, waveform = wavfile.read(audio_file) # Convertir a float waveform = waveform.astype(np.float32) / 32768.0 # Calcular RMS por ventana usando numpy frame_length = sr * window_seconds hop_length = sr # 1 segundo entre ventanas energies = [] for i in range(0, len(waveform) - frame_length, hop_length): chunk = waveform[i:i + frame_length] energy = np.sqrt(np.mean(chunk ** 2)) energies.append(energy) energies = np.array(energies) # Detectar picos mean_e = np.mean(energies) std_e = np.std(energies) logger.info(f"Audio stats: media={mean_e:.4f}, std={std_e:.4f}") audio_scores = {} for i, energy in enumerate(energies): if std_e > 0: z_score = (energy - mean_e) / std_e if z_score > threshold: audio_scores[i] = z_score logger.info(f"Picos de audio detectados: {len(audio_scores)}") return audio_scores def detect_video_peaks_fast(video_file, threshold=1.5, window_seconds=5): """ Detecta cambios de color/brillo (versión rápida, sin frames) """ logger.info("Analizando picos de color...") # Usar ffmpeg para obtener información de brillo por segundo # Esto es mucho más rápido que procesar frames result = subprocess.run([ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height,r_frame_rate,duration", "-of", "csv=p=0", video_file ], capture_output=True, text=True) # Extraer frames de referencia en baja resolución video_360 = video_file.replace('.mp4', '_temp_360.mp4') # Convertir a 360p para procesamiento rápido logger.info("Convirtiendo a 360p para análisis...") subprocess.run([ "ffmpeg", "-i", video_file, "-vf", "scale=-2:360", "-c:v", "libx264", "-preset", "fast", "-crf", "28", "-c:a", "copy", video_360, "-y" ], capture_output=True) # Extraer un frame cada N segundos frames_dir = Path("frames_temp") frames_dir.mkdir(exist_ok=True) subprocess.run([ "ffmpeg", "-i", video_360, "-vf", f"fps=1/{window_seconds}", f"{frames_dir}/frame_%04d.png", "-y" ], capture_output=True) # Procesar frames con PIL from PIL import Image import cv2 frame_files = sorted(frames_dir.glob("frame_*.png")) if not frame_files: logger.warning("No se pudieron extraer frames") return {} logger.info(f"Procesando {len(frame_files)} frames...") brightness_scores = [] for frame_file in frame_files: img = cv2.imread(str(frame_file)) hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) # Brillo = Value en HSV brightness = hsv[:,:,2].mean() # Saturación saturation = hsv[:,:,1].mean() # Score combinado score = (brightness / 255) + (saturation / 255) * 0.5 brightness_scores.append(score) brightness_scores = np.array(brightness_scores) # Detectar picos mean_b = np.mean(brightness_scores) std_b = np.std(brightness_scores) logger.info(f"Brillo stats: media={mean_b:.3f}, std={std_b:.3f}") color_scores = {} for i, score in enumerate(brightness_scores): if std_b > 0: z_score = (score - mean_b) / std_b if z_score > threshold: color_scores[i * window_seconds] = z_score # Limpiar subprocess.run(["rm", "-rf", str(frames_dir)]) subprocess.run(["rm", "-f", video_360], capture_output=True) logger.info(f"Picos de color detectados: {len(color_scores)}") return color_scores def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("--video", required=True, help="Video file") parser.add_argument("--chat", required=True, help="Chat JSON file") parser.add_argument("--output", default="highlights.json", help="Output JSON") parser.add_argument("--threshold", type=float, default=1.5, help="Threshold for peaks") parser.add_argument("--min-duration", type=int, default=10, help="Min highlight duration") parser.add_argument("--device", default="auto", help="Device: auto, cuda, cpu") args = parser.parse_args() # Determinar device if args.device == "auto": device = get_device() else: device = torch.device(args.device) logger.info(f"Usando device: {device}") # Cargar chat logger.info("Cargando chat...") with open(args.chat, 'r') as f: chat_data = json.load(f) # Extraer timestamps del chat chat_times = {} for comment in chat_data['comments']: second = int(comment['content_offset_seconds']) chat_times[second] = chat_times.get(second, 0) + 1 # Detectar picos de chat chat_values = list(chat_times.values()) mean_c = np.mean(chat_values) std_c = np.std(chat_values) logger.info(f"Chat stats: media={mean_c:.1f}, std={std_c:.1f}") chat_scores = {} max_chat = max(chat_values) if chat_values else 1 for second, count in chat_times.items(): if std_c > 0: z_score = (count - mean_c) / std_c if z_score > args.threshold: chat_scores[second] = z_score logger.info(f"Picos de chat: {len(chat_scores)}") # Extraer y analizar audio audio_file = "temp_audio.wav" extract_audio_gpu(args.video, audio_file) audio_scores = detect_audio_peaks_gpu(audio_file, args.threshold, device=str(device)) # Limpiar audio temporal Path(audio_file).unlink(missing_ok=True) # Analizar video video_scores = detect_video_peaks_fast(args.video, args.threshold) # Combinar scores (2 de 3) logger.info("Combinando scores (2 de 3)...") # Obtener duración total result = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrokey=1:nokey=1", args.video], capture_output=True, text=True ) duration = int(float(result.stdout.strip())) if result.stdout.strip() else 3600 # Normalizar scores max_audio = max(audio_scores.values()) if audio_scores else 1 max_video = max(video_scores.values()) if video_scores else 1 max_chat_norm = max(chat_scores.values()) if chat_scores else 1 # Unir segundos consecutivos highlights = [] for second in range(duration): points = 0 # Chat chat_point = chat_scores.get(second, 0) / max_chat_norm if max_chat_norm > 0 else 0 if chat_point > 0.5: points += 1 # Audio audio_point = audio_scores.get(second, 0) / max_audio if max_audio > 0 else 0 if audio_point > 0.5: points += 1 # Color video_point = video_scores.get(second, 0) / max_video if max_video > 0 else 0 if video_point > 0.5: points += 1 if points >= 2: highlights.append(second) # Crear intervalos intervals = [] if highlights: start = highlights[0] prev = highlights[0] for second in highlights[1:]: if second - prev > 1: if second - start >= args.min_duration: intervals.append((start, prev)) start = second prev = second if prev - start >= args.min_duration: intervals.append((start, prev)) logger.info(f"Highlights encontrados: {len(intervals)}") # Guardar with open(args.output, 'w') as f: json.dump(intervals, f) logger.info(f"Guardado en {args.output}") # Imprimir resumen print(f"\nHighlights ({len(intervals)} total):") for i, (s, e) in enumerate(intervals[:10]): print(f" {i+1}. {s}s - {e}s (duración: {e-s}s)") if __name__ == "__main__": main()