""" WebDAV service for Nextcloud integration """ import logging import os import time import unicodedata import re from pathlib import Path from typing import Optional, List, Dict, Tuple from contextlib import contextmanager from concurrent.futures import ThreadPoolExecutor, as_completed import requests from requests.auth import HTTPBasicAuth from requests.adapters import HTTPAdapter from config import settings from core import WebDAVError class WebDAVService: """Service for WebDAV operations with Nextcloud""" def __init__(self): self.session: Optional[requests.Session] = None self.logger = logging.getLogger(__name__) self._retry_delay = 1 self._max_retries = settings.WEBDAV_MAX_RETRIES def initialize(self) -> None: """Initialize WebDAV session""" if not settings.has_webdav_config: raise WebDAVError("WebDAV credentials not configured") self.session = requests.Session() self.session.auth = HTTPBasicAuth(settings.NEXTCLOUD_USER, settings.NEXTCLOUD_PASSWORD) # Configure HTTP adapter with retry strategy adapter = HTTPAdapter( max_retries=0, # We'll handle retries manually pool_connections=10, pool_maxsize=20 ) self.session.mount('https://', adapter) self.session.mount('http://', adapter) # Test connection try: self._request('GET', '', timeout=5) self.logger.info("WebDAV connection established") except Exception as e: raise WebDAVError(f"Failed to connect to WebDAV: {e}") def cleanup(self) -> None: """Cleanup WebDAV session""" if self.session: self.session.close() self.session = None @staticmethod def normalize_path(path: str) -> str: """Normalize remote paths to a consistent representation""" if not path: return "" normalized = unicodedata.normalize("NFC", str(path)).strip() if not normalized: return "" normalized = normalized.replace("\\", "/") normalized = re.sub(r"/+", "/", normalized) return normalized.lstrip("/") def _build_url(self, remote_path: str) -> str: """Build WebDAV URL""" path = self.normalize_path(remote_path) base_url = settings.WEBDAV_ENDPOINT.rstrip('/') return f"{base_url}/{path}" def _request(self, method: str, remote_path: str, **kwargs) -> requests.Response: """Make HTTP request to WebDAV with retries""" if not self.session: raise WebDAVError("WebDAV session not initialized") url = self._build_url(remote_path) timeout = kwargs.pop('timeout', settings.HTTP_TIMEOUT) for attempt in range(self._max_retries): try: response = self.session.request(method, url, timeout=timeout, **kwargs) if response.status_code < 400: return response elif response.status_code == 404: raise WebDAVError(f"Resource not found: {remote_path}") else: raise WebDAVError(f"HTTP {response.status_code}: {response.text}") except (requests.RequestException, requests.Timeout) as e: if attempt == self._max_retries - 1: raise WebDAVError(f"Request failed after {self._max_retries} retries: {e}") delay = self._retry_delay * (2 ** attempt) self.logger.warning(f"Request failed (attempt {attempt + 1}/{self._max_retries}), retrying in {delay}s...") time.sleep(delay) raise WebDAVError("Max retries exceeded") def list(self, remote_path: str = "") -> List[str]: """List files in remote directory""" self.logger.debug(f"Listing remote directory: {remote_path}") response = self._request('PROPFIND', remote_path, headers={'Depth': '1'}) return self._parse_propfind_response(response.text) def _parse_propfind_response(self, xml_response: str) -> List[str]: """Parse PROPFIND XML response and return only files (not directories)""" # Simple parser for PROPFIND response files = [] try: import xml.etree.ElementTree as ET from urllib.parse import urlparse, unquote root = ET.fromstring(xml_response) # Get the WebDAV path from settings parsed_url = urlparse(settings.NEXTCLOUD_URL) webdav_path = parsed_url.path.rstrip('/') # e.g. /remote.php/webdav # Find all response elements for response in root.findall('.//{DAV:}response'): href = response.find('.//{DAV:}href') if href is None or href.text is None: continue href_text = unquote(href.text) # Decode URL encoding # Check if this is a directory (has collection resourcetype) propstat = response.find('.//{DAV:}propstat') is_directory = False if propstat is not None: prop = propstat.find('.//{DAV:}prop') if prop is not None: resourcetype = prop.find('.//{DAV:}resourcetype') if resourcetype is not None and resourcetype.find('.//{DAV:}collection') is not None: is_directory = True # Skip directories if is_directory: continue # Also skip paths ending with / (another way to detect directories) if href_text.endswith('/'): continue # Remove base URL from href base_url = settings.NEXTCLOUD_URL.rstrip('/') if href_text.startswith(base_url): href_text = href_text[len(base_url):] # Also strip the webdav path if it's there if href_text.startswith(webdav_path): href_text = href_text[len(webdav_path):] # Clean up the path href_text = href_text.lstrip('/') if href_text: # Skip empty paths (root directory) files.append(href_text) except Exception as e: self.logger.error(f"Error parsing PROPFIND response: {e}") return files def download(self, remote_path: str, local_path: Path) -> None: """Download file from WebDAV""" self.logger.info(f"Downloading {remote_path} to {local_path}") # Ensure local directory exists local_path.parent.mkdir(parents=True, exist_ok=True) response = self._request('GET', remote_path, stream=True) # Use larger buffer size for better performance with open(local_path, 'wb', buffering=65536) as f: for chunk in response.iter_content(chunk_size=settings.DOWNLOAD_CHUNK_SIZE): if chunk: f.write(chunk) self.logger.debug(f"Download completed: {local_path}") def upload(self, local_path: Path, remote_path: str) -> None: """Upload file to WebDAV""" self.logger.info(f"Uploading {local_path} to {remote_path}") # Ensure remote directory exists remote_dir = self.normalize_path(remote_path) if '/' in remote_dir: dir_path = '/'.join(remote_dir.split('/')[:-1]) self.makedirs(dir_path) with open(local_path, 'rb') as f: self._request('PUT', remote_path, data=f) self.logger.debug(f"Upload completed: {remote_path}") def mkdir(self, remote_path: str) -> None: """Create directory on WebDAV""" self.makedirs(remote_path) def makedirs(self, remote_path: str) -> None: """Create directory and parent directories on WebDAV""" path = self.normalize_path(remote_path) if not path: return parts = path.split('/') current = "" for part in parts: current = f"{current}/{part}" if current else part try: self._request('MKCOL', current) self.logger.debug(f"Created directory: {current}") except WebDAVError as e: # Directory might already exist (409 Conflict or 405 MethodNotAllowed is OK) if '409' not in str(e) and '405' not in str(e): raise def delete(self, remote_path: str) -> None: """Delete file or directory from WebDAV""" self.logger.info(f"Deleting remote path: {remote_path}") self._request('DELETE', remote_path) def exists(self, remote_path: str) -> bool: """Check if remote path exists""" try: self._request('HEAD', remote_path) return True except WebDAVError: return False def upload_batch( self, files: List[Tuple[Path, str]], max_workers: int = 4, timeout: int = 120 ) -> Dict[str, bool]: """ Upload multiple files concurrently. Args: files: List of (local_path, remote_path) tuples max_workers: Maximum concurrent uploads timeout: Timeout per upload in seconds Returns: Dict mapping remote_path to success status """ if not files: return {} results: Dict[str, bool] = {} with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all upload tasks future_to_path = { executor.submit(self.upload, local, remote): remote for local, remote in files } # Collect results as they complete for future in as_completed(future_to_path, timeout=timeout): remote_path = future_to_path[future] try: future.result() results[remote_path] = True self.logger.info(f"Successfully uploaded: {remote_path}") except Exception as e: results[remote_path] = False self.logger.error(f"Failed to upload {remote_path}: {e}") failed_count = sum(1 for success in results.values() if not success) if failed_count > 0: self.logger.warning( f"Batch upload completed with {failed_count} failures " f"({len(results) - failed_count}/{len(results)} successful)" ) else: self.logger.info( f"Batch upload completed: {len(results)} files uploaded successfully" ) return results # Global instance webdav_service = WebDAVService()