Implement central configuration, custom exceptions, logging utilities, Pydantic models, and Redis client for BitByLaw integration
- Added `config.py` for centralized configuration management including Sync, API, Advoware, EspoCRM, Redis, Logging, Calendar Sync, and Feature Flags. - Created `exceptions.py` with a hierarchy of custom exceptions for integration errors, API errors, sync errors, and Redis errors. - Developed `logging_utils.py` for a unified logging wrapper supporting structured logging and performance tracking. - Defined Pydantic models in `models.py` for data validation of Advoware and EspoCRM entities, including sync operation models. - Introduced `redis_client.py` for a centralized Redis client factory with connection pooling, automatic reconnection, and health checks.
This commit is contained in:
@@ -13,64 +13,39 @@ Hilfsfunktionen für Sync-Operationen:
|
||||
from typing import Dict, Any, Optional, Tuple, Literal
|
||||
from datetime import datetime, timedelta
|
||||
import pytz
|
||||
import logging
|
||||
import redis
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from services.exceptions import LockAcquisitionError, SyncError, ValidationError
|
||||
from services.redis_client import get_redis_client
|
||||
from services.config import SYNC_CONFIG, get_lock_key, get_retry_delay_seconds
|
||||
from services.logging_utils import get_logger
|
||||
|
||||
import redis
|
||||
|
||||
# Timestamp-Vergleich Ergebnis-Typen
|
||||
TimestampResult = Literal["espocrm_newer", "advoware_newer", "conflict", "no_change"]
|
||||
|
||||
# Max retry before permanent failure
|
||||
MAX_SYNC_RETRIES = 5
|
||||
# Lock TTL in seconds (prevents deadlocks)
|
||||
LOCK_TTL_SECONDS = 900 # 15 minutes
|
||||
# Retry backoff: Wartezeit zwischen Retries (in Minuten)
|
||||
RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h
|
||||
# Auto-Reset nach 24h (für permanently_failed entities)
|
||||
AUTO_RESET_HOURS = 24
|
||||
|
||||
|
||||
class BeteiligteSync:
|
||||
"""Utility-Klasse für Beteiligte-Synchronisation"""
|
||||
|
||||
def __init__(self, espocrm_api, redis_client: redis.Redis = None, context=None):
|
||||
def __init__(self, espocrm_api, redis_client: Optional[redis.Redis] = None, context=None):
|
||||
self.espocrm = espocrm_api
|
||||
self.context = context
|
||||
self.logger = context.logger if context else logger
|
||||
self.redis = redis_client or self._init_redis()
|
||||
self.logger = get_logger('beteiligte_sync', context)
|
||||
|
||||
# Use provided Redis client or get from factory
|
||||
self.redis = redis_client or get_redis_client(strict=False)
|
||||
|
||||
if not self.redis:
|
||||
self.logger.error(
|
||||
"⚠️ KRITISCH: Redis nicht verfügbar! "
|
||||
"Distributed Locking deaktiviert - Race Conditions möglich!"
|
||||
)
|
||||
|
||||
# Import NotificationManager only when needed
|
||||
from services.notification_utils import NotificationManager
|
||||
self.notification_manager = NotificationManager(espocrm_api=self.espocrm, context=context)
|
||||
|
||||
def _init_redis(self) -> redis.Redis:
|
||||
"""Initialize Redis client for distributed locking"""
|
||||
try:
|
||||
redis_host = os.getenv('REDIS_HOST', 'localhost')
|
||||
redis_port = int(os.getenv('REDIS_PORT', '6379'))
|
||||
redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1'))
|
||||
|
||||
client = redis.Redis(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
db=redis_db,
|
||||
decode_responses=True
|
||||
)
|
||||
client.ping()
|
||||
return client
|
||||
except Exception as e:
|
||||
self._log(f"Redis connection failed: {e}", level='error')
|
||||
return None
|
||||
|
||||
def _log(self, message: str, level: str = 'info'):
|
||||
"""Logging mit Context-Support"""
|
||||
if self.context and hasattr(self.context, 'logger'):
|
||||
getattr(self.context.logger, level)(message)
|
||||
else:
|
||||
getattr(logger, level)(message)
|
||||
|
||||
async def acquire_sync_lock(self, entity_id: str) -> bool:
|
||||
"""
|
||||
Atomic distributed lock via Redis + syncStatus update
|
||||
@@ -80,23 +55,35 @@ class BeteiligteSync:
|
||||
|
||||
Returns:
|
||||
True wenn Lock erfolgreich, False wenn bereits im Sync
|
||||
|
||||
Raises:
|
||||
SyncError: Bei kritischen Sync-Problemen
|
||||
"""
|
||||
try:
|
||||
# STEP 1: Atomic Redis lock (prevents race conditions)
|
||||
if self.redis:
|
||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
||||
acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
|
||||
lock_key = get_lock_key('cbeteiligte', entity_id)
|
||||
acquired = self.redis.set(
|
||||
lock_key,
|
||||
"locked",
|
||||
nx=True,
|
||||
ex=SYNC_CONFIG.lock_ttl_seconds
|
||||
)
|
||||
|
||||
if not acquired:
|
||||
self._log(f"Redis lock bereits aktiv für {entity_id}", level='warn')
|
||||
self.logger.warning(f"Redis lock bereits aktiv für {entity_id}")
|
||||
return False
|
||||
else:
|
||||
self.logger.error(
|
||||
f"⚠️ WARNUNG: Sync ohne Redis-Lock für {entity_id} - Race Condition möglich!"
|
||||
)
|
||||
|
||||
# STEP 2: Update syncStatus (für UI visibility)
|
||||
await self.espocrm.update_entity('CBeteiligte', entity_id, {
|
||||
'syncStatus': 'syncing'
|
||||
})
|
||||
|
||||
self._log(f"Sync-Lock für {entity_id} erworben")
|
||||
self.logger.info(f"Sync-Lock für {entity_id} erworben")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
@@ -152,32 +139,42 @@ class BeteiligteSync:
|
||||
update_data['syncRetryCount'] = new_retry
|
||||
|
||||
# Exponential backoff - berechne nächsten Retry-Zeitpunkt
|
||||
if new_retry <= len(RETRY_BACKOFF_MINUTES):
|
||||
backoff_minutes = RETRY_BACKOFF_MINUTES[new_retry - 1]
|
||||
backoff_minutes = SYNC_CONFIG.retry_backoff_minutes
|
||||
if new_retry <= len(backoff_minutes):
|
||||
backoff_min = backoff_minutes[new_retry - 1]
|
||||
else:
|
||||
backoff_minutes = RETRY_BACKOFF_MINUTES[-1] # Letzte Backoff-Zeit
|
||||
backoff_min = backoff_minutes[-1] # Letzte Backoff-Zeit
|
||||
|
||||
next_retry = now_utc + timedelta(minutes=backoff_minutes)
|
||||
next_retry = now_utc + timedelta(minutes=backoff_min)
|
||||
update_data['syncNextRetry'] = next_retry.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
self._log(f"Retry {new_retry}/{MAX_SYNC_RETRIES}, nächster Versuch in {backoff_minutes} Minuten")
|
||||
self.logger.info(
|
||||
f"Retry {new_retry}/{SYNC_CONFIG.max_retries}, "
|
||||
f"nächster Versuch in {backoff_min} Minuten"
|
||||
)
|
||||
|
||||
# Check max retries - mark as permanently failed
|
||||
if new_retry >= MAX_SYNC_RETRIES:
|
||||
if new_retry >= SYNC_CONFIG.max_retries:
|
||||
update_data['syncStatus'] = 'permanently_failed'
|
||||
|
||||
# Auto-Reset Timestamp für Wiederherstellung nach 24h
|
||||
auto_reset_time = now_utc + timedelta(hours=AUTO_RESET_HOURS)
|
||||
auto_reset_time = now_utc + timedelta(hours=SYNC_CONFIG.auto_reset_hours)
|
||||
update_data['syncAutoResetAt'] = auto_reset_time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
await self.send_notification(
|
||||
entity_id,
|
||||
'error',
|
||||
extra_data={
|
||||
'message': f"Sync fehlgeschlagen nach {MAX_SYNC_RETRIES} Versuchen. Auto-Reset in {AUTO_RESET_HOURS}h."
|
||||
'message': (
|
||||
f"Sync fehlgeschlagen nach {SYNC_CONFIG.max_retries} Versuchen. "
|
||||
f"Auto-Reset in {SYNC_CONFIG.auto_reset_hours}h."
|
||||
)
|
||||
}
|
||||
)
|
||||
self._log(f"Max retries ({MAX_SYNC_RETRIES}) erreicht für {entity_id}, Auto-Reset um {auto_reset_time}", level='error')
|
||||
self.logger.error(
|
||||
f"Max retries ({SYNC_CONFIG.max_retries}) erreicht für {entity_id}, "
|
||||
f"Auto-Reset um {auto_reset_time}"
|
||||
)
|
||||
else:
|
||||
update_data['syncRetryCount'] = 0
|
||||
update_data['syncNextRetry'] = None
|
||||
@@ -188,19 +185,19 @@ class BeteiligteSync:
|
||||
|
||||
await self.espocrm.update_entity('CBeteiligte', entity_id, update_data)
|
||||
|
||||
self._log(f"Sync-Lock released: {entity_id} → {new_status}")
|
||||
self.logger.info(f"Sync-Lock released: {entity_id} → {new_status}")
|
||||
|
||||
# Release Redis lock
|
||||
if self.redis:
|
||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
||||
lock_key = get_lock_key('cbeteiligte', entity_id)
|
||||
self.redis.delete(lock_key)
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"Fehler beim Release Lock: {e}", level='error')
|
||||
self.logger.error(f"Fehler beim Release Lock: {e}")
|
||||
# Ensure Redis lock is released even on error
|
||||
if self.redis:
|
||||
try:
|
||||
lock_key = f"sync_lock:cbeteiligte:{entity_id}"
|
||||
lock_key = get_lock_key('cbeteiligte', entity_id)
|
||||
self.redis.delete(lock_key)
|
||||
except:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user