""" Gemini AI Provider - Optimized version with rate limiting and retry """ import logging import subprocess import shutil import requests import time from typing import Dict, Any, Optional from datetime import datetime, timedelta from config import settings from core import AIProcessingError from .base_provider import AIProvider class TokenBucket: """Token bucket rate limiter""" def __init__(self, rate: float = 10, capacity: int = 20): self.rate = rate # tokens per second self.capacity = capacity self.tokens = capacity self.last_update = time.time() self._lock = None # Lazy initialization def _get_lock(self): if self._lock is None: import threading self._lock = threading.Lock() return self._lock def acquire(self, tokens: int = 1) -> float: with self._get_lock(): now = time.time() elapsed = now - self.last_update self.last_update = now self.tokens = min(self.capacity, self.tokens + elapsed * self.rate) if self.tokens >= tokens: self.tokens -= tokens return 0.0 wait_time = (tokens - self.tokens) / self.rate self.tokens = 0 return wait_time class CircuitBreaker: """Circuit breaker for API calls""" def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.failures = 0 self.last_failure: Optional[datetime] = None self.state = "closed" # closed, open, half-open self._lock = None def _get_lock(self): if self._lock is None: import threading self._lock = threading.Lock() return self._lock def call(self, func, *args, **kwargs): with self._get_lock(): if self.state == "open": if self.last_failure and (datetime.utcnow() - self.last_failure).total_seconds() > self.recovery_timeout: self.state = "half-open" else: raise AIProcessingError("Circuit breaker is open") try: result = func(*args, **kwargs) if self.state == "half-open": self.state = "closed" self.failures = 0 return result except Exception as e: self.failures += 1 self.last_failure = datetime.utcnow() if self.failures >= self.failure_threshold: self.state = "open" raise class GeminiProvider(AIProvider): """Gemini AI provider with rate limiting and retry""" def __init__(self): super().__init__() self.logger = logging.getLogger(__name__) self._cli_path = settings.GEMINI_CLI_PATH or shutil.which("gemini") self._api_key = settings.GEMINI_API_KEY self._flash_model = settings.GEMINI_FLASH_MODEL self._pro_model = settings.GEMINI_PRO_MODEL self._session = None self._rate_limiter = TokenBucket(rate=15, capacity=30) self._circuit_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60) self._retry_config = { "max_attempts": 3, "base_delay": 1.0, "max_delay": 30.0, "exponential_base": 2 } @property def name(self) -> str: return "Gemini" def is_available(self) -> bool: """Check if Gemini CLI or API is available""" return bool(self._cli_path or self._api_key) def _init_session(self) -> None: """Initialize HTTP session with connection pooling""" if self._session is None: self._session = requests.Session() adapter = requests.adapters.HTTPAdapter( pool_connections=10, pool_maxsize=20, max_retries=0 # We handle retries manually ) self._session.mount('https://', adapter) def _run_with_retry(self, func, *args, **kwargs): """Execute function with exponential backoff retry""" max_attempts = self._retry_config["max_attempts"] base_delay = self._retry_config["base_delay"] last_exception = None for attempt in range(max_attempts): try: return self._circuit_breaker.call(func, *args, **kwargs) except requests.exceptions.RequestException as e: last_exception = e if attempt < max_attempts - 1: delay = min( base_delay * (2 ** attempt), self._retry_config["max_delay"] ) # Add jitter delay += delay * 0.1 * (time.time() % 1) self.logger.warning(f"Attempt {attempt + 1} failed: {e}, retrying in {delay:.2f}s") time.sleep(delay) raise AIProcessingError(f"Max retries exceeded: {last_exception}") def _run_cli(self, prompt: str, use_flash: bool = True, timeout: int = 300) -> str: """Run Gemini CLI with prompt""" if not self._cli_path: raise AIProcessingError("Gemini CLI not available") model = self._flash_model if use_flash else self._pro_model cmd = [self._cli_path, model, prompt] try: # Apply rate limiting wait_time = self._rate_limiter.acquire() if wait_time > 0: time.sleep(wait_time) process = subprocess.run( cmd, text=True, capture_output=True, timeout=timeout, shell=False ) if process.returncode != 0: error_msg = process.stderr or "Unknown error" raise AIProcessingError(f"Gemini CLI failed: {error_msg}") return process.stdout.strip() except subprocess.TimeoutExpired: raise AIProcessingError(f"Gemini CLI timed out after {timeout}s") except Exception as e: raise AIProcessingError(f"Gemini CLI error: {e}") def _call_api(self, prompt: str, use_flash: bool = True, timeout: int = 180) -> str: """Call Gemini API with rate limiting and retry""" if not self._api_key: raise AIProcessingError("Gemini API key not configured") self._init_session() model = self._flash_model if use_flash else self._pro_model url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent" payload = { "contents": [{ "parts": [{"text": prompt}] }] } params = {"key": self._api_key} def api_call(): # Apply rate limiting wait_time = self._rate_limiter.acquire() if wait_time > 0: time.sleep(wait_time) response = self._session.post( url, json=payload, params=params, timeout=timeout ) response.raise_for_status() return response response = self._run_with_retry(api_call) data = response.json() if "candidates" not in data or not data["candidates"]: raise AIProcessingError("Empty response from Gemini API") candidate = data["candidates"][0] if "content" not in candidate or "parts" not in candidate["content"]: raise AIProcessingError("Invalid response format from Gemini API") result = candidate["content"]["parts"][0]["text"] return result.strip() def _run(self, prompt: str, use_flash: bool = True, timeout: int = 300) -> str: """Run Gemini with fallback between CLI and API""" # Try CLI first if available if self._cli_path: try: return self._run_cli(prompt, use_flash, timeout) except Exception as e: self.logger.warning(f"Gemini CLI failed, trying API: {e}") # Fallback to API if self._api_key: api_timeout = min(timeout, 180) return self._call_api(prompt, use_flash, api_timeout) raise AIProcessingError("No Gemini provider available (CLI or API)") def summarize(self, text: str, **kwargs) -> str: """Generate summary using Gemini""" prompt = f"""Summarize the following text: {text} Provide a clear, concise summary in Spanish.""" return self._run(prompt, use_flash=True) def correct_text(self, text: str, **kwargs) -> str: """Correct text using Gemini""" prompt = f"""Correct the following text for grammar, spelling, and clarity: {text} Return only the corrected text, nothing else.""" return self._run(prompt, use_flash=True) def classify_content(self, text: str, **kwargs) -> Dict[str, Any]: """Classify content using Gemini""" categories = ["historia", "analisis_contable", "instituciones_gobierno", "otras_clases"] prompt = f"""Classify the following text into one of these categories: - historia - analisis_contable - instituciones_gobierno - otras_clases Text: {text} Return only the category name, nothing else.""" result = self._run(prompt, use_flash=True).lower() # Validate result if result not in categories: result = "otras_clases" return { "category": result, "confidence": 0.9, "provider": self.name } def generate_text(self, prompt: str, **kwargs) -> str: """Generate text using Gemini""" use_flash = kwargs.get('use_flash', True) if self._api_key: return self._call_api(prompt, use_flash=use_flash) return self._call_cli(prompt, use_yolo=True) def get_stats(self) -> Dict[str, Any]: """Get provider statistics""" return { "rate_limiter": { "tokens": round(self._rate_limiter.tokens, 2), "capacity": self._rate_limiter.capacity, "rate": self._rate_limiter.rate }, "circuit_breaker": { "state": self._circuit_breaker.state, "failures": self._circuit_breaker.failures, "failure_threshold": self._circuit_breaker.failure_threshold }, "cli_available": bool(self._cli_path), "api_available": bool(self._api_key) } # Global instance is created in __init__.py