Files
cbcren2026/services/notion_service_old.py
2026-03-31 01:28:25 -03:00

204 lines
6.6 KiB
Python

"""
Notion integration service
"""
import logging
import base64
from typing import Optional
from pathlib import Path
try:
import requests
REQUESTS_AVAILABLE = True
except ImportError:
REQUESTS_AVAILABLE = False
requests = None
from config import settings
class NotionService:
"""Service for Notion API integration"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._token: Optional[str] = None
self._database_id: Optional[str] = None
self._base_url = "https://api.notion.com/v1"
def configure(self, token: str, database_id: str) -> None:
"""Configure Notion credentials"""
self._token = token
self._database_id = database_id
self.logger.info("Notion service configured")
@property
def is_configured(self) -> bool:
"""Check if Notion is configured"""
return bool(self._token and self._database_id)
def _get_headers(self) -> dict:
"""Get headers for Notion API requests"""
return {
"Authorization": f"Bearer {self._token}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28"
}
def upload_pdf(self, pdf_path: Path, title: str) -> bool:
"""Upload PDF to Notion database"""
if not self.is_configured:
self.logger.warning("Notion not configured, skipping upload")
return False
if not REQUESTS_AVAILABLE:
self.logger.error("requests library not available for Notion upload")
return False
if not pdf_path.exists():
self.logger.error(f"PDF file not found: {pdf_path}")
return False
try:
# Read and encode PDF
with open(pdf_path, 'rb') as f:
pdf_data = base64.b64encode(f.read()).decode('utf-8')
# Prepare the page data
page_data = {
"parent": {"database_id": self._database_id},
"properties": {
"Name": {
"title": [
{
"text": {
"content": title
}
}
]
},
"Status": {
"select": {
"name": "Procesado"
}
}
},
"children": [
{
"object": "block",
"type": "paragraph",
"paragraph": {
"rich_text": [
{
"type": "text",
"text": {
"content": f"Documento generado automáticamente: {title}"
}
}
]
}
},
{
"object": "block",
"type": "file",
"file": {
"type": "external",
"external": {
"url": f"data:application/pdf;base64,{pdf_data}"
}
}
}
]
}
# Create page in database
response = requests.post(
f"{self._base_url}/pages",
headers=self._get_headers(),
json=page_data,
timeout=30
)
if response.status_code == 200:
self.logger.info(f"PDF uploaded to Notion successfully: {title}")
return True
else:
self.logger.error(f"Notion API error: {response.status_code} - {response.text}")
return False
except Exception as e:
self.logger.error(f"Error uploading PDF to Notion: {e}")
return False
def upload_pdf_as_file(self, pdf_path: Path, title: str) -> bool:
"""Upload PDF as a file block (alternative method)"""
if not self.is_configured:
self.logger.warning("Notion not configured, skipping upload")
return False
if not REQUESTS_AVAILABLE:
self.logger.error("requests library not available for Notion upload")
return False
if not pdf_path.exists():
self.logger.error(f"PDF file not found: {pdf_path}")
return False
try:
# For simplicity, we'll create a page with just the title and a link placeholder
# In a real implementation, you'd need to upload the file to Notion's file storage
page_data = {
"parent": {"database_id": self._database_id},
"properties": {
"Name": {
"title": [
{
"text": {
"content": title
}
}
]
},
"Status": {
"select": {
"name": "Procesado"
}
},
"File Path": {
"rich_text": [
{
"text": {
"content": str(pdf_path)
}
}
]
}
}
}
response = requests.post(
f"{self._base_url}/pages",
headers=self._get_headers(),
json=page_data,
timeout=30
)
if response.status_code == 200:
self.logger.info(f"PDF uploaded to Notion successfully: {title}")
return True
else:
self.logger.error(f"Notion API error: {response.status_code} - {response.text}")
return False
except Exception as e:
self.logger.error(f"Error uploading PDF to Notion: {e}")
return False
# Global instance
notion_service = NotionService()
def upload_to_notion(pdf_path: Path, title: str) -> bool:
"""Legacy function for backward compatibility"""
return notion_service.upload_pdf(pdf_path, title)