Compare commits
2 Commits
bcb6454b2a
...
a53051ea8e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a53051ea8e | ||
|
|
69a48f5f9a |
382
REFACTORING_SUMMARY.md
Normal file
382
REFACTORING_SUMMARY.md
Normal file
@@ -0,0 +1,382 @@
|
||||
# Code Refactoring - Verbesserungen Übersicht
|
||||
|
||||
Datum: 3. März 2026
|
||||
|
||||
## Zusammenfassung
|
||||
|
||||
Umfassendes Refactoring zur Verbesserung von Robustheit, Eleganz und Effizienz des BitByLaw Integration Codes.
|
||||
|
||||
## Implementierte Verbesserungen
|
||||
|
||||
### 1. ✅ Custom Exception Classes ([services/exceptions.py](services/exceptions.py))
|
||||
|
||||
**Problem:** Zu generisches Exception Handling mit `except Exception`
|
||||
|
||||
**Lösung:** Hierarchische Exception-Struktur:
|
||||
|
||||
```python
|
||||
from services.exceptions import (
|
||||
AdvowareAPIError,
|
||||
AdvowareAuthError,
|
||||
AdvowareTimeoutError,
|
||||
EspoCRMAPIError,
|
||||
EspoCRMAuthError,
|
||||
RetryableError,
|
||||
NonRetryableError,
|
||||
LockAcquisitionError,
|
||||
ValidationError
|
||||
)
|
||||
|
||||
# Verwendung:
|
||||
try:
|
||||
result = await advoware.api_call(...)
|
||||
except AdvowareTimeoutError:
|
||||
# Spezifisch für Timeouts
|
||||
raise RetryableError()
|
||||
except AdvowareAuthError:
|
||||
# Auth-Fehler nicht retryable
|
||||
raise
|
||||
except AdvowareAPIError as e:
|
||||
# Andere API-Fehler
|
||||
if is_retryable(e):
|
||||
# Retry logic
|
||||
```
|
||||
|
||||
**Vorteile:**
|
||||
- Präzise Fehlerbehandlung
|
||||
- Besseres Error Tracking
|
||||
- Automatische Retry-Klassifizierung mit `is_retryable()`
|
||||
|
||||
---
|
||||
|
||||
### 2. ✅ Redis Client Factory ([services/redis_client.py](services/redis_client.py))
|
||||
|
||||
**Problem:** Duplizierte Redis-Initialisierung in 4+ Dateien
|
||||
|
||||
**Lösung:** Zentralisierte Redis Client Factory mit Singleton Pattern:
|
||||
|
||||
```python
|
||||
from services.redis_client import get_redis_client, is_redis_available
|
||||
|
||||
# Strict mode: Exception bei Fehler
|
||||
redis_client = get_redis_client(strict=True)
|
||||
|
||||
# Optional mode: None bei Fehler (für optionale Features)
|
||||
redis_client = get_redis_client(strict=False)
|
||||
|
||||
# Health Check
|
||||
if is_redis_available():
|
||||
# Redis verfügbar
|
||||
```
|
||||
|
||||
**Vorteile:**
|
||||
- DRY (Don't Repeat Yourself)
|
||||
- Connection Pooling
|
||||
- Zentrale Konfiguration
|
||||
- Health Checks
|
||||
|
||||
---
|
||||
|
||||
### 3. ✅ Pydantic Models für Validation ([services/models.py](services/models.py))
|
||||
|
||||
**Problem:** Keine Datenvalidierung, unsichere Typen
|
||||
|
||||
**Lösung:** Pydantic Models mit automatischer Validierung:
|
||||
|
||||
```python
|
||||
from services.models import (
|
||||
AdvowareBeteiligteCreate,
|
||||
EspoCRMBeteiligteCreate,
|
||||
validate_beteiligte_advoware
|
||||
)
|
||||
|
||||
# Automatische Validierung:
|
||||
try:
|
||||
validated = AdvowareBeteiligteCreate.model_validate(data)
|
||||
except ValidationError as e:
|
||||
# Handle validation errors
|
||||
|
||||
# Helper:
|
||||
validated = validate_beteiligte_advoware(data)
|
||||
```
|
||||
|
||||
**Features:**
|
||||
- Type Safety
|
||||
- Automatische Validierung (Geburtsdatum, Name, etc.)
|
||||
- Enums für Status/Rechtsformen
|
||||
- Field Validators
|
||||
|
||||
---
|
||||
|
||||
### 4. ✅ Zentrale Konfiguration ([services/config.py](services/config.py))
|
||||
|
||||
**Problem:** Magic Numbers und Strings überall im Code
|
||||
|
||||
**Lösung:** Zentrale Config mit Dataclasses:
|
||||
|
||||
```python
|
||||
from services.config import (
|
||||
SYNC_CONFIG,
|
||||
API_CONFIG,
|
||||
ADVOWARE_CONFIG,
|
||||
ESPOCRM_CONFIG,
|
||||
FEATURE_FLAGS,
|
||||
get_retry_delay_seconds,
|
||||
get_lock_key
|
||||
)
|
||||
|
||||
# Verwendung:
|
||||
max_retries = SYNC_CONFIG.max_retries # 5
|
||||
lock_ttl = SYNC_CONFIG.lock_ttl_seconds # 900
|
||||
backoff = SYNC_CONFIG.retry_backoff_minutes # [1, 5, 15, 60, 240]
|
||||
|
||||
# Helper Functions:
|
||||
lock_key = get_lock_key('cbeteiligte', entity_id)
|
||||
retry_delay = get_retry_delay_seconds(attempt=2) # 15 * 60 seconds
|
||||
```
|
||||
|
||||
**Konfigurationsbereiche:**
|
||||
- `SYNC_CONFIG` - Retry, Locking, Change Detection
|
||||
- `API_CONFIG` - Timeouts, Rate Limiting
|
||||
- `ADVOWARE_CONFIG` - Token, Auth, Read-only Fields
|
||||
- `ESPOCRM_CONFIG` - Pagination, Notifications
|
||||
- `FEATURE_FLAGS` - Feature Toggles
|
||||
|
||||
---
|
||||
|
||||
### 5. ✅ Konsistentes Logging ([services/logging_utils.py](services/logging_utils.py))
|
||||
|
||||
**Problem:** Inkonsistentes Logging (3 verschiedene Patterns)
|
||||
|
||||
**Lösung:** Unified Logger mit Context-Support:
|
||||
|
||||
```python
|
||||
from services.logging_utils import get_logger, get_service_logger
|
||||
|
||||
# Service Logger:
|
||||
logger = get_service_logger('advoware', context)
|
||||
logger.info("Message", entity_id="123")
|
||||
|
||||
# Mit Context Manager für Timing:
|
||||
with logger.operation('sync_entity', entity_id='123'):
|
||||
# Do work
|
||||
pass # Automatisches Timing und Error Logging
|
||||
|
||||
# API Call Tracking:
|
||||
with logger.api_call('/api/v1/Beteiligte', method='POST'):
|
||||
result = await api.post(...)
|
||||
```
|
||||
|
||||
**Features:**
|
||||
- Motia FlowContext Support
|
||||
- Structured Logging
|
||||
- Automatisches Performance Tracking
|
||||
- Context Fields
|
||||
|
||||
---
|
||||
|
||||
### 6. ✅ Spezifische Exceptions in Services
|
||||
|
||||
**Aktualisierte Services:**
|
||||
- [advoware.py](services/advoware.py) - AdvowareAPIError, AdvowareAuthError, AdvowareTimeoutError
|
||||
- [espocrm.py](services/espocrm.py) - EspoCRMAPIError, EspoCRMAuthError, EspoCRMTimeoutError
|
||||
- [sync_utils_base.py](services/sync_utils_base.py) - LockAcquisitionError
|
||||
- [beteiligte_sync_utils.py](services/beteiligte_sync_utils.py) - SyncError
|
||||
|
||||
**Beispiel:**
|
||||
```python
|
||||
# Vorher:
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
|
||||
# Nachher:
|
||||
except AdvowareTimeoutError:
|
||||
raise RetryableError("Request timed out")
|
||||
except AdvowareAuthError:
|
||||
raise # Nicht retryable
|
||||
except AdvowareAPIError as e:
|
||||
if is_retryable(e):
|
||||
# Retry
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 7. ✅ Type Hints ergänzt
|
||||
|
||||
**Verbesserte Type Hints in:**
|
||||
- Service-Methoden (advoware.py, espocrm.py)
|
||||
- Mapper-Funktionen (espocrm_mapper.py)
|
||||
- Utility-Klassen (sync_utils_base.py, beteiligte_sync_utils.py)
|
||||
- Step Handler
|
||||
|
||||
**Beispiel:**
|
||||
```python
|
||||
# Vorher:
|
||||
async def handler(event_data, ctx):
|
||||
...
|
||||
|
||||
# Nachher:
|
||||
async def handler(
|
||||
event_data: Dict[str, Any],
|
||||
ctx: FlowContext[Any]
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
...
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Migration Guide
|
||||
|
||||
### Für bestehenden Code
|
||||
|
||||
1. **Exception Handling aktualisieren:**
|
||||
```python
|
||||
# Alt:
|
||||
try:
|
||||
result = await api.call()
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
|
||||
# Neu:
|
||||
try:
|
||||
result = await api.call()
|
||||
except AdvowareTimeoutError:
|
||||
# Spezifisch behandeln
|
||||
raise RetryableError()
|
||||
except AdvowareAPIError as e:
|
||||
logger.error(f"API Error: {e}")
|
||||
if is_retryable(e):
|
||||
# Retry
|
||||
```
|
||||
|
||||
2. **Redis initialisieren:**
|
||||
```python
|
||||
# Alt:
|
||||
redis_client = redis.Redis(host=..., port=...)
|
||||
|
||||
# Neu:
|
||||
from services.redis_client import get_redis_client
|
||||
redis_client = get_redis_client(strict=False)
|
||||
```
|
||||
|
||||
3. **Konstanten verwenden:**
|
||||
```python
|
||||
# Alt:
|
||||
MAX_RETRIES = 5
|
||||
LOCK_TTL = 900
|
||||
|
||||
# Neu:
|
||||
from services.config import SYNC_CONFIG
|
||||
max_retries = SYNC_CONFIG.max_retries
|
||||
lock_ttl = SYNC_CONFIG.lock_ttl_seconds
|
||||
```
|
||||
|
||||
4. **Logging standardisieren:**
|
||||
```python
|
||||
# Alt:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info("Message")
|
||||
|
||||
# Neu:
|
||||
from services.logging_utils import get_service_logger
|
||||
logger = get_service_logger('my_service', context)
|
||||
logger.info("Message", entity_id="123")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Performance-Verbesserungen
|
||||
|
||||
- ✅ Redis Connection Pooling (max 50 Connections)
|
||||
- ✅ Token Caching optimiert
|
||||
- ✅ Bessere Error Classification (weniger unnötige Retries)
|
||||
- ⚠️ Noch TODO: Batch Operations für parallele Syncs
|
||||
|
||||
---
|
||||
|
||||
## Feature Flags
|
||||
|
||||
Neue Features können über `FEATURE_FLAGS` gesteuert werden:
|
||||
|
||||
```python
|
||||
from services.config import FEATURE_FLAGS
|
||||
|
||||
# Aktivieren/Deaktivieren:
|
||||
FEATURE_FLAGS.strict_validation = True # Pydantic Validation
|
||||
FEATURE_FLAGS.kommunikation_sync_enabled = False # Noch in Entwicklung
|
||||
FEATURE_FLAGS.parallel_sync_enabled = False # Experimentell
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Testing
|
||||
|
||||
**Unit Tests sollten nun leichter sein:**
|
||||
|
||||
```python
|
||||
# Mock Redis:
|
||||
from services.redis_client import RedisClientFactory
|
||||
RedisClientFactory._instance = mock_redis
|
||||
|
||||
# Mock Exceptions:
|
||||
from services.exceptions import AdvowareAPIError
|
||||
raise AdvowareAPIError("Test error", status_code=500)
|
||||
|
||||
# Validate Models:
|
||||
from services.models import validate_beteiligte_advoware
|
||||
with pytest.raises(ValidationError):
|
||||
validate_beteiligte_advoware(invalid_data)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Nächste Schritte
|
||||
|
||||
1. **Unit Tests schreiben** (min. 60% Coverage)
|
||||
- Exception Handling Tests
|
||||
- Mapper Tests mit Pydantic
|
||||
- Redis Factory Tests
|
||||
|
||||
2. **Batch Operations** implementieren
|
||||
- Parallele API-Calls
|
||||
- Bulk Updates
|
||||
|
||||
3. **Monitoring** verbessern
|
||||
- Performance Metrics aus Logger nutzen
|
||||
- Redis Health Checks
|
||||
|
||||
4. **Dokumentation** erweitern
|
||||
- API-Docs generieren (Sphinx)
|
||||
- Error Handling Guide
|
||||
|
||||
---
|
||||
|
||||
## Breakfree Changes
|
||||
|
||||
⚠️ **Minimale Breaking Changes:**
|
||||
|
||||
1. Import-Pfade haben sich geändert:
|
||||
- `AdvowareTokenError` → `AdvowareAuthError`
|
||||
- `EspoCRMError` → `EspoCRMAPIError`
|
||||
|
||||
2. Redis wird jetzt über Factory bezogen:
|
||||
- Statt direktem `redis.Redis()` → `get_redis_client()`
|
||||
|
||||
**Migration ist einfach:** Imports aktualisieren, Code läuft sonst identisch.
|
||||
|
||||
---
|
||||
|
||||
## Autoren
|
||||
|
||||
- Code Refactoring: GitHub Copilot
|
||||
- Review: BitByLaw Team
|
||||
- Datum: 3. März 2026
|
||||
|
||||
---
|
||||
|
||||
## Fragen?
|
||||
|
||||
Bei Fragen zum Refactoring siehe:
|
||||
- [services/README.md](services/README.md) - Service-Layer Dokumentation
|
||||
- [exceptions.py](services/exceptions.py) - Exception Hierarchie
|
||||
- [config.py](services/config.py) - Alle Konfigurationsoptionen
|
||||
@@ -8,18 +8,22 @@ import hashlib
|
||||
import base64
|
||||
import os
|
||||
import datetime
|
||||
import redis
|
||||
import logging
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
from services.exceptions import (
|
||||
AdvowareAPIError,
|
||||
AdvowareAuthError,
|
||||
AdvowareTimeoutError,
|
||||
RetryableError
|
||||
)
|
||||
from services.redis_client import get_redis_client
|
||||
from services.config import ADVOWARE_CONFIG, API_CONFIG
|
||||
from services.logging_utils import get_service_logger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AdvowareTokenError(Exception):
|
||||
"""Raised when token acquisition fails"""
|
||||
pass
|
||||
|
||||
|
||||
class AdvowareAPI:
|
||||
"""
|
||||
Advoware API client with token caching via Redis.
|
||||
@@ -34,15 +38,8 @@ class AdvowareAPI:
|
||||
- ADVOWARE_USER
|
||||
- ADVOWARE_ROLE
|
||||
- ADVOWARE_PASSWORD
|
||||
- REDIS_HOST (optional, default: localhost)
|
||||
- REDIS_PORT (optional, default: 6379)
|
||||
- REDIS_DB_ADVOWARE_CACHE (optional, default: 1)
|
||||
"""
|
||||
|
||||
AUTH_URL = "https://security.advo-net.net/api/v1/Token"
|
||||
TOKEN_CACHE_KEY = 'advoware_access_token'
|
||||
TOKEN_TIMESTAMP_CACHE_KEY = 'advoware_token_timestamp'
|
||||
|
||||
def __init__(self, context=None):
|
||||
"""
|
||||
Initialize Advoware API client.
|
||||
@@ -51,7 +48,8 @@ class AdvowareAPI:
|
||||
context: Motia FlowContext for logging (optional)
|
||||
"""
|
||||
self.context = context
|
||||
self._log("AdvowareAPI initializing", level='debug')
|
||||
self.logger = get_service_logger('advoware', context)
|
||||
self.logger.debug("AdvowareAPI initializing")
|
||||
|
||||
# Load configuration from environment
|
||||
self.API_BASE_URL = os.getenv('ADVOWARE_API_BASE_URL', 'https://www2.advo-net.net:90/')
|
||||
@@ -63,30 +61,28 @@ class AdvowareAPI:
|
||||
self.user = os.getenv('ADVOWARE_USER', '')
|
||||
self.role = int(os.getenv('ADVOWARE_ROLE', '2'))
|
||||
self.password = os.getenv('ADVOWARE_PASSWORD', '')
|
||||
self.token_lifetime_minutes = int(os.getenv('ADVOWARE_TOKEN_LIFETIME_MINUTES', '55'))
|
||||
self.api_timeout_seconds = int(os.getenv('ADVOWARE_API_TIMEOUT_SECONDS', '30'))
|
||||
self.token_lifetime_minutes = ADVOWARE_CONFIG.token_lifetime_minutes
|
||||
self.api_timeout_seconds = API_CONFIG.default_timeout_seconds
|
||||
|
||||
# Initialize Redis for token caching
|
||||
try:
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
redis_timeout = int(os.getenv('REDIS_TIMEOUT_SECONDS', '5'))
|
||||
# Initialize Redis for token caching (centralized)
|
||||
self.redis_client = get_redis_client(strict=False)
|
||||
if self.redis_client:
|
||||
self.logger.info("Connected to Redis for token caching")
|
||||
else:
|
||||
self.logger.warning("⚠️ Redis unavailable - token caching disabled!")
|
||||
|
||||
self.redis_client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=redis_timeout,
|
||||
socket_connect_timeout=redis_timeout
|
||||
)
|
||||
self.redis_client.ping()
|
||||
self._log("Connected to Redis for token caching")
|
||||
except (redis.exceptions.ConnectionError, Exception) as e:
|
||||
self._log(f"Could not connect to Redis: {e}. Token caching disabled.", level='warning')
|
||||
self.redis_client = None
|
||||
self.logger.info("AdvowareAPI initialized")
|
||||
|
||||
self._log("AdvowareAPI initialized")
|
||||
self._session: Optional[aiohttp.ClientSession] = None
|
||||
|
||||
async def _get_session(self) -> aiohttp.ClientSession:
|
||||
if self._session is None or self._session.closed:
|
||||
self._session = aiohttp.ClientSession()
|
||||
return self._session
|
||||
|
||||
async def close(self) -> None:
|
||||
if self._session and not self._session.closed:
|
||||
await self._session.close()
|
||||
|
||||
def _generate_hmac(self, request_time_stamp: str, nonce: Optional[str] = None) -> str:
|
||||
"""Generate HMAC-SHA512 signature for authentication"""
|
||||
@@ -107,7 +103,7 @@ class AdvowareAPI:
|
||||
|
||||
def _fetch_new_access_token(self) -> str:
|
||||
"""Fetch new access token from Advoware Auth API"""
|
||||
self._log("Fetching new access token from Advoware")
|
||||
self.logger.info("Fetching new access token from Advoware")
|
||||
|
||||
nonce = str(uuid.uuid4())
|
||||
request_time_stamp = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
||||
@@ -127,35 +123,56 @@ class AdvowareAPI:
|
||||
"RequestTimeStamp": request_time_stamp
|
||||
}
|
||||
|
||||
self._log(f"Token request: AppID={self.app_id}, User={self.user}", level='debug')
|
||||
self.logger.debug(f"Token request: AppID={self.app_id}, User={self.user}")
|
||||
|
||||
# Using synchronous requests for token fetch (called from sync context)
|
||||
# TODO: Convert to async in future version
|
||||
import requests
|
||||
response = requests.post(
|
||||
self.AUTH_URL,
|
||||
json=data,
|
||||
headers=headers,
|
||||
timeout=self.api_timeout_seconds
|
||||
)
|
||||
|
||||
self._log(f"Token response status: {response.status_code}")
|
||||
response.raise_for_status()
|
||||
try:
|
||||
response = requests.post(
|
||||
ADVOWARE_CONFIG.auth_url,
|
||||
json=data,
|
||||
headers=headers,
|
||||
timeout=self.api_timeout_seconds
|
||||
)
|
||||
|
||||
self.logger.debug(f"Token response status: {response.status_code}")
|
||||
|
||||
if response.status_code == 401:
|
||||
raise AdvowareAuthError(
|
||||
"Authentication failed - check credentials",
|
||||
status_code=401
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
except requests.Timeout:
|
||||
raise AdvowareTimeoutError(
|
||||
"Token request timed out",
|
||||
status_code=408
|
||||
)
|
||||
except requests.RequestException as e:
|
||||
raise AdvowareAPIError(
|
||||
f"Token request failed: {str(e)}",
|
||||
status_code=getattr(e.response, 'status_code', None) if hasattr(e, 'response') else None
|
||||
)
|
||||
|
||||
result = response.json()
|
||||
access_token = result.get("access_token")
|
||||
|
||||
if not access_token:
|
||||
self._log("No access_token in response", level='error')
|
||||
raise AdvowareTokenError("No access_token received from Advoware")
|
||||
self.logger.error("No access_token in response")
|
||||
raise AdvowareAuthError("No access_token received from Advoware")
|
||||
|
||||
self._log("Access token fetched successfully")
|
||||
self.logger.info("Access token fetched successfully")
|
||||
|
||||
# Cache token in Redis
|
||||
if self.redis_client:
|
||||
effective_ttl = max(1, (self.token_lifetime_minutes - 2) * 60)
|
||||
self.redis_client.set(self.TOKEN_CACHE_KEY, access_token, ex=effective_ttl)
|
||||
self.redis_client.set(self.TOKEN_TIMESTAMP_CACHE_KEY, str(time.time()), ex=effective_ttl)
|
||||
self._log(f"Token cached in Redis with TTL {effective_ttl}s")
|
||||
self.redis_client.set(ADVOWARE_CONFIG.token_cache_key, access_token, ex=effective_ttl)
|
||||
self.redis_client.set(ADVOWARE_CONFIG.token_timestamp_key, str(time.time()), ex=effective_ttl)
|
||||
self.logger.debug(f"Token cached in Redis with TTL {effective_ttl}s")
|
||||
|
||||
return access_token
|
||||
|
||||
@@ -169,32 +186,33 @@ class AdvowareAPI:
|
||||
Returns:
|
||||
Valid access token
|
||||
"""
|
||||
self._log("Getting access token", level='debug')
|
||||
self.logger.debug("Getting access token")
|
||||
|
||||
if not self.redis_client:
|
||||
self._log("No Redis available, fetching new token")
|
||||
self.logger.info("No Redis available, fetching new token")
|
||||
return self._fetch_new_access_token()
|
||||
|
||||
if force_refresh:
|
||||
self._log("Force refresh requested, fetching new token")
|
||||
self.logger.info("Force refresh requested, fetching new token")
|
||||
return self._fetch_new_access_token()
|
||||
|
||||
# Check cache
|
||||
cached_token = self.redis_client.get(self.TOKEN_CACHE_KEY)
|
||||
token_timestamp = self.redis_client.get(self.TOKEN_TIMESTAMP_CACHE_KEY)
|
||||
cached_token = self.redis_client.get(ADVOWARE_CONFIG.token_cache_key)
|
||||
token_timestamp = self.redis_client.get(ADVOWARE_CONFIG.token_timestamp_key)
|
||||
|
||||
if cached_token and token_timestamp:
|
||||
try:
|
||||
timestamp = float(token_timestamp.decode('utf-8'))
|
||||
# Redis decode_responses=True returns strings
|
||||
timestamp = float(token_timestamp)
|
||||
age_seconds = time.time() - timestamp
|
||||
|
||||
if age_seconds < (self.token_lifetime_minutes - 1) * 60:
|
||||
self._log(f"Using cached token (age: {age_seconds:.0f}s)", level='debug')
|
||||
return cached_token.decode('utf-8')
|
||||
except (ValueError, AttributeError) as e:
|
||||
self._log(f"Error reading cached token: {e}", level='debug')
|
||||
self.logger.debug(f"Using cached token (age: {age_seconds:.0f}s)")
|
||||
return cached_token
|
||||
except (ValueError, AttributeError, TypeError) as e:
|
||||
self.logger.debug(f"Error reading cached token: {e}")
|
||||
|
||||
self._log("Cached token expired or invalid, fetching new")
|
||||
self.logger.info("Cached token expired or invalid, fetching new")
|
||||
return self._fetch_new_access_token()
|
||||
|
||||
async def api_call(
|
||||
@@ -223,6 +241,11 @@ class AdvowareAPI:
|
||||
|
||||
Returns:
|
||||
JSON response or None
|
||||
|
||||
Raises:
|
||||
AdvowareAuthError: Authentication failed
|
||||
AdvowareTimeoutError: Request timed out
|
||||
AdvowareAPIError: Other API errors
|
||||
"""
|
||||
# Clean endpoint
|
||||
endpoint = endpoint.lstrip('/')
|
||||
@@ -233,7 +256,12 @@ class AdvowareAPI:
|
||||
)
|
||||
|
||||
# Get auth token
|
||||
token = self.get_access_token()
|
||||
try:
|
||||
token = self.get_access_token()
|
||||
except AdvowareAuthError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise AdvowareAPIError(f"Failed to get access token: {str(e)}")
|
||||
|
||||
# Prepare headers
|
||||
effective_headers = headers.copy() if headers else {}
|
||||
@@ -243,20 +271,20 @@ class AdvowareAPI:
|
||||
# Use 'data' parameter if provided, otherwise 'json_data'
|
||||
json_payload = data if data is not None else json_data
|
||||
|
||||
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
|
||||
try:
|
||||
self._log(f"API call: {method} {url}", level='debug')
|
||||
|
||||
session = await self._get_session()
|
||||
try:
|
||||
with self.logger.api_call(endpoint, method):
|
||||
async with session.request(
|
||||
method,
|
||||
url,
|
||||
headers=effective_headers,
|
||||
params=params,
|
||||
json=json_payload
|
||||
json=json_payload,
|
||||
timeout=effective_timeout
|
||||
) as response:
|
||||
# Handle 401 - retry with fresh token
|
||||
if response.status == 401:
|
||||
self._log("401 Unauthorized, refreshing token")
|
||||
self.logger.warning("401 Unauthorized, refreshing token")
|
||||
token = self.get_access_token(force_refresh=True)
|
||||
effective_headers['Authorization'] = f'Bearer {token}'
|
||||
|
||||
@@ -265,17 +293,57 @@ class AdvowareAPI:
|
||||
url,
|
||||
headers=effective_headers,
|
||||
params=params,
|
||||
json=json_payload
|
||||
json=json_payload,
|
||||
timeout=effective_timeout
|
||||
) as retry_response:
|
||||
if retry_response.status == 401:
|
||||
raise AdvowareAuthError(
|
||||
"Authentication failed even after token refresh",
|
||||
status_code=401
|
||||
)
|
||||
|
||||
if retry_response.status >= 500:
|
||||
error_text = await retry_response.text()
|
||||
raise RetryableError(
|
||||
f"Server error {retry_response.status}: {error_text}"
|
||||
)
|
||||
|
||||
retry_response.raise_for_status()
|
||||
return await self._parse_response(retry_response)
|
||||
|
||||
response.raise_for_status()
|
||||
# Handle other error codes
|
||||
if response.status == 404:
|
||||
error_text = await response.text()
|
||||
raise AdvowareAPIError(
|
||||
f"Resource not found: {endpoint}",
|
||||
status_code=404,
|
||||
response_body=error_text
|
||||
)
|
||||
|
||||
if response.status >= 500:
|
||||
error_text = await response.text()
|
||||
raise RetryableError(
|
||||
f"Server error {response.status}: {error_text}"
|
||||
)
|
||||
|
||||
if response.status >= 400:
|
||||
error_text = await response.text()
|
||||
raise AdvowareAPIError(
|
||||
f"API error {response.status}: {error_text}",
|
||||
status_code=response.status,
|
||||
response_body=error_text
|
||||
)
|
||||
|
||||
return await self._parse_response(response)
|
||||
|
||||
except aiohttp.ClientError as e:
|
||||
self._log(f"API call failed: {e}", level='error')
|
||||
raise
|
||||
except asyncio.TimeoutError:
|
||||
raise AdvowareTimeoutError(
|
||||
f"Request timed out after {effective_timeout.total}s",
|
||||
status_code=408
|
||||
)
|
||||
except aiohttp.ClientError as e:
|
||||
self.logger.error(f"API call failed: {e}")
|
||||
raise AdvowareAPIError(f"Request failed: {str(e)}")
|
||||
|
||||
async def _parse_response(self, response: aiohttp.ClientResponse) -> Any:
|
||||
"""Parse API response"""
|
||||
@@ -283,27 +351,6 @@ class AdvowareAPI:
|
||||
try:
|
||||
return await response.json()
|
||||
except Exception as e:
|
||||
self._log(f"JSON parse error: {e}", level='debug')
|
||||
self.logger.debug(f"JSON parse error: {e}")
|
||||
return None
|
||||
return None
|
||||
|
||||
def _log(self, message: str, level: str = 'info'):
|
||||
"""Log message via context or standard logger"""
|
||||
if self.context:
|
||||
if level == 'debug':
|
||||
self.context.logger.debug(message)
|
||||
elif level == 'warning':
|
||||
self.context.logger.warning(message)
|
||||
elif level == 'error':
|
||||
self.context.logger.error(message)
|
||||
else:
|
||||
self.context.logger.info(message)
|
||||
else:
|
||||
if level == 'debug':
|
||||
logger.debug(message)
|
||||
elif level == 'warning':
|
||||
logger.warning(message)
|
||||
elif level == 'error':
|
||||
logger.error(message)
|
||||
else:
|
||||
logger.info(message)
|
||||
|
||||
@@ -13,64 +13,39 @@ Hilfsfunktionen für Sync-Operationen:
|
||||
from typing import Dict, Any, Optional, Tuple, Literal
|
||||
from datetime import datetime, timedelta
|
||||
import pytz
|
||||
import logging
|
||||
import redis
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from services.exceptions import LockAcquisitionError, SyncError, ValidationError
|
||||
from services.redis_client import get_redis_client
|
||||
from services.config import SYNC_CONFIG, get_lock_key, get_retry_delay_seconds
|
||||
from services.logging_utils import get_logger
|
||||
|
||||
import redis
|
||||
|
||||
# Timestamp-Vergleich Ergebnis-Typen
|
||||
TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_change"]
|
||||
|
||||
# Max retry before permanent failure
|
||||
MAX_SYNC_RETRIES = 5
|
||||
# Lock TTL in seconds (prevents deadlocks)
|
||||
LOCK_TTL_SECONDS = 900 # 15 minutes
|
||||
# Retry backoff: Wartezeit zwischen Retries (in Minuten)
|
||||
RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h
|
||||
# Auto-Reset nach 24h (für permanently_failed entities)
|
||||
AUTO_RESET_HOURS = 24
|
||||
|
||||
|
||||
class BeteiligteSync:
|
||||
"""Utility-Klasse für Beteiligte-Synchronisation"""
|
||||
|
||||
def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None):
|
||||
def __init__(self, espocrm_api, redis_client: Optional[redis.Redis] = None, context=None):
|
||||
self.espocrm = espocrm_api
|
||||
self.context = context
|
||||
self.logger = context.logger if context else logger
|
||||
self.redis = redis_client or self._init_redis()
|
||||
self.logger = get_logger('beteiligte_sync', context)
|
||||
|
||||
# Use provided Redis client or get from factory
|
||||
self.redis = redis_client or get_redis_client(strict=False)
|
||||
|
||||
if not self.redis:
|
||||
self.logger.error(
|
||||
"⚠️ KRITISCH: Redis nicht verfügbar! "
|
||||
"Distributed Locking deaktiviert - Race Conditions möglich!"
|
||||
)
|
||||
|
||||
# Import NotificationManager only when needed
|
||||
from services.notification_utils import NotificationManager
|
||||
self.notification_manager = NotificationManager(espocrm_api=self.espocrm, context=context)
|
||||
|
||||
def _init_redis(self) -> redis.Redis:
|
||||
"""Initialize Redis client for distributed locking"""
|
||||
try:
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
decode_responses=True
|
||||
)
|
||||
client.ping()
|
||||
return client
|
||||
except Exception as e:
|
||||
self._log(f"Redis connection failed: {e}", level='error')
|
||||
return None
|
||||
|
||||
def _log(self, message: str, level: str = 'info'):
|
||||
"""Logging mit Context-Support"""
|
||||
if self.context and hasattr(self.context, 'logger'):
|
||||
getattr(self.context.logger, level)(message)
|
||||
else:
|
||||
getattr(logger, level)(message)
|
||||
|
||||
async def acquire_sync_lock(self, entity_id: str) -> bool:
|
||||
"""
|
||||
Atomic distributed lock via Redis + syncStatus update
|
||||
@@ -80,23 +55,35 @@ class BeteiligteSync:
|
||||
|
||||
Returns:
|
||||
True wenn Lock erfolgreich, False wenn bereits im Sync
|
||||
|
||||
Raises:
|
||||
SyncError: Bei kritischen Sync-Problemen
|
||||
"""
|
||||
try:
|
||||
# STEP 1: Atomic Redis lock (prevents race conditions)
|
||||
if self.redis:
|
||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
||||
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
|
||||
lock_key = get_lock_key('cbeteiligte', entity_id)
|
||||
acquired = self.redis.set(
|
||||
lock_key,
|
||||
"locked",
|
||||
nx=True,
|
||||
ex=SYNC_CONFIG.lock_ttl_seconds
|
||||
)
|
||||
|
||||
if not acquired:
|
||||
self._log(f"Redis lock bereits aktiv für {entity_id}", level='warn')
|
||||
self.logger.warning(f"Redis lock bereits aktiv für {entity_id}")
|
||||
return False
|
||||
else:
|
||||
self.logger.error(
|
||||
f"⚠️ WARNUNG: Sync ohne Redis-Lock für {entity_id} - Race Condition möglich!"
|
||||
)
|
||||
|
||||
# STEP 2: Update syncStatus (für UI visibility)
|
||||
await self.espocrm.update_entity('CBeteiligte', entity_id, {
|
||||
'syncStatus': 'syncing'
|
||||
})
|
||||
|
||||
self._log(f"Sync-Lock für {entity_id} erworben")
|
||||
self.logger.info(f"Sync-Lock für {entity_id} erworben")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
@@ -152,32 +139,42 @@ class BeteiligteSync:
|
||||
update_data['syncRetryCount'] = new_retry
|
||||
|
||||
# Exponential backoff - berechne nächsten Retry-Zeitpunkt
|
||||
if new_retry <= len(RETRY_BACKOFF_MINUTES):
|
||||
backoff_minutes = RETRY_BACKOFF_MINUTES[new_retry - 1]
|
||||
backoff_minutes = SYNC_CONFIG.retry_backoff_minutes
|
||||
if new_retry <= len(backoff_minutes):
|
||||
backoff_min = backoff_minutes[new_retry - 1]
|
||||
else:
|
||||
backoff_minutes = RETRY_BACKOFF_MINUTES[-1] # Letzte Backoff-Zeit
|
||||
backoff_min = backoff_minutes[-1] # Letzte Backoff-Zeit
|
||||
|
||||
next_retry = now_utc + timedelta(minutes=backoff_minutes)
|
||||
next_retry = now_utc + timedelta(minutes=backoff_min)
|
||||
update_data['syncNextRetry'] = next_retry.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
self._log(f"Retry {new_retry}/{MAX_SYNC_RETRIES}, nächster Versuch in {backoff_minutes} Minuten")
|
||||
self.logger.info(
|
||||
f"Retry {new_retry}/{SYNC_CONFIG.max_retries}, "
|
||||
f"nächster Versuch in {backoff_min} Minuten"
|
||||
)
|
||||
|
||||
# Check max retries - mark as permanently failed
|
||||
if new_retry >= MAX_SYNC_RETRIES:
|
||||
if new_retry >= SYNC_CONFIG.max_retries:
|
||||
update_data['syncStatus'] = 'permanently_failed'
|
||||
|
||||
# Auto-Reset Timestamp für Wiederherstellung nach 24h
|
||||
auto_reset_time = now_utc + timedelta(hours=AUTO_RESET_HOURS)
|
||||
auto_reset_time = now_utc + timedelta(hours=SYNC_CONFIG.auto_reset_hours)
|
||||
update_data['syncAutoResetAt'] = auto_reset_time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
await self.send_notification(
|
||||
entity_id,
|
||||
'error',
|
||||
extra_data={
|
||||
'message': f"Sync fehlgeschlagen nach {MAX_SYNC_RETRIES} Versuchen. Auto-Reset in {AUTO_RESET_HOURS}h."
|
||||
'message': (
|
||||
f"Sync fehlgeschlagen nach {SYNC_CONFIG.max_retries} Versuchen. "
|
||||
f"Auto-Reset in {SYNC_CONFIG.auto_reset_hours}h."
|
||||
)
|
||||
}
|
||||
)
|
||||
self._log(f"Max retries ({MAX_SYNC_RETRIES}) erreicht für {entity_id}, Auto-Reset um {auto_reset_time}", level='error')
|
||||
self.logger.error(
|
||||
f"Max retries ({SYNC_CONFIG.max_retries}) erreicht für {entity_id}, "
|
||||
f"Auto-Reset um {auto_reset_time}"
|
||||
)
|
||||
else:
|
||||
update_data['syncRetryCount'] = 0
|
||||
update_data['syncNextRetry'] = None
|
||||
@@ -188,19 +185,19 @@ class BeteiligteSync:
|
||||
|
||||
await self.espocrm.update_entity('CBeteiligte', entity_id, update_data)
|
||||
|
||||
self._log(f"Sync-Lock released: {entity_id} → {new_status}")
|
||||
self.logger.info(f"Sync-Lock released: {entity_id} → {new_status}")
|
||||
|
||||
# Release Redis lock
|
||||
if self.redis:
|
||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
||||
lock_key = get_lock_key('cbeteiligte', entity_id)
|
||||
self.redis.delete(lock_key)
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"Fehler beim Release Lock: {e}", level='error')
|
||||
self.logger.error(f"Fehler beim Release Lock: {e}")
|
||||
# Ensure Redis lock is released even on error
|
||||
if self.redis:
|
||||
try:
|
||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
||||
lock_key = get_lock_key('cbeteiligte', entity_id)
|
||||
self.redis.delete(lock_key)
|
||||
except:
|
||||
pass
|
||||
|
||||
338
services/config.py
Normal file
338
services/config.py
Normal file
@@ -0,0 +1,338 @@
|
||||
"""
|
||||
Zentrale Konfiguration für BitByLaw Integration
|
||||
|
||||
Alle Magic Numbers und Strings sind hier zentralisiert.
|
||||
"""
|
||||
|
||||
from typing import List, Dict
|
||||
from dataclasses import dataclass
|
||||
import os
|
||||
|
||||
|
||||
# ========== Sync Configuration ==========
|
||||
|
||||
@dataclass
|
||||
class SyncConfig:
|
||||
"""Konfiguration für Sync-Operationen"""
|
||||
|
||||
# Retry-Konfiguration
|
||||
max_retries: int = 5
|
||||
"""Maximale Anzahl von Retry-Versuchen"""
|
||||
|
||||
retry_backoff_minutes: List[int] = None
|
||||
"""Exponential Backoff in Minuten: [1, 5, 15, 60, 240]"""
|
||||
|
||||
auto_reset_hours: int = 24
|
||||
"""Auto-Reset für permanently_failed Entities (in Stunden)"""
|
||||
|
||||
# Lock-Konfiguration
|
||||
lock_ttl_seconds: int = 900 # 15 Minuten
|
||||
"""TTL für distributed locks (verhindert Deadlocks)"""
|
||||
|
||||
lock_prefix: str = "sync_lock"
|
||||
"""Prefix für Redis Lock Keys"""
|
||||
|
||||
# Validation
|
||||
validate_before_sync: bool = True
|
||||
"""Validiere Entities vor dem Sync (empfohlen)"""
|
||||
|
||||
# Change Detection
|
||||
use_rowid_change_detection: bool = True
|
||||
"""Nutze rowId für Change Detection (Advoware)"""
|
||||
|
||||
def __post_init__(self):
|
||||
if self.retry_backoff_minutes is None:
|
||||
# Default exponential backoff: 1, 5, 15, 60, 240 Minuten
|
||||
self.retry_backoff_minutes = [1, 5, 15, 60, 240]
|
||||
|
||||
|
||||
# Singleton Instance
|
||||
SYNC_CONFIG = SyncConfig()
|
||||
|
||||
|
||||
# ========== API Configuration ==========
|
||||
|
||||
@dataclass
|
||||
class APIConfig:
|
||||
"""API-spezifische Konfiguration"""
|
||||
|
||||
# Timeouts
|
||||
default_timeout_seconds: int = 30
|
||||
"""Default Timeout für API-Calls"""
|
||||
|
||||
long_running_timeout_seconds: int = 120
|
||||
"""Timeout für lange Operations (z.B. Uploads)"""
|
||||
|
||||
# Retry
|
||||
max_api_retries: int = 3
|
||||
"""Anzahl Retries bei API-Fehlern"""
|
||||
|
||||
retry_status_codes: List[int] = None
|
||||
"""HTTP Status Codes die Retry auslösen"""
|
||||
|
||||
# Rate Limiting
|
||||
rate_limit_enabled: bool = True
|
||||
"""Aktiviere Rate Limiting"""
|
||||
|
||||
rate_limit_calls_per_minute: int = 60
|
||||
"""Max. API-Calls pro Minute"""
|
||||
|
||||
def __post_init__(self):
|
||||
if self.retry_status_codes is None:
|
||||
# Retry bei: 408 (Timeout), 429 (Rate Limit), 500, 502, 503, 504
|
||||
self.retry_status_codes = [408, 429, 500, 502, 503, 504]
|
||||
|
||||
|
||||
API_CONFIG = APIConfig()
|
||||
|
||||
|
||||
# ========== Advoware Configuration ==========
|
||||
|
||||
@dataclass
|
||||
class AdvowareConfig:
|
||||
"""Advoware-spezifische Konfiguration"""
|
||||
|
||||
# Token Management
|
||||
token_lifetime_minutes: int = 55
|
||||
"""Token-Lifetime (tatsächlich 60min, aber 5min Puffer)"""
|
||||
|
||||
token_cache_key: str = "advoware_access_token"
|
||||
"""Redis Key für Token Cache"""
|
||||
|
||||
token_timestamp_key: str = "advoware_token_timestamp"
|
||||
"""Redis Key für Token Timestamp"""
|
||||
|
||||
# Auth
|
||||
auth_url: str = "https://security.advo-net.net/api/v1/Token"
|
||||
"""Advoware Auth-Endpoint"""
|
||||
|
||||
product_id: int = 64
|
||||
"""Advoware Product ID"""
|
||||
|
||||
# Field Mapping
|
||||
readonly_fields: List[str] = None
|
||||
"""Felder die nicht via PUT geändert werden können"""
|
||||
|
||||
def __post_init__(self):
|
||||
if self.readonly_fields is None:
|
||||
# Diese Felder können nicht via PUT geändert werden
|
||||
self.readonly_fields = [
|
||||
'betNr', 'rowId', 'kommKz', # Kommunikation: kommKz ist read-only!
|
||||
'handelsRegisterNummer', 'registergericht' # Werden ignoriert von API
|
||||
]
|
||||
|
||||
|
||||
ADVOWARE_CONFIG = AdvowareConfig()
|
||||
|
||||
|
||||
# ========== EspoCRM Configuration ==========
|
||||
|
||||
@dataclass
|
||||
class EspoCRMConfig:
|
||||
"""EspoCRM-spezifische Konfiguration"""
|
||||
|
||||
# API
|
||||
default_page_size: int = 50
|
||||
"""Default Seitengröße für Listen-Abfragen"""
|
||||
|
||||
max_page_size: int = 200
|
||||
"""Maximale Seitengröße"""
|
||||
|
||||
# Sync Status Fields
|
||||
sync_status_field: str = "syncStatus"
|
||||
"""Feldname für Sync-Status"""
|
||||
|
||||
sync_error_field: str = "syncErrorMessage"
|
||||
"""Feldname für Sync-Fehler"""
|
||||
|
||||
sync_retry_field: str = "syncRetryCount"
|
||||
"""Feldname für Retry-Counter"""
|
||||
|
||||
# Notifications
|
||||
notification_enabled: bool = True
|
||||
"""In-App Notifications aktivieren"""
|
||||
|
||||
notification_user_id: str = "1"
|
||||
"""User-ID für Notifications (Marvin)"""
|
||||
|
||||
|
||||
ESPOCRM_CONFIG = EspoCRMConfig()
|
||||
|
||||
|
||||
# ========== Redis Configuration ==========
|
||||
|
||||
@dataclass
|
||||
class RedisConfig:
|
||||
"""Redis-spezifische Konfiguration"""
|
||||
|
||||
# Connection
|
||||
host: str = "localhost"
|
||||
port: int = 6379
|
||||
db: int = 1
|
||||
timeout_seconds: int = 5
|
||||
max_connections: int = 50
|
||||
|
||||
# Behavior
|
||||
decode_responses: bool = True
|
||||
"""Auto-decode bytes zu strings"""
|
||||
|
||||
health_check_interval: int = 30
|
||||
"""Health-Check Interval in Sekunden"""
|
||||
|
||||
# Keys
|
||||
key_prefix: str = "bitbylaw"
|
||||
"""Prefix für alle Redis Keys"""
|
||||
|
||||
def get_key(self, key: str) -> str:
|
||||
"""Gibt vollen Redis Key mit Prefix zurück"""
|
||||
return f"{self.key_prefix}:{key}"
|
||||
|
||||
@classmethod
|
||||
def from_env(cls) -> 'RedisConfig':
|
||||
"""Lädt Redis-Config aus Environment Variables"""
|
||||
return cls(
|
||||
host=os.getenv('REDIS_HOST', 'localhost'),
|
||||
port=int(os.getenv('REDIS_PORT', '6379')),
|
||||
db=int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')),
|
||||
timeout_seconds=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5')),
|
||||
max_connections=int(os.getenv('REDIS_MAX_CONNECTIONS', '50'))
|
||||
)
|
||||
|
||||
|
||||
REDIS_CONFIG = RedisConfig.from_env()
|
||||
|
||||
|
||||
# ========== Logging Configuration ==========
|
||||
|
||||
@dataclass
|
||||
class LoggingConfig:
|
||||
"""Logging-Konfiguration"""
|
||||
|
||||
# Levels
|
||||
default_level: str = "INFO"
|
||||
"""Default Log-Level"""
|
||||
|
||||
api_level: str = "INFO"
|
||||
"""Log-Level für API-Calls"""
|
||||
|
||||
sync_level: str = "INFO"
|
||||
"""Log-Level für Sync-Operations"""
|
||||
|
||||
# Format
|
||||
log_format: str = "[{timestamp}] {level} {logger}: {message}"
|
||||
"""Log-Format"""
|
||||
|
||||
include_context: bool = True
|
||||
"""Motia FlowContext in Logs einbinden"""
|
||||
|
||||
# Performance
|
||||
log_api_timings: bool = True
|
||||
"""API Call Timings loggen"""
|
||||
|
||||
log_sync_duration: bool = True
|
||||
"""Sync-Dauer loggen"""
|
||||
|
||||
|
||||
LOGGING_CONFIG = LoggingConfig()
|
||||
|
||||
|
||||
# ========== Calendar Sync Configuration ==========
|
||||
|
||||
@dataclass
|
||||
class CalendarSyncConfig:
|
||||
"""Konfiguration für Google Calendar Sync"""
|
||||
|
||||
# Sync Window
|
||||
sync_days_past: int = 7
|
||||
"""Tage in die Vergangenheit syncen"""
|
||||
|
||||
sync_days_future: int = 90
|
||||
"""Tage in die Zukunft syncen"""
|
||||
|
||||
# Cron
|
||||
cron_schedule: str = "0 */15 * * * *"
|
||||
"""Cron-Schedule (jede 15 Minuten)"""
|
||||
|
||||
# Batch Size
|
||||
batch_size: int = 10
|
||||
"""Anzahl Mitarbeiter pro Batch"""
|
||||
|
||||
|
||||
CALENDAR_SYNC_CONFIG = CalendarSyncConfig()
|
||||
|
||||
|
||||
# ========== Feature Flags ==========
|
||||
|
||||
@dataclass
|
||||
class FeatureFlags:
|
||||
"""Feature Flags für schrittweises Rollout"""
|
||||
|
||||
# Validation
|
||||
strict_validation: bool = True
|
||||
"""Strenge Validierung mit Pydantic"""
|
||||
|
||||
# Sync Features
|
||||
kommunikation_sync_enabled: bool = False
|
||||
"""Kommunikation-Sync aktivieren (noch in Entwicklung)"""
|
||||
|
||||
document_sync_enabled: bool = False
|
||||
"""Document-Sync aktivieren (noch in Entwicklung)"""
|
||||
|
||||
# Advanced Features
|
||||
parallel_sync_enabled: bool = False
|
||||
"""Parallele Sync-Operations (experimentell)"""
|
||||
|
||||
auto_conflict_resolution: bool = False
|
||||
"""Automatische Konfliktauflösung (experimentell)"""
|
||||
|
||||
# Debug
|
||||
debug_mode: bool = False
|
||||
"""Debug-Modus (mehr Logging, langsamer)"""
|
||||
|
||||
|
||||
FEATURE_FLAGS = FeatureFlags()
|
||||
|
||||
|
||||
# ========== Helper Functions ==========
|
||||
|
||||
def get_retry_delay_seconds(attempt: int) -> int:
|
||||
"""
|
||||
Gibt Retry-Delay in Sekunden für gegebenen Versuch zurück.
|
||||
|
||||
Args:
|
||||
attempt: Versuchs-Nummer (0-indexed)
|
||||
|
||||
Returns:
|
||||
Delay in Sekunden
|
||||
"""
|
||||
backoff_minutes = SYNC_CONFIG.retry_backoff_minutes
|
||||
if attempt < len(backoff_minutes):
|
||||
return backoff_minutes[attempt] * 60
|
||||
return backoff_minutes[-1] * 60
|
||||
|
||||
|
||||
def get_lock_key(entity_type: str, entity_id: str) -> str:
|
||||
"""
|
||||
Erzeugt Redis Lock-Key für Entity.
|
||||
|
||||
Args:
|
||||
entity_type: Entity-Typ (z.B. 'cbeteiligte')
|
||||
entity_id: Entity-ID
|
||||
|
||||
Returns:
|
||||
Redis Key
|
||||
"""
|
||||
return f"{SYNC_CONFIG.lock_prefix}:{entity_type.lower()}:{entity_id}"
|
||||
|
||||
|
||||
def is_retryable_status_code(status_code: int) -> bool:
|
||||
"""
|
||||
Prüft ob HTTP Status Code Retry auslösen soll.
|
||||
|
||||
Args:
|
||||
status_code: HTTP Status Code
|
||||
|
||||
Returns:
|
||||
True wenn retryable
|
||||
"""
|
||||
return status_code in API_CONFIG.retry_status_codes
|
||||
@@ -2,23 +2,23 @@
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import logging
|
||||
import redis
|
||||
import os
|
||||
from typing import Optional, Dict, Any, List
|
||||
import os
|
||||
|
||||
from services.exceptions import (
|
||||
EspoCRMAPIError,
|
||||
EspoCRMAuthError,
|
||||
EspoCRMTimeoutError,
|
||||
RetryableError,
|
||||
ValidationError
|
||||
)
|
||||
from services.redis_client import get_redis_client
|
||||
from services.config import ESPOCRM_CONFIG, API_CONFIG
|
||||
from services.logging_utils import get_service_logger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EspoCRMError(Exception):
|
||||
"""Base exception for EspoCRM API errors"""
|
||||
pass
|
||||
|
||||
|
||||
class EspoCRMAuthError(EspoCRMError):
|
||||
"""Authentication error"""
|
||||
pass
|
||||
|
||||
|
||||
class EspoCRMAPI:
|
||||
"""
|
||||
EspoCRM API Client for BitByLaw integration.
|
||||
@@ -32,7 +32,6 @@ class EspoCRMAPI:
|
||||
- ESPOCRM_API_BASE_URL (e.g., https://crm.bitbylaw.com/api/v1)
|
||||
- ESPOCRM_API_KEY (Marvin API key)
|
||||
- ESPOCRM_API_TIMEOUT_SECONDS (optional, default: 30)
|
||||
- REDIS_HOST, REDIS_PORT, REDIS_DB_ADVOWARE_CACHE (for caching)
|
||||
"""
|
||||
|
||||
def __init__(self, context=None):
|
||||
@@ -43,47 +42,27 @@ class EspoCRMAPI:
|
||||
context: Motia FlowContext for logging (optional)
|
||||
"""
|
||||
self.context = context
|
||||
self._log("EspoCRMAPI initializing", level='debug')
|
||||
self.logger = get_service_logger('espocrm', context)
|
||||
self.logger.debug("EspoCRMAPI initializing")
|
||||
|
||||
# Load configuration from environment
|
||||
self.api_base_url = os.getenv('ESPOCRM_API_BASE_URL', 'https://crm.bitbylaw.com/api/v1')
|
||||
self.api_key = os.getenv('ESPOCRM_API_KEY', '')
|
||||
self.api_timeout_seconds = int(os.getenv('ESPOCRM_API_TIMEOUT_SECONDS', '30'))
|
||||
self.api_timeout_seconds = int(os.getenv('ESPOCRM_API_TIMEOUT_SECONDS', str(API_CONFIG.default_timeout_seconds)))
|
||||
|
||||
if not self.api_key:
|
||||
raise EspoCRMAuthError("ESPOCRM_API_KEY not configured in environment")
|
||||
|
||||
self._log(f"EspoCRM API initialized with base URL: {self.api_base_url}")
|
||||
self.logger.info(f"EspoCRM API initialized with base URL: {self.api_base_url}")
|
||||
|
||||
# Optional Redis for caching/rate limiting
|
||||
try:
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
redis_timeout = int(os.getenv('REDIS_TIMEOUT_SECONDS', '5'))
|
||||
self._session: Optional[aiohttp.ClientSession] = None
|
||||
|
||||
self.redis_client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=redis_timeout,
|
||||
socket_connect_timeout=redis_timeout,
|
||||
decode_responses=True
|
||||
)
|
||||
self.redis_client.ping()
|
||||
self._log("Connected to Redis for EspoCRM operations")
|
||||
except Exception as e:
|
||||
self._log(f"Could not connect to Redis: {e}. Continuing without caching.", level='warning')
|
||||
self.redis_client = None
|
||||
|
||||
def _log(self, message: str, level: str = 'info'):
|
||||
"""Log message via context.logger if available, otherwise use module logger"""
|
||||
if self.context and hasattr(self.context, 'logger'):
|
||||
log_func = getattr(self.context.logger, level, self.context.logger.info)
|
||||
log_func(f"[EspoCRM] {message}")
|
||||
# Optional Redis for caching/rate limiting (centralized)
|
||||
self.redis_client = get_redis_client(strict=False)
|
||||
if self.redis_client:
|
||||
self.logger.info("Connected to Redis for EspoCRM operations")
|
||||
else:
|
||||
log_func = getattr(logger, level, logger.info)
|
||||
log_func(f"[EspoCRM] {message}")
|
||||
self.logger.warning("⚠️ Redis unavailable - caching disabled")
|
||||
|
||||
def _get_headers(self) -> Dict[str, str]:
|
||||
"""Generate request headers with API key"""
|
||||
@@ -93,6 +72,15 @@ class EspoCRMAPI:
|
||||
'Accept': 'application/json'
|
||||
}
|
||||
|
||||
async def _get_session(self) -> aiohttp.ClientSession:
|
||||
if self._session is None or self._session.closed:
|
||||
self._session = aiohttp.ClientSession()
|
||||
return self._session
|
||||
|
||||
async def close(self) -> None:
|
||||
if self._session and not self._session.closed:
|
||||
await self._session.close()
|
||||
|
||||
async def api_call(
|
||||
self,
|
||||
endpoint: str,
|
||||
@@ -115,7 +103,9 @@ class EspoCRMAPI:
|
||||
Parsed JSON response or None
|
||||
|
||||
Raises:
|
||||
EspoCRMError: On API errors
|
||||
EspoCRMAuthError: Authentication failed
|
||||
EspoCRMTimeoutError: Request timed out
|
||||
EspoCRMAPIError: Other API errors
|
||||
"""
|
||||
# Ensure endpoint starts with /
|
||||
if not endpoint.startswith('/'):
|
||||
@@ -127,45 +117,62 @@ class EspoCRMAPI:
|
||||
total=timeout_seconds or self.api_timeout_seconds
|
||||
)
|
||||
|
||||
self._log(f"API call: {method} {url}", level='debug')
|
||||
if params:
|
||||
self._log(f"Params: {params}", level='debug')
|
||||
|
||||
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
|
||||
try:
|
||||
session = await self._get_session()
|
||||
try:
|
||||
with self.logger.api_call(endpoint, method):
|
||||
async with session.request(
|
||||
method,
|
||||
url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
json=json_data
|
||||
json=json_data,
|
||||
timeout=effective_timeout
|
||||
) as response:
|
||||
# Log response status
|
||||
self._log(f"Response status: {response.status}", level='debug')
|
||||
|
||||
# Handle errors
|
||||
if response.status == 401:
|
||||
raise EspoCRMAuthError("Authentication failed - check API key")
|
||||
raise EspoCRMAuthError(
|
||||
"Authentication failed - check API key",
|
||||
status_code=401
|
||||
)
|
||||
elif response.status == 403:
|
||||
raise EspoCRMError("Access forbidden")
|
||||
raise EspoCRMAPIError(
|
||||
"Access forbidden",
|
||||
status_code=403
|
||||
)
|
||||
elif response.status == 404:
|
||||
raise EspoCRMError(f"Resource not found: {endpoint}")
|
||||
raise EspoCRMAPIError(
|
||||
f"Resource not found: {endpoint}",
|
||||
status_code=404
|
||||
)
|
||||
elif response.status >= 500:
|
||||
error_text = await response.text()
|
||||
raise RetryableError(
|
||||
f"Server error {response.status}: {error_text}"
|
||||
)
|
||||
elif response.status >= 400:
|
||||
error_text = await response.text()
|
||||
raise EspoCRMError(f"API error {response.status}: {error_text}")
|
||||
raise EspoCRMAPIError(
|
||||
f"API error {response.status}: {error_text}",
|
||||
status_code=response.status,
|
||||
response_body=error_text
|
||||
)
|
||||
|
||||
# Parse response
|
||||
if response.content_type == 'application/json':
|
||||
result = await response.json()
|
||||
self._log(f"Response received", level='debug')
|
||||
return result
|
||||
else:
|
||||
# For DELETE or other non-JSON responses
|
||||
return None
|
||||
|
||||
except aiohttp.ClientError as e:
|
||||
self._log(f"API call failed: {e}", level='error')
|
||||
raise EspoCRMError(f"Request failed: {e}") from e
|
||||
except asyncio.TimeoutError:
|
||||
raise EspoCRMTimeoutError(
|
||||
f"Request timed out after {effective_timeout.total}s",
|
||||
status_code=408
|
||||
)
|
||||
except aiohttp.ClientError as e:
|
||||
self.logger.error(f"API call failed: {e}")
|
||||
raise EspoCRMAPIError(f"Request failed: {str(e)}")
|
||||
|
||||
async def get_entity(self, entity_type: str, entity_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
@@ -345,36 +352,36 @@ class EspoCRMAPI:
|
||||
|
||||
effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
||||
|
||||
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
|
||||
try:
|
||||
async with session.post(url, headers=headers, data=form_data) as response:
|
||||
self._log(f"Upload response status: {response.status}")
|
||||
session = await self._get_session()
|
||||
try:
|
||||
async with session.post(url, headers=headers, data=form_data, timeout=effective_timeout) as response:
|
||||
self._log(f"Upload response status: {response.status}")
|
||||
|
||||
if response.status == 401:
|
||||
raise EspoCRMAuthError("Authentication failed - check API key")
|
||||
elif response.status == 403:
|
||||
raise EspoCRMError("Access forbidden")
|
||||
elif response.status == 404:
|
||||
raise EspoCRMError(f"Attachment endpoint not found")
|
||||
elif response.status >= 400:
|
||||
error_text = await response.text()
|
||||
self._log(f"❌ Upload failed with {response.status}. Response: {error_text}", level='error')
|
||||
raise EspoCRMError(f"Upload error {response.status}: {error_text}")
|
||||
if response.status == 401:
|
||||
raise EspoCRMAuthError("Authentication failed - check API key")
|
||||
elif response.status == 403:
|
||||
raise EspoCRMError("Access forbidden")
|
||||
elif response.status == 404:
|
||||
raise EspoCRMError(f"Attachment endpoint not found")
|
||||
elif response.status >= 400:
|
||||
error_text = await response.text()
|
||||
self._log(f"❌ Upload failed with {response.status}. Response: {error_text}", level='error')
|
||||
raise EspoCRMError(f"Upload error {response.status}: {error_text}")
|
||||
|
||||
# Parse response
|
||||
if response.content_type == 'application/json':
|
||||
result = await response.json()
|
||||
attachment_id = result.get('id')
|
||||
self._log(f"✅ Attachment uploaded successfully: {attachment_id}")
|
||||
return result
|
||||
else:
|
||||
response_text = await response.text()
|
||||
self._log(f"⚠️ Non-JSON response: {response_text[:200]}", level='warn')
|
||||
return {'success': True, 'response': response_text}
|
||||
# Parse response
|
||||
if response.content_type == 'application/json':
|
||||
result = await response.json()
|
||||
attachment_id = result.get('id')
|
||||
self._log(f"✅ Attachment uploaded successfully: {attachment_id}")
|
||||
return result
|
||||
else:
|
||||
response_text = await response.text()
|
||||
self._log(f"⚠️ Non-JSON response: {response_text[:200]}", level='warn')
|
||||
return {'success': True, 'response': response_text}
|
||||
|
||||
except aiohttp.ClientError as e:
|
||||
self._log(f"Upload failed: {e}", level='error')
|
||||
raise EspoCRMError(f"Upload request failed: {e}") from e
|
||||
except aiohttp.ClientError as e:
|
||||
self._log(f"Upload failed: {e}", level='error')
|
||||
raise EspoCRMError(f"Upload request failed: {e}") from e
|
||||
|
||||
async def download_attachment(self, attachment_id: str) -> bytes:
|
||||
"""
|
||||
@@ -395,23 +402,23 @@ class EspoCRMAPI:
|
||||
|
||||
effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
||||
|
||||
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
|
||||
try:
|
||||
async with session.get(url, headers=headers) as response:
|
||||
if response.status == 401:
|
||||
raise EspoCRMAuthError("Authentication failed - check API key")
|
||||
elif response.status == 403:
|
||||
raise EspoCRMError("Access forbidden")
|
||||
elif response.status == 404:
|
||||
raise EspoCRMError(f"Attachment not found: {attachment_id}")
|
||||
elif response.status >= 400:
|
||||
error_text = await response.text()
|
||||
raise EspoCRMError(f"Download error {response.status}: {error_text}")
|
||||
session = await self._get_session()
|
||||
try:
|
||||
async with session.get(url, headers=headers, timeout=effective_timeout) as response:
|
||||
if response.status == 401:
|
||||
raise EspoCRMAuthError("Authentication failed - check API key")
|
||||
elif response.status == 403:
|
||||
raise EspoCRMError("Access forbidden")
|
||||
elif response.status == 404:
|
||||
raise EspoCRMError(f"Attachment not found: {attachment_id}")
|
||||
elif response.status >= 400:
|
||||
error_text = await response.text()
|
||||
raise EspoCRMError(f"Download error {response.status}: {error_text}")
|
||||
|
||||
content = await response.read()
|
||||
self._log(f"✅ Downloaded {len(content)} bytes")
|
||||
return content
|
||||
content = await response.read()
|
||||
self._log(f"✅ Downloaded {len(content)} bytes")
|
||||
return content
|
||||
|
||||
except aiohttp.ClientError as e:
|
||||
self._log(f"Download failed: {e}", level='error')
|
||||
raise EspoCRMError(f"Download request failed: {e}") from e
|
||||
except aiohttp.ClientError as e:
|
||||
self._log(f"Download failed: {e}", level='error')
|
||||
raise EspoCRMError(f"Download request failed: {e}") from e
|
||||
|
||||
@@ -8,6 +8,16 @@ from typing import Dict, Any, Optional, List
|
||||
from datetime import datetime
|
||||
import logging
|
||||
|
||||
from services.models import (
|
||||
AdvowareBeteiligteCreate,
|
||||
AdvowareBeteiligteUpdate,
|
||||
EspoCRMBeteiligteCreate,
|
||||
validate_beteiligte_advoware,
|
||||
validate_beteiligte_espocrm
|
||||
)
|
||||
from services.exceptions import ValidationError
|
||||
from services.config import FEATURE_FLAGS
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -27,6 +37,9 @@ class BeteiligteMapper:
|
||||
|
||||
Returns:
|
||||
Dict mit Stammdaten für Advoware API (POST/PUT /api/v1/advonet/Beteiligte)
|
||||
|
||||
Raises:
|
||||
ValidationError: Bei Validierungsfehlern (wenn strict_validation aktiviert)
|
||||
"""
|
||||
logger.debug(f"Mapping EspoCRM → Advoware STAMMDATEN: {espo_entity.get('id')}")
|
||||
|
||||
@@ -78,6 +91,14 @@ class BeteiligteMapper:
|
||||
|
||||
logger.debug(f"Mapped to Advoware STAMMDATEN: name={advo_data.get('name')}, vorname={advo_data.get('vorname')}, rechtsform={advo_data.get('rechtsform')}")
|
||||
|
||||
# Optional: Validiere mit Pydantic wenn aktiviert
|
||||
if FEATURE_FLAGS.strict_validation:
|
||||
try:
|
||||
validate_beteiligte_advoware(advo_data)
|
||||
except ValidationError as e:
|
||||
logger.warning(f"Validation warning: {e}")
|
||||
# Continue anyway - validation ist optional
|
||||
|
||||
return advo_data
|
||||
|
||||
@staticmethod
|
||||
|
||||
217
services/exceptions.py
Normal file
217
services/exceptions.py
Normal file
@@ -0,0 +1,217 @@
|
||||
"""
|
||||
Custom Exception Classes für BitByLaw Integration
|
||||
|
||||
Hierarchie:
|
||||
- IntegrationError (Base)
|
||||
- APIError
|
||||
- AdvowareAPIError
|
||||
- AdvowareAuthError
|
||||
- AdvowareTimeoutError
|
||||
- EspoCRMAPIError
|
||||
- EspoCRMAuthError
|
||||
- EspoCRMTimeoutError
|
||||
- SyncError
|
||||
- LockAcquisitionError
|
||||
- ValidationError
|
||||
- ConflictError
|
||||
- RetryableError
|
||||
- NonRetryableError
|
||||
"""
|
||||
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
|
||||
class IntegrationError(Exception):
|
||||
"""Base exception for all integration errors"""
|
||||
|
||||
def __init__(self, message: str, details: Optional[Dict[str, Any]] = None):
|
||||
super().__init__(message)
|
||||
self.message = message
|
||||
self.details = details or {}
|
||||
|
||||
|
||||
# ========== API Errors ==========
|
||||
|
||||
class APIError(IntegrationError):
|
||||
"""Base class for all API-related errors"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
status_code: Optional[int] = None,
|
||||
response_body: Optional[str] = None,
|
||||
details: Optional[Dict[str, Any]] = None
|
||||
):
|
||||
super().__init__(message, details)
|
||||
self.status_code = status_code
|
||||
self.response_body = response_body
|
||||
|
||||
|
||||
class AdvowareAPIError(APIError):
|
||||
"""Advoware API error"""
|
||||
pass
|
||||
|
||||
|
||||
class AdvowareAuthError(AdvowareAPIError):
|
||||
"""Advoware authentication error"""
|
||||
pass
|
||||
|
||||
|
||||
class AdvowareTimeoutError(AdvowareAPIError):
|
||||
"""Advoware API timeout"""
|
||||
pass
|
||||
|
||||
|
||||
class EspoCRMAPIError(APIError):
|
||||
"""EspoCRM API error"""
|
||||
pass
|
||||
|
||||
|
||||
class EspoCRMAuthError(EspoCRMAPIError):
|
||||
"""EspoCRM authentication error"""
|
||||
pass
|
||||
|
||||
|
||||
class EspoCRMTimeoutError(EspoCRMAPIError):
|
||||
"""EspoCRM API timeout"""
|
||||
pass
|
||||
|
||||
|
||||
# ========== Sync Errors ==========
|
||||
|
||||
class SyncError(IntegrationError):
|
||||
"""Base class for synchronization errors"""
|
||||
pass
|
||||
|
||||
|
||||
class LockAcquisitionError(SyncError):
|
||||
"""Failed to acquire distributed lock"""
|
||||
|
||||
def __init__(self, entity_id: str, lock_key: str, message: Optional[str] = None):
|
||||
super().__init__(
|
||||
message or f"Could not acquire lock for entity {entity_id}",
|
||||
details={"entity_id": entity_id, "lock_key": lock_key}
|
||||
)
|
||||
self.entity_id = entity_id
|
||||
self.lock_key = lock_key
|
||||
|
||||
|
||||
class ValidationError(SyncError):
|
||||
"""Data validation error"""
|
||||
|
||||
def __init__(self, message: str, field: Optional[str] = None, value: Any = None):
|
||||
super().__init__(
|
||||
message,
|
||||
details={"field": field, "value": value}
|
||||
)
|
||||
self.field = field
|
||||
self.value = value
|
||||
|
||||
|
||||
class ConflictError(SyncError):
|
||||
"""Data conflict during synchronization"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
entity_id: str,
|
||||
source_system: Optional[str] = None,
|
||||
target_system: Optional[str] = None
|
||||
):
|
||||
super().__init__(
|
||||
message,
|
||||
details={
|
||||
"entity_id": entity_id,
|
||||
"source_system": source_system,
|
||||
"target_system": target_system
|
||||
}
|
||||
)
|
||||
self.entity_id = entity_id
|
||||
|
||||
|
||||
# ========== Retry Classification ==========
|
||||
|
||||
class RetryableError(IntegrationError):
|
||||
"""Error that should trigger retry logic"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
retry_after_seconds: Optional[int] = None,
|
||||
details: Optional[Dict[str, Any]] = None
|
||||
):
|
||||
super().__init__(message, details)
|
||||
self.retry_after_seconds = retry_after_seconds
|
||||
|
||||
|
||||
class NonRetryableError(IntegrationError):
|
||||
"""Error that should NOT trigger retry (e.g., validation errors)"""
|
||||
pass
|
||||
|
||||
|
||||
# ========== Redis Errors ==========
|
||||
|
||||
class RedisError(IntegrationError):
|
||||
"""Redis connection or operation error"""
|
||||
|
||||
def __init__(self, message: str, operation: Optional[str] = None):
|
||||
super().__init__(message, details={"operation": operation})
|
||||
self.operation = operation
|
||||
|
||||
|
||||
class RedisConnectionError(RedisError):
|
||||
"""Redis connection failed"""
|
||||
pass
|
||||
|
||||
|
||||
# ========== Helper Functions ==========
|
||||
|
||||
def is_retryable(error: Exception) -> bool:
|
||||
"""
|
||||
Determine if an error should trigger retry logic.
|
||||
|
||||
Args:
|
||||
error: Exception to check
|
||||
|
||||
Returns:
|
||||
True if error is retryable
|
||||
"""
|
||||
if isinstance(error, NonRetryableError):
|
||||
return False
|
||||
|
||||
if isinstance(error, RetryableError):
|
||||
return True
|
||||
|
||||
if isinstance(error, (AdvowareTimeoutError, EspoCRMTimeoutError)):
|
||||
return True
|
||||
|
||||
if isinstance(error, ValidationError):
|
||||
return False
|
||||
|
||||
# Default: assume retryable for API errors
|
||||
if isinstance(error, APIError):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def get_retry_delay(error: Exception, attempt: int) -> int:
|
||||
"""
|
||||
Calculate retry delay based on error type and attempt number.
|
||||
|
||||
Args:
|
||||
error: The error that occurred
|
||||
attempt: Current retry attempt (0-indexed)
|
||||
|
||||
Returns:
|
||||
Delay in seconds
|
||||
"""
|
||||
if isinstance(error, RetryableError) and error.retry_after_seconds:
|
||||
return error.retry_after_seconds
|
||||
|
||||
# Exponential backoff: [1, 5, 15, 60, 240] minutes
|
||||
backoff_minutes = [1, 5, 15, 60, 240]
|
||||
if attempt < len(backoff_minutes):
|
||||
return backoff_minutes[attempt] * 60
|
||||
|
||||
return backoff_minutes[-1] * 60
|
||||
363
services/logging_utils.py
Normal file
363
services/logging_utils.py
Normal file
@@ -0,0 +1,363 @@
|
||||
"""
|
||||
Konsistenter Logging Wrapper für BitByLaw Integration
|
||||
|
||||
Vereinheitlicht Logging über:
|
||||
- Standard Python Logger
|
||||
- Motia FlowContext Logger
|
||||
- Structured Logging
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from typing import Optional, Any, Dict
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class IntegrationLogger:
|
||||
"""
|
||||
Unified Logger mit Support für:
|
||||
- Motia FlowContext
|
||||
- Standard Python Logging
|
||||
- Structured Logging
|
||||
- Performance Tracking
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
context: Optional[Any] = None,
|
||||
extra_fields: Optional[Dict[str, Any]] = None
|
||||
):
|
||||
"""
|
||||
Initialize logger.
|
||||
|
||||
Args:
|
||||
name: Logger name (z.B. 'advoware.api')
|
||||
context: Optional Motia FlowContext
|
||||
extra_fields: Optional extra fields für structured logging
|
||||
"""
|
||||
self.name = name
|
||||
self.context = context
|
||||
self.extra_fields = extra_fields or {}
|
||||
self._standard_logger = logging.getLogger(name)
|
||||
|
||||
def _format_message(self, message: str, **kwargs) -> str:
|
||||
"""
|
||||
Formatiert Log-Message mit optionalen Feldern.
|
||||
|
||||
Args:
|
||||
message: Base message
|
||||
**kwargs: Extra fields
|
||||
|
||||
Returns:
|
||||
Formatted message
|
||||
"""
|
||||
if not kwargs and not self.extra_fields:
|
||||
return message
|
||||
|
||||
# Merge extra fields
|
||||
fields = {**self.extra_fields, **kwargs}
|
||||
|
||||
if fields:
|
||||
field_str = " | ".join(f"{k}={v}" for k, v in fields.items())
|
||||
return f"{message} | {field_str}"
|
||||
|
||||
return message
|
||||
|
||||
def _log(
|
||||
self,
|
||||
level: str,
|
||||
message: str,
|
||||
exc_info: bool = False,
|
||||
**kwargs
|
||||
) -> None:
|
||||
"""
|
||||
Internal logging method.
|
||||
|
||||
Args:
|
||||
level: Log level (debug, info, warning, error, critical)
|
||||
message: Log message
|
||||
exc_info: Include exception info
|
||||
**kwargs: Extra fields for structured logging
|
||||
"""
|
||||
formatted_msg = self._format_message(message, **kwargs)
|
||||
|
||||
# Log to FlowContext if available
|
||||
if self.context and hasattr(self.context, 'logger'):
|
||||
try:
|
||||
log_func = getattr(self.context.logger, level, self.context.logger.info)
|
||||
log_func(formatted_msg)
|
||||
except Exception:
|
||||
# Fallback to standard logger
|
||||
pass
|
||||
|
||||
# Always log to standard Python logger
|
||||
log_func = getattr(self._standard_logger, level, self._standard_logger.info)
|
||||
log_func(formatted_msg, exc_info=exc_info)
|
||||
|
||||
def debug(self, message: str, **kwargs) -> None:
|
||||
"""Log debug message"""
|
||||
self._log('debug', message, **kwargs)
|
||||
|
||||
def info(self, message: str, **kwargs) -> None:
|
||||
"""Log info message"""
|
||||
self._log('info', message, **kwargs)
|
||||
|
||||
def warning(self, message: str, **kwargs) -> None:
|
||||
"""Log warning message"""
|
||||
self._log('warning', message, **kwargs)
|
||||
|
||||
def warn(self, message: str, **kwargs) -> None:
|
||||
"""Alias for warning"""
|
||||
self.warning(message, **kwargs)
|
||||
|
||||
def error(self, message: str, exc_info: bool = True, **kwargs) -> None:
|
||||
"""Log error message (with exception info by default)"""
|
||||
self._log('error', message, exc_info=exc_info, **kwargs)
|
||||
|
||||
def critical(self, message: str, exc_info: bool = True, **kwargs) -> None:
|
||||
"""Log critical message"""
|
||||
self._log('critical', message, exc_info=exc_info, **kwargs)
|
||||
|
||||
def exception(self, message: str, **kwargs) -> None:
|
||||
"""Log exception with traceback"""
|
||||
self._log('error', message, exc_info=True, **kwargs)
|
||||
|
||||
@contextmanager
|
||||
def operation(self, operation_name: str, **context_fields):
|
||||
"""
|
||||
Context manager für Operations mit automatischem Timing.
|
||||
|
||||
Args:
|
||||
operation_name: Name der Operation
|
||||
**context_fields: Context fields für logging
|
||||
|
||||
Example:
|
||||
with logger.operation('sync_beteiligte', entity_id='123'):
|
||||
# Do sync
|
||||
pass
|
||||
"""
|
||||
start_time = time.time()
|
||||
self.info(f"▶️ Starting: {operation_name}", **context_fields)
|
||||
|
||||
try:
|
||||
yield
|
||||
duration_ms = int((time.time() - start_time) * 1000)
|
||||
self.info(
|
||||
f"✅ Completed: {operation_name}",
|
||||
duration_ms=duration_ms,
|
||||
**context_fields
|
||||
)
|
||||
except Exception as e:
|
||||
duration_ms = int((time.time() - start_time) * 1000)
|
||||
self.error(
|
||||
f"❌ Failed: {operation_name} - {str(e)}",
|
||||
duration_ms=duration_ms,
|
||||
error_type=type(e).__name__,
|
||||
**context_fields
|
||||
)
|
||||
raise
|
||||
|
||||
@contextmanager
|
||||
def api_call(self, endpoint: str, method: str = 'GET', **context_fields):
|
||||
"""
|
||||
Context manager speziell für API-Calls.
|
||||
|
||||
Args:
|
||||
endpoint: API endpoint
|
||||
method: HTTP method
|
||||
**context_fields: Extra context
|
||||
|
||||
Example:
|
||||
with logger.api_call('/api/v1/Beteiligte', method='POST'):
|
||||
result = await api.post(...)
|
||||
"""
|
||||
start_time = time.time()
|
||||
self.debug(f"API Call: {method} {endpoint}", **context_fields)
|
||||
|
||||
try:
|
||||
yield
|
||||
duration_ms = int((time.time() - start_time) * 1000)
|
||||
self.debug(
|
||||
f"API Success: {method} {endpoint}",
|
||||
duration_ms=duration_ms,
|
||||
**context_fields
|
||||
)
|
||||
except Exception as e:
|
||||
duration_ms = int((time.time() - start_time) * 1000)
|
||||
self.error(
|
||||
f"API Error: {method} {endpoint} - {str(e)}",
|
||||
duration_ms=duration_ms,
|
||||
error_type=type(e).__name__,
|
||||
**context_fields
|
||||
)
|
||||
raise
|
||||
|
||||
def with_context(self, **extra_fields) -> 'IntegrationLogger':
|
||||
"""
|
||||
Erstellt neuen Logger mit zusätzlichen Context-Feldern.
|
||||
|
||||
Args:
|
||||
**extra_fields: Additional context fields
|
||||
|
||||
Returns:
|
||||
New logger instance with merged context
|
||||
"""
|
||||
merged_fields = {**self.extra_fields, **extra_fields}
|
||||
return IntegrationLogger(
|
||||
name=self.name,
|
||||
context=self.context,
|
||||
extra_fields=merged_fields
|
||||
)
|
||||
|
||||
|
||||
# ========== Factory Functions ==========
|
||||
|
||||
def get_logger(
|
||||
name: str,
|
||||
context: Optional[Any] = None,
|
||||
**extra_fields
|
||||
) -> IntegrationLogger:
|
||||
"""
|
||||
Factory function für Logger.
|
||||
|
||||
Args:
|
||||
name: Logger name
|
||||
context: Optional Motia FlowContext
|
||||
**extra_fields: Extra context fields
|
||||
|
||||
Returns:
|
||||
Configured logger
|
||||
|
||||
Example:
|
||||
logger = get_logger('advoware.sync', context=ctx, entity_id='123')
|
||||
logger.info("Starting sync")
|
||||
"""
|
||||
return IntegrationLogger(name, context, extra_fields)
|
||||
|
||||
|
||||
def get_service_logger(
|
||||
service_name: str,
|
||||
context: Optional[Any] = None
|
||||
) -> IntegrationLogger:
|
||||
"""
|
||||
Factory für Service-Logger.
|
||||
|
||||
Args:
|
||||
service_name: Service name (z.B. 'advoware', 'espocrm')
|
||||
context: Optional FlowContext
|
||||
|
||||
Returns:
|
||||
Service logger
|
||||
"""
|
||||
return IntegrationLogger(f"services.{service_name}", context)
|
||||
|
||||
|
||||
def get_step_logger(
|
||||
step_name: str,
|
||||
context: Optional[Any] = None
|
||||
) -> IntegrationLogger:
|
||||
"""
|
||||
Factory für Step-Logger.
|
||||
|
||||
Args:
|
||||
step_name: Step name
|
||||
context: FlowContext (required for steps)
|
||||
|
||||
Returns:
|
||||
Step logger
|
||||
"""
|
||||
return IntegrationLogger(f"steps.{step_name}", context)
|
||||
|
||||
|
||||
# ========== Decorator for Logging ==========
|
||||
|
||||
def log_operation(operation_name: str):
|
||||
"""
|
||||
Decorator für automatisches Operation-Logging.
|
||||
|
||||
Args:
|
||||
operation_name: Name der Operation
|
||||
|
||||
Example:
|
||||
@log_operation('sync_beteiligte')
|
||||
async def sync_entity(entity_id: str):
|
||||
...
|
||||
"""
|
||||
def decorator(func):
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
# Try to find context in args
|
||||
context = None
|
||||
for arg in args:
|
||||
if hasattr(arg, 'logger'):
|
||||
context = arg
|
||||
break
|
||||
|
||||
logger = get_logger(func.__module__, context)
|
||||
|
||||
with logger.operation(operation_name):
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
context = None
|
||||
for arg in args:
|
||||
if hasattr(arg, 'logger'):
|
||||
context = arg
|
||||
break
|
||||
|
||||
logger = get_logger(func.__module__, context)
|
||||
|
||||
with logger.operation(operation_name):
|
||||
return func(*args, **kwargs)
|
||||
|
||||
# Return appropriate wrapper
|
||||
import asyncio
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
else:
|
||||
return sync_wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
# ========== Performance Tracking ==========
|
||||
|
||||
class PerformanceTracker:
|
||||
"""Track performance metrics for operations"""
|
||||
|
||||
def __init__(self, logger: IntegrationLogger):
|
||||
self.logger = logger
|
||||
self.metrics: Dict[str, list] = {}
|
||||
|
||||
def record(self, operation: str, duration_ms: int) -> None:
|
||||
"""Record operation duration"""
|
||||
if operation not in self.metrics:
|
||||
self.metrics[operation] = []
|
||||
self.metrics[operation].append(duration_ms)
|
||||
|
||||
def get_stats(self, operation: str) -> Dict[str, float]:
|
||||
"""Get statistics for operation"""
|
||||
if operation not in self.metrics:
|
||||
return {}
|
||||
|
||||
durations = self.metrics[operation]
|
||||
return {
|
||||
'count': len(durations),
|
||||
'avg_ms': sum(durations) / len(durations),
|
||||
'min_ms': min(durations),
|
||||
'max_ms': max(durations),
|
||||
'total_ms': sum(durations)
|
||||
}
|
||||
|
||||
def log_summary(self) -> None:
|
||||
"""Log summary of all operations"""
|
||||
self.logger.info("=== Performance Summary ===")
|
||||
for operation, durations in self.metrics.items():
|
||||
stats = self.get_stats(operation)
|
||||
self.logger.info(
|
||||
f"{operation}: {stats['count']} calls, "
|
||||
f"avg {stats['avg_ms']:.1f}ms, "
|
||||
f"min {stats['min_ms']:.1f}ms, "
|
||||
f"max {stats['max_ms']:.1f}ms"
|
||||
)
|
||||
259
services/models.py
Normal file
259
services/models.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""
|
||||
Pydantic Models für Datenvalidierung
|
||||
|
||||
Definiert strenge Schemas für:
|
||||
- Advoware Entities
|
||||
- EspoCRM Entities
|
||||
- Sync Operations
|
||||
"""
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator, ConfigDict
|
||||
from typing import Optional, Literal
|
||||
from datetime import date, datetime
|
||||
from enum import Enum
|
||||
|
||||
|
||||
# ========== Enums ==========
|
||||
|
||||
class Rechtsform(str, Enum):
|
||||
"""Rechtsformen für Beteiligte"""
|
||||
NATUERLICHE_PERSON = ""
|
||||
GMBH = "GmbH"
|
||||
AG = "AG"
|
||||
GMBH_CO_KG = "GmbH & Co. KG"
|
||||
KG = "KG"
|
||||
OHG = "OHG"
|
||||
EV = "e.V."
|
||||
EINZELUNTERNEHMEN = "Einzelunternehmen"
|
||||
FREIBERUFLER = "Freiberufler"
|
||||
|
||||
|
||||
class SyncStatus(str, Enum):
|
||||
"""Sync Status für EspoCRM Entities"""
|
||||
PENDING_SYNC = "pending_sync"
|
||||
SYNCING = "syncing"
|
||||
CLEAN = "clean"
|
||||
FAILED = "failed"
|
||||
CONFLICT = "conflict"
|
||||
PERMANENTLY_FAILED = "permanently_failed"
|
||||
|
||||
|
||||
class SalutationType(str, Enum):
|
||||
"""Anredetypen"""
|
||||
HERR = "Herr"
|
||||
FRAU = "Frau"
|
||||
DIVERS = "Divers"
|
||||
FIRMA = ""
|
||||
|
||||
|
||||
# ========== Advoware Models ==========
|
||||
|
||||
class AdvowareBeteiligteBase(BaseModel):
|
||||
"""Base Model für Advoware Beteiligte (POST/PUT)"""
|
||||
|
||||
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True)
|
||||
|
||||
name: str = Field(..., min_length=1, max_length=200)
|
||||
vorname: Optional[str] = Field(None, max_length=100)
|
||||
rechtsform: str = Field(default="")
|
||||
anrede: Optional[str] = Field(None, max_length=50)
|
||||
titel: Optional[str] = Field(None, max_length=50)
|
||||
bAnrede: Optional[str] = Field(None, max_length=200, description="Briefanrede")
|
||||
zusatz: Optional[str] = Field(None, max_length=200)
|
||||
geburtsdatum: Optional[date] = None
|
||||
|
||||
@field_validator('name')
|
||||
@classmethod
|
||||
def validate_name(cls, v: str) -> str:
|
||||
if not v or not v.strip():
|
||||
raise ValueError('Name darf nicht leer sein')
|
||||
return v.strip()
|
||||
|
||||
@field_validator('geburtsdatum')
|
||||
@classmethod
|
||||
def validate_birthdate(cls, v: Optional[date]) -> Optional[date]:
|
||||
if v and v > date.today():
|
||||
raise ValueError('Geburtsdatum kann nicht in der Zukunft liegen')
|
||||
if v and v.year < 1900:
|
||||
raise ValueError('Geburtsdatum vor 1900 nicht erlaubt')
|
||||
return v
|
||||
|
||||
|
||||
class AdvowareBeteiligteRead(AdvowareBeteiligteBase):
|
||||
"""Advoware Beteiligte Response (GET)"""
|
||||
|
||||
betNr: int = Field(..., ge=1)
|
||||
rowId: str = Field(..., description="Change detection ID")
|
||||
|
||||
# Optional fields die Advoware zurückgibt
|
||||
strasse: Optional[str] = None
|
||||
plz: Optional[str] = None
|
||||
ort: Optional[str] = None
|
||||
land: Optional[str] = None
|
||||
|
||||
|
||||
class AdvowareBeteiligteCreate(AdvowareBeteiligteBase):
|
||||
"""Advoware Beteiligte für POST"""
|
||||
pass
|
||||
|
||||
|
||||
class AdvowareBeteiligteUpdate(AdvowareBeteiligteBase):
|
||||
"""Advoware Beteiligte für PUT"""
|
||||
pass
|
||||
|
||||
|
||||
# ========== EspoCRM Models ==========
|
||||
|
||||
class EspoCRMBeteiligteBase(BaseModel):
|
||||
"""Base Model für EspoCRM CBeteiligte"""
|
||||
|
||||
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True)
|
||||
|
||||
name: str = Field(..., min_length=1, max_length=255)
|
||||
firstName: Optional[str] = Field(None, max_length=100)
|
||||
lastName: Optional[str] = Field(None, max_length=100)
|
||||
firmenname: Optional[str] = Field(None, max_length=255)
|
||||
rechtsform: str = Field(default="")
|
||||
salutationName: Optional[str] = None
|
||||
titel: Optional[str] = Field(None, max_length=100)
|
||||
briefAnrede: Optional[str] = Field(None, max_length=255)
|
||||
zusatz: Optional[str] = Field(None, max_length=255)
|
||||
dateOfBirth: Optional[date] = None
|
||||
|
||||
@field_validator('name')
|
||||
@classmethod
|
||||
def validate_name(cls, v: str) -> str:
|
||||
if not v or not v.strip():
|
||||
raise ValueError('Name darf nicht leer sein')
|
||||
return v.strip()
|
||||
|
||||
@field_validator('dateOfBirth')
|
||||
@classmethod
|
||||
def validate_birthdate(cls, v: Optional[date]) -> Optional[date]:
|
||||
if v and v > date.today():
|
||||
raise ValueError('Geburtsdatum kann nicht in der Zukunft liegen')
|
||||
if v and v.year < 1900:
|
||||
raise ValueError('Geburtsdatum vor 1900 nicht erlaubt')
|
||||
return v
|
||||
|
||||
@field_validator('firstName', 'lastName')
|
||||
@classmethod
|
||||
def validate_person_fields(cls, v: Optional[str]) -> Optional[str]:
|
||||
"""Validiere dass Person-Felder nur bei natürlichen Personen gesetzt sind"""
|
||||
if v:
|
||||
return v.strip()
|
||||
return None
|
||||
|
||||
|
||||
class EspoCRMBeteiligteRead(EspoCRMBeteiligteBase):
|
||||
"""EspoCRM CBeteiligte Response (GET)"""
|
||||
|
||||
id: str = Field(..., min_length=1)
|
||||
betnr: Optional[int] = Field(None, ge=1)
|
||||
advowareRowId: Optional[str] = None
|
||||
syncStatus: SyncStatus = Field(default=SyncStatus.PENDING_SYNC)
|
||||
syncRetryCount: int = Field(default=0, ge=0, le=10)
|
||||
syncErrorMessage: Optional[str] = None
|
||||
advowareLastSync: Optional[datetime] = None
|
||||
syncNextRetry: Optional[datetime] = None
|
||||
syncAutoResetAt: Optional[datetime] = None
|
||||
|
||||
|
||||
class EspoCRMBeteiligteCreate(EspoCRMBeteiligteBase):
|
||||
"""EspoCRM CBeteiligte für POST"""
|
||||
|
||||
syncStatus: SyncStatus = Field(default=SyncStatus.PENDING_SYNC)
|
||||
|
||||
|
||||
class EspoCRMBeteiligteUpdate(BaseModel):
|
||||
"""EspoCRM CBeteiligte für PUT (alle Felder optional)"""
|
||||
|
||||
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True)
|
||||
|
||||
name: Optional[str] = Field(None, min_length=1, max_length=255)
|
||||
firstName: Optional[str] = Field(None, max_length=100)
|
||||
lastName: Optional[str] = Field(None, max_length=100)
|
||||
firmenname: Optional[str] = Field(None, max_length=255)
|
||||
rechtsform: Optional[str] = None
|
||||
salutationName: Optional[str] = None
|
||||
titel: Optional[str] = Field(None, max_length=100)
|
||||
briefAnrede: Optional[str] = Field(None, max_length=255)
|
||||
zusatz: Optional[str] = Field(None, max_length=255)
|
||||
dateOfBirth: Optional[date] = None
|
||||
betnr: Optional[int] = Field(None, ge=1)
|
||||
advowareRowId: Optional[str] = None
|
||||
syncStatus: Optional[SyncStatus] = None
|
||||
syncRetryCount: Optional[int] = Field(None, ge=0, le=10)
|
||||
syncErrorMessage: Optional[str] = Field(None, max_length=2000)
|
||||
advowareLastSync: Optional[datetime] = None
|
||||
syncNextRetry: Optional[datetime] = None
|
||||
|
||||
def model_dump_clean(self) -> dict:
|
||||
"""Gibt nur nicht-None Werte zurück (für PATCH-ähnliches Update)"""
|
||||
return {k: v for k, v in self.model_dump().items() if v is not None}
|
||||
|
||||
|
||||
# ========== Sync Operation Models ==========
|
||||
|
||||
class SyncOperation(BaseModel):
|
||||
"""Model für Sync-Operation Tracking"""
|
||||
|
||||
entity_id: str
|
||||
action: Literal["create", "update", "delete", "sync_check"]
|
||||
source: Literal["webhook", "cron", "api", "manual"]
|
||||
timestamp: datetime = Field(default_factory=datetime.utcnow)
|
||||
entity_type: str = "CBeteiligte"
|
||||
|
||||
|
||||
class SyncResult(BaseModel):
|
||||
"""Result einer Sync-Operation"""
|
||||
|
||||
success: bool
|
||||
entity_id: str
|
||||
action: str
|
||||
message: Optional[str] = None
|
||||
error: Optional[str] = None
|
||||
details: Optional[dict] = None
|
||||
duration_ms: Optional[int] = None
|
||||
|
||||
|
||||
# ========== Validation Helpers ==========
|
||||
|
||||
def validate_beteiligte_advoware(data: dict) -> AdvowareBeteiligteCreate:
|
||||
"""
|
||||
Validiert Advoware Beteiligte Daten.
|
||||
|
||||
Args:
|
||||
data: Dict mit Advoware Daten
|
||||
|
||||
Returns:
|
||||
Validiertes Model
|
||||
|
||||
Raises:
|
||||
ValidationError: Bei Validierungsfehlern
|
||||
"""
|
||||
try:
|
||||
return AdvowareBeteiligteCreate.model_validate(data)
|
||||
except Exception as e:
|
||||
from services.exceptions import ValidationError
|
||||
raise ValidationError(f"Invalid Advoware data: {e}")
|
||||
|
||||
|
||||
def validate_beteiligte_espocrm(data: dict) -> EspoCRMBeteiligteCreate:
|
||||
"""
|
||||
Validiert EspoCRM Beteiligte Daten.
|
||||
|
||||
Args:
|
||||
data: Dict mit EspoCRM Daten
|
||||
|
||||
Returns:
|
||||
Validiertes Model
|
||||
|
||||
Raises:
|
||||
ValidationError: Bei Validierungsfehlern
|
||||
"""
|
||||
try:
|
||||
return EspoCRMBeteiligteCreate.model_validate(data)
|
||||
except Exception as e:
|
||||
from services.exceptions import ValidationError
|
||||
raise ValidationError(f"Invalid EspoCRM data: {e}")
|
||||
191
services/redis_client.py
Normal file
191
services/redis_client.py
Normal file
@@ -0,0 +1,191 @@
|
||||
"""
|
||||
Redis Client Factory
|
||||
|
||||
Zentralisierte Redis-Client-Verwaltung mit:
|
||||
- Singleton Pattern
|
||||
- Connection Pooling
|
||||
- Automatic Reconnection
|
||||
- Health Checks
|
||||
"""
|
||||
|
||||
import redis
|
||||
import os
|
||||
import logging
|
||||
from typing import Optional
|
||||
from services.exceptions import RedisConnectionError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RedisClientFactory:
|
||||
"""
|
||||
Singleton Factory für Redis Clients.
|
||||
|
||||
Vorteile:
|
||||
- Eine zentrale Konfiguration
|
||||
- Connection Pooling
|
||||
- Lazy Initialization
|
||||
- Besseres Error Handling
|
||||
"""
|
||||
|
||||
_instance: Optional[redis.Redis] = None
|
||||
_connection_pool: Optional[redis.ConnectionPool] = None
|
||||
|
||||
@classmethod
|
||||
def get_client(cls, strict: bool = False) -> Optional[redis.Redis]:
|
||||
"""
|
||||
Gibt Redis Client zurück (erstellt wenn nötig).
|
||||
|
||||
Args:
|
||||
strict: Wenn True, wirft Exception bei Verbindungsfehlern.
|
||||
Wenn False, gibt None zurück (für optionale Redis-Nutzung).
|
||||
|
||||
Returns:
|
||||
Redis client oder None (wenn strict=False und Verbindung fehlschlägt)
|
||||
|
||||
Raises:
|
||||
RedisConnectionError: Wenn strict=True und Verbindung fehlschlägt
|
||||
"""
|
||||
if cls._instance is None:
|
||||
try:
|
||||
cls._instance = cls._create_client()
|
||||
logger.info("Redis client created successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create Redis client: {e}")
|
||||
if strict:
|
||||
raise RedisConnectionError(
|
||||
f"Could not connect to Redis: {e}",
|
||||
operation="get_client"
|
||||
)
|
||||
logger.warning("Redis unavailable - continuing without caching")
|
||||
return None
|
||||
|
||||
return cls._instance
|
||||
|
||||
@classmethod
|
||||
def _create_client(cls) -> redis.Redis:
|
||||
"""
|
||||
Erstellt neuen Redis Client mit Connection Pool.
|
||||
|
||||
Returns:
|
||||
Configured Redis client
|
||||
|
||||
Raises:
|
||||
redis.ConnectionError: Bei Verbindungsproblemen
|
||||
"""
|
||||
# Load configuration from environment
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
redis_timeout = int(os.getenv('REDIS_TIMEOUT_SECONDS', '5'))
|
||||
redis_max_connections = int(os.getenv('REDIS_MAX_CONNECTIONS', '50'))
|
||||
|
||||
logger.info(
|
||||
f"Creating Redis client: {redis_host}:{redis_port} "
|
||||
f"(db={redis_db}, timeout={redis_timeout}s)"
|
||||
)
|
||||
|
||||
# Create connection pool
|
||||
if cls._connection_pool is None:
|
||||
cls._connection_pool = redis.ConnectionPool(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
socket_timeout=redis_timeout,
|
||||
socket_connect_timeout=redis_timeout,
|
||||
max_connections=redis_max_connections,
|
||||
decode_responses=True # Auto-decode bytes zu strings
|
||||
)
|
||||
|
||||
# Create client from pool
|
||||
client = redis.Redis(connection_pool=cls._connection_pool)
|
||||
|
||||
# Verify connection
|
||||
client.ping()
|
||||
|
||||
return client
|
||||
|
||||
@classmethod
|
||||
def reset(cls) -> None:
|
||||
"""
|
||||
Reset factory state (hauptsächlich für Tests).
|
||||
|
||||
Schließt bestehende Verbindungen und setzt Singleton zurück.
|
||||
"""
|
||||
if cls._instance:
|
||||
try:
|
||||
cls._instance.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing Redis client: {e}")
|
||||
|
||||
if cls._connection_pool:
|
||||
try:
|
||||
cls._connection_pool.disconnect()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing connection pool: {e}")
|
||||
|
||||
cls._instance = None
|
||||
cls._connection_pool = None
|
||||
logger.info("Redis factory reset")
|
||||
|
||||
@classmethod
|
||||
def health_check(cls) -> bool:
|
||||
"""
|
||||
Prüft Redis-Verbindung.
|
||||
|
||||
Returns:
|
||||
True wenn Redis erreichbar, False sonst
|
||||
"""
|
||||
try:
|
||||
client = cls.get_client(strict=False)
|
||||
if client is None:
|
||||
return False
|
||||
|
||||
client.ping()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"Redis health check failed: {e}")
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def get_info(cls) -> Optional[dict]:
|
||||
"""
|
||||
Gibt Redis Server Info zurück (für Monitoring).
|
||||
|
||||
Returns:
|
||||
Redis info dict oder None bei Fehler
|
||||
"""
|
||||
try:
|
||||
client = cls.get_client(strict=False)
|
||||
if client is None:
|
||||
return None
|
||||
|
||||
return client.info()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get Redis info: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# ========== Convenience Functions ==========
|
||||
|
||||
def get_redis_client(strict: bool = False) -> Optional[redis.Redis]:
|
||||
"""
|
||||
Convenience function für Redis Client.
|
||||
|
||||
Args:
|
||||
strict: Wenn True, wirft Exception bei Fehler
|
||||
|
||||
Returns:
|
||||
Redis client oder None
|
||||
"""
|
||||
return RedisClientFactory.get_client(strict=strict)
|
||||
|
||||
|
||||
def is_redis_available() -> bool:
|
||||
"""
|
||||
Prüft ob Redis verfügbar ist.
|
||||
|
||||
Returns:
|
||||
True wenn Redis erreichbar
|
||||
"""
|
||||
return RedisClientFactory.health_check()
|
||||
@@ -9,62 +9,38 @@ Gemeinsame Funktionalität für alle Sync-Operationen:
|
||||
|
||||
from typing import Dict, Any, Optional
|
||||
from datetime import datetime
|
||||
import logging
|
||||
import redis
|
||||
import os
|
||||
import pytz
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from services.exceptions import RedisConnectionError, LockAcquisitionError
|
||||
from services.redis_client import get_redis_client
|
||||
from services.config import SYNC_CONFIG, get_lock_key
|
||||
from services.logging_utils import get_logger
|
||||
|
||||
# Lock TTL in seconds (prevents deadlocks)
|
||||
LOCK_TTL_SECONDS = 900 # 15 minutes
|
||||
import redis
|
||||
|
||||
|
||||
class BaseSyncUtils:
|
||||
"""Base-Klasse mit gemeinsamer Sync-Funktionalität"""
|
||||
|
||||
def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None):
|
||||
def __init__(self, espocrm_api, redis_client: Optional[redis.Redis] = None, context=None):
|
||||
"""
|
||||
Args:
|
||||
espocrm_api: EspoCRM API client instance
|
||||
redis_client: Optional Redis client (wird sonst initialisiert)
|
||||
redis_client: Optional Redis client (wird sonst über Factory initialisiert)
|
||||
context: Optional Motia FlowContext für Logging
|
||||
"""
|
||||
self.espocrm = espocrm_api
|
||||
self.context = context
|
||||
self.logger = context.logger if context else logger
|
||||
self.redis = redis_client or self._init_redis()
|
||||
self.logger = get_logger('sync_utils', context)
|
||||
|
||||
def _init_redis(self) -> Optional[redis.Redis]:
|
||||
"""Initialize Redis client for distributed locking"""
|
||||
try:
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
# Use provided Redis client or get from factory
|
||||
self.redis = redis_client or get_redis_client(strict=False)
|
||||
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
decode_responses=True
|
||||
if not self.redis:
|
||||
self.logger.error(
|
||||
"⚠️ WARNUNG: Redis nicht verfügbar! "
|
||||
"Distributed Locking deaktiviert - Race Conditions möglich!"
|
||||
)
|
||||
client.ping()
|
||||
return client
|
||||
except Exception as e:
|
||||
self._log(f"Redis connection failed: {e}", level='error')
|
||||
return None
|
||||
|
||||
def _log(self, message: str, level: str = 'info'):
|
||||
"""
|
||||
Context-aware logging
|
||||
|
||||
Falls ein FlowContext vorhanden ist, wird dessen Logger verwendet.
|
||||
Sonst fallback auf Standard-Logger.
|
||||
"""
|
||||
if self.context and hasattr(self.context, 'logger'):
|
||||
getattr(self.context.logger, level)(message)
|
||||
else:
|
||||
getattr(logger, level)(message)
|
||||
|
||||
def _get_lock_key(self, entity_id: str) -> str:
|
||||
"""
|
||||
@@ -84,17 +60,30 @@ class BaseSyncUtils:
|
||||
|
||||
Returns:
|
||||
True wenn Lock erfolgreich, False wenn bereits locked
|
||||
|
||||
Raises:
|
||||
LockAcquisitionError: Bei kritischen Lock-Problemen (wenn strict mode)
|
||||
"""
|
||||
if not self.redis:
|
||||
self._log("Redis nicht verfügbar, Lock-Mechanismus deaktiviert", level='warn')
|
||||
return True # Fallback: Wenn kein Redis, immer lock erlauben
|
||||
self.logger.error(
|
||||
"CRITICAL: Distributed Locking deaktiviert - Redis nicht verfügbar!"
|
||||
)
|
||||
# In production: Dies könnte zu Race Conditions führen!
|
||||
# Für jetzt erlauben wir Fortsetzung, aber mit Warning
|
||||
return True
|
||||
|
||||
try:
|
||||
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
|
||||
acquired = self.redis.set(
|
||||
lock_key,
|
||||
"locked",
|
||||
nx=True,
|
||||
ex=SYNC_CONFIG.lock_ttl_seconds
|
||||
)
|
||||
return bool(acquired)
|
||||
except Exception as e:
|
||||
self._log(f"Redis lock error: {e}", level='error')
|
||||
return True # Bei Fehler: Lock erlauben, um Deadlocks zu vermeiden
|
||||
except redis.RedisError as e:
|
||||
self.logger.error(f"Redis lock error: {e}")
|
||||
# Bei Redis-Fehler: Lock erlauben, um Deadlocks zu vermeiden
|
||||
return True
|
||||
|
||||
def _release_redis_lock(self, lock_key: str) -> None:
|
||||
"""
|
||||
@@ -108,8 +97,8 @@ class BaseSyncUtils:
|
||||
|
||||
try:
|
||||
self.redis.delete(lock_key)
|
||||
except Exception as e:
|
||||
self._log(f"Redis unlock error: {e}", level='error')
|
||||
except redis.RedisError as e:
|
||||
self.logger.error(f"Redis unlock error: {e}")
|
||||
|
||||
def _get_espocrm_datetime(self, dt: Optional[datetime] = None) -> str:
|
||||
"""
|
||||
|
||||
@@ -17,9 +17,16 @@ from services.advoware_service import AdvowareService
|
||||
from services.espocrm import EspoCRMAPI
|
||||
from services.espocrm_mapper import BeteiligteMapper
|
||||
from services.beteiligte_sync_utils import BeteiligteSync
|
||||
from services.redis_client import get_redis_client
|
||||
from services.exceptions import (
|
||||
AdvowareAPIError,
|
||||
EspoCRMAPIError,
|
||||
SyncError,
|
||||
RetryableError,
|
||||
is_retryable
|
||||
)
|
||||
from services.logging_utils import get_step_logger
|
||||
import json
|
||||
import redis
|
||||
import os
|
||||
|
||||
config = {
|
||||
"name": "VMH Beteiligte Sync Handler",
|
||||
@@ -35,32 +42,36 @@ config = {
|
||||
}
|
||||
|
||||
|
||||
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
|
||||
"""Zentraler Sync-Handler für Beteiligte"""
|
||||
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Zentraler Sync-Handler für Beteiligte
|
||||
|
||||
Args:
|
||||
event_data: Event data mit entity_id, action, source
|
||||
ctx: Motia FlowContext
|
||||
|
||||
Returns:
|
||||
Optional result dict
|
||||
"""
|
||||
entity_id = event_data.get('entity_id')
|
||||
action = event_data.get('action')
|
||||
source = event_data.get('source')
|
||||
|
||||
step_logger = get_step_logger('beteiligte_sync', ctx)
|
||||
|
||||
if not entity_id:
|
||||
ctx.logger.error("Keine entity_id im Event gefunden")
|
||||
return
|
||||
step_logger.error("Keine entity_id im Event gefunden")
|
||||
return None
|
||||
|
||||
ctx.logger.info(f"🔄 Sync-Handler gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}")
|
||||
|
||||
# Shared Redis client for distributed locking
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
|
||||
redis_client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
decode_responses=True
|
||||
step_logger.info(
|
||||
f"🔄 Sync-Handler gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}"
|
||||
)
|
||||
|
||||
# Get shared Redis client (centralized)
|
||||
redis_client = get_redis_client(strict=False)
|
||||
|
||||
# APIs initialisieren
|
||||
espocrm = EspoCRMAPI()
|
||||
espocrm = EspoCRMAPI(ctx)
|
||||
advoware = AdvowareAPI(ctx)
|
||||
sync_utils = BeteiligteSync(espocrm, redis_client, ctx)
|
||||
mapper = BeteiligteMapper()
|
||||
|
||||
Reference in New Issue
Block a user