"""Advoware API client for Motia III""" import aiohttp import asyncio import time import uuid import hmac import hashlib import base64 import os import datetime from typing import Optional, Dict, Any from services.exceptions import ( AdvowareAPIError, AdvowareAuthError, AdvowareTimeoutError, RetryableError ) from services.redis_client import get_redis_client from services.config import ADVOWARE_CONFIG, API_CONFIG from services.logging_utils import get_service_logger class AdvowareAPI: """ Advoware API client with token caching via Redis. Environment variables required: - ADVOWARE_API_BASE_URL - ADVOWARE_PRODUCT_ID - ADVOWARE_APP_ID - ADVOWARE_API_KEY (base64 encoded) - ADVOWARE_KANZLEI - ADVOWARE_DATABASE - ADVOWARE_USER - ADVOWARE_ROLE - ADVOWARE_PASSWORD """ def __init__(self, context=None): """ Initialize Advoware API client. Args: context: Motia FlowContext for logging (optional) """ self.context = context self.logger = get_service_logger('advoware', context) self.logger.debug("AdvowareAPI initializing") # Load configuration from environment self.API_BASE_URL = os.getenv('ADVOWARE_API_BASE_URL', 'https://www2.advo-net.net:90/') self.product_id = int(os.getenv('ADVOWARE_PRODUCT_ID', '64')) self.app_id = os.getenv('ADVOWARE_APP_ID', '') self.api_key = os.getenv('ADVOWARE_API_KEY', '') self.kanzlei = os.getenv('ADVOWARE_KANZLEI', '') self.database = os.getenv('ADVOWARE_DATABASE', '') self.user = os.getenv('ADVOWARE_USER', '') self.role = int(os.getenv('ADVOWARE_ROLE', '2')) self.password = os.getenv('ADVOWARE_PASSWORD', '') self.token_lifetime_minutes = ADVOWARE_CONFIG.token_lifetime_minutes self.api_timeout_seconds = API_CONFIG.default_timeout_seconds # Initialize Redis for token caching (centralized) self.redis_client = get_redis_client(strict=False) if self.redis_client: self.logger.info("Connected to Redis for token caching") else: self.logger.warning("⚠️ Redis unavailable - token caching disabled!") self.logger.info("AdvowareAPI initialized") self._session: Optional[aiohttp.ClientSession] = None async def _get_session(self) -> aiohttp.ClientSession: if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() return self._session async def close(self) -> None: if self._session and not self._session.closed: await self._session.close() def _generate_hmac(self, request_time_stamp: str, nonce: Optional[str] = None) -> str: """Generate HMAC-SHA512 signature for authentication""" if not nonce: nonce = str(uuid.uuid4()) message = f"{self.product_id}:{self.app_id}:{nonce}:{request_time_stamp}".encode('utf-8') try: api_key_bytes = base64.b64decode(self.api_key) self.logger.debug("API Key decoded from base64") except Exception as e: self._log(f"API Key not base64-encoded, using as-is: {e}", level='debug') api_key_bytes = self.api_key.encode('utf-8') if isinstance(self.api_key, str) else self.api_key signature = hmac.new(api_key_bytes, message, hashlib.sha512) return base64.b64encode(signature.digest()).decode('utf-8') def _fetch_new_access_token(self) -> str: """Fetch new access token from Advoware Auth API""" self.logger.info("Fetching new access token from Advoware") nonce = str(uuid.uuid4()) request_time_stamp = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z" hmac_signature = self._generate_hmac(request_time_stamp, nonce) headers = {'Content-Type': 'application/json'} data = { "AppID": self.app_id, "Kanzlei": self.kanzlei, "Database": self.database, "User": self.user, "Role": self.role, "Product": self.product_id, "Password": self.password, "Nonce": nonce, "HMAC512Signature": hmac_signature, "RequestTimeStamp": request_time_stamp } self.logger.debug(f"Token request: AppID={self.app_id}, User={self.user}") # Using synchronous requests for token fetch (called from sync context) # TODO: Convert to async in future version import requests try: response = requests.post( ADVOWARE_CONFIG.auth_url, json=data, headers=headers, timeout=self.api_timeout_seconds ) self.logger.debug(f"Token response status: {response.status_code}") if response.status_code == 401: raise AdvowareAuthError( "Authentication failed - check credentials", status_code=401 ) response.raise_for_status() except requests.Timeout: raise AdvowareTimeoutError( "Token request timed out", status_code=408 ) except requests.RequestException as e: raise AdvowareAPIError( f"Token request failed: {str(e)}", status_code=getattr(e.response, 'status_code', None) if hasattr(e, 'response') else None ) result = response.json() access_token = result.get("access_token") if not access_token: self.logger.error("No access_token in response") raise AdvowareAuthError("No access_token received from Advoware") self.logger.info("Access token fetched successfully") # Cache token in Redis if self.redis_client: effective_ttl = max(1, (self.token_lifetime_minutes - 2) * 60) self.redis_client.set(ADVOWARE_CONFIG.token_cache_key, access_token, ex=effective_ttl) self.redis_client.set(ADVOWARE_CONFIG.token_timestamp_key, str(time.time()), ex=effective_ttl) self.logger.debug(f"Token cached in Redis with TTL {effective_ttl}s") return access_token def get_access_token(self, force_refresh: bool = False) -> str: """ Get valid access token (from cache or fetch new). Args: force_refresh: Force token refresh even if cached Returns: Valid access token """ self.logger.debug("Getting access token") if not self.redis_client: self.logger.info("No Redis available, fetching new token") return self._fetch_new_access_token() if force_refresh: self.logger.info("Force refresh requested, fetching new token") return self._fetch_new_access_token() # Check cache cached_token = self.redis_client.get(ADVOWARE_CONFIG.token_cache_key) token_timestamp = self.redis_client.get(ADVOWARE_CONFIG.token_timestamp_key) if cached_token and token_timestamp: try: # Redis decode_responses=True returns strings timestamp = float(token_timestamp) age_seconds = time.time() - timestamp if age_seconds < (self.token_lifetime_minutes - 1) * 60: self.logger.debug(f"Using cached token (age: {age_seconds:.0f}s)") return cached_token except (ValueError, AttributeError, TypeError) as e: self.logger.debug(f"Error reading cached token: {e}") self.logger.info("Cached token expired or invalid, fetching new") return self._fetch_new_access_token() async def api_call( self, endpoint: str, method: str = 'GET', headers: Optional[Dict] = None, params: Optional[Dict] = None, json_data: Optional[Dict] = None, files: Optional[Any] = None, data: Optional[Any] = None, timeout_seconds: Optional[int] = None ) -> Any: """ Make async API call to Advoware. Args: endpoint: API endpoint (without base URL) method: HTTP method headers: Optional headers params: Optional query parameters json_data: Optional JSON body files: Optional files (not implemented) data: Optional raw data (overrides json_data) timeout_seconds: Optional timeout override Returns: JSON response or None Raises: AdvowareAuthError: Authentication failed AdvowareTimeoutError: Request timed out AdvowareAPIError: Other API errors """ # Clean endpoint endpoint = endpoint.lstrip('/') url = self.API_BASE_URL.rstrip('/') + '/' + endpoint effective_timeout = aiohttp.ClientTimeout( total=timeout_seconds or self.api_timeout_seconds ) # Get auth token try: token = self.get_access_token() except AdvowareAuthError: raise except Exception as e: raise AdvowareAPIError(f"Failed to get access token: {str(e)}") # Prepare headers effective_headers = headers.copy() if headers else {} effective_headers['Authorization'] = f'Bearer {token}' effective_headers.setdefault('Content-Type', 'application/json') # Use 'data' parameter if provided, otherwise 'json_data' json_payload = data if data is not None else json_data session = await self._get_session() try: with self.logger.api_call(endpoint, method): async with session.request( method, url, headers=effective_headers, params=params, json=json_payload, timeout=effective_timeout ) as response: # Handle 401 - retry with fresh token if response.status == 401: self.logger.warning("401 Unauthorized, refreshing token") token = self.get_access_token(force_refresh=True) effective_headers['Authorization'] = f'Bearer {token}' async with session.request( method, url, headers=effective_headers, params=params, json=json_payload, timeout=effective_timeout ) as retry_response: if retry_response.status == 401: raise AdvowareAuthError( "Authentication failed even after token refresh", status_code=401 ) if retry_response.status >= 500: error_text = await retry_response.text() raise RetryableError( f"Server error {retry_response.status}: {error_text}" ) retry_response.raise_for_status() return await self._parse_response(retry_response) # Handle other error codes if response.status == 404: error_text = await response.text() raise AdvowareAPIError( f"Resource not found: {endpoint}", status_code=404, response_body=error_text ) if response.status >= 500: error_text = await response.text() raise RetryableError( f"Server error {response.status}: {error_text}" ) if response.status >= 400: error_text = await response.text() raise AdvowareAPIError( f"API error {response.status}: {error_text}", status_code=response.status, response_body=error_text ) return await self._parse_response(response) except asyncio.TimeoutError: raise AdvowareTimeoutError( f"Request timed out after {effective_timeout.total}s", status_code=408 ) except aiohttp.ClientError as e: self.logger.error(f"API call failed: {e}") raise AdvowareAPIError(f"Request failed: {str(e)}") async def _parse_response(self, response: aiohttp.ClientResponse) -> Any: """Parse API response""" if response.content_type == 'application/json': try: return await response.json() except Exception as e: self.logger.debug(f"JSON parse error: {e}") return None return None