Files
motia-iii/services/sync_utils_base.py
bsiggel 69a48f5f9a Implement central configuration, custom exceptions, logging utilities, Pydantic models, and Redis client for BitByLaw integration
- Added `config.py` for centralized configuration management including Sync, API, Advoware, EspoCRM, Redis, Logging, Calendar Sync, and Feature Flags.
- Created `exceptions.py` with a hierarchy of custom exceptions for integration errors, API errors, sync errors, and Redis errors.
- Developed `logging_utils.py` for a unified logging wrapper supporting structured logging and performance tracking.
- Defined Pydantic models in `models.py` for data validation of Advoware and EspoCRM entities, including sync operation models.
- Introduced `redis_client.py` for a centralized Redis client factory with connection pooling, automatic reconnection, and health checks.
2026-03-03 17:18:49 +00:00

140 lines
4.5 KiB
Python

"""
Base Sync Utilities
Gemeinsame Funktionalität für alle Sync-Operationen:
- Redis Distributed Locking
- Context-aware Logging
- EspoCRM API Helpers
"""
from typing import Dict, Any, Optional
from datetime import datetime
import pytz
from services.exceptions import RedisConnectionError, LockAcquisitionError
from services.redis_client import get_redis_client
from services.config import SYNC_CONFIG, get_lock_key
from services.logging_utils import get_logger
import redis
class BaseSyncUtils:
"""Base-Klasse mit gemeinsamer Sync-Funktionalität"""
def __init__(self, espocrm_api, redis_client: Optional[redis.Redis] = None, context=None):
"""
Args:
espocrm_api: EspoCRM API client instance
redis_client: Optional Redis client (wird sonst über Factory initialisiert)
context: Optional Motia FlowContext für Logging
"""
self.espocrm = espocrm_api
self.context = context
self.logger = get_logger('sync_utils', context)
# Use provided Redis client or get from factory
self.redis = redis_client or get_redis_client(strict=False)
if not self.redis:
self.logger.error(
"⚠️ WARNUNG: Redis nicht verfügbar! "
"Distributed Locking deaktiviert - Race Conditions möglich!"
)
def _get_lock_key(self, entity_id: str) -> str:
"""
Erzeugt Redis Lock-Key für eine Entity
Muss in Subklassen überschrieben werden, um entity-spezifische Prefixes zu nutzen.
z.B. 'sync_lock:cbeteiligte:{entity_id}' oder 'sync_lock:document:{entity_id}'
"""
raise NotImplementedError("Subclass must implement _get_lock_key()")
def _acquire_redis_lock(self, lock_key: str) -> bool:
"""
Atomic Redis lock acquisition
Args:
lock_key: Redis key für den Lock
Returns:
True wenn Lock erfolgreich, False wenn bereits locked
Raises:
LockAcquisitionError: Bei kritischen Lock-Problemen (wenn strict mode)
"""
if not self.redis:
self.logger.error(
"CRITICAL: Distributed Locking deaktiviert - Redis nicht verfügbar!"
)
# In production: Dies könnte zu Race Conditions führen!
# Für jetzt erlauben wir Fortsetzung, aber mit Warning
return True
try:
acquired = self.redis.set(
lock_key,
"locked",
nx=True,
ex=SYNC_CONFIG.lock_ttl_seconds
)
return bool(acquired)
except redis.RedisError as e:
self.logger.error(f"Redis lock error: {e}")
# Bei Redis-Fehler: Lock erlauben, um Deadlocks zu vermeiden
return True
def _release_redis_lock(self, lock_key: str) -> None:
"""
Redis lock freigeben
Args:
lock_key: Redis key für den Lock
"""
if not self.redis:
return
try:
self.redis.delete(lock_key)
except redis.RedisError as e:
self.logger.error(f"Redis unlock error: {e}")
def _get_espocrm_datetime(self, dt: Optional[datetime] = None) -> str:
"""
Formatiert datetime für EspoCRM (ohne Timezone!)
Args:
dt: Optional datetime object (default: now UTC)
Returns:
String im Format 'YYYY-MM-DD HH:MM:SS'
"""
if dt is None:
dt = datetime.now(pytz.UTC)
elif dt.tzinfo is None:
dt = pytz.UTC.localize(dt)
return dt.strftime('%Y-%m-%d %H:%M:%S')
async def acquire_sync_lock(self, entity_id: str, **kwargs) -> bool:
"""
Erwirbt Sync-Lock für eine Entity
Muss in Subklassen implementiert werden, um entity-spezifische
Status-Updates durchzuführen.
Returns:
True wenn Lock erfolgreich, False wenn bereits locked
"""
raise NotImplementedError("Subclass must implement acquire_sync_lock()")
async def release_sync_lock(self, entity_id: str, **kwargs) -> None:
"""
Gibt Sync-Lock frei und setzt finalen Status
Muss in Subklassen implementiert werden, um entity-spezifische
Status-Updates durchzuführen.
"""
raise NotImplementedError("Subclass must implement release_sync_lock()")