Files
motia-iii/services/advoware_watcher_service.py
bsiggel 1ffc37b0b7 feat: Add Advoware History and Watcher services for document synchronization
- Implement AdvowareHistoryService for fetching and creating history entries.
- Implement AdvowareWatcherService for file operations including listing, downloading, and uploading with Blake3 hash verification.
- Introduce Blake3 utility functions for hash computation and verification.
- Create document sync cron step to poll Redis for pending Aktennummern and emit sync events.
- Develop document sync event handler to manage 3-way merge synchronization for Akten, including metadata updates and error handling.
2026-03-25 21:24:31 +00:00

276 lines
11 KiB
Python

"""
Advoware Filesystem Watcher API Client
API client for Windows Watcher service that provides:
- File list retrieval with USN tracking
- File download from Windows
- File upload to Windows with Blake3 hash verification
"""
from typing import Dict, Any, List, Optional
import aiohttp
import asyncio
import os
from services.logging_utils import get_service_logger
from services.exceptions import ExternalAPIError
class AdvowareWatcherService:
"""
API client for Advoware Filesystem Watcher.
Provides methods to:
- Get file list with USNs
- Download files
- Upload files with Blake3 verification
"""
def __init__(self, ctx):
"""
Initialize service with context.
Args:
ctx: Motia context for logging and config
"""
self.ctx = ctx
self.logger = get_service_logger(__name__, ctx)
self.base_url = os.getenv('ADVOWARE_WATCHER_BASE_URL', 'http://192.168.1.12:8765')
self.auth_token = os.getenv('ADVOWARE_WATCHER_AUTH_TOKEN', '')
self.timeout = int(os.getenv('ADVOWARE_WATCHER_TIMEOUT_SECONDS', '30'))
if not self.auth_token:
self.logger.warning("⚠️ ADVOWARE_WATCHER_AUTH_TOKEN not configured")
self._session: Optional[aiohttp.ClientSession] = None
self.logger.info(f"AdvowareWatcherService initialized: {self.base_url}")
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create HTTP session"""
if self._session is None or self._session.closed:
headers = {}
if self.auth_token:
headers['Authorization'] = f'Bearer {self.auth_token}'
self._session = aiohttp.ClientSession(headers=headers)
return self._session
async def close(self) -> None:
"""Close HTTP session"""
if self._session and not self._session.closed:
await self._session.close()
def _log(self, message: str, level: str = 'info') -> None:
"""Helper for consistent logging"""
getattr(self.logger, level)(f"[AdvowareWatcherService] {message}")
async def get_akte_files(self, aktennummer: str) -> List[Dict[str, Any]]:
"""
Get file list for Akte with USNs.
Args:
aktennummer: Akte number (e.g., "12345")
Returns:
List of file info dicts with:
- filename: str
- path: str (relative to V:\)
- usn: int (Windows USN)
- size: int (bytes)
- modified: str (ISO timestamp)
- blake3Hash: str (hex)
Raises:
ExternalAPIError: If API call fails
"""
self._log(f"Fetching file list for Akte {aktennummer}")
try:
session = await self._get_session()
# Retry with exponential backoff
for attempt in range(1, 4): # 3 attempts
try:
async with session.get(
f"{self.base_url}/akte-details",
params={'akte': aktennummer},
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 404:
self._log(f"Akte {aktennummer} not found on Windows", level='warning')
return []
response.raise_for_status()
data = await response.json()
files = data.get('files', [])
# Transform: Add 'filename' field (extracted from relative_path)
for file in files:
rel_path = file.get('relative_path', '')
if rel_path and 'filename' not in file:
# Extract filename from path (e.g., "subdir/doc.pdf" → "doc.pdf")
filename = rel_path.split('/')[-1] # Use / for cross-platform
file['filename'] = filename
self._log(f"Successfully fetched {len(files)} files for Akte {aktennummer}")
return files
except asyncio.TimeoutError:
if attempt < 3:
delay = 2 ** attempt # 2, 4 seconds
self._log(f"Timeout on attempt {attempt}, retrying in {delay}s...", level='warning')
await asyncio.sleep(delay)
else:
raise
except aiohttp.ClientError as e:
if attempt < 3:
delay = 2 ** attempt
self._log(f"Network error on attempt {attempt}: {e}, retrying in {delay}s...", level='warning')
await asyncio.sleep(delay)
else:
raise
except Exception as e:
self._log(f"Failed to fetch file list for Akte {aktennummer}: {e}", level='error')
raise ExternalAPIError(f"Watcher API error: {e}") from e
async def download_file(self, aktennummer: str, filename: str) -> bytes:
"""
Download file from Windows.
Args:
aktennummer: Akte number
filename: Filename (e.g., "document.pdf")
Returns:
File content as bytes
Raises:
ExternalAPIError: If download fails
"""
self._log(f"Downloading file: {aktennummer}/{filename}")
try:
session = await self._get_session()
# Retry with exponential backoff
for attempt in range(1, 4): # 3 attempts
try:
async with session.get(
f"{self.base_url}/file",
params={
'akte': aktennummer,
'path': filename
},
timeout=aiohttp.ClientTimeout(total=60) # Longer timeout for downloads
) as response:
if response.status == 404:
raise ExternalAPIError(f"File not found: {aktennummer}/{filename}")
response.raise_for_status()
content = await response.read()
self._log(f"Successfully downloaded {len(content)} bytes from {aktennummer}/{filename}")
return content
except asyncio.TimeoutError:
if attempt < 3:
delay = 2 ** attempt
self._log(f"Download timeout on attempt {attempt}, retrying in {delay}s...", level='warning')
await asyncio.sleep(delay)
else:
raise
except aiohttp.ClientError as e:
if attempt < 3:
delay = 2 ** attempt
self._log(f"Download error on attempt {attempt}: {e}, retrying in {delay}s...", level='warning')
await asyncio.sleep(delay)
else:
raise
except Exception as e:
self._log(f"Failed to download file {aktennummer}/{filename}: {e}", level='error')
raise ExternalAPIError(f"File download failed: {e}") from e
async def upload_file(
self,
aktennummer: str,
filename: str,
content: bytes,
blake3_hash: str
) -> Dict[str, Any]:
"""
Upload file to Windows with Blake3 verification.
Args:
aktennummer: Akte number
filename: Filename
content: File content
blake3_hash: Blake3 hash (hex) for verification
Returns:
Upload result dict with:
- success: bool
- message: str
- usn: int (new USN)
- blake3Hash: str (computed hash)
Raises:
ExternalAPIError: If upload fails
"""
self._log(f"Uploading file: {aktennummer}/{filename} ({len(content)} bytes)")
try:
session = await self._get_session()
# Build headers with Blake3 hash
headers = {
'X-Blake3-Hash': blake3_hash,
'Content-Type': 'application/octet-stream'
}
# Retry with exponential backoff
for attempt in range(1, 4): # 3 attempts
try:
async with session.put(
f"{self.base_url}/files/{aktennummer}/{filename}",
data=content,
headers=headers,
timeout=aiohttp.ClientTimeout(total=120) # Long timeout for uploads
) as response:
response.raise_for_status()
result = await response.json()
if not result.get('success'):
error_msg = result.get('message', 'Unknown error')
raise ExternalAPIError(f"Upload failed: {error_msg}")
self._log(f"Successfully uploaded {aktennummer}/{filename}, new USN: {result.get('usn')}")
return result
except asyncio.TimeoutError:
if attempt < 3:
delay = 2 ** attempt
self._log(f"Upload timeout on attempt {attempt}, retrying in {delay}s...", level='warning')
await asyncio.sleep(delay)
else:
raise
except aiohttp.ClientError as e:
if attempt < 3:
delay = 2 ** attempt
self._log(f"Upload error on attempt {attempt}: {e}, retrying in {delay}s...", level='warning')
await asyncio.sleep(delay)
else:
raise
except Exception as e:
self._log(f"Failed to upload file {aktennummer}/{filename}: {e}", level='error')
raise ExternalAPIError(f"File upload failed: {e}") from e