#!/usr/bin/env python3 """ Twitch Highlight Generator - ULTIMATE HYBRID VERSION Combina: Whisper (transcripción) + MiniMax (IA) + Chat + Audio + Video + Análisis de contenido Uso: python3 hybrid_detector.py --video stream.mp4 --chat chat.json --output highlights.mp4 """ import argparse import io import json import logging import os import re import subprocess import sys import tempfile import time from pathlib import Path from typing import List, Tuple, Dict import cv2 import numpy as np import torch import torch.nn.functional as F from openai import OpenAI logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") logger = logging.getLogger(__name__) class HybridHighlightDetector: """Detector híbrido que combina múltiples fuentes de datos.""" def __init__(self, device="cuda", api_key=None): self.device = ( torch.device(device) if torch.cuda.is_available() else torch.device("cpu") ) self.api_key = api_key or os.environ.get("OPENAI_API_KEY") self.client = None if self.api_key: base_url = os.environ.get("OPENAI_BASE_URL", "https://api.minimax.io/v1") self.client = OpenAI(base_url=base_url, api_key=self.api_key) logger.info(f"Detector híbrido inicializado en {self.device}") def get_device(self): """Obtiene el dispositivo (GPU o CPU).""" if torch.cuda.is_available(): logger.info(f"GPU detectada: {torch.cuda.get_device_name(0)}") return torch.device("cuda") return torch.device("cpu") def transcribe_with_whisper(self, video_file, model_size="base"): """Transcribe el video usando Whisper.""" try: import whisper logger.info(f"Cargando Whisper ({model_size})...") model = whisper.load_model(model_size, device=self.device) logger.info(f"Transcribiendo {video_file}...") result = model.transcribe( video_file, language="es", word_timestamps=True, verbose=False ) logger.info( f"Transcripción completada: {len(result['segments'])} segmentos" ) return result except Exception as e: logger.error(f"Error en Whisper: {e}") return None def analyze_with_minimax(self, segments: List[Dict]) -> List[Dict]: """Usa MiniMax para puntuar segmentos por calidad de contenido.""" if not self.client: logger.warning("No hay API key de MiniMax, saltando análisis de IA") return [] logger.info("Analizando contenido con MiniMax...") # Preparar batches de segmentos para análisis batches = [] batch = [] batch_text = [] for i, seg in enumerate(segments): text = seg["text"].strip() if len(text) > 10: # Ignorar segmentos muy cortos batch.append(seg) start_mins = int(seg["start"]) // 60 start_secs = int(seg["start"]) % 60 batch_text.append(f"[{start_mins:02d}:{start_secs:02d}] {text[:100]}") if len(batch) >= 20 or i == len(segments) - 1: if batch: batches.append((batch, "\n".join(batch_text))) batch = [] batch_text = [] scored_segments = [] for batch_idx, (batch_segments, batch_text) in enumerate( batches[:10] ): # Limitar a 10 batches try: prompt = f"""Analiza estos segmentos de un stream de League of Legends y puntúalos del 1 al 10 según: CRITERIOS DE PUNTUACIÓN: 10 = Momentos épicos: Pentakill, Baron steal, teamfight ganada, rage extremo, insultos graciosos 8-9 = Muy buenos: Buenas jugadas, kills importantes, reacciones fuertes 6-7 = Buenos: Jugadas decentes, comentarios interesantes 4-5 = Regulares: Gameplay normal, nada especial 1-3 = Malos: Silencio, repetición, aburrimiento SEGMENTOS A ANALIZAR: {batch_text} Responde SOLO con números del 1-10 separados por comas, uno por cada segmento. Ejemplo: 8,3,9,7,2,10,5,8,6,9 Puntuaciones:""" response = self.client.chat.completions.create( model="MiniMax-M2.5", messages=[ { "role": "system", "content": "Eres un experto editor de videos de gaming.", }, {"role": "user", "content": prompt}, ], temperature=0.1, max_tokens=100, ) scores_text = response.choices[0].message.content.strip() scores = [ int(s.strip()) for s in scores_text.split(",") if s.strip().isdigit() ] # Ajustar longitud while len(scores) < len(batch_segments): scores.append(5) scores = scores[: len(batch_segments)] for seg, score in zip(batch_segments, scores): scored_segments.append( { "start": seg["start"], "end": seg["end"], "text": seg["text"], "minimax_score": score, } ) logger.info( f"Batch {batch_idx + 1}/{len(batches)}: {len(scores)} segmentos puntuados" ) except Exception as e: logger.error(f"Error en MiniMax batch {batch_idx}: {e}") # Asignar score neutral for seg in batch_segments: scored_segments.append( { "start": seg["start"], "end": seg["end"], "text": seg["text"], "minimax_score": 5, } ) return scored_segments def detect_chat_peaks(self, chat_data, skip_intro=0): """Detecta picos de actividad en el chat.""" chat_times = {} for comment in chat_data["comments"]: second = int(comment["content_offset_seconds"]) if second >= skip_intro: chat_times[second] = chat_times.get(second, 0) + 1 if not chat_times: return {} values = list(chat_times.values()) mean_val = np.mean(values) std_val = np.std(values) chat_scores = {} for second, count in chat_times.items(): z_score = (count - mean_val) / (std_val + 1e-8) if z_score > 1.0: # Umbral más permisivo chat_scores[second] = z_score logger.info(f"Picos de chat: {len(chat_scores)}") return chat_scores def detect_audio_peaks(self, video_file, skip_intro=0): """Detecta picos de volumen en el audio.""" import soundfile as sf cmd = [ "ffmpeg", "-i", video_file, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-f", "wav", "pipe:1", "-y", "-threads", "4", ] result = subprocess.run(cmd, capture_output=True) waveform, sr = sf.read(io.BytesIO(result.stdout), dtype="float32") # Saltar intro skip_samples = skip_intro * sr if len(waveform) > skip_samples: waveform = waveform[skip_samples:] # Calcular energía por ventanas de 1 segundo window_size = sr energies = [] for i in range(0, len(waveform) - window_size, window_size): window = waveform[i : i + window_size] energy = np.sqrt(np.mean(window**2)) energies.append(energy) # Detectar picos mean_e = np.mean(energies) std_e = np.std(energies) audio_scores = {} for i, energy in enumerate(energies): z_score = (energy - mean_e) / (std_e + 1e-8) if z_score > 1.5: audio_scores[i] = z_score logger.info(f"Picos de audio: {len(audio_scores)}") return audio_scores def detect_keyword_moments(self, transcription): """Detecta momentos con palabras clave (insultos, rage, etc).""" if not transcription: return {} keywords = { "rage_extreme": [ "puta madre", "retrasado", "imbecil", "estupido", "mongolo", "inutil", ], "epic_plays": [ "pentakill", "baron steal", "drag steal", "triple", "quadra", "ace", ], "laughter": ["jajaja", "jejeje", "risa", "carcajada"], "death": ["me mataron", "me mori", "muerto", "kill", "mate"], "skills": ["ulti", "flash", "ignite", "exhaust"], } keyword_scores = {} for seg in transcription.get("segments", []): text = seg["text"].lower() score = 0 for category, words in keywords.items(): for word in words: if word in text: if category == "rage_extreme": score += 3 elif category == "epic_plays": score += 3 elif category == "laughter": score += 2 else: score += 1 if score > 0: keyword_scores[int(seg["start"])] = { "score": score, "text": seg["text"][:50], } logger.info(f"Momentos con keywords: {len(keyword_scores)}") return keyword_scores def extend_clip_smart(self, start, end, transcription, min_extend=5, max_extend=15): """Exiende clips inteligentemente basado en transcripción.""" if not transcription: return start, end original_duration = end - start # Buscar si hay contenido interesante justo después extend_end = 0 for seg in transcription.get("segments", []): seg_start = seg["start"] seg_end = seg["end"] # Si el segmento está justo después del clip if end <= seg_start <= end + max_extend: text = seg["text"].lower() # Palabras que indican que la acción continúa if any( word in text for word in ["joder", "puta", "mierda", "no", "omg", "dios"] ): extend_end = max(extend_end, int(seg_end - end)) # Limitar extensión extend_end = min(extend_end, max_extend) extend_end = max(extend_end, min_extend) # Al menos min_extend new_end = end + extend_end return start, new_end def combine_all_sources( self, chat_scores, audio_scores, keyword_scores, minimax_scores, duration, min_duration=8, ): """Combina todas las fuentes en un score final.""" # Crear diccionario de scores por segundo all_scores = {} # Ponderaciones weights = {"chat": 1.0, "audio": 0.8, "keywords": 1.5, "minimax": 1.2} # Agregar chat scores for sec, score in chat_scores.items(): if sec not in all_scores: all_scores[sec] = {} all_scores[sec]["chat"] = score # Agregar audio scores for sec, score in audio_scores.items(): if sec not in all_scores: all_scores[sec] = {} all_scores[sec]["audio"] = score # Agregar keyword scores for sec, data in keyword_scores.items(): if sec not in all_scores: all_scores[sec] = {} all_scores[sec]["keywords"] = data["score"] # Agregar minimax scores (conversión a intervalos) for seg in minimax_scores: start = int(seg["start"]) score = seg["minimax_score"] / 10.0 # Normalizar a 0-1 if start not in all_scores: all_scores[start] = {} all_scores[start]["minimax"] = score # Calcular score combinado combined_scores = {} for sec, scores in all_scores.items(): total = 0 total_weight = 0 for source, weight in weights.items(): if source in scores: total += scores[source] * weight total_weight += weight if total_weight > 0: combined_scores[sec] = total / total_weight # Crear intervalos (más permisivo: gap de 5s, min 5s duración) intervals = [] sorted_seconds = sorted(combined_scores.keys()) if not sorted_seconds: return [] start = sorted_seconds[0] prev = sorted_seconds[0] for sec in sorted_seconds[1:]: if sec - prev > 5: # Gap de 5 segundos (más tolerante) if prev - start >= 5: # Mínimo 5 segundos intervals.append((start, prev)) start = sec prev = sec if prev - start >= 5: intervals.append((start, prev)) # Ordenar por duración y score intervals_with_score = [] for s, e in intervals: avg_score = np.mean([combined_scores.get(i, 0) for i in range(s, e)]) intervals_with_score.append((s, e, avg_score, e - s)) # Ordenar por score combinado intervals_with_score.sort(key=lambda x: -x[2]) # Tomar top 30 (más contenido) top_intervals = [(s, e) for s, e, _, _ in intervals_with_score[:30]] top_intervals.sort() return top_intervals def detect( self, video_file, chat_file, skip_intro=455, use_whisper=True, use_minimax=False, whisper_model="base", ): """Ejecuta la detección completa.""" logger.info("=" * 60) logger.info("DETECCIÓN HÍBRIDA - FASE 1: RECOPILACIÓN DE DATOS") logger.info("=" * 60) # 1. Cargar chat with open(chat_file, "r") as f: chat_data = json.load(f) chat_scores = self.detect_chat_peaks(chat_data, skip_intro) # 2. Audio audio_scores = self.detect_audio_peaks(video_file, skip_intro) # 3. Whisper (opcional) transcription = None if use_whisper: transcription = self.transcribe_with_whisper(video_file, whisper_model) # 4. Keywords keyword_scores = {} if transcription: keyword_scores = self.detect_keyword_moments(transcription) # 5. MiniMax (opcional) minimax_scores = [] if use_minimax and transcription: minimax_scores = self.analyze_with_minimax(transcription["segments"]) logger.info("=" * 60) logger.info("FASE 2: COMBINACIÓN Y FILTRADO") logger.info("=" * 60) # Obtener duración result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_file, ], capture_output=True, text=True, ) duration = int(float(result.stdout.strip())) if result.stdout.strip() else 3600 # Combinar intervals = self.combine_all_sources( chat_scores, audio_scores, keyword_scores, minimax_scores, duration, min_duration=8, ) logger.info(f"Highlights base: {len(intervals)}") # 6. Extensión inteligente logger.info("=" * 60) logger.info("FASE 3: EXTENSIÓN INTELIGENTE") logger.info("=" * 60) extended_intervals = [] for start, end in intervals: new_start, new_end = self.extend_clip_smart(start, end, transcription) extended_intervals.append((new_start, new_end)) original_dur = end - start new_dur = new_end - new_start if new_dur > original_dur: logger.info( f"Clip extendido: {start}s-{end}s → {new_start}s-{new_end}s " f"(+{new_dur - original_dur}s)" ) return extended_intervals def create_video(video_file, highlights, output_file, padding=3): """Crea el video final.""" if not highlights: logger.error("No hay highlights para generar video") return logger.info("=" * 60) logger.info("GENERANDO VIDEO FINAL") logger.info("=" * 60) # Obtener duración result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_file, ], capture_output=True, text=True, ) duration = float(result.stdout.strip()) if result.stdout.strip() else 3600 # Crear lista de concatenación concat_file = tempfile.mktemp(suffix=".txt") with open(concat_file, "w") as f: for start, end in highlights: start_pad = max(0, start - padding) end_pad = min(duration, end + padding) f.write(f"file '{video_file}'\n") f.write(f"inpoint {start_pad}\n") f.write(f"outpoint {end_pad}\n") cmd = [ "ffmpeg", "-f", "concat", "-safe", "0", "-i", concat_file, "-c", "copy", "-y", output_file, ] subprocess.run(cmd, capture_output=True) Path(concat_file).unlink() logger.info(f"Video generado: {output_file}") def main(): import os parser = argparse.ArgumentParser(description="Hybrid Highlight Detector") parser.add_argument("--video", required=True, help="Video file") parser.add_argument("--chat", required=True, help="Chat JSON file") parser.add_argument( "--output", default="highlights_hybrid.mp4", help="Output video" ) parser.add_argument( "--skip-intro", type=int, default=455, help="Skip intro seconds" ) parser.add_argument("--whisper-model", default="base", help="Whisper model size") parser.add_argument("--use-whisper", action="store_true", help="Enable Whisper") parser.add_argument("--use-minimax", action="store_true", help="Enable MiniMax") parser.add_argument("--api-key", help="MiniMax API key") parser.add_argument("--padding", type=int, default=3, help="Video padding") args = parser.parse_args() # Crear detector detector = HybridHighlightDetector( device="cuda" if torch.cuda.is_available() else "cpu", api_key=args.api_key ) # Detectar highlights = detector.detect( args.video, args.chat, skip_intro=args.skip_intro, use_whisper=args.use_whisper, use_minimax=args.use_minimax, whisper_model=args.whisper_model, ) # Guardar JSON json_file = args.output.replace(".mp4", ".json") with open(json_file, "w") as f: json.dump(highlights, f) logger.info(f"Highlights guardados: {json_file}") # Mostrar resumen print(f"\n{'=' * 60}") print(f"HIGHLIGHTS DETECTADOS ({len(highlights)} clips)") print(f"{'=' * 60}") for i, (s, e) in enumerate(highlights, 1): mins = s // 60 secs = s % 60 dur = e - s print(f"{i:2d}. {mins:02d}:{secs:02d} - {dur}s") print(f"{'=' * 60}") # Generar video create_video(args.video, highlights, args.output, args.padding) if __name__ == "__main__": main()