Files
twitch-highlight-detector/vlm_detector.py
renato97 00180d0b1c Sistema completo de detección de highlights con VLM y análisis de gameplay
- Implementación de detector híbrido (Whisper + Chat + Audio + VLM)
- Sistema de detección de gameplay real vs hablando
- Scene detection con FFmpeg
- Soporte para RTX 3050 y RX 6800 XT
- Guía completa en 6800xt.md para próxima IA
- Scripts de filtrado visual y análisis de contexto
- Pipeline automatizado de generación de videos
2026-02-19 17:38:14 +00:00

243 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""
VLM GAMEPLAY DETECTOR
Usa modelo de visión para detectar cuándo REALMENTE está jugando LoL.
Compatible con RTX 3050 (4GB VRAM)
"""
import json
import subprocess
import tempfile
from pathlib import Path
from PIL import Image
import io
class VLMGameplayDetector:
"""Detecta gameplay usando modelo de visión local."""
def __init__(self, model_name="moondream-2b"):
self.model_name = model_name
self.vlm = None
def load_model(self):
"""Carga el modelo VLM en GPU."""
try:
# Moondream - muy ligero, ideal para RTX 3050
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
print("Cargando Moondream en GPU...")
model_id = "vikhyatk/moondream2"
self.model = AutoModelForCausalLM.from_pretrained(
model_id,
trust_remote_code=True,
torch_dtype=torch.float16,
device_map={"": "cuda"}, # Usar GPU
)
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
print("✅ Modelo cargado en GPU")
return True
except Exception as e:
print(f"❌ Error cargando modelo: {e}")
print("Instala: pip install transformers torch")
return False
def extract_frame(self, video_path, timestamp):
"""Extrae un frame del video."""
cmd = [
"ffmpeg",
"-i",
video_path,
"-ss",
str(timestamp),
"-vframes",
"1",
"-f",
"image2pipe",
"-vcodec",
"png",
"pipe:1",
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0:
return Image.open(io.BytesIO(result.stdout))
return None
def analyze_frame(self, image, timestamp):
"""Analiza un frame con el VLM."""
if self.vlm is None:
return None
# Prompt específico para detectar gameplay
prompt = """Analiza esta imagen de un stream de videojuegos.
Responde ÚNICAMENTE con UNA de estas opciones:
1. JUGANDO_LEAGUE - Si se ve gameplay de League of Legends (mapa, campeones, habilidades)
2. SELECCION_CAMPEONES - Si está en lobby/selección de personajes
3. HABLANDO - Si solo se ve al streamer hablando sin gameplay visible
4. MENU/ESPERA - Si está en menús, tienda, o esperando
5. OTRO_JUEGO - Si está jugando otro juego diferente
Respuesta:"""
try:
# Moondream inference
enc_image = self.model.encode_image(image)
answer = self.model.answer_question(enc_image, prompt, self.tokenizer)
return {
"timestamp": timestamp,
"classification": answer.strip(),
"is_gameplay": "JUGANDO_LEAGUE" in answer,
}
except Exception as e:
print(f"Error analizando frame en {timestamp}s: {e}")
return None
def scan_video(self, video_path, interval=30):
"""
Escanea el video cada X segundos para detectar gameplay.
Args:
video_path: Ruta al video
interval: Analizar cada N segundos (default 30s)
"""
print(f"\n🔍 Escaneando video cada {interval}s...")
# Obtener duración
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
video_path,
],
capture_output=True,
text=True,
)
duration = float(result.stdout.strip())
print(f"Duración: {duration / 60:.1f} minutos")
gameplay_segments = []
current_segment_start = None
# Analizar frames cada 30 segundos
for timestamp in range(455, int(duration), interval): # Saltar intro
print(f"\nAnalizando {timestamp // 60}m {timestamp % 60}s...")
frame = self.extract_frame(video_path, timestamp)
if frame is None:
continue
analysis = self.analyze_frame(frame, timestamp)
if analysis is None:
continue
print(f" Resultado: {analysis['classification']}")
# Detectar cambios de estado
if analysis["is_gameplay"]:
if current_segment_start is None:
current_segment_start = timestamp
print(f" ✅ INICIO de gameplay")
else:
if current_segment_start is not None:
# Fin de segmento de gameplay
gameplay_segments.append(
{
"start": current_segment_start,
"end": timestamp,
"duration": timestamp - current_segment_start,
}
)
print(
f" ❌ FIN de gameplay ({timestamp - current_segment_start}s)"
)
current_segment_start = None
# Cerrar último segmento si quedó abierto
if current_segment_start is not None:
gameplay_segments.append(
{
"start": current_segment_start,
"end": int(duration),
"duration": int(duration) - current_segment_start,
}
)
return gameplay_segments
def save_gameplay_map(self, segments, output_file):
"""Guarda el mapa de segmentos de gameplay."""
with open(output_file, "w") as f:
json.dump(segments, f, indent=2)
print(f"\n{'=' * 60}")
print(f"MAPA DE GAMEPLAY GUARDADO")
print(f"{'=' * 60}")
print(f"Total segmentos: {len(segments)}")
total_gameplay = sum(s["duration"] for s in segments)
print(
f"Tiempo total de gameplay: {total_gameplay // 60}m {total_gameplay % 60}s"
)
for i, seg in enumerate(segments, 1):
mins_s, secs_s = divmod(seg["start"], 60)
mins_e, secs_e = divmod(seg["end"], 60)
print(
f"{i}. {mins_s:02d}:{secs_s:02d} - {mins_e:02d}:{secs_e:02d} "
f"({seg['duration'] // 60}m {seg['duration'] % 60}s)"
)
def main():
import argparse
parser = argparse.ArgumentParser(description="Detect gameplay using VLM")
parser.add_argument("--video", required=True, help="Video file to analyze")
parser.add_argument(
"--output", default="gameplay_segments_vlm.json", help="Output JSON file"
)
parser.add_argument(
"--interval",
type=int,
default=30,
help="Analysis interval in seconds (default: 30)",
)
args = parser.parse_args()
detector = VLMGameplayDetector()
# Cargar modelo
if not detector.load_model():
print("No se pudo cargar el modelo VLM")
return
# Escanear video
segments = detector.scan_video(args.video, args.interval)
# Guardar resultado
detector.save_gameplay_map(segments, args.output)
print(f"\nGuardado en: {args.output}")
print("\nUsa este archivo para filtrar highlights:")
print(" Solo buscar momentos dentro de estos rangos de tiempo")
if __name__ == "__main__":
main()