"""Advoware API client for Motia III""" import aiohttp import asyncio import time import uuid import hmac import hashlib import base64 import os import datetime import redis import logging from typing import Optional, Dict, Any logger = logging.getLogger(__name__) class AdvowareTokenError(Exception): """Raised when token acquisition fails""" pass class AdvowareAPI: """ Advoware API client with token caching via Redis. Environment variables required: - ADVOWARE_API_BASE_URL - ADVOWARE_PRODUCT_ID - ADVOWARE_APP_ID - ADVOWARE_API_KEY (base64 encoded) - ADVOWARE_KANZLEI - ADVOWARE_DATABASE - ADVOWARE_USER - ADVOWARE_ROLE - ADVOWARE_PASSWORD - REDIS_HOST (optional, default: localhost) - REDIS_PORT (optional, default: 6379) - REDIS_DB_ADVOWARE_CACHE (optional, default: 1) """ AUTH_URL = "https://security.advo-net.net/api/v1/Token" TOKEN_CACHE_KEY = 'advoware_access_token' TOKEN_TIMESTAMP_CACHE_KEY = 'advoware_token_timestamp' def __init__(self, context=None): """ Initialize Advoware API client. Args: context: Motia FlowContext for logging (optional) """ self.context = context self._log("AdvowareAPI initializing", level='debug') # Load configuration from environment self.API_BASE_URL = os.getenv('ADVOWARE_API_BASE_URL', 'https://www2.advo-net.net:90/') self.product_id = int(os.getenv('ADVOWARE_PRODUCT_ID', '64')) self.app_id = os.getenv('ADVOWARE_APP_ID', '') self.api_key = os.getenv('ADVOWARE_API_KEY', '') self.kanzlei = os.getenv('ADVOWARE_KANZLEI', '') self.database = os.getenv('ADVOWARE_DATABASE', '') self.user = os.getenv('ADVOWARE_USER', '') self.role = int(os.getenv('ADVOWARE_ROLE', '2')) self.password = os.getenv('ADVOWARE_PASSWORD', '') self.token_lifetime_minutes = int(os.getenv('ADVOWARE_TOKEN_LIFETIME_MINUTES', '55')) self.api_timeout_seconds = int(os.getenv('ADVOWARE_API_TIMEOUT_SECONDS', '30')) # Initialize Redis for token caching try: redis_host = os.getenv('REDIS_HOST', 'localhost') redis_port = int(os.getenv('REDIS_PORT', '6379')) redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')) redis_timeout = int(os.getenv('REDIS_TIMEOUT_SECONDS', '5')) self.redis_client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, socket_timeout=redis_timeout, socket_connect_timeout=redis_timeout ) self.redis_client.ping() self._log("Connected to Redis for token caching") except (redis.exceptions.ConnectionError, Exception) as e: self._log(f"Could not connect to Redis: {e}. Token caching disabled.", level='warning') self.redis_client = None self._log("AdvowareAPI initialized") def _generate_hmac(self, request_time_stamp: str, nonce: Optional[str] = None) -> str: """Generate HMAC-SHA512 signature for authentication""" if not nonce: nonce = str(uuid.uuid4()) message = f"{self.product_id}:{self.app_id}:{nonce}:{request_time_stamp}".encode('utf-8') try: api_key_bytes = base64.b64decode(self.api_key) logger.debug("API Key decoded from base64") except Exception as e: self._log(f"API Key not base64-encoded, using as-is: {e}", level='debug') api_key_bytes = self.api_key.encode('utf-8') if isinstance(self.api_key, str) else self.api_key signature = hmac.new(api_key_bytes, message, hashlib.sha512) return base64.b64encode(signature.digest()).decode('utf-8') def _fetch_new_access_token(self) -> str: """Fetch new access token from Advoware Auth API""" self._log("Fetching new access token from Advoware") nonce = str(uuid.uuid4()) request_time_stamp = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z" hmac_signature = self._generate_hmac(request_time_stamp, nonce) headers = {'Content-Type': 'application/json'} data = { "AppID": self.app_id, "Kanzlei": self.kanzlei, "Database": self.database, "User": self.user, "Role": self.role, "Product": self.product_id, "Password": self.password, "Nonce": nonce, "HMAC512Signature": hmac_signature, "RequestTimeStamp": request_time_stamp } self._log(f"Token request: AppID={self.app_id}, User={self.user}", level='debug') # Using synchronous requests for token fetch (called from sync context) import requests response = requests.post( self.AUTH_URL, json=data, headers=headers, timeout=self.api_timeout_seconds ) self._log(f"Token response status: {response.status_code}") response.raise_for_status() result = response.json() access_token = result.get("access_token") if not access_token: self._log("No access_token in response", level='error') raise AdvowareTokenError("No access_token received from Advoware") self._log("Access token fetched successfully") # Cache token in Redis if self.redis_client: effective_ttl = max(1, (self.token_lifetime_minutes - 2) * 60) self.redis_client.set(self.TOKEN_CACHE_KEY, access_token, ex=effective_ttl) self.redis_client.set(self.TOKEN_TIMESTAMP_CACHE_KEY, str(time.time()), ex=effective_ttl) self._log(f"Token cached in Redis with TTL {effective_ttl}s") return access_token def get_access_token(self, force_refresh: bool = False) -> str: """ Get valid access token (from cache or fetch new). Args: force_refresh: Force token refresh even if cached Returns: Valid access token """ self._log("Getting access token", level='debug') if not self.redis_client: self._log("No Redis available, fetching new token") return self._fetch_new_access_token() if force_refresh: self._log("Force refresh requested, fetching new token") return self._fetch_new_access_token() # Check cache cached_token = self.redis_client.get(self.TOKEN_CACHE_KEY) token_timestamp = self.redis_client.get(self.TOKEN_TIMESTAMP_CACHE_KEY) if cached_token and token_timestamp: try: timestamp = float(token_timestamp.decode('utf-8')) age_seconds = time.time() - timestamp if age_seconds < (self.token_lifetime_minutes - 1) * 60: self._log(f"Using cached token (age: {age_seconds:.0f}s)", level='debug') return cached_token.decode('utf-8') except (ValueError, AttributeError) as e: self._log(f"Error reading cached token: {e}", level='debug') self._log("Cached token expired or invalid, fetching new") return self._fetch_new_access_token() async def api_call( self, endpoint: str, method: str = 'GET', headers: Optional[Dict] = None, params: Optional[Dict] = None, json_data: Optional[Dict] = None, files: Optional[Any] = None, data: Optional[Any] = None, timeout_seconds: Optional[int] = None ) -> Any: """ Make async API call to Advoware. Args: endpoint: API endpoint (without base URL) method: HTTP method headers: Optional headers params: Optional query parameters json_data: Optional JSON body files: Optional files (not implemented) data: Optional raw data (overrides json_data) timeout_seconds: Optional timeout override Returns: JSON response or None """ # Clean endpoint endpoint = endpoint.lstrip('/') url = self.API_BASE_URL.rstrip('/') + '/' + endpoint effective_timeout = aiohttp.ClientTimeout( total=timeout_seconds or self.api_timeout_seconds ) # Get auth token token = self.get_access_token() # Prepare headers effective_headers = headers.copy() if headers else {} effective_headers['Authorization'] = f'Bearer {token}' effective_headers.setdefault('Content-Type', 'application/json') # Use 'data' parameter if provided, otherwise 'json_data' json_payload = data if data is not None else json_data async with aiohttp.ClientSession(timeout=effective_timeout) as session: try: self._log(f"API call: {method} {url}", level='debug') async with session.request( method, url, headers=effective_headers, params=params, json=json_payload ) as response: # Handle 401 - retry with fresh token if response.status == 401: self._log("401 Unauthorized, refreshing token") token = self.get_access_token(force_refresh=True) effective_headers['Authorization'] = f'Bearer {token}' async with session.request( method, url, headers=effective_headers, params=params, json=json_payload ) as retry_response: retry_response.raise_for_status() return await self._parse_response(retry_response) response.raise_for_status() return await self._parse_response(response) except aiohttp.ClientError as e: self._log(f"API call failed: {e}", level='error') raise async def _parse_response(self, response: aiohttp.ClientResponse) -> Any: """Parse API response""" if response.content_type == 'application/json': try: return await response.json() except Exception as e: self._log(f"JSON parse error: {e}", level='debug') return None return None def _log(self, message: str, level: str = 'info'): """Log message via context or standard logger""" if self.context: if level == 'debug': self.context.logger.debug(message) elif level == 'warning': self.context.logger.warning(message) elif level == 'error': self.context.logger.error(message) else: self.context.logger.info(message) else: if level == 'debug': logger.debug(message) elif level == 'warning': logger.warning(message) elif level == 'error': logger.error(message) else: logger.info(message)