Files
motia-iii/services/advoware.py

360 lines
14 KiB
Python

"""Advoware API client for Motia III"""
import aiohttp
import asyncio
import time
import uuid
import hmac
import hashlib
import base64
import os
import datetime
from typing import Optional, Dict, Any
from services.exceptions import (
AdvowareAPIError,
AdvowareAuthError,
AdvowareTimeoutError,
RetryableError
)
from services.redis_client import get_redis_client
from services.config import ADVOWARE_CONFIG, API_CONFIG
from services.logging_utils import get_service_logger
class AdvowareAPI:
"""
Advoware API client with token caching via Redis.
Environment variables required:
- ADVOWARE_API_BASE_URL
- ADVOWARE_PRODUCT_ID
- ADVOWARE_APP_ID
- ADVOWARE_API_KEY (base64 encoded)
- ADVOWARE_KANZLEI
- ADVOWARE_DATABASE
- ADVOWARE_USER
- ADVOWARE_ROLE
- ADVOWARE_PASSWORD
"""
def __init__(self, context=None):
"""
Initialize Advoware API client.
Args:
context: Motia FlowContext for logging (optional)
"""
self.context = context
self.logger = get_service_logger('advoware', context)
self.logger.debug("AdvowareAPI initializing")
# Load configuration from environment
self.API_BASE_URL = os.getenv('ADVOWARE_API_BASE_URL', 'https://www2.advo-net.net:90/')
self.product_id = int(os.getenv('ADVOWARE_PRODUCT_ID', '64'))
self.app_id = os.getenv('ADVOWARE_APP_ID', '')
self.api_key = os.getenv('ADVOWARE_API_KEY', '')
self.kanzlei = os.getenv('ADVOWARE_KANZLEI', '')
self.database = os.getenv('ADVOWARE_DATABASE', '')
self.user = os.getenv('ADVOWARE_USER', '')
self.role = int(os.getenv('ADVOWARE_ROLE', '2'))
self.password = os.getenv('ADVOWARE_PASSWORD', '')
self.token_lifetime_minutes = ADVOWARE_CONFIG.token_lifetime_minutes
self.api_timeout_seconds = API_CONFIG.default_timeout_seconds
# Initialize Redis for token caching (centralized)
self.redis_client = get_redis_client(strict=False)
if self.redis_client:
self.logger.info("Connected to Redis for token caching")
else:
self.logger.warning("⚠️ Redis unavailable - token caching disabled!")
self.logger.info("AdvowareAPI initialized")
self._session: Optional[aiohttp.ClientSession] = None
def _log(self, message: str, level: str = 'info') -> None:
"""Internal logging helper"""
log_func = getattr(self.logger, level, self.logger.info)
log_func(message)
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def close(self) -> None:
if self._session and not self._session.closed:
await self._session.close()
def _generate_hmac(self, request_time_stamp: str, nonce: Optional[str] = None) -> str:
"""Generate HMAC-SHA512 signature for authentication"""
if not nonce:
nonce = str(uuid.uuid4())
message = f"{self.product_id}:{self.app_id}:{nonce}:{request_time_stamp}".encode('utf-8')
try:
api_key_bytes = base64.b64decode(self.api_key)
self.logger.debug("API Key decoded from base64")
except Exception as e:
self._log(f"API Key not base64-encoded, using as-is: {e}", level='debug')
api_key_bytes = self.api_key.encode('utf-8') if isinstance(self.api_key, str) else self.api_key
signature = hmac.new(api_key_bytes, message, hashlib.sha512)
return base64.b64encode(signature.digest()).decode('utf-8')
async def _fetch_new_access_token(self) -> str:
"""Fetch new access token from Advoware Auth API (async)"""
self.logger.info("Fetching new access token from Advoware")
nonce = str(uuid.uuid4())
request_time_stamp = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
hmac_signature = self._generate_hmac(request_time_stamp, nonce)
headers = {'Content-Type': 'application/json'}
data = {
"AppID": self.app_id,
"Kanzlei": self.kanzlei,
"Database": self.database,
"User": self.user,
"Role": self.role,
"Product": self.product_id,
"Password": self.password,
"Nonce": nonce,
"HMAC512Signature": hmac_signature,
"RequestTimeStamp": request_time_stamp
}
self.logger.debug(f"Token request: AppID={self.app_id}, User={self.user}")
# Async token fetch using aiohttp
session = await self._get_session()
try:
async with session.post(
ADVOWARE_CONFIG.auth_url,
json=data,
headers=headers,
timeout=aiohttp.ClientTimeout(total=self.api_timeout_seconds)
) as response:
self.logger.debug(f"Token response status: {response.status}")
if response.status == 401:
raise AdvowareAuthError(
"Authentication failed - check credentials",
status_code=401
)
if response.status >= 400:
error_text = await response.text()
raise AdvowareAPIError(
f"Token request failed ({response.status}): {error_text}",
status_code=response.status
)
result = await response.json()
except asyncio.TimeoutError:
raise AdvowareTimeoutError(
"Token request timed out",
status_code=408
)
except aiohttp.ClientError as e:
raise AdvowareAPIError(f"Token request failed: {str(e)}")
access_token = result.get("access_token")
if not access_token:
self.logger.error("No access_token in response")
raise AdvowareAuthError("No access_token received from Advoware")
self.logger.info("Access token fetched successfully")
# Cache token in Redis
if self.redis_client:
effective_ttl = max(1, (self.token_lifetime_minutes - 2) * 60)
self.redis_client.set(ADVOWARE_CONFIG.token_cache_key, access_token, ex=effective_ttl)
self.redis_client.set(ADVOWARE_CONFIG.token_timestamp_key, str(time.time()), ex=effective_ttl)
self.logger.debug(f"Token cached in Redis with TTL {effective_ttl}s")
return access_token
async def get_access_token(self, force_refresh: bool = False) -> str:
"""
Get valid access token (from cache or fetch new).
Args:
force_refresh: Force token refresh even if cached
Returns:
Valid access token
"""
self.logger.debug("Getting access token")
if not self.redis_client:
self.logger.info("No Redis available, fetching new token")
return await self._fetch_new_access_token()
if force_refresh:
self.logger.info("Force refresh requested, fetching new token")
return await self._fetch_new_access_token()
# Check cache
cached_token = self.redis_client.get(ADVOWARE_CONFIG.token_cache_key)
token_timestamp = self.redis_client.get(ADVOWARE_CONFIG.token_timestamp_key)
if cached_token and token_timestamp:
try:
# Redis decode_responses=True returns strings
timestamp = float(token_timestamp)
age_seconds = time.time() - timestamp
if age_seconds < (self.token_lifetime_minutes - 1) * 60:
self.logger.debug(f"Using cached token (age: {age_seconds:.0f}s)")
return cached_token
except (ValueError, AttributeError, TypeError) as e:
self.logger.debug(f"Error reading cached token: {e}")
self.logger.info("Cached token expired or invalid, fetching new")
return await self._fetch_new_access_token()
async def api_call(
self,
endpoint: str,
method: str = 'GET',
headers: Optional[Dict] = None,
params: Optional[Dict] = None,
json_data: Optional[Dict] = None,
files: Optional[Any] = None,
data: Optional[Any] = None,
timeout_seconds: Optional[int] = None
) -> Any:
"""
Make async API call to Advoware.
Args:
endpoint: API endpoint (without base URL)
method: HTTP method
headers: Optional headers
params: Optional query parameters
json_data: Optional JSON body
files: Optional files (not implemented)
data: Optional raw data (overrides json_data)
timeout_seconds: Optional timeout override
Returns:
JSON response or None
Raises:
AdvowareAuthError: Authentication failed
AdvowareTimeoutError: Request timed out
AdvowareAPIError: Other API errors
"""
# Clean endpoint
endpoint = endpoint.lstrip('/')
url = self.API_BASE_URL.rstrip('/') + '/' + endpoint
effective_timeout = aiohttp.ClientTimeout(
total=timeout_seconds or self.api_timeout_seconds
)
# Get auth token
try:
token = await self.get_access_token()
except AdvowareAuthError:
raise
except Exception as e:
raise AdvowareAPIError(f"Failed to get access token: {str(e)}")
# Prepare headers
effective_headers = headers.copy() if headers else {}
effective_headers['Authorization'] = f'Bearer {token}'
effective_headers.setdefault('Content-Type', 'application/json')
# Use 'data' parameter if provided, otherwise 'json_data'
json_payload = data if data is not None else json_data
session = await self._get_session()
try:
with self.logger.api_call(endpoint, method):
async with session.request(
method,
url,
headers=effective_headers,
params=params,
json=json_payload,
timeout=effective_timeout
) as response:
# Handle 401 - retry with fresh token
if response.status == 401:
self.logger.warning("401 Unauthorized, refreshing token")
token = await self.get_access_token(force_refresh=True)
effective_headers['Authorization'] = f'Bearer {token}'
async with session.request(
method,
url,
headers=effective_headers,
params=params,
json=json_payload,
timeout=effective_timeout
) as retry_response:
if retry_response.status == 401:
raise AdvowareAuthError(
"Authentication failed even after token refresh",
status_code=401
)
if retry_response.status >= 500:
error_text = await retry_response.text()
raise RetryableError(
f"Server error {retry_response.status}: {error_text}"
)
retry_response.raise_for_status()
return await self._parse_response(retry_response)
# Handle other error codes
if response.status == 404:
error_text = await response.text()
raise AdvowareAPIError(
f"Resource not found: {endpoint}",
status_code=404,
response_body=error_text
)
if response.status >= 500:
error_text = await response.text()
raise RetryableError(
f"Server error {response.status}: {error_text}"
)
if response.status >= 400:
error_text = await response.text()
raise AdvowareAPIError(
f"API error {response.status}: {error_text}",
status_code=response.status,
response_body=error_text
)
return await self._parse_response(response)
except asyncio.TimeoutError:
raise AdvowareTimeoutError(
f"Request timed out after {effective_timeout.total}s",
status_code=408
)
except aiohttp.ClientError as e:
self.logger.error(f"API call failed: {e}")
raise AdvowareAPIError(f"Request failed: {str(e)}")
async def _parse_response(self, response: aiohttp.ClientResponse) -> Any:
"""Parse API response"""
if response.content_type == 'application/json':
try:
return await response.json()
except Exception as e:
self.logger.debug(f"JSON parse error: {e}")
return None
return None