356 lines
12 KiB
Python
356 lines
12 KiB
Python
"""
|
|
Health check endpoint for CBCFacil service monitoring
|
|
"""
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, Any, List, Optional
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HealthChecker:
|
|
"""Comprehensive health check for all service dependencies"""
|
|
|
|
def __init__(self):
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def check_webdav_connection(self) -> Dict[str, Any]:
|
|
"""Check WebDAV service connectivity"""
|
|
from config import settings
|
|
|
|
result = {
|
|
"service": "webdav",
|
|
"status": "unknown",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
try:
|
|
from services.webdav_service import webdav_service
|
|
|
|
if not settings.has_webdav_config:
|
|
result["status"] = "not_configured"
|
|
result["message"] = "WebDAV credentials not configured"
|
|
return result
|
|
|
|
# Test connection with a simple list operation
|
|
webdav_service.list(".")
|
|
|
|
result["status"] = "healthy"
|
|
result["message"] = "WebDAV connection successful"
|
|
result["endpoint"] = settings.NEXTCLOUD_URL
|
|
|
|
except Exception as e:
|
|
result["status"] = "unhealthy"
|
|
result["error"] = str(e)
|
|
self.logger.error(f"WebDAV health check failed: {e}")
|
|
|
|
return result
|
|
|
|
def check_ai_providers(self) -> Dict[str, Any]:
|
|
"""Check AI provider configurations"""
|
|
from config import settings
|
|
|
|
result = {
|
|
"service": "ai_providers",
|
|
"status": "unknown",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"providers": {}
|
|
}
|
|
|
|
try:
|
|
# Check ZAI
|
|
if settings.ZAI_AUTH_TOKEN:
|
|
result["providers"]["zai"] = {
|
|
"configured": True,
|
|
"status": "unknown"
|
|
}
|
|
else:
|
|
result["providers"]["zai"] = {
|
|
"configured": False,
|
|
"status": "not_configured"
|
|
}
|
|
|
|
# Check Gemini
|
|
if settings.GEMINI_API_KEY:
|
|
result["providers"]["gemini"] = {
|
|
"configured": True,
|
|
"status": "unknown"
|
|
}
|
|
else:
|
|
result["providers"]["gemini"] = {
|
|
"configured": False,
|
|
"status": "not_configured"
|
|
}
|
|
|
|
# Check CLI providers
|
|
if settings.CLAUDE_CLI_PATH:
|
|
claude_path = Path(settings.CLAUDE_CLI_PATH)
|
|
result["providers"]["claude_cli"] = {
|
|
"configured": True,
|
|
"path_exists": claude_path.exists(),
|
|
"status": "available" if claude_path.exists() else "path_invalid"
|
|
}
|
|
|
|
if settings.GEMINI_CLI_PATH:
|
|
gemini_path = Path(settings.GEMINI_CLI_PATH)
|
|
result["providers"]["gemini_cli"] = {
|
|
"configured": True,
|
|
"path_exists": gemini_path.exists(),
|
|
"status": "available" if gemini_path.exists() else "path_invalid"
|
|
}
|
|
|
|
# Overall status
|
|
if settings.has_ai_config:
|
|
result["status"] = "healthy"
|
|
result["message"] = "At least one AI provider configured"
|
|
else:
|
|
result["status"] = "not_configured"
|
|
result["message"] = "No AI providers configured"
|
|
|
|
except Exception as e:
|
|
result["status"] = "error"
|
|
result["error"] = str(e)
|
|
self.logger.error(f"AI providers health check failed: {e}")
|
|
|
|
return result
|
|
|
|
def check_vram_manager(self) -> Dict[str, Any]:
|
|
"""Check VRAM manager status"""
|
|
result = {
|
|
"service": "vram_manager",
|
|
"status": "unknown",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
try:
|
|
from services.vram_manager import vram_manager
|
|
|
|
vram_info = vram_manager.get_vram_info()
|
|
|
|
result["status"] = "healthy"
|
|
result["vram_info"] = {
|
|
"total_gb": round(vram_info.get("total", 0) / (1024**3), 2),
|
|
"free_gb": round(vram_info.get("free", 0) / (1024**3), 2),
|
|
"allocated_gb": round(vram_info.get("allocated", 0) / (1024**3), 2)
|
|
}
|
|
result["cuda_available"] = vram_info.get("cuda_available", False)
|
|
|
|
except Exception as e:
|
|
result["status"] = "unavailable"
|
|
result["error"] = str(e)
|
|
self.logger.error(f"VRAM manager health check failed: {e}")
|
|
|
|
return result
|
|
|
|
def check_telegram_service(self) -> Dict[str, Any]:
|
|
"""Check Telegram service status"""
|
|
from config import settings
|
|
|
|
result = {
|
|
"service": "telegram",
|
|
"status": "unknown",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
try:
|
|
from services.telegram_service import telegram_service
|
|
|
|
if telegram_service.is_configured:
|
|
result["status"] = "healthy"
|
|
result["message"] = "Telegram service configured"
|
|
else:
|
|
result["status"] = "not_configured"
|
|
result["message"] = "Telegram credentials not configured"
|
|
|
|
except Exception as e:
|
|
result["status"] = "error"
|
|
result["error"] = str(e)
|
|
self.logger.error(f"Telegram service health check failed: {e}")
|
|
|
|
return result
|
|
|
|
def check_processed_registry(self) -> Dict[str, Any]:
|
|
"""Check processed files registry"""
|
|
result = {
|
|
"service": "processed_registry",
|
|
"status": "unknown",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
try:
|
|
from storage.processed_registry import processed_registry
|
|
|
|
# Try to load registry
|
|
processed_registry.load()
|
|
|
|
result["status"] = "healthy"
|
|
result["registry_path"] = str(processed_registry.registry_path)
|
|
|
|
# Check if registry file is writable
|
|
registry_file = Path(processed_registry.registry_path)
|
|
if registry_file.exists():
|
|
result["registry_exists"] = True
|
|
result["registry_writable"] = registry_file.is_file() and os.access(registry_file, os.W_OK)
|
|
else:
|
|
result["registry_exists"] = False
|
|
|
|
except Exception as e:
|
|
result["status"] = "unhealthy"
|
|
result["error"] = str(e)
|
|
self.logger.error(f"Processed registry health check failed: {e}")
|
|
|
|
return result
|
|
|
|
def check_disk_space(self) -> Dict[str, Any]:
|
|
"""Check available disk space"""
|
|
result = {
|
|
"service": "disk_space",
|
|
"status": "unknown",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
try:
|
|
import shutil
|
|
|
|
# Check main directory
|
|
usage = shutil.disk_usage(Path(__file__).parent.parent)
|
|
|
|
total_gb = usage.total / (1024**3)
|
|
free_gb = usage.free / (1024**3)
|
|
used_percent = (usage.used / usage.total) * 100
|
|
|
|
result["status"] = "healthy"
|
|
result["total_gb"] = round(total_gb, 2)
|
|
result["free_gb"] = round(free_gb, 2)
|
|
result["used_percent"] = round(used_percent, 2)
|
|
|
|
# Warning if low disk space
|
|
if free_gb < 1: # Less than 1GB
|
|
result["status"] = "warning"
|
|
result["message"] = "Low disk space"
|
|
elif free_gb < 5: # Less than 5GB
|
|
result["status"] = "degraded"
|
|
result["message"] = "Disk space running low"
|
|
|
|
except Exception as e:
|
|
result["status"] = "error"
|
|
result["error"] = str(e)
|
|
self.logger.error(f"Disk space health check failed: {e}")
|
|
|
|
return result
|
|
|
|
def check_configuration(self) -> Dict[str, Any]:
|
|
"""Check configuration validity"""
|
|
from config import settings
|
|
|
|
result = {
|
|
"service": "configuration",
|
|
"status": "unknown",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
try:
|
|
warnings = []
|
|
|
|
# Check for warnings
|
|
if not settings.has_webdav_config:
|
|
warnings.append("WebDAV not configured")
|
|
|
|
if not settings.has_ai_config:
|
|
warnings.append("AI providers not configured")
|
|
|
|
if not settings.telegram_configured:
|
|
warnings.append("Telegram not configured")
|
|
|
|
if settings.DASHBOARD_SECRET_KEY == "":
|
|
warnings.append("Dashboard secret key not set")
|
|
|
|
if settings.DASHBOARD_SECRET_KEY == "dashboard-secret-key-change-in-production":
|
|
warnings.append("Using default dashboard secret")
|
|
|
|
result["status"] = "healthy" if not warnings else "warning"
|
|
result["warnings"] = warnings
|
|
result["environment"] = settings.environment_type
|
|
|
|
except Exception as e:
|
|
result["status"] = "error"
|
|
result["error"] = str(e)
|
|
self.logger.error(f"Configuration health check failed: {e}")
|
|
|
|
return result
|
|
|
|
def run_full_health_check(self) -> Dict[str, Any]:
|
|
"""Run all health checks and return comprehensive status"""
|
|
checks = [
|
|
("configuration", self.check_configuration),
|
|
("webdav", self.check_webdav_connection),
|
|
("ai_providers", self.check_ai_providers),
|
|
("vram_manager", self.check_vram_manager),
|
|
("telegram", self.check_telegram_service),
|
|
("processed_registry", self.check_processed_registry),
|
|
("disk_space", self.check_disk_space)
|
|
]
|
|
|
|
results = {}
|
|
overall_status = "healthy"
|
|
|
|
for check_name, check_func in checks:
|
|
try:
|
|
result = check_func()
|
|
results[check_name] = result
|
|
|
|
# Track overall status
|
|
if result["status"] in ["unhealthy", "error"]:
|
|
overall_status = "unhealthy"
|
|
elif result["status"] in ["warning", "degraded"] and overall_status == "healthy":
|
|
overall_status = "warning"
|
|
|
|
except Exception as e:
|
|
results[check_name] = {
|
|
"service": check_name,
|
|
"status": "error",
|
|
"error": str(e),
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
overall_status = "unhealthy"
|
|
self.logger.error(f"Health check {check_name} failed: {e}")
|
|
|
|
return {
|
|
"overall_status": overall_status,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"checks": results,
|
|
"summary": {
|
|
"total_checks": len(checks),
|
|
"healthy": sum(1 for r in results.values() if r["status"] == "healthy"),
|
|
"warning": sum(1 for r in results.values() if r["status"] == "warning"),
|
|
"unhealthy": sum(1 for r in results.values() if r["status"] == "unhealthy")
|
|
}
|
|
}
|
|
|
|
|
|
# Convenience function for CLI usage
|
|
def get_health_status() -> Dict[str, Any]:
|
|
"""Get comprehensive health status"""
|
|
checker = HealthChecker()
|
|
return checker.run_full_health_check()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# CLI usage: python core/health_check.py
|
|
import sys
|
|
import os
|
|
|
|
health = get_health_status()
|
|
|
|
print(json.dumps(health, indent=2))
|
|
|
|
# Exit with appropriate code
|
|
if health["overall_status"] == "healthy":
|
|
sys.exit(0)
|
|
elif health["overall_status"] == "warning":
|
|
sys.exit(1)
|
|
else:
|
|
sys.exit(2)
|