Files
twitch-highlight-detector/detect_gameplay.py
renato97 00180d0b1c Sistema completo de detección de highlights con VLM y análisis de gameplay
- Implementación de detector híbrido (Whisper + Chat + Audio + VLM)
- Sistema de detección de gameplay real vs hablando
- Scene detection con FFmpeg
- Soporte para RTX 3050 y RX 6800 XT
- Guía completa en 6800xt.md para próxima IA
- Scripts de filtrado visual y análisis de contexto
- Pipeline automatizado de generación de videos
2026-02-19 17:38:14 +00:00

265 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""
VLM GAMEPLAY DETECTOR - Standalone version
No requiere instalación de transformers, usa Moondream directamente
"""
import json
import subprocess
import sys
import os
from pathlib import Path
import urllib.request
import tarfile
def download_moondream():
"""Descarga Moondream si no existe."""
model_dir = Path("moondream_model")
if model_dir.exists():
print("✅ Modelo Moondream ya descargado")
return model_dir
print("📥 Descargando Moondream...")
model_dir.mkdir(exist_ok=True)
# URL del modelo (version INT8 cuantizada para ahorrar VRAM)
url = "https://huggingface.co/vikhyatk/moondream2/resolve/main/moondream-2b-int8.mf"
try:
urllib.request.urlretrieve(url, model_dir / "model.mf")
print("✅ Modelo descargado")
return model_dir
except Exception as e:
print(f"❌ Error descargando: {e}")
print("Intentando con wget...")
subprocess.run(
["wget", "-q", "-O", str(model_dir / "model.mf"), url], check=True
)
return model_dir
def simple_gameplay_detector(video_path):
"""
Detector simple usando análisis de frames.
No requiere VLM complejo, usa heurísticas visuales básicas.
"""
print(f"\n🔍 Analizando {video_path}...")
# Obtener duración
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
video_path,
],
capture_output=True,
text=True,
)
duration = float(result.stdout.strip())
print(f"Duración: {duration / 60:.1f} minutos")
# Analizar frames cada 60 segundos para detectar gameplay
gameplay_segments = []
check_interval = 60 # Cada minuto
print(f"\nAnalizando frames cada {check_interval}s...")
print("(Esto detecta cuando hay gameplay real vs hablando)")
last_was_gameplay = False
segment_start = None
for timestamp in range(455, int(duration), check_interval):
# Extraer frame
frame_file = f"/tmp/frame_{timestamp}.jpg"
subprocess.run(
[
"ffmpeg",
"-y",
"-i",
video_path,
"-ss",
str(timestamp),
"-vframes",
"1",
"-q:v",
"2",
frame_file,
],
capture_output=True,
)
if not Path(frame_file).exists():
continue
# Analizar frame con ffprobe (simple)
# Detectar si hay movimiento/cambios (indica gameplay)
# vs imagen estática (indica hablando/menu)
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"frame=pkt_pts_time,pict_type",
"-of",
"json",
"-i",
video_path,
"-read_intervals",
f"{timestamp}%+0.1",
],
capture_output=True,
text=True,
)
try:
frame_info = json.loads(result.stdout)
frames = frame_info.get("frames", [])
# Heurística: Si hay frames P (predictivos) = hay movimiento = gameplay
# Si solo hay frames I (intra) = imagen estática = menu/hablando
has_movement = any(f.get("pict_type") == "P" for f in frames)
# También verificar si hay cambios de escena recientes
scene_check = subprocess.run(
[
"ffmpeg",
"-i",
video_path,
"-ss",
str(max(0, timestamp - 5)),
"-t",
"5",
"-vf",
"select=gt(scene\,0.3)",
"-vsync",
"vfr",
"-f",
"null",
"-",
],
capture_output=True,
)
scene_changes = scene_check.stderr.decode().count("scene")
is_likely_gameplay = has_movement or scene_changes > 0
status = "🎮" if is_likely_gameplay else "🗣️"
print(f" {timestamp // 60:02d}:{timestamp % 60:02d} {status}", end="")
if is_likely_gameplay:
if not last_was_gameplay:
segment_start = timestamp
last_was_gameplay = True
print(" INICIO")
else:
print("")
else:
if last_was_gameplay and segment_start:
gameplay_segments.append(
{
"start": segment_start,
"end": timestamp,
"duration": timestamp - segment_start,
}
)
print(f" FIN ({timestamp - segment_start}s)")
segment_start = None
last_was_gameplay = False
except:
print(f" {timestamp // 60:02d}:{timestamp % 60:02d}")
# Limpiar frame temporal
Path(frame_file).unlink(missing_ok=True)
# Cerrar último segmento
if last_was_gameplay and segment_start:
gameplay_segments.append(
{
"start": segment_start,
"end": int(duration),
"duration": int(duration) - segment_start,
}
)
return gameplay_segments
def filter_moments_by_gameplay(rage_moments_file, gameplay_segments):
"""Filtra momentos de rage para mantener solo los en gameplay."""
with open(rage_moments_file, "r") as f:
all_moments = json.load(f)
filtered = []
for moment in all_moments:
moment_time = moment.get("time", moment.get("start", 0))
# Verificar si está en gameplay
in_gameplay = False
for seg in gameplay_segments:
if seg["start"] <= moment_time <= seg["end"]:
in_gameplay = True
break
if in_gameplay:
filtered.append(moment)
return filtered
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--video", default="nuevo_stream_360p.mp4")
parser.add_argument("--output", default="gameplay_segments.json")
args = parser.parse_args()
print("=" * 60)
print("GAMEPLAY DETECTOR (Heurísticas Visuales)")
print("=" * 60)
print("Analizando movimiento en video para detectar gameplay...")
# Detectar segmentos
segments = simple_gameplay_detector(args.video)
# Guardar
with open(args.output, "w") as f:
json.dump(segments, f, indent=2)
print(f"\n{'=' * 60}")
print(f"RESULTADO")
print(f"{'=' * 60}")
print(f"Segmentos de gameplay: {len(segments)}")
total = sum(s["duration"] for s in segments)
print(f"Tiempo total: {total // 60}m {total % 60}s")
for i, seg in enumerate(segments, 1):
mins_s, secs_s = divmod(seg["start"], 60)
mins_e, secs_e = divmod(seg["end"], 60)
print(
f"{i}. {mins_s:02d}:{secs_s:02d} - {mins_e:02d}:{secs_e:02d} "
f"({seg['duration'] // 60}m {seg['duration'] % 60}s)"
)
print(f"\nGuardado en: {args.output}")
if __name__ == "__main__":
main()