Compare commits
2 Commits
bcb6454b2a
...
a53051ea8e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a53051ea8e | ||
|
|
69a48f5f9a |
382
REFACTORING_SUMMARY.md
Normal file
382
REFACTORING_SUMMARY.md
Normal file
@@ -0,0 +1,382 @@
|
|||||||
|
# Code Refactoring - Verbesserungen Übersicht
|
||||||
|
|
||||||
|
Datum: 3. März 2026
|
||||||
|
|
||||||
|
## Zusammenfassung
|
||||||
|
|
||||||
|
Umfassendes Refactoring zur Verbesserung von Robustheit, Eleganz und Effizienz des BitByLaw Integration Codes.
|
||||||
|
|
||||||
|
## Implementierte Verbesserungen
|
||||||
|
|
||||||
|
### 1. ✅ Custom Exception Classes ([services/exceptions.py](services/exceptions.py))
|
||||||
|
|
||||||
|
**Problem:** Zu generisches Exception Handling mit `except Exception`
|
||||||
|
|
||||||
|
**Lösung:** Hierarchische Exception-Struktur:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from services.exceptions import (
|
||||||
|
AdvowareAPIError,
|
||||||
|
AdvowareAuthError,
|
||||||
|
AdvowareTimeoutError,
|
||||||
|
EspoCRMAPIError,
|
||||||
|
EspoCRMAuthError,
|
||||||
|
RetryableError,
|
||||||
|
NonRetryableError,
|
||||||
|
LockAcquisitionError,
|
||||||
|
ValidationError
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verwendung:
|
||||||
|
try:
|
||||||
|
result = await advoware.api_call(...)
|
||||||
|
except AdvowareTimeoutError:
|
||||||
|
# Spezifisch für Timeouts
|
||||||
|
raise RetryableError()
|
||||||
|
except AdvowareAuthError:
|
||||||
|
# Auth-Fehler nicht retryable
|
||||||
|
raise
|
||||||
|
except AdvowareAPIError as e:
|
||||||
|
# Andere API-Fehler
|
||||||
|
if is_retryable(e):
|
||||||
|
# Retry logic
|
||||||
|
```
|
||||||
|
|
||||||
|
**Vorteile:**
|
||||||
|
- Präzise Fehlerbehandlung
|
||||||
|
- Besseres Error Tracking
|
||||||
|
- Automatische Retry-Klassifizierung mit `is_retryable()`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 2. ✅ Redis Client Factory ([services/redis_client.py](services/redis_client.py))
|
||||||
|
|
||||||
|
**Problem:** Duplizierte Redis-Initialisierung in 4+ Dateien
|
||||||
|
|
||||||
|
**Lösung:** Zentralisierte Redis Client Factory mit Singleton Pattern:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from services.redis_client import get_redis_client, is_redis_available
|
||||||
|
|
||||||
|
# Strict mode: Exception bei Fehler
|
||||||
|
redis_client = get_redis_client(strict=True)
|
||||||
|
|
||||||
|
# Optional mode: None bei Fehler (für optionale Features)
|
||||||
|
redis_client = get_redis_client(strict=False)
|
||||||
|
|
||||||
|
# Health Check
|
||||||
|
if is_redis_available():
|
||||||
|
# Redis verfügbar
|
||||||
|
```
|
||||||
|
|
||||||
|
**Vorteile:**
|
||||||
|
- DRY (Don't Repeat Yourself)
|
||||||
|
- Connection Pooling
|
||||||
|
- Zentrale Konfiguration
|
||||||
|
- Health Checks
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 3. ✅ Pydantic Models für Validation ([services/models.py](services/models.py))
|
||||||
|
|
||||||
|
**Problem:** Keine Datenvalidierung, unsichere Typen
|
||||||
|
|
||||||
|
**Lösung:** Pydantic Models mit automatischer Validierung:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from services.models import (
|
||||||
|
AdvowareBeteiligteCreate,
|
||||||
|
EspoCRMBeteiligteCreate,
|
||||||
|
validate_beteiligte_advoware
|
||||||
|
)
|
||||||
|
|
||||||
|
# Automatische Validierung:
|
||||||
|
try:
|
||||||
|
validated = AdvowareBeteiligteCreate.model_validate(data)
|
||||||
|
except ValidationError as e:
|
||||||
|
# Handle validation errors
|
||||||
|
|
||||||
|
# Helper:
|
||||||
|
validated = validate_beteiligte_advoware(data)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Features:**
|
||||||
|
- Type Safety
|
||||||
|
- Automatische Validierung (Geburtsdatum, Name, etc.)
|
||||||
|
- Enums für Status/Rechtsformen
|
||||||
|
- Field Validators
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 4. ✅ Zentrale Konfiguration ([services/config.py](services/config.py))
|
||||||
|
|
||||||
|
**Problem:** Magic Numbers und Strings überall im Code
|
||||||
|
|
||||||
|
**Lösung:** Zentrale Config mit Dataclasses:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from services.config import (
|
||||||
|
SYNC_CONFIG,
|
||||||
|
API_CONFIG,
|
||||||
|
ADVOWARE_CONFIG,
|
||||||
|
ESPOCRM_CONFIG,
|
||||||
|
FEATURE_FLAGS,
|
||||||
|
get_retry_delay_seconds,
|
||||||
|
get_lock_key
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verwendung:
|
||||||
|
max_retries = SYNC_CONFIG.max_retries # 5
|
||||||
|
lock_ttl = SYNC_CONFIG.lock_ttl_seconds # 900
|
||||||
|
backoff = SYNC_CONFIG.retry_backoff_minutes # [1, 5, 15, 60, 240]
|
||||||
|
|
||||||
|
# Helper Functions:
|
||||||
|
lock_key = get_lock_key('cbeteiligte', entity_id)
|
||||||
|
retry_delay = get_retry_delay_seconds(attempt=2) # 15 * 60 seconds
|
||||||
|
```
|
||||||
|
|
||||||
|
**Konfigurationsbereiche:**
|
||||||
|
- `SYNC_CONFIG` - Retry, Locking, Change Detection
|
||||||
|
- `API_CONFIG` - Timeouts, Rate Limiting
|
||||||
|
- `ADVOWARE_CONFIG` - Token, Auth, Read-only Fields
|
||||||
|
- `ESPOCRM_CONFIG` - Pagination, Notifications
|
||||||
|
- `FEATURE_FLAGS` - Feature Toggles
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 5. ✅ Konsistentes Logging ([services/logging_utils.py](services/logging_utils.py))
|
||||||
|
|
||||||
|
**Problem:** Inkonsistentes Logging (3 verschiedene Patterns)
|
||||||
|
|
||||||
|
**Lösung:** Unified Logger mit Context-Support:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from services.logging_utils import get_logger, get_service_logger
|
||||||
|
|
||||||
|
# Service Logger:
|
||||||
|
logger = get_service_logger('advoware', context)
|
||||||
|
logger.info("Message", entity_id="123")
|
||||||
|
|
||||||
|
# Mit Context Manager für Timing:
|
||||||
|
with logger.operation('sync_entity', entity_id='123'):
|
||||||
|
# Do work
|
||||||
|
pass # Automatisches Timing und Error Logging
|
||||||
|
|
||||||
|
# API Call Tracking:
|
||||||
|
with logger.api_call('/api/v1/Beteiligte', method='POST'):
|
||||||
|
result = await api.post(...)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Features:**
|
||||||
|
- Motia FlowContext Support
|
||||||
|
- Structured Logging
|
||||||
|
- Automatisches Performance Tracking
|
||||||
|
- Context Fields
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 6. ✅ Spezifische Exceptions in Services
|
||||||
|
|
||||||
|
**Aktualisierte Services:**
|
||||||
|
- [advoware.py](services/advoware.py) - AdvowareAPIError, AdvowareAuthError, AdvowareTimeoutError
|
||||||
|
- [espocrm.py](services/espocrm.py) - EspoCRMAPIError, EspoCRMAuthError, EspoCRMTimeoutError
|
||||||
|
- [sync_utils_base.py](services/sync_utils_base.py) - LockAcquisitionError
|
||||||
|
- [beteiligte_sync_utils.py](services/beteiligte_sync_utils.py) - SyncError
|
||||||
|
|
||||||
|
**Beispiel:**
|
||||||
|
```python
|
||||||
|
# Vorher:
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error: {e}")
|
||||||
|
|
||||||
|
# Nachher:
|
||||||
|
except AdvowareTimeoutError:
|
||||||
|
raise RetryableError("Request timed out")
|
||||||
|
except AdvowareAuthError:
|
||||||
|
raise # Nicht retryable
|
||||||
|
except AdvowareAPIError as e:
|
||||||
|
if is_retryable(e):
|
||||||
|
# Retry
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 7. ✅ Type Hints ergänzt
|
||||||
|
|
||||||
|
**Verbesserte Type Hints in:**
|
||||||
|
- Service-Methoden (advoware.py, espocrm.py)
|
||||||
|
- Mapper-Funktionen (espocrm_mapper.py)
|
||||||
|
- Utility-Klassen (sync_utils_base.py, beteiligte_sync_utils.py)
|
||||||
|
- Step Handler
|
||||||
|
|
||||||
|
**Beispiel:**
|
||||||
|
```python
|
||||||
|
# Vorher:
|
||||||
|
async def handler(event_data, ctx):
|
||||||
|
...
|
||||||
|
|
||||||
|
# Nachher:
|
||||||
|
async def handler(
|
||||||
|
event_data: Dict[str, Any],
|
||||||
|
ctx: FlowContext[Any]
|
||||||
|
) -> Optional[Dict[str, Any]]:
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Migration Guide
|
||||||
|
|
||||||
|
### Für bestehenden Code
|
||||||
|
|
||||||
|
1. **Exception Handling aktualisieren:**
|
||||||
|
```python
|
||||||
|
# Alt:
|
||||||
|
try:
|
||||||
|
result = await api.call()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error: {e}")
|
||||||
|
|
||||||
|
# Neu:
|
||||||
|
try:
|
||||||
|
result = await api.call()
|
||||||
|
except AdvowareTimeoutError:
|
||||||
|
# Spezifisch behandeln
|
||||||
|
raise RetryableError()
|
||||||
|
except AdvowareAPIError as e:
|
||||||
|
logger.error(f"API Error: {e}")
|
||||||
|
if is_retryable(e):
|
||||||
|
# Retry
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Redis initialisieren:**
|
||||||
|
```python
|
||||||
|
# Alt:
|
||||||
|
redis_client = redis.Redis(host=..., port=...)
|
||||||
|
|
||||||
|
# Neu:
|
||||||
|
from services.redis_client import get_redis_client
|
||||||
|
redis_client = get_redis_client(strict=False)
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Konstanten verwenden:**
|
||||||
|
```python
|
||||||
|
# Alt:
|
||||||
|
MAX_RETRIES = 5
|
||||||
|
LOCK_TTL = 900
|
||||||
|
|
||||||
|
# Neu:
|
||||||
|
from services.config import SYNC_CONFIG
|
||||||
|
max_retries = SYNC_CONFIG.max_retries
|
||||||
|
lock_ttl = SYNC_CONFIG.lock_ttl_seconds
|
||||||
|
```
|
||||||
|
|
||||||
|
4. **Logging standardisieren:**
|
||||||
|
```python
|
||||||
|
# Alt:
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.info("Message")
|
||||||
|
|
||||||
|
# Neu:
|
||||||
|
from services.logging_utils import get_service_logger
|
||||||
|
logger = get_service_logger('my_service', context)
|
||||||
|
logger.info("Message", entity_id="123")
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Performance-Verbesserungen
|
||||||
|
|
||||||
|
- ✅ Redis Connection Pooling (max 50 Connections)
|
||||||
|
- ✅ Token Caching optimiert
|
||||||
|
- ✅ Bessere Error Classification (weniger unnötige Retries)
|
||||||
|
- ⚠️ Noch TODO: Batch Operations für parallele Syncs
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Feature Flags
|
||||||
|
|
||||||
|
Neue Features können über `FEATURE_FLAGS` gesteuert werden:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from services.config import FEATURE_FLAGS
|
||||||
|
|
||||||
|
# Aktivieren/Deaktivieren:
|
||||||
|
FEATURE_FLAGS.strict_validation = True # Pydantic Validation
|
||||||
|
FEATURE_FLAGS.kommunikation_sync_enabled = False # Noch in Entwicklung
|
||||||
|
FEATURE_FLAGS.parallel_sync_enabled = False # Experimentell
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
|
||||||
|
**Unit Tests sollten nun leichter sein:**
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Mock Redis:
|
||||||
|
from services.redis_client import RedisClientFactory
|
||||||
|
RedisClientFactory._instance = mock_redis
|
||||||
|
|
||||||
|
# Mock Exceptions:
|
||||||
|
from services.exceptions import AdvowareAPIError
|
||||||
|
raise AdvowareAPIError("Test error", status_code=500)
|
||||||
|
|
||||||
|
# Validate Models:
|
||||||
|
from services.models import validate_beteiligte_advoware
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
validate_beteiligte_advoware(invalid_data)
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Nächste Schritte
|
||||||
|
|
||||||
|
1. **Unit Tests schreiben** (min. 60% Coverage)
|
||||||
|
- Exception Handling Tests
|
||||||
|
- Mapper Tests mit Pydantic
|
||||||
|
- Redis Factory Tests
|
||||||
|
|
||||||
|
2. **Batch Operations** implementieren
|
||||||
|
- Parallele API-Calls
|
||||||
|
- Bulk Updates
|
||||||
|
|
||||||
|
3. **Monitoring** verbessern
|
||||||
|
- Performance Metrics aus Logger nutzen
|
||||||
|
- Redis Health Checks
|
||||||
|
|
||||||
|
4. **Dokumentation** erweitern
|
||||||
|
- API-Docs generieren (Sphinx)
|
||||||
|
- Error Handling Guide
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Breakfree Changes
|
||||||
|
|
||||||
|
⚠️ **Minimale Breaking Changes:**
|
||||||
|
|
||||||
|
1. Import-Pfade haben sich geändert:
|
||||||
|
- `AdvowareTokenError` → `AdvowareAuthError`
|
||||||
|
- `EspoCRMError` → `EspoCRMAPIError`
|
||||||
|
|
||||||
|
2. Redis wird jetzt über Factory bezogen:
|
||||||
|
- Statt direktem `redis.Redis()` → `get_redis_client()`
|
||||||
|
|
||||||
|
**Migration ist einfach:** Imports aktualisieren, Code läuft sonst identisch.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Autoren
|
||||||
|
|
||||||
|
- Code Refactoring: GitHub Copilot
|
||||||
|
- Review: BitByLaw Team
|
||||||
|
- Datum: 3. März 2026
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Fragen?
|
||||||
|
|
||||||
|
Bei Fragen zum Refactoring siehe:
|
||||||
|
- [services/README.md](services/README.md) - Service-Layer Dokumentation
|
||||||
|
- [exceptions.py](services/exceptions.py) - Exception Hierarchie
|
||||||
|
- [config.py](services/config.py) - Alle Konfigurationsoptionen
|
||||||
@@ -8,18 +8,22 @@ import hashlib
|
|||||||
import base64
|
import base64
|
||||||
import os
|
import os
|
||||||
import datetime
|
import datetime
|
||||||
import redis
|
|
||||||
import logging
|
import logging
|
||||||
from typing import Optional, Dict, Any
|
from typing import Optional, Dict, Any
|
||||||
|
|
||||||
|
from services.exceptions import (
|
||||||
|
AdvowareAPIError,
|
||||||
|
AdvowareAuthError,
|
||||||
|
AdvowareTimeoutError,
|
||||||
|
RetryableError
|
||||||
|
)
|
||||||
|
from services.redis_client import get_redis_client
|
||||||
|
from services.config import ADVOWARE_CONFIG, API_CONFIG
|
||||||
|
from services.logging_utils import get_service_logger
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class AdvowareTokenError(Exception):
|
|
||||||
"""Raised when token acquisition fails"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class AdvowareAPI:
|
class AdvowareAPI:
|
||||||
"""
|
"""
|
||||||
Advoware API client with token caching via Redis.
|
Advoware API client with token caching via Redis.
|
||||||
@@ -34,15 +38,8 @@ class AdvowareAPI:
|
|||||||
- ADVOWARE_USER
|
- ADVOWARE_USER
|
||||||
- ADVOWARE_ROLE
|
- ADVOWARE_ROLE
|
||||||
- ADVOWARE_PASSWORD
|
- ADVOWARE_PASSWORD
|
||||||
- REDIS_HOST (optional, default: localhost)
|
|
||||||
- REDIS_PORT (optional, default: 6379)
|
|
||||||
- REDIS_DB_ADVOWARE_CACHE (optional, default: 1)
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
AUTH_URL = "https://security.advo-net.net/api/v1/Token"
|
|
||||||
TOKEN_CACHE_KEY = 'advoware_access_token'
|
|
||||||
TOKEN_TIMESTAMP_CACHE_KEY = 'advoware_token_timestamp'
|
|
||||||
|
|
||||||
def __init__(self, context=None):
|
def __init__(self, context=None):
|
||||||
"""
|
"""
|
||||||
Initialize Advoware API client.
|
Initialize Advoware API client.
|
||||||
@@ -51,7 +48,8 @@ class AdvowareAPI:
|
|||||||
context: Motia FlowContext for logging (optional)
|
context: Motia FlowContext for logging (optional)
|
||||||
"""
|
"""
|
||||||
self.context = context
|
self.context = context
|
||||||
self._log("AdvowareAPI initializing", level='debug')
|
self.logger = get_service_logger('advoware', context)
|
||||||
|
self.logger.debug("AdvowareAPI initializing")
|
||||||
|
|
||||||
# Load configuration from environment
|
# Load configuration from environment
|
||||||
self.API_BASE_URL = os.getenv('ADVOWARE_API_BASE_URL', 'https://www2.advo-net.net:90/')
|
self.API_BASE_URL = os.getenv('ADVOWARE_API_BASE_URL', 'https://www2.advo-net.net:90/')
|
||||||
@@ -63,30 +61,28 @@ class AdvowareAPI:
|
|||||||
self.user = os.getenv('ADVOWARE_USER', '')
|
self.user = os.getenv('ADVOWARE_USER', '')
|
||||||
self.role = int(os.getenv('ADVOWARE_ROLE', '2'))
|
self.role = int(os.getenv('ADVOWARE_ROLE', '2'))
|
||||||
self.password = os.getenv('ADVOWARE_PASSWORD', '')
|
self.password = os.getenv('ADVOWARE_PASSWORD', '')
|
||||||
self.token_lifetime_minutes = int(os.getenv('ADVOWARE_TOKEN_LIFETIME_MINUTES', '55'))
|
self.token_lifetime_minutes = ADVOWARE_CONFIG.token_lifetime_minutes
|
||||||
self.api_timeout_seconds = int(os.getenv('ADVOWARE_API_TIMEOUT_SECONDS', '30'))
|
self.api_timeout_seconds = API_CONFIG.default_timeout_seconds
|
||||||
|
|
||||||
# Initialize Redis for token caching
|
# Initialize Redis for token caching (centralized)
|
||||||
try:
|
self.redis_client = get_redis_client(strict=False)
|
||||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
if self.redis_client:
|
||||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
self.logger.info("Connected to Redis for token caching")
|
||||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
else:
|
||||||
redis_timeout = int(os.getenv('REDIS_TIMEOUT_SECONDS', '5'))
|
self.logger.warning("⚠️ Redis unavailable - token caching disabled!")
|
||||||
|
|
||||||
self.redis_client = redis.Redis(
|
self.logger.info("AdvowareAPI initialized")
|
||||||
host=redis_host,
|
|
||||||
port=redis_port,
|
|
||||||
db=redis_db,
|
|
||||||
socket_timeout=redis_timeout,
|
|
||||||
socket_connect_timeout=redis_timeout
|
|
||||||
)
|
|
||||||
self.redis_client.ping()
|
|
||||||
self._log("Connected to Redis for token caching")
|
|
||||||
except (redis.exceptions.ConnectionError, Exception) as e:
|
|
||||||
self._log(f"Could not connect to Redis: {e}. Token caching disabled.", level='warning')
|
|
||||||
self.redis_client = None
|
|
||||||
|
|
||||||
self._log("AdvowareAPI initialized")
|
self._session: Optional[aiohttp.ClientSession] = None
|
||||||
|
|
||||||
|
async def _get_session(self) -> aiohttp.ClientSession:
|
||||||
|
if self._session is None or self._session.closed:
|
||||||
|
self._session = aiohttp.ClientSession()
|
||||||
|
return self._session
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
if self._session and not self._session.closed:
|
||||||
|
await self._session.close()
|
||||||
|
|
||||||
def _generate_hmac(self, request_time_stamp: str, nonce: Optional[str] = None) -> str:
|
def _generate_hmac(self, request_time_stamp: str, nonce: Optional[str] = None) -> str:
|
||||||
"""Generate HMAC-SHA512 signature for authentication"""
|
"""Generate HMAC-SHA512 signature for authentication"""
|
||||||
@@ -107,7 +103,7 @@ class AdvowareAPI:
|
|||||||
|
|
||||||
def _fetch_new_access_token(self) -> str:
|
def _fetch_new_access_token(self) -> str:
|
||||||
"""Fetch new access token from Advoware Auth API"""
|
"""Fetch new access token from Advoware Auth API"""
|
||||||
self._log("Fetching new access token from Advoware")
|
self.logger.info("Fetching new access token from Advoware")
|
||||||
|
|
||||||
nonce = str(uuid.uuid4())
|
nonce = str(uuid.uuid4())
|
||||||
request_time_stamp = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
request_time_stamp = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
||||||
@@ -127,35 +123,56 @@ class AdvowareAPI:
|
|||||||
"RequestTimeStamp": request_time_stamp
|
"RequestTimeStamp": request_time_stamp
|
||||||
}
|
}
|
||||||
|
|
||||||
self._log(f"Token request: AppID={self.app_id}, User={self.user}", level='debug')
|
self.logger.debug(f"Token request: AppID={self.app_id}, User={self.user}")
|
||||||
|
|
||||||
# Using synchronous requests for token fetch (called from sync context)
|
# Using synchronous requests for token fetch (called from sync context)
|
||||||
|
# TODO: Convert to async in future version
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
|
try:
|
||||||
response = requests.post(
|
response = requests.post(
|
||||||
self.AUTH_URL,
|
ADVOWARE_CONFIG.auth_url,
|
||||||
json=data,
|
json=data,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
timeout=self.api_timeout_seconds
|
timeout=self.api_timeout_seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
self._log(f"Token response status: {response.status_code}")
|
self.logger.debug(f"Token response status: {response.status_code}")
|
||||||
|
|
||||||
|
if response.status_code == 401:
|
||||||
|
raise AdvowareAuthError(
|
||||||
|
"Authentication failed - check credentials",
|
||||||
|
status_code=401
|
||||||
|
)
|
||||||
|
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
|
except requests.Timeout:
|
||||||
|
raise AdvowareTimeoutError(
|
||||||
|
"Token request timed out",
|
||||||
|
status_code=408
|
||||||
|
)
|
||||||
|
except requests.RequestException as e:
|
||||||
|
raise AdvowareAPIError(
|
||||||
|
f"Token request failed: {str(e)}",
|
||||||
|
status_code=getattr(e.response, 'status_code', None) if hasattr(e, 'response') else None
|
||||||
|
)
|
||||||
|
|
||||||
result = response.json()
|
result = response.json()
|
||||||
access_token = result.get("access_token")
|
access_token = result.get("access_token")
|
||||||
|
|
||||||
if not access_token:
|
if not access_token:
|
||||||
self._log("No access_token in response", level='error')
|
self.logger.error("No access_token in response")
|
||||||
raise AdvowareTokenError("No access_token received from Advoware")
|
raise AdvowareAuthError("No access_token received from Advoware")
|
||||||
|
|
||||||
self._log("Access token fetched successfully")
|
self.logger.info("Access token fetched successfully")
|
||||||
|
|
||||||
# Cache token in Redis
|
# Cache token in Redis
|
||||||
if self.redis_client:
|
if self.redis_client:
|
||||||
effective_ttl = max(1, (self.token_lifetime_minutes - 2) * 60)
|
effective_ttl = max(1, (self.token_lifetime_minutes - 2) * 60)
|
||||||
self.redis_client.set(self.TOKEN_CACHE_KEY, access_token, ex=effective_ttl)
|
self.redis_client.set(ADVOWARE_CONFIG.token_cache_key, access_token, ex=effective_ttl)
|
||||||
self.redis_client.set(self.TOKEN_TIMESTAMP_CACHE_KEY, str(time.time()), ex=effective_ttl)
|
self.redis_client.set(ADVOWARE_CONFIG.token_timestamp_key, str(time.time()), ex=effective_ttl)
|
||||||
self._log(f"Token cached in Redis with TTL {effective_ttl}s")
|
self.logger.debug(f"Token cached in Redis with TTL {effective_ttl}s")
|
||||||
|
|
||||||
return access_token
|
return access_token
|
||||||
|
|
||||||
@@ -169,32 +186,33 @@ class AdvowareAPI:
|
|||||||
Returns:
|
Returns:
|
||||||
Valid access token
|
Valid access token
|
||||||
"""
|
"""
|
||||||
self._log("Getting access token", level='debug')
|
self.logger.debug("Getting access token")
|
||||||
|
|
||||||
if not self.redis_client:
|
if not self.redis_client:
|
||||||
self._log("No Redis available, fetching new token")
|
self.logger.info("No Redis available, fetching new token")
|
||||||
return self._fetch_new_access_token()
|
return self._fetch_new_access_token()
|
||||||
|
|
||||||
if force_refresh:
|
if force_refresh:
|
||||||
self._log("Force refresh requested, fetching new token")
|
self.logger.info("Force refresh requested, fetching new token")
|
||||||
return self._fetch_new_access_token()
|
return self._fetch_new_access_token()
|
||||||
|
|
||||||
# Check cache
|
# Check cache
|
||||||
cached_token = self.redis_client.get(self.TOKEN_CACHE_KEY)
|
cached_token = self.redis_client.get(ADVOWARE_CONFIG.token_cache_key)
|
||||||
token_timestamp = self.redis_client.get(self.TOKEN_TIMESTAMP_CACHE_KEY)
|
token_timestamp = self.redis_client.get(ADVOWARE_CONFIG.token_timestamp_key)
|
||||||
|
|
||||||
if cached_token and token_timestamp:
|
if cached_token and token_timestamp:
|
||||||
try:
|
try:
|
||||||
timestamp = float(token_timestamp.decode('utf-8'))
|
# Redis decode_responses=True returns strings
|
||||||
|
timestamp = float(token_timestamp)
|
||||||
age_seconds = time.time() - timestamp
|
age_seconds = time.time() - timestamp
|
||||||
|
|
||||||
if age_seconds < (self.token_lifetime_minutes - 1) * 60:
|
if age_seconds < (self.token_lifetime_minutes - 1) * 60:
|
||||||
self._log(f"Using cached token (age: {age_seconds:.0f}s)", level='debug')
|
self.logger.debug(f"Using cached token (age: {age_seconds:.0f}s)")
|
||||||
return cached_token.decode('utf-8')
|
return cached_token
|
||||||
except (ValueError, AttributeError) as e:
|
except (ValueError, AttributeError, TypeError) as e:
|
||||||
self._log(f"Error reading cached token: {e}", level='debug')
|
self.logger.debug(f"Error reading cached token: {e}")
|
||||||
|
|
||||||
self._log("Cached token expired or invalid, fetching new")
|
self.logger.info("Cached token expired or invalid, fetching new")
|
||||||
return self._fetch_new_access_token()
|
return self._fetch_new_access_token()
|
||||||
|
|
||||||
async def api_call(
|
async def api_call(
|
||||||
@@ -223,6 +241,11 @@ class AdvowareAPI:
|
|||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
JSON response or None
|
JSON response or None
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
AdvowareAuthError: Authentication failed
|
||||||
|
AdvowareTimeoutError: Request timed out
|
||||||
|
AdvowareAPIError: Other API errors
|
||||||
"""
|
"""
|
||||||
# Clean endpoint
|
# Clean endpoint
|
||||||
endpoint = endpoint.lstrip('/')
|
endpoint = endpoint.lstrip('/')
|
||||||
@@ -233,7 +256,12 @@ class AdvowareAPI:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Get auth token
|
# Get auth token
|
||||||
|
try:
|
||||||
token = self.get_access_token()
|
token = self.get_access_token()
|
||||||
|
except AdvowareAuthError:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
raise AdvowareAPIError(f"Failed to get access token: {str(e)}")
|
||||||
|
|
||||||
# Prepare headers
|
# Prepare headers
|
||||||
effective_headers = headers.copy() if headers else {}
|
effective_headers = headers.copy() if headers else {}
|
||||||
@@ -243,20 +271,20 @@ class AdvowareAPI:
|
|||||||
# Use 'data' parameter if provided, otherwise 'json_data'
|
# Use 'data' parameter if provided, otherwise 'json_data'
|
||||||
json_payload = data if data is not None else json_data
|
json_payload = data if data is not None else json_data
|
||||||
|
|
||||||
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
|
session = await self._get_session()
|
||||||
try:
|
try:
|
||||||
self._log(f"API call: {method} {url}", level='debug')
|
with self.logger.api_call(endpoint, method):
|
||||||
|
|
||||||
async with session.request(
|
async with session.request(
|
||||||
method,
|
method,
|
||||||
url,
|
url,
|
||||||
headers=effective_headers,
|
headers=effective_headers,
|
||||||
params=params,
|
params=params,
|
||||||
json=json_payload
|
json=json_payload,
|
||||||
|
timeout=effective_timeout
|
||||||
) as response:
|
) as response:
|
||||||
# Handle 401 - retry with fresh token
|
# Handle 401 - retry with fresh token
|
||||||
if response.status == 401:
|
if response.status == 401:
|
||||||
self._log("401 Unauthorized, refreshing token")
|
self.logger.warning("401 Unauthorized, refreshing token")
|
||||||
token = self.get_access_token(force_refresh=True)
|
token = self.get_access_token(force_refresh=True)
|
||||||
effective_headers['Authorization'] = f'Bearer {token}'
|
effective_headers['Authorization'] = f'Bearer {token}'
|
||||||
|
|
||||||
@@ -265,17 +293,57 @@ class AdvowareAPI:
|
|||||||
url,
|
url,
|
||||||
headers=effective_headers,
|
headers=effective_headers,
|
||||||
params=params,
|
params=params,
|
||||||
json=json_payload
|
json=json_payload,
|
||||||
|
timeout=effective_timeout
|
||||||
) as retry_response:
|
) as retry_response:
|
||||||
|
if retry_response.status == 401:
|
||||||
|
raise AdvowareAuthError(
|
||||||
|
"Authentication failed even after token refresh",
|
||||||
|
status_code=401
|
||||||
|
)
|
||||||
|
|
||||||
|
if retry_response.status >= 500:
|
||||||
|
error_text = await retry_response.text()
|
||||||
|
raise RetryableError(
|
||||||
|
f"Server error {retry_response.status}: {error_text}"
|
||||||
|
)
|
||||||
|
|
||||||
retry_response.raise_for_status()
|
retry_response.raise_for_status()
|
||||||
return await self._parse_response(retry_response)
|
return await self._parse_response(retry_response)
|
||||||
|
|
||||||
response.raise_for_status()
|
# Handle other error codes
|
||||||
|
if response.status == 404:
|
||||||
|
error_text = await response.text()
|
||||||
|
raise AdvowareAPIError(
|
||||||
|
f"Resource not found: {endpoint}",
|
||||||
|
status_code=404,
|
||||||
|
response_body=error_text
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status >= 500:
|
||||||
|
error_text = await response.text()
|
||||||
|
raise RetryableError(
|
||||||
|
f"Server error {response.status}: {error_text}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status >= 400:
|
||||||
|
error_text = await response.text()
|
||||||
|
raise AdvowareAPIError(
|
||||||
|
f"API error {response.status}: {error_text}",
|
||||||
|
status_code=response.status,
|
||||||
|
response_body=error_text
|
||||||
|
)
|
||||||
|
|
||||||
return await self._parse_response(response)
|
return await self._parse_response(response)
|
||||||
|
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
raise AdvowareTimeoutError(
|
||||||
|
f"Request timed out after {effective_timeout.total}s",
|
||||||
|
status_code=408
|
||||||
|
)
|
||||||
except aiohttp.ClientError as e:
|
except aiohttp.ClientError as e:
|
||||||
self._log(f"API call failed: {e}", level='error')
|
self.logger.error(f"API call failed: {e}")
|
||||||
raise
|
raise AdvowareAPIError(f"Request failed: {str(e)}")
|
||||||
|
|
||||||
async def _parse_response(self, response: aiohttp.ClientResponse) -> Any:
|
async def _parse_response(self, response: aiohttp.ClientResponse) -> Any:
|
||||||
"""Parse API response"""
|
"""Parse API response"""
|
||||||
@@ -283,27 +351,6 @@ class AdvowareAPI:
|
|||||||
try:
|
try:
|
||||||
return await response.json()
|
return await response.json()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self._log(f"JSON parse error: {e}", level='debug')
|
self.logger.debug(f"JSON parse error: {e}")
|
||||||
return None
|
return None
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _log(self, message: str, level: str = 'info'):
|
|
||||||
"""Log message via context or standard logger"""
|
|
||||||
if self.context:
|
|
||||||
if level == 'debug':
|
|
||||||
self.context.logger.debug(message)
|
|
||||||
elif level == 'warning':
|
|
||||||
self.context.logger.warning(message)
|
|
||||||
elif level == 'error':
|
|
||||||
self.context.logger.error(message)
|
|
||||||
else:
|
|
||||||
self.context.logger.info(message)
|
|
||||||
else:
|
|
||||||
if level == 'debug':
|
|
||||||
logger.debug(message)
|
|
||||||
elif level == 'warning':
|
|
||||||
logger.warning(message)
|
|
||||||
elif level == 'error':
|
|
||||||
logger.error(message)
|
|
||||||
else:
|
|
||||||
logger.info(message)
|
|
||||||
|
|||||||
@@ -13,64 +13,39 @@ Hilfsfunktionen für Sync-Operationen:
|
|||||||
from typing import Dict, Any, Optional, Tuple, Literal
|
from typing import Dict, Any, Optional, Tuple, Literal
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
import pytz
|
import pytz
|
||||||
import logging
|
|
||||||
import redis
|
|
||||||
import os
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
from services.exceptions import LockAcquisitionError, SyncError, ValidationError
|
||||||
|
from services.redis_client import get_redis_client
|
||||||
|
from services.config import SYNC_CONFIG, get_lock_key, get_retry_delay_seconds
|
||||||
|
from services.logging_utils import get_logger
|
||||||
|
|
||||||
|
import redis
|
||||||
|
|
||||||
# Timestamp-Vergleich Ergebnis-Typen
|
# Timestamp-Vergleich Ergebnis-Typen
|
||||||
TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_change"]
|
TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_change"]
|
||||||
|
|
||||||
# Max retry before permanent failure
|
|
||||||
MAX_SYNC_RETRIES = 5
|
|
||||||
# Lock TTL in seconds (prevents deadlocks)
|
|
||||||
LOCK_TTL_SECONDS = 900 # 15 minutes
|
|
||||||
# Retry backoff: Wartezeit zwischen Retries (in Minuten)
|
|
||||||
RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h
|
|
||||||
# Auto-Reset nach 24h (für permanently_failed entities)
|
|
||||||
AUTO_RESET_HOURS = 24
|
|
||||||
|
|
||||||
|
|
||||||
class BeteiligteSync:
|
class BeteiligteSync:
|
||||||
"""Utility-Klasse für Beteiligte-Synchronisation"""
|
"""Utility-Klasse für Beteiligte-Synchronisation"""
|
||||||
|
|
||||||
def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None):
|
def __init__(self, espocrm_api, redis_client: Optional[redis.Redis] = None, context=None):
|
||||||
self.espocrm = espocrm_api
|
self.espocrm = espocrm_api
|
||||||
self.context = context
|
self.context = context
|
||||||
self.logger = context.logger if context else logger
|
self.logger = get_logger('beteiligte_sync', context)
|
||||||
self.redis = redis_client or self._init_redis()
|
|
||||||
|
# Use provided Redis client or get from factory
|
||||||
|
self.redis = redis_client or get_redis_client(strict=False)
|
||||||
|
|
||||||
|
if not self.redis:
|
||||||
|
self.logger.error(
|
||||||
|
"⚠️ KRITISCH: Redis nicht verfügbar! "
|
||||||
|
"Distributed Locking deaktiviert - Race Conditions möglich!"
|
||||||
|
)
|
||||||
|
|
||||||
# Import NotificationManager only when needed
|
# Import NotificationManager only when needed
|
||||||
from services.notification_utils import NotificationManager
|
from services.notification_utils import NotificationManager
|
||||||
self.notification_manager = NotificationManager(espocrm_api=self.espocrm, context=context)
|
self.notification_manager = NotificationManager(espocrm_api=self.espocrm, context=context)
|
||||||
|
|
||||||
def _init_redis(self) -> redis.Redis:
|
|
||||||
"""Initialize Redis client for distributed locking"""
|
|
||||||
try:
|
|
||||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
|
||||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
|
||||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
|
||||||
|
|
||||||
client = redis.Redis(
|
|
||||||
host=redis_host,
|
|
||||||
port=redis_port,
|
|
||||||
db=redis_db,
|
|
||||||
decode_responses=True
|
|
||||||
)
|
|
||||||
client.ping()
|
|
||||||
return client
|
|
||||||
except Exception as e:
|
|
||||||
self._log(f"Redis connection failed: {e}", level='error')
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _log(self, message: str, level: str = 'info'):
|
|
||||||
"""Logging mit Context-Support"""
|
|
||||||
if self.context and hasattr(self.context, 'logger'):
|
|
||||||
getattr(self.context.logger, level)(message)
|
|
||||||
else:
|
|
||||||
getattr(logger, level)(message)
|
|
||||||
|
|
||||||
async def acquire_sync_lock(self, entity_id: str) -> bool:
|
async def acquire_sync_lock(self, entity_id: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Atomic distributed lock via Redis + syncStatus update
|
Atomic distributed lock via Redis + syncStatus update
|
||||||
@@ -80,23 +55,35 @@ class BeteiligteSync:
|
|||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True wenn Lock erfolgreich, False wenn bereits im Sync
|
True wenn Lock erfolgreich, False wenn bereits im Sync
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
SyncError: Bei kritischen Sync-Problemen
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# STEP 1: Atomic Redis lock (prevents race conditions)
|
# STEP 1: Atomic Redis lock (prevents race conditions)
|
||||||
if self.redis:
|
if self.redis:
|
||||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
lock_key = get_lock_key('cbeteiligte', entity_id)
|
||||||
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
|
acquired = self.redis.set(
|
||||||
|
lock_key,
|
||||||
|
"locked",
|
||||||
|
nx=True,
|
||||||
|
ex=SYNC_CONFIG.lock_ttl_seconds
|
||||||
|
)
|
||||||
|
|
||||||
if not acquired:
|
if not acquired:
|
||||||
self._log(f"Redis lock bereits aktiv für {entity_id}", level='warn')
|
self.logger.warning(f"Redis lock bereits aktiv für {entity_id}")
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
self.logger.error(
|
||||||
|
f"⚠️ WARNUNG: Sync ohne Redis-Lock für {entity_id} - Race Condition möglich!"
|
||||||
|
)
|
||||||
|
|
||||||
# STEP 2: Update syncStatus (für UI visibility)
|
# STEP 2: Update syncStatus (für UI visibility)
|
||||||
await self.espocrm.update_entity('CBeteiligte', entity_id, {
|
await self.espocrm.update_entity('CBeteiligte', entity_id, {
|
||||||
'syncStatus': 'syncing'
|
'syncStatus': 'syncing'
|
||||||
})
|
})
|
||||||
|
|
||||||
self._log(f"Sync-Lock für {entity_id} erworben")
|
self.logger.info(f"Sync-Lock für {entity_id} erworben")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -152,32 +139,42 @@ class BeteiligteSync:
|
|||||||
update_data['syncRetryCount'] = new_retry
|
update_data['syncRetryCount'] = new_retry
|
||||||
|
|
||||||
# Exponential backoff - berechne nächsten Retry-Zeitpunkt
|
# Exponential backoff - berechne nächsten Retry-Zeitpunkt
|
||||||
if new_retry <= len(RETRY_BACKOFF_MINUTES):
|
backoff_minutes = SYNC_CONFIG.retry_backoff_minutes
|
||||||
backoff_minutes = RETRY_BACKOFF_MINUTES[new_retry - 1]
|
if new_retry <= len(backoff_minutes):
|
||||||
|
backoff_min = backoff_minutes[new_retry - 1]
|
||||||
else:
|
else:
|
||||||
backoff_minutes = RETRY_BACKOFF_MINUTES[-1] # Letzte Backoff-Zeit
|
backoff_min = backoff_minutes[-1] # Letzte Backoff-Zeit
|
||||||
|
|
||||||
next_retry = now_utc + timedelta(minutes=backoff_minutes)
|
next_retry = now_utc + timedelta(minutes=backoff_min)
|
||||||
update_data['syncNextRetry'] = next_retry.strftime('%Y-%m-%d %H:%M:%S')
|
update_data['syncNextRetry'] = next_retry.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
|
||||||
self._log(f"Retry {new_retry}/{MAX_SYNC_RETRIES}, nächster Versuch in {backoff_minutes} Minuten")
|
self.logger.info(
|
||||||
|
f"Retry {new_retry}/{SYNC_CONFIG.max_retries}, "
|
||||||
|
f"nächster Versuch in {backoff_min} Minuten"
|
||||||
|
)
|
||||||
|
|
||||||
# Check max retries - mark as permanently failed
|
# Check max retries - mark as permanently failed
|
||||||
if new_retry >= MAX_SYNC_RETRIES:
|
if new_retry >= SYNC_CONFIG.max_retries:
|
||||||
update_data['syncStatus'] = 'permanently_failed'
|
update_data['syncStatus'] = 'permanently_failed'
|
||||||
|
|
||||||
# Auto-Reset Timestamp für Wiederherstellung nach 24h
|
# Auto-Reset Timestamp für Wiederherstellung nach 24h
|
||||||
auto_reset_time = now_utc + timedelta(hours=AUTO_RESET_HOURS)
|
auto_reset_time = now_utc + timedelta(hours=SYNC_CONFIG.auto_reset_hours)
|
||||||
update_data['syncAutoResetAt'] = auto_reset_time.strftime('%Y-%m-%d %H:%M:%S')
|
update_data['syncAutoResetAt'] = auto_reset_time.strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
|
||||||
await self.send_notification(
|
await self.send_notification(
|
||||||
entity_id,
|
entity_id,
|
||||||
'error',
|
'error',
|
||||||
extra_data={
|
extra_data={
|
||||||
'message': f"Sync fehlgeschlagen nach {MAX_SYNC_RETRIES} Versuchen. Auto-Reset in {AUTO_RESET_HOURS}h."
|
'message': (
|
||||||
|
f"Sync fehlgeschlagen nach {SYNC_CONFIG.max_retries} Versuchen. "
|
||||||
|
f"Auto-Reset in {SYNC_CONFIG.auto_reset_hours}h."
|
||||||
|
)
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
self._log(f"Max retries ({MAX_SYNC_RETRIES}) erreicht für {entity_id}, Auto-Reset um {auto_reset_time}", level='error')
|
self.logger.error(
|
||||||
|
f"Max retries ({SYNC_CONFIG.max_retries}) erreicht für {entity_id}, "
|
||||||
|
f"Auto-Reset um {auto_reset_time}"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
update_data['syncRetryCount'] = 0
|
update_data['syncRetryCount'] = 0
|
||||||
update_data['syncNextRetry'] = None
|
update_data['syncNextRetry'] = None
|
||||||
@@ -188,19 +185,19 @@ class BeteiligteSync:
|
|||||||
|
|
||||||
await self.espocrm.update_entity('CBeteiligte', entity_id, update_data)
|
await self.espocrm.update_entity('CBeteiligte', entity_id, update_data)
|
||||||
|
|
||||||
self._log(f"Sync-Lock released: {entity_id} → {new_status}")
|
self.logger.info(f"Sync-Lock released: {entity_id} → {new_status}")
|
||||||
|
|
||||||
# Release Redis lock
|
# Release Redis lock
|
||||||
if self.redis:
|
if self.redis:
|
||||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
lock_key = get_lock_key('cbeteiligte', entity_id)
|
||||||
self.redis.delete(lock_key)
|
self.redis.delete(lock_key)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self._log(f"Fehler beim Release Lock: {e}", level='error')
|
self.logger.error(f"Fehler beim Release Lock: {e}")
|
||||||
# Ensure Redis lock is released even on error
|
# Ensure Redis lock is released even on error
|
||||||
if self.redis:
|
if self.redis:
|
||||||
try:
|
try:
|
||||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
lock_key = get_lock_key('cbeteiligte', entity_id)
|
||||||
self.redis.delete(lock_key)
|
self.redis.delete(lock_key)
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|||||||
338
services/config.py
Normal file
338
services/config.py
Normal file
@@ -0,0 +1,338 @@
|
|||||||
|
"""
|
||||||
|
Zentrale Konfiguration für BitByLaw Integration
|
||||||
|
|
||||||
|
Alle Magic Numbers und Strings sind hier zentralisiert.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import List, Dict
|
||||||
|
from dataclasses import dataclass
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Sync Configuration ==========
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SyncConfig:
|
||||||
|
"""Konfiguration für Sync-Operationen"""
|
||||||
|
|
||||||
|
# Retry-Konfiguration
|
||||||
|
max_retries: int = 5
|
||||||
|
"""Maximale Anzahl von Retry-Versuchen"""
|
||||||
|
|
||||||
|
retry_backoff_minutes: List[int] = None
|
||||||
|
"""Exponential Backoff in Minuten: [1, 5, 15, 60, 240]"""
|
||||||
|
|
||||||
|
auto_reset_hours: int = 24
|
||||||
|
"""Auto-Reset für permanently_failed Entities (in Stunden)"""
|
||||||
|
|
||||||
|
# Lock-Konfiguration
|
||||||
|
lock_ttl_seconds: int = 900 # 15 Minuten
|
||||||
|
"""TTL für distributed locks (verhindert Deadlocks)"""
|
||||||
|
|
||||||
|
lock_prefix: str = "sync_lock"
|
||||||
|
"""Prefix für Redis Lock Keys"""
|
||||||
|
|
||||||
|
# Validation
|
||||||
|
validate_before_sync: bool = True
|
||||||
|
"""Validiere Entities vor dem Sync (empfohlen)"""
|
||||||
|
|
||||||
|
# Change Detection
|
||||||
|
use_rowid_change_detection: bool = True
|
||||||
|
"""Nutze rowId für Change Detection (Advoware)"""
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if self.retry_backoff_minutes is None:
|
||||||
|
# Default exponential backoff: 1, 5, 15, 60, 240 Minuten
|
||||||
|
self.retry_backoff_minutes = [1, 5, 15, 60, 240]
|
||||||
|
|
||||||
|
|
||||||
|
# Singleton Instance
|
||||||
|
SYNC_CONFIG = SyncConfig()
|
||||||
|
|
||||||
|
|
||||||
|
# ========== API Configuration ==========
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class APIConfig:
|
||||||
|
"""API-spezifische Konfiguration"""
|
||||||
|
|
||||||
|
# Timeouts
|
||||||
|
default_timeout_seconds: int = 30
|
||||||
|
"""Default Timeout für API-Calls"""
|
||||||
|
|
||||||
|
long_running_timeout_seconds: int = 120
|
||||||
|
"""Timeout für lange Operations (z.B. Uploads)"""
|
||||||
|
|
||||||
|
# Retry
|
||||||
|
max_api_retries: int = 3
|
||||||
|
"""Anzahl Retries bei API-Fehlern"""
|
||||||
|
|
||||||
|
retry_status_codes: List[int] = None
|
||||||
|
"""HTTP Status Codes die Retry auslösen"""
|
||||||
|
|
||||||
|
# Rate Limiting
|
||||||
|
rate_limit_enabled: bool = True
|
||||||
|
"""Aktiviere Rate Limiting"""
|
||||||
|
|
||||||
|
rate_limit_calls_per_minute: int = 60
|
||||||
|
"""Max. API-Calls pro Minute"""
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if self.retry_status_codes is None:
|
||||||
|
# Retry bei: 408 (Timeout), 429 (Rate Limit), 500, 502, 503, 504
|
||||||
|
self.retry_status_codes = [408, 429, 500, 502, 503, 504]
|
||||||
|
|
||||||
|
|
||||||
|
API_CONFIG = APIConfig()
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Advoware Configuration ==========
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AdvowareConfig:
|
||||||
|
"""Advoware-spezifische Konfiguration"""
|
||||||
|
|
||||||
|
# Token Management
|
||||||
|
token_lifetime_minutes: int = 55
|
||||||
|
"""Token-Lifetime (tatsächlich 60min, aber 5min Puffer)"""
|
||||||
|
|
||||||
|
token_cache_key: str = "advoware_access_token"
|
||||||
|
"""Redis Key für Token Cache"""
|
||||||
|
|
||||||
|
token_timestamp_key: str = "advoware_token_timestamp"
|
||||||
|
"""Redis Key für Token Timestamp"""
|
||||||
|
|
||||||
|
# Auth
|
||||||
|
auth_url: str = "https://security.advo-net.net/api/v1/Token"
|
||||||
|
"""Advoware Auth-Endpoint"""
|
||||||
|
|
||||||
|
product_id: int = 64
|
||||||
|
"""Advoware Product ID"""
|
||||||
|
|
||||||
|
# Field Mapping
|
||||||
|
readonly_fields: List[str] = None
|
||||||
|
"""Felder die nicht via PUT geändert werden können"""
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if self.readonly_fields is None:
|
||||||
|
# Diese Felder können nicht via PUT geändert werden
|
||||||
|
self.readonly_fields = [
|
||||||
|
'betNr', 'rowId', 'kommKz', # Kommunikation: kommKz ist read-only!
|
||||||
|
'handelsRegisterNummer', 'registergericht' # Werden ignoriert von API
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
ADVOWARE_CONFIG = AdvowareConfig()
|
||||||
|
|
||||||
|
|
||||||
|
# ========== EspoCRM Configuration ==========
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EspoCRMConfig:
|
||||||
|
"""EspoCRM-spezifische Konfiguration"""
|
||||||
|
|
||||||
|
# API
|
||||||
|
default_page_size: int = 50
|
||||||
|
"""Default Seitengröße für Listen-Abfragen"""
|
||||||
|
|
||||||
|
max_page_size: int = 200
|
||||||
|
"""Maximale Seitengröße"""
|
||||||
|
|
||||||
|
# Sync Status Fields
|
||||||
|
sync_status_field: str = "syncStatus"
|
||||||
|
"""Feldname für Sync-Status"""
|
||||||
|
|
||||||
|
sync_error_field: str = "syncErrorMessage"
|
||||||
|
"""Feldname für Sync-Fehler"""
|
||||||
|
|
||||||
|
sync_retry_field: str = "syncRetryCount"
|
||||||
|
"""Feldname für Retry-Counter"""
|
||||||
|
|
||||||
|
# Notifications
|
||||||
|
notification_enabled: bool = True
|
||||||
|
"""In-App Notifications aktivieren"""
|
||||||
|
|
||||||
|
notification_user_id: str = "1"
|
||||||
|
"""User-ID für Notifications (Marvin)"""
|
||||||
|
|
||||||
|
|
||||||
|
ESPOCRM_CONFIG = EspoCRMConfig()
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Redis Configuration ==========
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RedisConfig:
|
||||||
|
"""Redis-spezifische Konfiguration"""
|
||||||
|
|
||||||
|
# Connection
|
||||||
|
host: str = "localhost"
|
||||||
|
port: int = 6379
|
||||||
|
db: int = 1
|
||||||
|
timeout_seconds: int = 5
|
||||||
|
max_connections: int = 50
|
||||||
|
|
||||||
|
# Behavior
|
||||||
|
decode_responses: bool = True
|
||||||
|
"""Auto-decode bytes zu strings"""
|
||||||
|
|
||||||
|
health_check_interval: int = 30
|
||||||
|
"""Health-Check Interval in Sekunden"""
|
||||||
|
|
||||||
|
# Keys
|
||||||
|
key_prefix: str = "bitbylaw"
|
||||||
|
"""Prefix für alle Redis Keys"""
|
||||||
|
|
||||||
|
def get_key(self, key: str) -> str:
|
||||||
|
"""Gibt vollen Redis Key mit Prefix zurück"""
|
||||||
|
return f"{self.key_prefix}:{key}"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_env(cls) -> 'RedisConfig':
|
||||||
|
"""Lädt Redis-Config aus Environment Variables"""
|
||||||
|
return cls(
|
||||||
|
host=os.getenv('REDIS_HOST', 'localhost'),
|
||||||
|
port=int(os.getenv('REDIS_PORT', '6379')),
|
||||||
|
db=int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')),
|
||||||
|
timeout_seconds=int(os.getenv('REDIS_TIMEOUT_SECONDS', '5')),
|
||||||
|
max_connections=int(os.getenv('REDIS_MAX_CONNECTIONS', '50'))
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
REDIS_CONFIG = RedisConfig.from_env()
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Logging Configuration ==========
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class LoggingConfig:
|
||||||
|
"""Logging-Konfiguration"""
|
||||||
|
|
||||||
|
# Levels
|
||||||
|
default_level: str = "INFO"
|
||||||
|
"""Default Log-Level"""
|
||||||
|
|
||||||
|
api_level: str = "INFO"
|
||||||
|
"""Log-Level für API-Calls"""
|
||||||
|
|
||||||
|
sync_level: str = "INFO"
|
||||||
|
"""Log-Level für Sync-Operations"""
|
||||||
|
|
||||||
|
# Format
|
||||||
|
log_format: str = "[{timestamp}] {level} {logger}: {message}"
|
||||||
|
"""Log-Format"""
|
||||||
|
|
||||||
|
include_context: bool = True
|
||||||
|
"""Motia FlowContext in Logs einbinden"""
|
||||||
|
|
||||||
|
# Performance
|
||||||
|
log_api_timings: bool = True
|
||||||
|
"""API Call Timings loggen"""
|
||||||
|
|
||||||
|
log_sync_duration: bool = True
|
||||||
|
"""Sync-Dauer loggen"""
|
||||||
|
|
||||||
|
|
||||||
|
LOGGING_CONFIG = LoggingConfig()
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Calendar Sync Configuration ==========
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CalendarSyncConfig:
|
||||||
|
"""Konfiguration für Google Calendar Sync"""
|
||||||
|
|
||||||
|
# Sync Window
|
||||||
|
sync_days_past: int = 7
|
||||||
|
"""Tage in die Vergangenheit syncen"""
|
||||||
|
|
||||||
|
sync_days_future: int = 90
|
||||||
|
"""Tage in die Zukunft syncen"""
|
||||||
|
|
||||||
|
# Cron
|
||||||
|
cron_schedule: str = "0 */15 * * * *"
|
||||||
|
"""Cron-Schedule (jede 15 Minuten)"""
|
||||||
|
|
||||||
|
# Batch Size
|
||||||
|
batch_size: int = 10
|
||||||
|
"""Anzahl Mitarbeiter pro Batch"""
|
||||||
|
|
||||||
|
|
||||||
|
CALENDAR_SYNC_CONFIG = CalendarSyncConfig()
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Feature Flags ==========
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FeatureFlags:
|
||||||
|
"""Feature Flags für schrittweises Rollout"""
|
||||||
|
|
||||||
|
# Validation
|
||||||
|
strict_validation: bool = True
|
||||||
|
"""Strenge Validierung mit Pydantic"""
|
||||||
|
|
||||||
|
# Sync Features
|
||||||
|
kommunikation_sync_enabled: bool = False
|
||||||
|
"""Kommunikation-Sync aktivieren (noch in Entwicklung)"""
|
||||||
|
|
||||||
|
document_sync_enabled: bool = False
|
||||||
|
"""Document-Sync aktivieren (noch in Entwicklung)"""
|
||||||
|
|
||||||
|
# Advanced Features
|
||||||
|
parallel_sync_enabled: bool = False
|
||||||
|
"""Parallele Sync-Operations (experimentell)"""
|
||||||
|
|
||||||
|
auto_conflict_resolution: bool = False
|
||||||
|
"""Automatische Konfliktauflösung (experimentell)"""
|
||||||
|
|
||||||
|
# Debug
|
||||||
|
debug_mode: bool = False
|
||||||
|
"""Debug-Modus (mehr Logging, langsamer)"""
|
||||||
|
|
||||||
|
|
||||||
|
FEATURE_FLAGS = FeatureFlags()
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Helper Functions ==========
|
||||||
|
|
||||||
|
def get_retry_delay_seconds(attempt: int) -> int:
|
||||||
|
"""
|
||||||
|
Gibt Retry-Delay in Sekunden für gegebenen Versuch zurück.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
attempt: Versuchs-Nummer (0-indexed)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Delay in Sekunden
|
||||||
|
"""
|
||||||
|
backoff_minutes = SYNC_CONFIG.retry_backoff_minutes
|
||||||
|
if attempt < len(backoff_minutes):
|
||||||
|
return backoff_minutes[attempt] * 60
|
||||||
|
return backoff_minutes[-1] * 60
|
||||||
|
|
||||||
|
|
||||||
|
def get_lock_key(entity_type: str, entity_id: str) -> str:
|
||||||
|
"""
|
||||||
|
Erzeugt Redis Lock-Key für Entity.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
entity_type: Entity-Typ (z.B. 'cbeteiligte')
|
||||||
|
entity_id: Entity-ID
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Redis Key
|
||||||
|
"""
|
||||||
|
return f"{SYNC_CONFIG.lock_prefix}:{entity_type.lower()}:{entity_id}"
|
||||||
|
|
||||||
|
|
||||||
|
def is_retryable_status_code(status_code: int) -> bool:
|
||||||
|
"""
|
||||||
|
Prüft ob HTTP Status Code Retry auslösen soll.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
status_code: HTTP Status Code
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True wenn retryable
|
||||||
|
"""
|
||||||
|
return status_code in API_CONFIG.retry_status_codes
|
||||||
@@ -2,23 +2,23 @@
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import redis
|
|
||||||
import os
|
|
||||||
from typing import Optional, Dict, Any, List
|
from typing import Optional, Dict, Any, List
|
||||||
|
import os
|
||||||
|
|
||||||
|
from services.exceptions import (
|
||||||
|
EspoCRMAPIError,
|
||||||
|
EspoCRMAuthError,
|
||||||
|
EspoCRMTimeoutError,
|
||||||
|
RetryableError,
|
||||||
|
ValidationError
|
||||||
|
)
|
||||||
|
from services.redis_client import get_redis_client
|
||||||
|
from services.config import ESPOCRM_CONFIG, API_CONFIG
|
||||||
|
from services.logging_utils import get_service_logger
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class EspoCRMError(Exception):
|
|
||||||
"""Base exception for EspoCRM API errors"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class EspoCRMAuthError(EspoCRMError):
|
|
||||||
"""Authentication error"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class EspoCRMAPI:
|
class EspoCRMAPI:
|
||||||
"""
|
"""
|
||||||
EspoCRM API Client for BitByLaw integration.
|
EspoCRM API Client for BitByLaw integration.
|
||||||
@@ -32,7 +32,6 @@ class EspoCRMAPI:
|
|||||||
- ESPOCRM_API_BASE_URL (e.g., https://crm.bitbylaw.com/api/v1)
|
- ESPOCRM_API_BASE_URL (e.g., https://crm.bitbylaw.com/api/v1)
|
||||||
- ESPOCRM_API_KEY (Marvin API key)
|
- ESPOCRM_API_KEY (Marvin API key)
|
||||||
- ESPOCRM_API_TIMEOUT_SECONDS (optional, default: 30)
|
- ESPOCRM_API_TIMEOUT_SECONDS (optional, default: 30)
|
||||||
- REDIS_HOST, REDIS_PORT, REDIS_DB_ADVOWARE_CACHE (for caching)
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, context=None):
|
def __init__(self, context=None):
|
||||||
@@ -43,47 +42,27 @@ class EspoCRMAPI:
|
|||||||
context: Motia FlowContext for logging (optional)
|
context: Motia FlowContext for logging (optional)
|
||||||
"""
|
"""
|
||||||
self.context = context
|
self.context = context
|
||||||
self._log("EspoCRMAPI initializing", level='debug')
|
self.logger = get_service_logger('espocrm', context)
|
||||||
|
self.logger.debug("EspoCRMAPI initializing")
|
||||||
|
|
||||||
# Load configuration from environment
|
# Load configuration from environment
|
||||||
self.api_base_url = os.getenv('ESPOCRM_API_BASE_URL', 'https://crm.bitbylaw.com/api/v1')
|
self.api_base_url = os.getenv('ESPOCRM_API_BASE_URL', 'https://crm.bitbylaw.com/api/v1')
|
||||||
self.api_key = os.getenv('ESPOCRM_API_KEY', '')
|
self.api_key = os.getenv('ESPOCRM_API_KEY', '')
|
||||||
self.api_timeout_seconds = int(os.getenv('ESPOCRM_API_TIMEOUT_SECONDS', '30'))
|
self.api_timeout_seconds = int(os.getenv('ESPOCRM_API_TIMEOUT_SECONDS', str(API_CONFIG.default_timeout_seconds)))
|
||||||
|
|
||||||
if not self.api_key:
|
if not self.api_key:
|
||||||
raise EspoCRMAuthError("ESPOCRM_API_KEY not configured in environment")
|
raise EspoCRMAuthError("ESPOCRM_API_KEY not configured in environment")
|
||||||
|
|
||||||
self._log(f"EspoCRM API initialized with base URL: {self.api_base_url}")
|
self.logger.info(f"EspoCRM API initialized with base URL: {self.api_base_url}")
|
||||||
|
|
||||||
# Optional Redis for caching/rate limiting
|
self._session: Optional[aiohttp.ClientSession] = None
|
||||||
try:
|
|
||||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
|
||||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
|
||||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
|
||||||
redis_timeout = int(os.getenv('REDIS_TIMEOUT_SECONDS', '5'))
|
|
||||||
|
|
||||||
self.redis_client = redis.Redis(
|
# Optional Redis for caching/rate limiting (centralized)
|
||||||
host=redis_host,
|
self.redis_client = get_redis_client(strict=False)
|
||||||
port=redis_port,
|
if self.redis_client:
|
||||||
db=redis_db,
|
self.logger.info("Connected to Redis for EspoCRM operations")
|
||||||
socket_timeout=redis_timeout,
|
|
||||||
socket_connect_timeout=redis_timeout,
|
|
||||||
decode_responses=True
|
|
||||||
)
|
|
||||||
self.redis_client.ping()
|
|
||||||
self._log("Connected to Redis for EspoCRM operations")
|
|
||||||
except Exception as e:
|
|
||||||
self._log(f"Could not connect to Redis: {e}. Continuing without caching.", level='warning')
|
|
||||||
self.redis_client = None
|
|
||||||
|
|
||||||
def _log(self, message: str, level: str = 'info'):
|
|
||||||
"""Log message via context.logger if available, otherwise use module logger"""
|
|
||||||
if self.context and hasattr(self.context, 'logger'):
|
|
||||||
log_func = getattr(self.context.logger, level, self.context.logger.info)
|
|
||||||
log_func(f"[EspoCRM] {message}")
|
|
||||||
else:
|
else:
|
||||||
log_func = getattr(logger, level, logger.info)
|
self.logger.warning("⚠️ Redis unavailable - caching disabled")
|
||||||
log_func(f"[EspoCRM] {message}")
|
|
||||||
|
|
||||||
def _get_headers(self) -> Dict[str, str]:
|
def _get_headers(self) -> Dict[str, str]:
|
||||||
"""Generate request headers with API key"""
|
"""Generate request headers with API key"""
|
||||||
@@ -93,6 +72,15 @@ class EspoCRMAPI:
|
|||||||
'Accept': 'application/json'
|
'Accept': 'application/json'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async def _get_session(self) -> aiohttp.ClientSession:
|
||||||
|
if self._session is None or self._session.closed:
|
||||||
|
self._session = aiohttp.ClientSession()
|
||||||
|
return self._session
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
if self._session and not self._session.closed:
|
||||||
|
await self._session.close()
|
||||||
|
|
||||||
async def api_call(
|
async def api_call(
|
||||||
self,
|
self,
|
||||||
endpoint: str,
|
endpoint: str,
|
||||||
@@ -115,7 +103,9 @@ class EspoCRMAPI:
|
|||||||
Parsed JSON response or None
|
Parsed JSON response or None
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
EspoCRMError: On API errors
|
EspoCRMAuthError: Authentication failed
|
||||||
|
EspoCRMTimeoutError: Request timed out
|
||||||
|
EspoCRMAPIError: Other API errors
|
||||||
"""
|
"""
|
||||||
# Ensure endpoint starts with /
|
# Ensure endpoint starts with /
|
||||||
if not endpoint.startswith('/'):
|
if not endpoint.startswith('/'):
|
||||||
@@ -127,45 +117,62 @@ class EspoCRMAPI:
|
|||||||
total=timeout_seconds or self.api_timeout_seconds
|
total=timeout_seconds or self.api_timeout_seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
self._log(f"API call: {method} {url}", level='debug')
|
session = await self._get_session()
|
||||||
if params:
|
|
||||||
self._log(f"Params: {params}", level='debug')
|
|
||||||
|
|
||||||
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
|
|
||||||
try:
|
try:
|
||||||
|
with self.logger.api_call(endpoint, method):
|
||||||
async with session.request(
|
async with session.request(
|
||||||
method,
|
method,
|
||||||
url,
|
url,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
params=params,
|
params=params,
|
||||||
json=json_data
|
json=json_data,
|
||||||
|
timeout=effective_timeout
|
||||||
) as response:
|
) as response:
|
||||||
# Log response status
|
|
||||||
self._log(f"Response status: {response.status}", level='debug')
|
|
||||||
|
|
||||||
# Handle errors
|
# Handle errors
|
||||||
if response.status == 401:
|
if response.status == 401:
|
||||||
raise EspoCRMAuthError("Authentication failed - check API key")
|
raise EspoCRMAuthError(
|
||||||
|
"Authentication failed - check API key",
|
||||||
|
status_code=401
|
||||||
|
)
|
||||||
elif response.status == 403:
|
elif response.status == 403:
|
||||||
raise EspoCRMError("Access forbidden")
|
raise EspoCRMAPIError(
|
||||||
|
"Access forbidden",
|
||||||
|
status_code=403
|
||||||
|
)
|
||||||
elif response.status == 404:
|
elif response.status == 404:
|
||||||
raise EspoCRMError(f"Resource not found: {endpoint}")
|
raise EspoCRMAPIError(
|
||||||
|
f"Resource not found: {endpoint}",
|
||||||
|
status_code=404
|
||||||
|
)
|
||||||
|
elif response.status >= 500:
|
||||||
|
error_text = await response.text()
|
||||||
|
raise RetryableError(
|
||||||
|
f"Server error {response.status}: {error_text}"
|
||||||
|
)
|
||||||
elif response.status >= 400:
|
elif response.status >= 400:
|
||||||
error_text = await response.text()
|
error_text = await response.text()
|
||||||
raise EspoCRMError(f"API error {response.status}: {error_text}")
|
raise EspoCRMAPIError(
|
||||||
|
f"API error {response.status}: {error_text}",
|
||||||
|
status_code=response.status,
|
||||||
|
response_body=error_text
|
||||||
|
)
|
||||||
|
|
||||||
# Parse response
|
# Parse response
|
||||||
if response.content_type == 'application/json':
|
if response.content_type == 'application/json':
|
||||||
result = await response.json()
|
result = await response.json()
|
||||||
self._log(f"Response received", level='debug')
|
|
||||||
return result
|
return result
|
||||||
else:
|
else:
|
||||||
# For DELETE or other non-JSON responses
|
# For DELETE or other non-JSON responses
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
raise EspoCRMTimeoutError(
|
||||||
|
f"Request timed out after {effective_timeout.total}s",
|
||||||
|
status_code=408
|
||||||
|
)
|
||||||
except aiohttp.ClientError as e:
|
except aiohttp.ClientError as e:
|
||||||
self._log(f"API call failed: {e}", level='error')
|
self.logger.error(f"API call failed: {e}")
|
||||||
raise EspoCRMError(f"Request failed: {e}") from e
|
raise EspoCRMAPIError(f"Request failed: {str(e)}")
|
||||||
|
|
||||||
async def get_entity(self, entity_type: str, entity_id: str) -> Dict[str, Any]:
|
async def get_entity(self, entity_type: str, entity_id: str) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
@@ -345,9 +352,9 @@ class EspoCRMAPI:
|
|||||||
|
|
||||||
effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
||||||
|
|
||||||
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
|
session = await self._get_session()
|
||||||
try:
|
try:
|
||||||
async with session.post(url, headers=headers, data=form_data) as response:
|
async with session.post(url, headers=headers, data=form_data, timeout=effective_timeout) as response:
|
||||||
self._log(f"Upload response status: {response.status}")
|
self._log(f"Upload response status: {response.status}")
|
||||||
|
|
||||||
if response.status == 401:
|
if response.status == 401:
|
||||||
@@ -395,9 +402,9 @@ class EspoCRMAPI:
|
|||||||
|
|
||||||
effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
effective_timeout = aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
||||||
|
|
||||||
async with aiohttp.ClientSession(timeout=effective_timeout) as session:
|
session = await self._get_session()
|
||||||
try:
|
try:
|
||||||
async with session.get(url, headers=headers) as response:
|
async with session.get(url, headers=headers, timeout=effective_timeout) as response:
|
||||||
if response.status == 401:
|
if response.status == 401:
|
||||||
raise EspoCRMAuthError("Authentication failed - check API key")
|
raise EspoCRMAuthError("Authentication failed - check API key")
|
||||||
elif response.status == 403:
|
elif response.status == 403:
|
||||||
|
|||||||
@@ -8,6 +8,16 @@ from typing import Dict, Any, Optional, List
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from services.models import (
|
||||||
|
AdvowareBeteiligteCreate,
|
||||||
|
AdvowareBeteiligteUpdate,
|
||||||
|
EspoCRMBeteiligteCreate,
|
||||||
|
validate_beteiligte_advoware,
|
||||||
|
validate_beteiligte_espocrm
|
||||||
|
)
|
||||||
|
from services.exceptions import ValidationError
|
||||||
|
from services.config import FEATURE_FLAGS
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@@ -27,6 +37,9 @@ class BeteiligteMapper:
|
|||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dict mit Stammdaten für Advoware API (POST/PUT /api/v1/advonet/Beteiligte)
|
Dict mit Stammdaten für Advoware API (POST/PUT /api/v1/advonet/Beteiligte)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValidationError: Bei Validierungsfehlern (wenn strict_validation aktiviert)
|
||||||
"""
|
"""
|
||||||
logger.debug(f"Mapping EspoCRM → Advoware STAMMDATEN: {espo_entity.get('id')}")
|
logger.debug(f"Mapping EspoCRM → Advoware STAMMDATEN: {espo_entity.get('id')}")
|
||||||
|
|
||||||
@@ -78,6 +91,14 @@ class BeteiligteMapper:
|
|||||||
|
|
||||||
logger.debug(f"Mapped to Advoware STAMMDATEN: name={advo_data.get('name')}, vorname={advo_data.get('vorname')}, rechtsform={advo_data.get('rechtsform')}")
|
logger.debug(f"Mapped to Advoware STAMMDATEN: name={advo_data.get('name')}, vorname={advo_data.get('vorname')}, rechtsform={advo_data.get('rechtsform')}")
|
||||||
|
|
||||||
|
# Optional: Validiere mit Pydantic wenn aktiviert
|
||||||
|
if FEATURE_FLAGS.strict_validation:
|
||||||
|
try:
|
||||||
|
validate_beteiligte_advoware(advo_data)
|
||||||
|
except ValidationError as e:
|
||||||
|
logger.warning(f"Validation warning: {e}")
|
||||||
|
# Continue anyway - validation ist optional
|
||||||
|
|
||||||
return advo_data
|
return advo_data
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
217
services/exceptions.py
Normal file
217
services/exceptions.py
Normal file
@@ -0,0 +1,217 @@
|
|||||||
|
"""
|
||||||
|
Custom Exception Classes für BitByLaw Integration
|
||||||
|
|
||||||
|
Hierarchie:
|
||||||
|
- IntegrationError (Base)
|
||||||
|
- APIError
|
||||||
|
- AdvowareAPIError
|
||||||
|
- AdvowareAuthError
|
||||||
|
- AdvowareTimeoutError
|
||||||
|
- EspoCRMAPIError
|
||||||
|
- EspoCRMAuthError
|
||||||
|
- EspoCRMTimeoutError
|
||||||
|
- SyncError
|
||||||
|
- LockAcquisitionError
|
||||||
|
- ValidationError
|
||||||
|
- ConflictError
|
||||||
|
- RetryableError
|
||||||
|
- NonRetryableError
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Optional, Dict, Any
|
||||||
|
|
||||||
|
|
||||||
|
class IntegrationError(Exception):
|
||||||
|
"""Base exception for all integration errors"""
|
||||||
|
|
||||||
|
def __init__(self, message: str, details: Optional[Dict[str, Any]] = None):
|
||||||
|
super().__init__(message)
|
||||||
|
self.message = message
|
||||||
|
self.details = details or {}
|
||||||
|
|
||||||
|
|
||||||
|
# ========== API Errors ==========
|
||||||
|
|
||||||
|
class APIError(IntegrationError):
|
||||||
|
"""Base class for all API-related errors"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
message: str,
|
||||||
|
status_code: Optional[int] = None,
|
||||||
|
response_body: Optional[str] = None,
|
||||||
|
details: Optional[Dict[str, Any]] = None
|
||||||
|
):
|
||||||
|
super().__init__(message, details)
|
||||||
|
self.status_code = status_code
|
||||||
|
self.response_body = response_body
|
||||||
|
|
||||||
|
|
||||||
|
class AdvowareAPIError(APIError):
|
||||||
|
"""Advoware API error"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class AdvowareAuthError(AdvowareAPIError):
|
||||||
|
"""Advoware authentication error"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class AdvowareTimeoutError(AdvowareAPIError):
|
||||||
|
"""Advoware API timeout"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class EspoCRMAPIError(APIError):
|
||||||
|
"""EspoCRM API error"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class EspoCRMAuthError(EspoCRMAPIError):
|
||||||
|
"""EspoCRM authentication error"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class EspoCRMTimeoutError(EspoCRMAPIError):
|
||||||
|
"""EspoCRM API timeout"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Sync Errors ==========
|
||||||
|
|
||||||
|
class SyncError(IntegrationError):
|
||||||
|
"""Base class for synchronization errors"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class LockAcquisitionError(SyncError):
|
||||||
|
"""Failed to acquire distributed lock"""
|
||||||
|
|
||||||
|
def __init__(self, entity_id: str, lock_key: str, message: Optional[str] = None):
|
||||||
|
super().__init__(
|
||||||
|
message or f"Could not acquire lock for entity {entity_id}",
|
||||||
|
details={"entity_id": entity_id, "lock_key": lock_key}
|
||||||
|
)
|
||||||
|
self.entity_id = entity_id
|
||||||
|
self.lock_key = lock_key
|
||||||
|
|
||||||
|
|
||||||
|
class ValidationError(SyncError):
|
||||||
|
"""Data validation error"""
|
||||||
|
|
||||||
|
def __init__(self, message: str, field: Optional[str] = None, value: Any = None):
|
||||||
|
super().__init__(
|
||||||
|
message,
|
||||||
|
details={"field": field, "value": value}
|
||||||
|
)
|
||||||
|
self.field = field
|
||||||
|
self.value = value
|
||||||
|
|
||||||
|
|
||||||
|
class ConflictError(SyncError):
|
||||||
|
"""Data conflict during synchronization"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
message: str,
|
||||||
|
entity_id: str,
|
||||||
|
source_system: Optional[str] = None,
|
||||||
|
target_system: Optional[str] = None
|
||||||
|
):
|
||||||
|
super().__init__(
|
||||||
|
message,
|
||||||
|
details={
|
||||||
|
"entity_id": entity_id,
|
||||||
|
"source_system": source_system,
|
||||||
|
"target_system": target_system
|
||||||
|
}
|
||||||
|
)
|
||||||
|
self.entity_id = entity_id
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Retry Classification ==========
|
||||||
|
|
||||||
|
class RetryableError(IntegrationError):
|
||||||
|
"""Error that should trigger retry logic"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
message: str,
|
||||||
|
retry_after_seconds: Optional[int] = None,
|
||||||
|
details: Optional[Dict[str, Any]] = None
|
||||||
|
):
|
||||||
|
super().__init__(message, details)
|
||||||
|
self.retry_after_seconds = retry_after_seconds
|
||||||
|
|
||||||
|
|
||||||
|
class NonRetryableError(IntegrationError):
|
||||||
|
"""Error that should NOT trigger retry (e.g., validation errors)"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Redis Errors ==========
|
||||||
|
|
||||||
|
class RedisError(IntegrationError):
|
||||||
|
"""Redis connection or operation error"""
|
||||||
|
|
||||||
|
def __init__(self, message: str, operation: Optional[str] = None):
|
||||||
|
super().__init__(message, details={"operation": operation})
|
||||||
|
self.operation = operation
|
||||||
|
|
||||||
|
|
||||||
|
class RedisConnectionError(RedisError):
|
||||||
|
"""Redis connection failed"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Helper Functions ==========
|
||||||
|
|
||||||
|
def is_retryable(error: Exception) -> bool:
|
||||||
|
"""
|
||||||
|
Determine if an error should trigger retry logic.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
error: Exception to check
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if error is retryable
|
||||||
|
"""
|
||||||
|
if isinstance(error, NonRetryableError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
if isinstance(error, RetryableError):
|
||||||
|
return True
|
||||||
|
|
||||||
|
if isinstance(error, (AdvowareTimeoutError, EspoCRMTimeoutError)):
|
||||||
|
return True
|
||||||
|
|
||||||
|
if isinstance(error, ValidationError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Default: assume retryable for API errors
|
||||||
|
if isinstance(error, APIError):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_retry_delay(error: Exception, attempt: int) -> int:
|
||||||
|
"""
|
||||||
|
Calculate retry delay based on error type and attempt number.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
error: The error that occurred
|
||||||
|
attempt: Current retry attempt (0-indexed)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Delay in seconds
|
||||||
|
"""
|
||||||
|
if isinstance(error, RetryableError) and error.retry_after_seconds:
|
||||||
|
return error.retry_after_seconds
|
||||||
|
|
||||||
|
# Exponential backoff: [1, 5, 15, 60, 240] minutes
|
||||||
|
backoff_minutes = [1, 5, 15, 60, 240]
|
||||||
|
if attempt < len(backoff_minutes):
|
||||||
|
return backoff_minutes[attempt] * 60
|
||||||
|
|
||||||
|
return backoff_minutes[-1] * 60
|
||||||
363
services/logging_utils.py
Normal file
363
services/logging_utils.py
Normal file
@@ -0,0 +1,363 @@
|
|||||||
|
"""
|
||||||
|
Konsistenter Logging Wrapper für BitByLaw Integration
|
||||||
|
|
||||||
|
Vereinheitlicht Logging über:
|
||||||
|
- Standard Python Logger
|
||||||
|
- Motia FlowContext Logger
|
||||||
|
- Structured Logging
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from typing import Optional, Any, Dict
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
|
class IntegrationLogger:
|
||||||
|
"""
|
||||||
|
Unified Logger mit Support für:
|
||||||
|
- Motia FlowContext
|
||||||
|
- Standard Python Logging
|
||||||
|
- Structured Logging
|
||||||
|
- Performance Tracking
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
context: Optional[Any] = None,
|
||||||
|
extra_fields: Optional[Dict[str, Any]] = None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Initialize logger.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: Logger name (z.B. 'advoware.api')
|
||||||
|
context: Optional Motia FlowContext
|
||||||
|
extra_fields: Optional extra fields für structured logging
|
||||||
|
"""
|
||||||
|
self.name = name
|
||||||
|
self.context = context
|
||||||
|
self.extra_fields = extra_fields or {}
|
||||||
|
self._standard_logger = logging.getLogger(name)
|
||||||
|
|
||||||
|
def _format_message(self, message: str, **kwargs) -> str:
|
||||||
|
"""
|
||||||
|
Formatiert Log-Message mit optionalen Feldern.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message: Base message
|
||||||
|
**kwargs: Extra fields
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Formatted message
|
||||||
|
"""
|
||||||
|
if not kwargs and not self.extra_fields:
|
||||||
|
return message
|
||||||
|
|
||||||
|
# Merge extra fields
|
||||||
|
fields = {**self.extra_fields, **kwargs}
|
||||||
|
|
||||||
|
if fields:
|
||||||
|
field_str = " | ".join(f"{k}={v}" for k, v in fields.items())
|
||||||
|
return f"{message} | {field_str}"
|
||||||
|
|
||||||
|
return message
|
||||||
|
|
||||||
|
def _log(
|
||||||
|
self,
|
||||||
|
level: str,
|
||||||
|
message: str,
|
||||||
|
exc_info: bool = False,
|
||||||
|
**kwargs
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Internal logging method.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
level: Log level (debug, info, warning, error, critical)
|
||||||
|
message: Log message
|
||||||
|
exc_info: Include exception info
|
||||||
|
**kwargs: Extra fields for structured logging
|
||||||
|
"""
|
||||||
|
formatted_msg = self._format_message(message, **kwargs)
|
||||||
|
|
||||||
|
# Log to FlowContext if available
|
||||||
|
if self.context and hasattr(self.context, 'logger'):
|
||||||
|
try:
|
||||||
|
log_func = getattr(self.context.logger, level, self.context.logger.info)
|
||||||
|
log_func(formatted_msg)
|
||||||
|
except Exception:
|
||||||
|
# Fallback to standard logger
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Always log to standard Python logger
|
||||||
|
log_func = getattr(self._standard_logger, level, self._standard_logger.info)
|
||||||
|
log_func(formatted_msg, exc_info=exc_info)
|
||||||
|
|
||||||
|
def debug(self, message: str, **kwargs) -> None:
|
||||||
|
"""Log debug message"""
|
||||||
|
self._log('debug', message, **kwargs)
|
||||||
|
|
||||||
|
def info(self, message: str, **kwargs) -> None:
|
||||||
|
"""Log info message"""
|
||||||
|
self._log('info', message, **kwargs)
|
||||||
|
|
||||||
|
def warning(self, message: str, **kwargs) -> None:
|
||||||
|
"""Log warning message"""
|
||||||
|
self._log('warning', message, **kwargs)
|
||||||
|
|
||||||
|
def warn(self, message: str, **kwargs) -> None:
|
||||||
|
"""Alias for warning"""
|
||||||
|
self.warning(message, **kwargs)
|
||||||
|
|
||||||
|
def error(self, message: str, exc_info: bool = True, **kwargs) -> None:
|
||||||
|
"""Log error message (with exception info by default)"""
|
||||||
|
self._log('error', message, exc_info=exc_info, **kwargs)
|
||||||
|
|
||||||
|
def critical(self, message: str, exc_info: bool = True, **kwargs) -> None:
|
||||||
|
"""Log critical message"""
|
||||||
|
self._log('critical', message, exc_info=exc_info, **kwargs)
|
||||||
|
|
||||||
|
def exception(self, message: str, **kwargs) -> None:
|
||||||
|
"""Log exception with traceback"""
|
||||||
|
self._log('error', message, exc_info=True, **kwargs)
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def operation(self, operation_name: str, **context_fields):
|
||||||
|
"""
|
||||||
|
Context manager für Operations mit automatischem Timing.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
operation_name: Name der Operation
|
||||||
|
**context_fields: Context fields für logging
|
||||||
|
|
||||||
|
Example:
|
||||||
|
with logger.operation('sync_beteiligte', entity_id='123'):
|
||||||
|
# Do sync
|
||||||
|
pass
|
||||||
|
"""
|
||||||
|
start_time = time.time()
|
||||||
|
self.info(f"▶️ Starting: {operation_name}", **context_fields)
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
duration_ms = int((time.time() - start_time) * 1000)
|
||||||
|
self.info(
|
||||||
|
f"✅ Completed: {operation_name}",
|
||||||
|
duration_ms=duration_ms,
|
||||||
|
**context_fields
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
duration_ms = int((time.time() - start_time) * 1000)
|
||||||
|
self.error(
|
||||||
|
f"❌ Failed: {operation_name} - {str(e)}",
|
||||||
|
duration_ms=duration_ms,
|
||||||
|
error_type=type(e).__name__,
|
||||||
|
**context_fields
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def api_call(self, endpoint: str, method: str = 'GET', **context_fields):
|
||||||
|
"""
|
||||||
|
Context manager speziell für API-Calls.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
endpoint: API endpoint
|
||||||
|
method: HTTP method
|
||||||
|
**context_fields: Extra context
|
||||||
|
|
||||||
|
Example:
|
||||||
|
with logger.api_call('/api/v1/Beteiligte', method='POST'):
|
||||||
|
result = await api.post(...)
|
||||||
|
"""
|
||||||
|
start_time = time.time()
|
||||||
|
self.debug(f"API Call: {method} {endpoint}", **context_fields)
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
duration_ms = int((time.time() - start_time) * 1000)
|
||||||
|
self.debug(
|
||||||
|
f"API Success: {method} {endpoint}",
|
||||||
|
duration_ms=duration_ms,
|
||||||
|
**context_fields
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
duration_ms = int((time.time() - start_time) * 1000)
|
||||||
|
self.error(
|
||||||
|
f"API Error: {method} {endpoint} - {str(e)}",
|
||||||
|
duration_ms=duration_ms,
|
||||||
|
error_type=type(e).__name__,
|
||||||
|
**context_fields
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def with_context(self, **extra_fields) -> 'IntegrationLogger':
|
||||||
|
"""
|
||||||
|
Erstellt neuen Logger mit zusätzlichen Context-Feldern.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
**extra_fields: Additional context fields
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
New logger instance with merged context
|
||||||
|
"""
|
||||||
|
merged_fields = {**self.extra_fields, **extra_fields}
|
||||||
|
return IntegrationLogger(
|
||||||
|
name=self.name,
|
||||||
|
context=self.context,
|
||||||
|
extra_fields=merged_fields
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Factory Functions ==========
|
||||||
|
|
||||||
|
def get_logger(
|
||||||
|
name: str,
|
||||||
|
context: Optional[Any] = None,
|
||||||
|
**extra_fields
|
||||||
|
) -> IntegrationLogger:
|
||||||
|
"""
|
||||||
|
Factory function für Logger.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: Logger name
|
||||||
|
context: Optional Motia FlowContext
|
||||||
|
**extra_fields: Extra context fields
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Configured logger
|
||||||
|
|
||||||
|
Example:
|
||||||
|
logger = get_logger('advoware.sync', context=ctx, entity_id='123')
|
||||||
|
logger.info("Starting sync")
|
||||||
|
"""
|
||||||
|
return IntegrationLogger(name, context, extra_fields)
|
||||||
|
|
||||||
|
|
||||||
|
def get_service_logger(
|
||||||
|
service_name: str,
|
||||||
|
context: Optional[Any] = None
|
||||||
|
) -> IntegrationLogger:
|
||||||
|
"""
|
||||||
|
Factory für Service-Logger.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
service_name: Service name (z.B. 'advoware', 'espocrm')
|
||||||
|
context: Optional FlowContext
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Service logger
|
||||||
|
"""
|
||||||
|
return IntegrationLogger(f"services.{service_name}", context)
|
||||||
|
|
||||||
|
|
||||||
|
def get_step_logger(
|
||||||
|
step_name: str,
|
||||||
|
context: Optional[Any] = None
|
||||||
|
) -> IntegrationLogger:
|
||||||
|
"""
|
||||||
|
Factory für Step-Logger.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
step_name: Step name
|
||||||
|
context: FlowContext (required for steps)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Step logger
|
||||||
|
"""
|
||||||
|
return IntegrationLogger(f"steps.{step_name}", context)
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Decorator for Logging ==========
|
||||||
|
|
||||||
|
def log_operation(operation_name: str):
|
||||||
|
"""
|
||||||
|
Decorator für automatisches Operation-Logging.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
operation_name: Name der Operation
|
||||||
|
|
||||||
|
Example:
|
||||||
|
@log_operation('sync_beteiligte')
|
||||||
|
async def sync_entity(entity_id: str):
|
||||||
|
...
|
||||||
|
"""
|
||||||
|
def decorator(func):
|
||||||
|
async def async_wrapper(*args, **kwargs):
|
||||||
|
# Try to find context in args
|
||||||
|
context = None
|
||||||
|
for arg in args:
|
||||||
|
if hasattr(arg, 'logger'):
|
||||||
|
context = arg
|
||||||
|
break
|
||||||
|
|
||||||
|
logger = get_logger(func.__module__, context)
|
||||||
|
|
||||||
|
with logger.operation(operation_name):
|
||||||
|
return await func(*args, **kwargs)
|
||||||
|
|
||||||
|
def sync_wrapper(*args, **kwargs):
|
||||||
|
context = None
|
||||||
|
for arg in args:
|
||||||
|
if hasattr(arg, 'logger'):
|
||||||
|
context = arg
|
||||||
|
break
|
||||||
|
|
||||||
|
logger = get_logger(func.__module__, context)
|
||||||
|
|
||||||
|
with logger.operation(operation_name):
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
# Return appropriate wrapper
|
||||||
|
import asyncio
|
||||||
|
if asyncio.iscoroutinefunction(func):
|
||||||
|
return async_wrapper
|
||||||
|
else:
|
||||||
|
return sync_wrapper
|
||||||
|
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Performance Tracking ==========
|
||||||
|
|
||||||
|
class PerformanceTracker:
|
||||||
|
"""Track performance metrics for operations"""
|
||||||
|
|
||||||
|
def __init__(self, logger: IntegrationLogger):
|
||||||
|
self.logger = logger
|
||||||
|
self.metrics: Dict[str, list] = {}
|
||||||
|
|
||||||
|
def record(self, operation: str, duration_ms: int) -> None:
|
||||||
|
"""Record operation duration"""
|
||||||
|
if operation not in self.metrics:
|
||||||
|
self.metrics[operation] = []
|
||||||
|
self.metrics[operation].append(duration_ms)
|
||||||
|
|
||||||
|
def get_stats(self, operation: str) -> Dict[str, float]:
|
||||||
|
"""Get statistics for operation"""
|
||||||
|
if operation not in self.metrics:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
durations = self.metrics[operation]
|
||||||
|
return {
|
||||||
|
'count': len(durations),
|
||||||
|
'avg_ms': sum(durations) / len(durations),
|
||||||
|
'min_ms': min(durations),
|
||||||
|
'max_ms': max(durations),
|
||||||
|
'total_ms': sum(durations)
|
||||||
|
}
|
||||||
|
|
||||||
|
def log_summary(self) -> None:
|
||||||
|
"""Log summary of all operations"""
|
||||||
|
self.logger.info("=== Performance Summary ===")
|
||||||
|
for operation, durations in self.metrics.items():
|
||||||
|
stats = self.get_stats(operation)
|
||||||
|
self.logger.info(
|
||||||
|
f"{operation}: {stats['count']} calls, "
|
||||||
|
f"avg {stats['avg_ms']:.1f}ms, "
|
||||||
|
f"min {stats['min_ms']:.1f}ms, "
|
||||||
|
f"max {stats['max_ms']:.1f}ms"
|
||||||
|
)
|
||||||
259
services/models.py
Normal file
259
services/models.py
Normal file
@@ -0,0 +1,259 @@
|
|||||||
|
"""
|
||||||
|
Pydantic Models für Datenvalidierung
|
||||||
|
|
||||||
|
Definiert strenge Schemas für:
|
||||||
|
- Advoware Entities
|
||||||
|
- EspoCRM Entities
|
||||||
|
- Sync Operations
|
||||||
|
"""
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field, field_validator, ConfigDict
|
||||||
|
from typing import Optional, Literal
|
||||||
|
from datetime import date, datetime
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Enums ==========
|
||||||
|
|
||||||
|
class Rechtsform(str, Enum):
|
||||||
|
"""Rechtsformen für Beteiligte"""
|
||||||
|
NATUERLICHE_PERSON = ""
|
||||||
|
GMBH = "GmbH"
|
||||||
|
AG = "AG"
|
||||||
|
GMBH_CO_KG = "GmbH & Co. KG"
|
||||||
|
KG = "KG"
|
||||||
|
OHG = "OHG"
|
||||||
|
EV = "e.V."
|
||||||
|
EINZELUNTERNEHMEN = "Einzelunternehmen"
|
||||||
|
FREIBERUFLER = "Freiberufler"
|
||||||
|
|
||||||
|
|
||||||
|
class SyncStatus(str, Enum):
|
||||||
|
"""Sync Status für EspoCRM Entities"""
|
||||||
|
PENDING_SYNC = "pending_sync"
|
||||||
|
SYNCING = "syncing"
|
||||||
|
CLEAN = "clean"
|
||||||
|
FAILED = "failed"
|
||||||
|
CONFLICT = "conflict"
|
||||||
|
PERMANENTLY_FAILED = "permanently_failed"
|
||||||
|
|
||||||
|
|
||||||
|
class SalutationType(str, Enum):
|
||||||
|
"""Anredetypen"""
|
||||||
|
HERR = "Herr"
|
||||||
|
FRAU = "Frau"
|
||||||
|
DIVERS = "Divers"
|
||||||
|
FIRMA = ""
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Advoware Models ==========
|
||||||
|
|
||||||
|
class AdvowareBeteiligteBase(BaseModel):
|
||||||
|
"""Base Model für Advoware Beteiligte (POST/PUT)"""
|
||||||
|
|
||||||
|
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True)
|
||||||
|
|
||||||
|
name: str = Field(..., min_length=1, max_length=200)
|
||||||
|
vorname: Optional[str] = Field(None, max_length=100)
|
||||||
|
rechtsform: str = Field(default="")
|
||||||
|
anrede: Optional[str] = Field(None, max_length=50)
|
||||||
|
titel: Optional[str] = Field(None, max_length=50)
|
||||||
|
bAnrede: Optional[str] = Field(None, max_length=200, description="Briefanrede")
|
||||||
|
zusatz: Optional[str] = Field(None, max_length=200)
|
||||||
|
geburtsdatum: Optional[date] = None
|
||||||
|
|
||||||
|
@field_validator('name')
|
||||||
|
@classmethod
|
||||||
|
def validate_name(cls, v: str) -> str:
|
||||||
|
if not v or not v.strip():
|
||||||
|
raise ValueError('Name darf nicht leer sein')
|
||||||
|
return v.strip()
|
||||||
|
|
||||||
|
@field_validator('geburtsdatum')
|
||||||
|
@classmethod
|
||||||
|
def validate_birthdate(cls, v: Optional[date]) -> Optional[date]:
|
||||||
|
if v and v > date.today():
|
||||||
|
raise ValueError('Geburtsdatum kann nicht in der Zukunft liegen')
|
||||||
|
if v and v.year < 1900:
|
||||||
|
raise ValueError('Geburtsdatum vor 1900 nicht erlaubt')
|
||||||
|
return v
|
||||||
|
|
||||||
|
|
||||||
|
class AdvowareBeteiligteRead(AdvowareBeteiligteBase):
|
||||||
|
"""Advoware Beteiligte Response (GET)"""
|
||||||
|
|
||||||
|
betNr: int = Field(..., ge=1)
|
||||||
|
rowId: str = Field(..., description="Change detection ID")
|
||||||
|
|
||||||
|
# Optional fields die Advoware zurückgibt
|
||||||
|
strasse: Optional[str] = None
|
||||||
|
plz: Optional[str] = None
|
||||||
|
ort: Optional[str] = None
|
||||||
|
land: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
class AdvowareBeteiligteCreate(AdvowareBeteiligteBase):
|
||||||
|
"""Advoware Beteiligte für POST"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class AdvowareBeteiligteUpdate(AdvowareBeteiligteBase):
|
||||||
|
"""Advoware Beteiligte für PUT"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ========== EspoCRM Models ==========
|
||||||
|
|
||||||
|
class EspoCRMBeteiligteBase(BaseModel):
|
||||||
|
"""Base Model für EspoCRM CBeteiligte"""
|
||||||
|
|
||||||
|
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True)
|
||||||
|
|
||||||
|
name: str = Field(..., min_length=1, max_length=255)
|
||||||
|
firstName: Optional[str] = Field(None, max_length=100)
|
||||||
|
lastName: Optional[str] = Field(None, max_length=100)
|
||||||
|
firmenname: Optional[str] = Field(None, max_length=255)
|
||||||
|
rechtsform: str = Field(default="")
|
||||||
|
salutationName: Optional[str] = None
|
||||||
|
titel: Optional[str] = Field(None, max_length=100)
|
||||||
|
briefAnrede: Optional[str] = Field(None, max_length=255)
|
||||||
|
zusatz: Optional[str] = Field(None, max_length=255)
|
||||||
|
dateOfBirth: Optional[date] = None
|
||||||
|
|
||||||
|
@field_validator('name')
|
||||||
|
@classmethod
|
||||||
|
def validate_name(cls, v: str) -> str:
|
||||||
|
if not v or not v.strip():
|
||||||
|
raise ValueError('Name darf nicht leer sein')
|
||||||
|
return v.strip()
|
||||||
|
|
||||||
|
@field_validator('dateOfBirth')
|
||||||
|
@classmethod
|
||||||
|
def validate_birthdate(cls, v: Optional[date]) -> Optional[date]:
|
||||||
|
if v and v > date.today():
|
||||||
|
raise ValueError('Geburtsdatum kann nicht in der Zukunft liegen')
|
||||||
|
if v and v.year < 1900:
|
||||||
|
raise ValueError('Geburtsdatum vor 1900 nicht erlaubt')
|
||||||
|
return v
|
||||||
|
|
||||||
|
@field_validator('firstName', 'lastName')
|
||||||
|
@classmethod
|
||||||
|
def validate_person_fields(cls, v: Optional[str]) -> Optional[str]:
|
||||||
|
"""Validiere dass Person-Felder nur bei natürlichen Personen gesetzt sind"""
|
||||||
|
if v:
|
||||||
|
return v.strip()
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class EspoCRMBeteiligteRead(EspoCRMBeteiligteBase):
|
||||||
|
"""EspoCRM CBeteiligte Response (GET)"""
|
||||||
|
|
||||||
|
id: str = Field(..., min_length=1)
|
||||||
|
betnr: Optional[int] = Field(None, ge=1)
|
||||||
|
advowareRowId: Optional[str] = None
|
||||||
|
syncStatus: SyncStatus = Field(default=SyncStatus.PENDING_SYNC)
|
||||||
|
syncRetryCount: int = Field(default=0, ge=0, le=10)
|
||||||
|
syncErrorMessage: Optional[str] = None
|
||||||
|
advowareLastSync: Optional[datetime] = None
|
||||||
|
syncNextRetry: Optional[datetime] = None
|
||||||
|
syncAutoResetAt: Optional[datetime] = None
|
||||||
|
|
||||||
|
|
||||||
|
class EspoCRMBeteiligteCreate(EspoCRMBeteiligteBase):
|
||||||
|
"""EspoCRM CBeteiligte für POST"""
|
||||||
|
|
||||||
|
syncStatus: SyncStatus = Field(default=SyncStatus.PENDING_SYNC)
|
||||||
|
|
||||||
|
|
||||||
|
class EspoCRMBeteiligteUpdate(BaseModel):
|
||||||
|
"""EspoCRM CBeteiligte für PUT (alle Felder optional)"""
|
||||||
|
|
||||||
|
model_config = ConfigDict(str_strip_whitespace=True, validate_assignment=True)
|
||||||
|
|
||||||
|
name: Optional[str] = Field(None, min_length=1, max_length=255)
|
||||||
|
firstName: Optional[str] = Field(None, max_length=100)
|
||||||
|
lastName: Optional[str] = Field(None, max_length=100)
|
||||||
|
firmenname: Optional[str] = Field(None, max_length=255)
|
||||||
|
rechtsform: Optional[str] = None
|
||||||
|
salutationName: Optional[str] = None
|
||||||
|
titel: Optional[str] = Field(None, max_length=100)
|
||||||
|
briefAnrede: Optional[str] = Field(None, max_length=255)
|
||||||
|
zusatz: Optional[str] = Field(None, max_length=255)
|
||||||
|
dateOfBirth: Optional[date] = None
|
||||||
|
betnr: Optional[int] = Field(None, ge=1)
|
||||||
|
advowareRowId: Optional[str] = None
|
||||||
|
syncStatus: Optional[SyncStatus] = None
|
||||||
|
syncRetryCount: Optional[int] = Field(None, ge=0, le=10)
|
||||||
|
syncErrorMessage: Optional[str] = Field(None, max_length=2000)
|
||||||
|
advowareLastSync: Optional[datetime] = None
|
||||||
|
syncNextRetry: Optional[datetime] = None
|
||||||
|
|
||||||
|
def model_dump_clean(self) -> dict:
|
||||||
|
"""Gibt nur nicht-None Werte zurück (für PATCH-ähnliches Update)"""
|
||||||
|
return {k: v for k, v in self.model_dump().items() if v is not None}
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Sync Operation Models ==========
|
||||||
|
|
||||||
|
class SyncOperation(BaseModel):
|
||||||
|
"""Model für Sync-Operation Tracking"""
|
||||||
|
|
||||||
|
entity_id: str
|
||||||
|
action: Literal["create", "update", "delete", "sync_check"]
|
||||||
|
source: Literal["webhook", "cron", "api", "manual"]
|
||||||
|
timestamp: datetime = Field(default_factory=datetime.utcnow)
|
||||||
|
entity_type: str = "CBeteiligte"
|
||||||
|
|
||||||
|
|
||||||
|
class SyncResult(BaseModel):
|
||||||
|
"""Result einer Sync-Operation"""
|
||||||
|
|
||||||
|
success: bool
|
||||||
|
entity_id: str
|
||||||
|
action: str
|
||||||
|
message: Optional[str] = None
|
||||||
|
error: Optional[str] = None
|
||||||
|
details: Optional[dict] = None
|
||||||
|
duration_ms: Optional[int] = None
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Validation Helpers ==========
|
||||||
|
|
||||||
|
def validate_beteiligte_advoware(data: dict) -> AdvowareBeteiligteCreate:
|
||||||
|
"""
|
||||||
|
Validiert Advoware Beteiligte Daten.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: Dict mit Advoware Daten
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Validiertes Model
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValidationError: Bei Validierungsfehlern
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return AdvowareBeteiligteCreate.model_validate(data)
|
||||||
|
except Exception as e:
|
||||||
|
from services.exceptions import ValidationError
|
||||||
|
raise ValidationError(f"Invalid Advoware data: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def validate_beteiligte_espocrm(data: dict) -> EspoCRMBeteiligteCreate:
|
||||||
|
"""
|
||||||
|
Validiert EspoCRM Beteiligte Daten.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: Dict mit EspoCRM Daten
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Validiertes Model
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValidationError: Bei Validierungsfehlern
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return EspoCRMBeteiligteCreate.model_validate(data)
|
||||||
|
except Exception as e:
|
||||||
|
from services.exceptions import ValidationError
|
||||||
|
raise ValidationError(f"Invalid EspoCRM data: {e}")
|
||||||
191
services/redis_client.py
Normal file
191
services/redis_client.py
Normal file
@@ -0,0 +1,191 @@
|
|||||||
|
"""
|
||||||
|
Redis Client Factory
|
||||||
|
|
||||||
|
Zentralisierte Redis-Client-Verwaltung mit:
|
||||||
|
- Singleton Pattern
|
||||||
|
- Connection Pooling
|
||||||
|
- Automatic Reconnection
|
||||||
|
- Health Checks
|
||||||
|
"""
|
||||||
|
|
||||||
|
import redis
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from typing import Optional
|
||||||
|
from services.exceptions import RedisConnectionError
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RedisClientFactory:
|
||||||
|
"""
|
||||||
|
Singleton Factory für Redis Clients.
|
||||||
|
|
||||||
|
Vorteile:
|
||||||
|
- Eine zentrale Konfiguration
|
||||||
|
- Connection Pooling
|
||||||
|
- Lazy Initialization
|
||||||
|
- Besseres Error Handling
|
||||||
|
"""
|
||||||
|
|
||||||
|
_instance: Optional[redis.Redis] = None
|
||||||
|
_connection_pool: Optional[redis.ConnectionPool] = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_client(cls, strict: bool = False) -> Optional[redis.Redis]:
|
||||||
|
"""
|
||||||
|
Gibt Redis Client zurück (erstellt wenn nötig).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
strict: Wenn True, wirft Exception bei Verbindungsfehlern.
|
||||||
|
Wenn False, gibt None zurück (für optionale Redis-Nutzung).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Redis client oder None (wenn strict=False und Verbindung fehlschlägt)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RedisConnectionError: Wenn strict=True und Verbindung fehlschlägt
|
||||||
|
"""
|
||||||
|
if cls._instance is None:
|
||||||
|
try:
|
||||||
|
cls._instance = cls._create_client()
|
||||||
|
logger.info("Redis client created successfully")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to create Redis client: {e}")
|
||||||
|
if strict:
|
||||||
|
raise RedisConnectionError(
|
||||||
|
f"Could not connect to Redis: {e}",
|
||||||
|
operation="get_client"
|
||||||
|
)
|
||||||
|
logger.warning("Redis unavailable - continuing without caching")
|
||||||
|
return None
|
||||||
|
|
||||||
|
return cls._instance
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _create_client(cls) -> redis.Redis:
|
||||||
|
"""
|
||||||
|
Erstellt neuen Redis Client mit Connection Pool.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Configured Redis client
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
redis.ConnectionError: Bei Verbindungsproblemen
|
||||||
|
"""
|
||||||
|
# Load configuration from environment
|
||||||
|
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||||
|
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||||
|
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||||
|
redis_timeout = int(os.getenv('REDIS_TIMEOUT_SECONDS', '5'))
|
||||||
|
redis_max_connections = int(os.getenv('REDIS_MAX_CONNECTIONS', '50'))
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Creating Redis client: {redis_host}:{redis_port} "
|
||||||
|
f"(db={redis_db}, timeout={redis_timeout}s)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create connection pool
|
||||||
|
if cls._connection_pool is None:
|
||||||
|
cls._connection_pool = redis.ConnectionPool(
|
||||||
|
host=redis_host,
|
||||||
|
port=redis_port,
|
||||||
|
db=redis_db,
|
||||||
|
socket_timeout=redis_timeout,
|
||||||
|
socket_connect_timeout=redis_timeout,
|
||||||
|
max_connections=redis_max_connections,
|
||||||
|
decode_responses=True # Auto-decode bytes zu strings
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create client from pool
|
||||||
|
client = redis.Redis(connection_pool=cls._connection_pool)
|
||||||
|
|
||||||
|
# Verify connection
|
||||||
|
client.ping()
|
||||||
|
|
||||||
|
return client
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def reset(cls) -> None:
|
||||||
|
"""
|
||||||
|
Reset factory state (hauptsächlich für Tests).
|
||||||
|
|
||||||
|
Schließt bestehende Verbindungen und setzt Singleton zurück.
|
||||||
|
"""
|
||||||
|
if cls._instance:
|
||||||
|
try:
|
||||||
|
cls._instance.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error closing Redis client: {e}")
|
||||||
|
|
||||||
|
if cls._connection_pool:
|
||||||
|
try:
|
||||||
|
cls._connection_pool.disconnect()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error closing connection pool: {e}")
|
||||||
|
|
||||||
|
cls._instance = None
|
||||||
|
cls._connection_pool = None
|
||||||
|
logger.info("Redis factory reset")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def health_check(cls) -> bool:
|
||||||
|
"""
|
||||||
|
Prüft Redis-Verbindung.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True wenn Redis erreichbar, False sonst
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
client = cls.get_client(strict=False)
|
||||||
|
if client is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
client.ping()
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Redis health check failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_info(cls) -> Optional[dict]:
|
||||||
|
"""
|
||||||
|
Gibt Redis Server Info zurück (für Monitoring).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Redis info dict oder None bei Fehler
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
client = cls.get_client(strict=False)
|
||||||
|
if client is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return client.info()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to get Redis info: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ========== Convenience Functions ==========
|
||||||
|
|
||||||
|
def get_redis_client(strict: bool = False) -> Optional[redis.Redis]:
|
||||||
|
"""
|
||||||
|
Convenience function für Redis Client.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
strict: Wenn True, wirft Exception bei Fehler
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Redis client oder None
|
||||||
|
"""
|
||||||
|
return RedisClientFactory.get_client(strict=strict)
|
||||||
|
|
||||||
|
|
||||||
|
def is_redis_available() -> bool:
|
||||||
|
"""
|
||||||
|
Prüft ob Redis verfügbar ist.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True wenn Redis erreichbar
|
||||||
|
"""
|
||||||
|
return RedisClientFactory.health_check()
|
||||||
@@ -9,62 +9,38 @@ Gemeinsame Funktionalität für alle Sync-Operationen:
|
|||||||
|
|
||||||
from typing import Dict, Any, Optional
|
from typing import Dict, Any, Optional
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import logging
|
|
||||||
import redis
|
|
||||||
import os
|
|
||||||
import pytz
|
import pytz
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
from services.exceptions import RedisConnectionError, LockAcquisitionError
|
||||||
|
from services.redis_client import get_redis_client
|
||||||
|
from services.config import SYNC_CONFIG, get_lock_key
|
||||||
|
from services.logging_utils import get_logger
|
||||||
|
|
||||||
# Lock TTL in seconds (prevents deadlocks)
|
import redis
|
||||||
LOCK_TTL_SECONDS = 900 # 15 minutes
|
|
||||||
|
|
||||||
|
|
||||||
class BaseSyncUtils:
|
class BaseSyncUtils:
|
||||||
"""Base-Klasse mit gemeinsamer Sync-Funktionalität"""
|
"""Base-Klasse mit gemeinsamer Sync-Funktionalität"""
|
||||||
|
|
||||||
def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None):
|
def __init__(self, espocrm_api, redis_client: Optional[redis.Redis] = None, context=None):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
espocrm_api: EspoCRM API client instance
|
espocrm_api: EspoCRM API client instance
|
||||||
redis_client: Optional Redis client (wird sonst initialisiert)
|
redis_client: Optional Redis client (wird sonst über Factory initialisiert)
|
||||||
context: Optional Motia FlowContext für Logging
|
context: Optional Motia FlowContext für Logging
|
||||||
"""
|
"""
|
||||||
self.espocrm = espocrm_api
|
self.espocrm = espocrm_api
|
||||||
self.context = context
|
self.context = context
|
||||||
self.logger = context.logger if context else logger
|
self.logger = get_logger('sync_utils', context)
|
||||||
self.redis = redis_client or self._init_redis()
|
|
||||||
|
|
||||||
def _init_redis(self) -> Optional[redis.Redis]:
|
# Use provided Redis client or get from factory
|
||||||
"""Initialize Redis client for distributed locking"""
|
self.redis = redis_client or get_redis_client(strict=False)
|
||||||
try:
|
|
||||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
|
||||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
|
||||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
|
||||||
|
|
||||||
client = redis.Redis(
|
if not self.redis:
|
||||||
host=redis_host,
|
self.logger.error(
|
||||||
port=redis_port,
|
"⚠️ WARNUNG: Redis nicht verfügbar! "
|
||||||
db=redis_db,
|
"Distributed Locking deaktiviert - Race Conditions möglich!"
|
||||||
decode_responses=True
|
|
||||||
)
|
)
|
||||||
client.ping()
|
|
||||||
return client
|
|
||||||
except Exception as e:
|
|
||||||
self._log(f"Redis connection failed: {e}", level='error')
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _log(self, message: str, level: str = 'info'):
|
|
||||||
"""
|
|
||||||
Context-aware logging
|
|
||||||
|
|
||||||
Falls ein FlowContext vorhanden ist, wird dessen Logger verwendet.
|
|
||||||
Sonst fallback auf Standard-Logger.
|
|
||||||
"""
|
|
||||||
if self.context and hasattr(self.context, 'logger'):
|
|
||||||
getattr(self.context.logger, level)(message)
|
|
||||||
else:
|
|
||||||
getattr(logger, level)(message)
|
|
||||||
|
|
||||||
def _get_lock_key(self, entity_id: str) -> str:
|
def _get_lock_key(self, entity_id: str) -> str:
|
||||||
"""
|
"""
|
||||||
@@ -84,17 +60,30 @@ class BaseSyncUtils:
|
|||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True wenn Lock erfolgreich, False wenn bereits locked
|
True wenn Lock erfolgreich, False wenn bereits locked
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
LockAcquisitionError: Bei kritischen Lock-Problemen (wenn strict mode)
|
||||||
"""
|
"""
|
||||||
if not self.redis:
|
if not self.redis:
|
||||||
self._log("Redis nicht verfügbar, Lock-Mechanismus deaktiviert", level='warn')
|
self.logger.error(
|
||||||
return True # Fallback: Wenn kein Redis, immer lock erlauben
|
"CRITICAL: Distributed Locking deaktiviert - Redis nicht verfügbar!"
|
||||||
|
)
|
||||||
|
# In production: Dies könnte zu Race Conditions führen!
|
||||||
|
# Für jetzt erlauben wir Fortsetzung, aber mit Warning
|
||||||
|
return True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
|
acquired = self.redis.set(
|
||||||
|
lock_key,
|
||||||
|
"locked",
|
||||||
|
nx=True,
|
||||||
|
ex=SYNC_CONFIG.lock_ttl_seconds
|
||||||
|
)
|
||||||
return bool(acquired)
|
return bool(acquired)
|
||||||
except Exception as e:
|
except redis.RedisError as e:
|
||||||
self._log(f"Redis lock error: {e}", level='error')
|
self.logger.error(f"Redis lock error: {e}")
|
||||||
return True # Bei Fehler: Lock erlauben, um Deadlocks zu vermeiden
|
# Bei Redis-Fehler: Lock erlauben, um Deadlocks zu vermeiden
|
||||||
|
return True
|
||||||
|
|
||||||
def _release_redis_lock(self, lock_key: str) -> None:
|
def _release_redis_lock(self, lock_key: str) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -108,8 +97,8 @@ class BaseSyncUtils:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
self.redis.delete(lock_key)
|
self.redis.delete(lock_key)
|
||||||
except Exception as e:
|
except redis.RedisError as e:
|
||||||
self._log(f"Redis unlock error: {e}", level='error')
|
self.logger.error(f"Redis unlock error: {e}")
|
||||||
|
|
||||||
def _get_espocrm_datetime(self, dt: Optional[datetime] = None) -> str:
|
def _get_espocrm_datetime(self, dt: Optional[datetime] = None) -> str:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -17,9 +17,16 @@ from services.advoware_service import AdvowareService
|
|||||||
from services.espocrm import EspoCRMAPI
|
from services.espocrm import EspoCRMAPI
|
||||||
from services.espocrm_mapper import BeteiligteMapper
|
from services.espocrm_mapper import BeteiligteMapper
|
||||||
from services.beteiligte_sync_utils import BeteiligteSync
|
from services.beteiligte_sync_utils import BeteiligteSync
|
||||||
|
from services.redis_client import get_redis_client
|
||||||
|
from services.exceptions import (
|
||||||
|
AdvowareAPIError,
|
||||||
|
EspoCRMAPIError,
|
||||||
|
SyncError,
|
||||||
|
RetryableError,
|
||||||
|
is_retryable
|
||||||
|
)
|
||||||
|
from services.logging_utils import get_step_logger
|
||||||
import json
|
import json
|
||||||
import redis
|
|
||||||
import os
|
|
||||||
|
|
||||||
config = {
|
config = {
|
||||||
"name": "VMH Beteiligte Sync Handler",
|
"name": "VMH Beteiligte Sync Handler",
|
||||||
@@ -35,32 +42,36 @@ config = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]):
|
async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> Optional[Dict[str, Any]]:
|
||||||
"""Zentraler Sync-Handler für Beteiligte"""
|
"""
|
||||||
|
Zentraler Sync-Handler für Beteiligte
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_data: Event data mit entity_id, action, source
|
||||||
|
ctx: Motia FlowContext
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Optional result dict
|
||||||
|
"""
|
||||||
entity_id = event_data.get('entity_id')
|
entity_id = event_data.get('entity_id')
|
||||||
action = event_data.get('action')
|
action = event_data.get('action')
|
||||||
source = event_data.get('source')
|
source = event_data.get('source')
|
||||||
|
|
||||||
|
step_logger = get_step_logger('beteiligte_sync', ctx)
|
||||||
|
|
||||||
if not entity_id:
|
if not entity_id:
|
||||||
ctx.logger.error("Keine entity_id im Event gefunden")
|
step_logger.error("Keine entity_id im Event gefunden")
|
||||||
return
|
return None
|
||||||
|
|
||||||
ctx.logger.info(f"🔄 Sync-Handler gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}")
|
step_logger.info(
|
||||||
|
f"🔄 Sync-Handler gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}"
|
||||||
# Shared Redis client for distributed locking
|
|
||||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
|
||||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
|
||||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
|
||||||
|
|
||||||
redis_client = redis.Redis(
|
|
||||||
host=redis_host,
|
|
||||||
port=redis_port,
|
|
||||||
db=redis_db,
|
|
||||||
decode_responses=True
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Get shared Redis client (centralized)
|
||||||
|
redis_client = get_redis_client(strict=False)
|
||||||
|
|
||||||
# APIs initialisieren
|
# APIs initialisieren
|
||||||
espocrm = EspoCRMAPI()
|
espocrm = EspoCRMAPI(ctx)
|
||||||
advoware = AdvowareAPI(ctx)
|
advoware = AdvowareAPI(ctx)
|
||||||
sync_utils = BeteiligteSync(espocrm, redis_client, ctx)
|
sync_utils = BeteiligteSync(espocrm, redis_client, ctx)
|
||||||
mapper = BeteiligteMapper()
|
mapper = BeteiligteMapper()
|
||||||
|
|||||||
Reference in New Issue
Block a user