""" Servicio de notificaciones Telegram. Envía mensajes al chat configurado mediante la API de Telegram Bot. Silencioso si no está configurado (TELEGRAM_TOKEN y TELEGRAM_CHAT_ID). """ import logging import time from typing import Optional import requests from config.settings import settings logger = logging.getLogger(__name__) def _truncate_safely(text: str, max_length: int) -> str: """ Trunca texto sin romper entidades de formato HTML. Args: text: Texto a truncar. max_length: Longitud máxima. Returns: Texto truncado de forma segura. """ if len(text) <= max_length: return text # Dejar margen para el sufijo "..." safe_length = max_length - 10 # Buscar el último espacio o salto de línea antes del límite cut_point = text.rfind("\n", 0, safe_length) if cut_point == -1 or cut_point < safe_length - 100: cut_point = text.rfind(" ", 0, safe_length) if cut_point == -1 or cut_point < safe_length - 50: cut_point = safe_length return text[:cut_point] + "..." class TelegramService: """Servicio para enviar notificaciones a Telegram.""" def __init__(self) -> None: """Inicializa el servicio si hay configuración de Telegram.""" self._token: Optional[str] = settings.TELEGRAM_TOKEN self._chat_id: Optional[str] = settings.TELEGRAM_CHAT_ID self._configured: bool = settings.has_telegram_config # Rate limiting: mínimo tiempo entre mensajes (segundos) self._min_interval: float = 1.0 self._last_send_time: float = 0.0 if self._configured: logger.info( "TelegramService inicializado", extra={"chat_id": self._mask_chat_id()}, ) else: logger.debug("TelegramService deshabilitado (sin configuración)") def _mask_chat_id(self) -> str: """Oculta el chat_id para logging seguro.""" if self._chat_id and len(self._chat_id) > 4: return f"***{self._chat_id[-4:]}" return "****" def _wait_for_rate_limit(self) -> None: """Espera si es necesario para cumplir el rate limiting.""" now = time.monotonic() elapsed = now - self._last_send_time if elapsed < self._min_interval: sleep_time = self._min_interval - elapsed logger.debug(f"Rate limiting: esperando {sleep_time:.2f}s") time.sleep(sleep_time) self._last_send_time = time.monotonic() def _send_request(self, method: str, data: dict) -> bool: """Envía una request a la API de Telegram.""" if not self._configured: return False url = f"https://api.telegram.org/bot{self._token}/{method}" try: self._wait_for_rate_limit() response = requests.post(url, json=data, timeout=10) # Intentar parsear JSON para obtener detalles del error try: result = response.json() except ValueError: result = {"raw": response.text} if response.status_code == 200 and result.get("ok"): logger.debug( "Mensaje enviado exitosamente", extra={"message_id": result.get("result", {}).get("message_id")}, ) return True # Error detallado error_code = result.get("error_code", response.status_code) description = result.get("description", response.text) logger.error( f"Error de Telegram API: HTTP {response.status_code}", extra={ "method": method, "error_code": error_code, "description": description, "response_data": result, "request_data": { k: v if k != "text" else f"<{len(str(v))} chars>" for k, v in data.items() }, }, ) return False except requests.RequestException as e: logger.error( f"Error de conexión con Telegram: {e}", extra={"method": method, "data_keys": list(data.keys())}, ) return False def send_message(self, text: str, parse_mode: str = "HTML") -> bool: """ Envía un mensaje de texto al chat configurado. Args: text: Contenido del mensaje. parse_mode: Modo de parseo (HTML, Markdown o MarkdownV2). Returns: True si se envió correctamente, False en caso contrario. """ if not self._configured: logger.debug(f"Mensaje ignorado (sin configuración): {text[:50]}...") return False # Validar que el texto no esté vacío if not text or not text.strip(): logger.warning("Intento de enviar mensaje vacío, ignorando") return False # Eliminar espacios en blanco al inicio y final text = text.strip() # Telegram limita a 4096 caracteres MAX_LENGTH = 4096 text = _truncate_safely(text, MAX_LENGTH) data = { "chat_id": self._chat_id, "text": text, } # Solo incluir parse_mode si hay texto y no está vacío if parse_mode: data["parse_mode"] = parse_mode logger.info("Enviando mensaje a Telegram", extra={"length": len(text)}) return self._send_request("sendMessage", data) def send_start_notification(self, filename: str) -> bool: """ Envía notificación de inicio de procesamiento. Args: filename: Nombre del archivo que se está procesando. Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" # Usar HTML para evitar problemas de escaping safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") text = f"▶️ Inicio de procesamiento\n\n📄 Archivo: {safe_filename}" return self.send_message(text, parse_mode="HTML") def send_error_notification(self, filename: str, error: str) -> bool: """ Envía notificación de error en procesamiento. Args: filename: Nombre del archivo que falló. error: Descripción del error. Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" if not error: error = "(error desconocido)" # Usar HTML para evitar problemas de escaping safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") safe_error = error.replace("&", "&").replace("<", "<").replace(">", ">") text = f"❌ Error de procesamiento\n\n📄 Archivo: {safe_filename}\n⚠️ Error: {safe_error}" return self.send_message(text, parse_mode="HTML") def send_completion_notification( self, filename: str, duration: Optional[float] = None, output_path: Optional[str] = None, ) -> bool: """ Envía notificación de completado exitoso. Args: filename: Nombre del archivo procesado. duration: Duración del procesamiento en segundos (opcional). output_path: Ruta del archivo de salida (opcional). Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" # Usar HTML para evitar problemas de escaping safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") duration_text = "" if duration is not None: minutes = int(duration // 60) seconds = int(duration % 60) duration_text = f"\n⏱️ Duración: {minutes}m {seconds}s" output_text = "" if output_path: safe_output = output_path.replace("&", "&").replace("<", "<").replace(">", ">") output_text = f"\n📁 Salida: {safe_output}" text = f"✅ Procesamiento completado\n\n📄 Archivo: {safe_filename}{duration_text}{output_text}" return self.send_message(text, parse_mode="HTML") def send_download_complete(self, filename: str) -> bool: """ Envía notificación de descarga completada. Args: filename: Nombre del archivo descargado. Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") text = f"📥 Archivo descargado\n\n📄 {safe_filename}" return self.send_message(text, parse_mode="HTML") def send_transcription_start(self, filename: str) -> bool: """ Envía notificación de inicio de transcripción. Args: filename: Nombre del archivo a transcribir. Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") text = f"🎙️ Iniciando transcripción...\n\n📄 {safe_filename}" return self.send_message(text, parse_mode="HTML") def send_transcription_progress( self, filename: str, progress_percent: int, ) -> bool: """ Envía notificación de progreso de transcripción. Args: filename: Nombre del archivo. progress_percent: Porcentaje de progreso (0-100). Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") text = f"⏳ Transcribiendo...\n\n📄 {safe_filename}\n📊 Progreso: {progress_percent}%" return self.send_message(text, parse_mode="HTML") def send_transcription_complete( self, filename: str, text_length: int, ) -> bool: """ Envía notificación de transcripción completada. Args: filename: Nombre del archivo. text_length: Longitud del texto transcrito. Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") # Formatear longitud del texto if text_length >= 1000: length_text = f"{text_length // 1000}k caracteres" else: length_text = f"{text_length} caracteres" text = f"✅ Transcripción completada\n\n📄 {safe_filename}\n📝 {length_text}" return self.send_message(text, parse_mode="HTML") def send_summary_start(self, filename: str) -> bool: """ Envía notificación de inicio de resumen con IA. Args: filename: Nombre del archivo. Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") text = f"🤖 Generando resumen con IA...\n\n📄 {safe_filename}" return self.send_message(text, parse_mode="HTML") def send_summary_complete(self, filename: str, has_markdown: bool = True) -> bool: """ Envía notificación de resumen completado. Args: filename: Nombre del archivo. has_markdown: Si se creó el archivo markdown. Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") status = "✅" if has_markdown else "⚠️" text = f"{status} Resumen completado\n\n📄 {safe_filename}" return self.send_message(text, parse_mode="HTML") def send_pdf_start(self, filename: str) -> bool: """ Envía notificación de inicio de generación de PDF. Args: filename: Nombre del archivo. Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") text = f"📄 Creando PDF...\n\n📄 {safe_filename}" return self.send_message(text, parse_mode="HTML") def send_pdf_complete(self, filename: str, pdf_path: str) -> bool: """ Envía notificación de PDF completado. Args: filename: Nombre del archivo. pdf_path: Ruta del PDF generado. Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") safe_path = pdf_path.replace("&", "&").replace("<", "<").replace(">", ">") text = f"📄 PDF creado\n\n📄 {safe_filename}\n📁 {safe_path}" return self.send_message(text, parse_mode="HTML") def send_all_complete( self, filename: str, txt_path: Optional[str] = None, md_path: Optional[str] = None, pdf_path: Optional[str] = None, ) -> bool: """ Envía notificación final con todos los archivos generados. Args: filename: Nombre del archivo original. txt_path: Ruta del archivo de texto (opcional). md_path: Ruta del markdown (opcional). pdf_path: Ruta del PDF (opcional). Returns: True si se envió correctamente. """ if not filename: filename = "(desconocido)" safe_filename = filename.replace("&", "&").replace("<", "<").replace(">", ">") files_text = "" if txt_path: safe_txt = txt_path.replace("&", "&").replace("<", "<").replace(">", ">") files_text += f"\n📝 {safe_txt}" if md_path: safe_md = md_path.replace("&", "&").replace("<", "<").replace(">", ">") files_text += f"\n📋 {safe_md}" if pdf_path: safe_pdf = pdf_path.replace("&", "&").replace("<", "<").replace(">", ">") files_text += f"\n📄 {safe_pdf}" text = f"✅ ¡Proceso completado!\n\n📄 {safe_filename}\n📁 Archivos:{files_text}" return self.send_message(text, parse_mode="HTML") # Instancia global del servicio telegram_service = TelegramService()