refactor(logging): remove unused logger instances and enhance error logging in webhook steps
This commit is contained in:
@@ -7,9 +7,6 @@ Basierend auf ADRESSEN_SYNC_ANALYSE.md Abschnitt 12.
|
|||||||
|
|
||||||
from typing import Dict, Any, Optional
|
from typing import Dict, Any, Optional
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import logging
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class AdressenMapper:
|
class AdressenMapper:
|
||||||
|
|||||||
@@ -26,8 +26,6 @@ from services.espocrm import EspoCRMAPI
|
|||||||
from services.adressen_mapper import AdressenMapper
|
from services.adressen_mapper import AdressenMapper
|
||||||
from services.notification_utils import NotificationManager
|
from services.notification_utils import NotificationManager
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class AdressenSync:
|
class AdressenSync:
|
||||||
"""Sync-Klasse für Adressen zwischen EspoCRM und Advoware"""
|
"""Sync-Klasse für Adressen zwischen EspoCRM und Advoware"""
|
||||||
|
|||||||
@@ -72,6 +72,11 @@ class AdvowareAPI:
|
|||||||
|
|
||||||
self._session: Optional[aiohttp.ClientSession] = None
|
self._session: Optional[aiohttp.ClientSession] = None
|
||||||
|
|
||||||
|
def _log(self, message: str, level: str = 'info') -> None:
|
||||||
|
"""Internal logging helper"""
|
||||||
|
log_func = getattr(self.logger, level, self.logger.info)
|
||||||
|
log_func(message)
|
||||||
|
|
||||||
async def _get_session(self) -> aiohttp.ClientSession:
|
async def _get_session(self) -> aiohttp.ClientSession:
|
||||||
if self._session is None or self._session.closed:
|
if self._session is None or self._session.closed:
|
||||||
self._session = aiohttp.ClientSession()
|
self._session = aiohttp.ClientSession()
|
||||||
@@ -98,8 +103,8 @@ class AdvowareAPI:
|
|||||||
signature = hmac.new(api_key_bytes, message, hashlib.sha512)
|
signature = hmac.new(api_key_bytes, message, hashlib.sha512)
|
||||||
return base64.b64encode(signature.digest()).decode('utf-8')
|
return base64.b64encode(signature.digest()).decode('utf-8')
|
||||||
|
|
||||||
def _fetch_new_access_token(self) -> str:
|
async def _fetch_new_access_token(self) -> str:
|
||||||
"""Fetch new access token from Advoware Auth API"""
|
"""Fetch new access token from Advoware Auth API (async)"""
|
||||||
self.logger.info("Fetching new access token from Advoware")
|
self.logger.info("Fetching new access token from Advoware")
|
||||||
|
|
||||||
nonce = str(uuid.uuid4())
|
nonce = str(uuid.uuid4())
|
||||||
@@ -122,40 +127,41 @@ class AdvowareAPI:
|
|||||||
|
|
||||||
self.logger.debug(f"Token request: AppID={self.app_id}, User={self.user}")
|
self.logger.debug(f"Token request: AppID={self.app_id}, User={self.user}")
|
||||||
|
|
||||||
# Using synchronous requests for token fetch (called from sync context)
|
# Async token fetch using aiohttp
|
||||||
# TODO: Convert to async in future version
|
session = await self._get_session()
|
||||||
import requests
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = requests.post(
|
async with session.post(
|
||||||
ADVOWARE_CONFIG.auth_url,
|
ADVOWARE_CONFIG.auth_url,
|
||||||
json=data,
|
json=data,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
timeout=self.api_timeout_seconds
|
timeout=aiohttp.ClientTimeout(total=self.api_timeout_seconds)
|
||||||
)
|
) as response:
|
||||||
|
self.logger.debug(f"Token response status: {response.status}")
|
||||||
self.logger.debug(f"Token response status: {response.status_code}")
|
|
||||||
|
if response.status == 401:
|
||||||
if response.status_code == 401:
|
raise AdvowareAuthError(
|
||||||
raise AdvowareAuthError(
|
"Authentication failed - check credentials",
|
||||||
"Authentication failed - check credentials",
|
status_code=401
|
||||||
status_code=401
|
)
|
||||||
)
|
|
||||||
|
if response.status >= 400:
|
||||||
response.raise_for_status()
|
error_text = await response.text()
|
||||||
|
raise AdvowareAPIError(
|
||||||
except requests.Timeout:
|
f"Token request failed ({response.status}): {error_text}",
|
||||||
|
status_code=response.status
|
||||||
|
)
|
||||||
|
|
||||||
|
result = await response.json()
|
||||||
|
|
||||||
|
except asyncio.TimeoutError:
|
||||||
raise AdvowareTimeoutError(
|
raise AdvowareTimeoutError(
|
||||||
"Token request timed out",
|
"Token request timed out",
|
||||||
status_code=408
|
status_code=408
|
||||||
)
|
)
|
||||||
except requests.RequestException as e:
|
except aiohttp.ClientError as e:
|
||||||
raise AdvowareAPIError(
|
raise AdvowareAPIError(f"Token request failed: {str(e)}")
|
||||||
f"Token request failed: {str(e)}",
|
|
||||||
status_code=getattr(e.response, 'status_code', None) if hasattr(e, 'response') else None
|
|
||||||
)
|
|
||||||
|
|
||||||
result = response.json()
|
|
||||||
access_token = result.get("access_token")
|
access_token = result.get("access_token")
|
||||||
|
|
||||||
if not access_token:
|
if not access_token:
|
||||||
@@ -173,7 +179,7 @@ class AdvowareAPI:
|
|||||||
|
|
||||||
return access_token
|
return access_token
|
||||||
|
|
||||||
def get_access_token(self, force_refresh: bool = False) -> str:
|
async def get_access_token(self, force_refresh: bool = False) -> str:
|
||||||
"""
|
"""
|
||||||
Get valid access token (from cache or fetch new).
|
Get valid access token (from cache or fetch new).
|
||||||
|
|
||||||
@@ -187,11 +193,11 @@ class AdvowareAPI:
|
|||||||
|
|
||||||
if not self.redis_client:
|
if not self.redis_client:
|
||||||
self.logger.info("No Redis available, fetching new token")
|
self.logger.info("No Redis available, fetching new token")
|
||||||
return self._fetch_new_access_token()
|
return await self._fetch_new_access_token()
|
||||||
|
|
||||||
if force_refresh:
|
if force_refresh:
|
||||||
self.logger.info("Force refresh requested, fetching new token")
|
self.logger.info("Force refresh requested, fetching new token")
|
||||||
return self._fetch_new_access_token()
|
return await self._fetch_new_access_token()
|
||||||
|
|
||||||
# Check cache
|
# Check cache
|
||||||
cached_token = self.redis_client.get(ADVOWARE_CONFIG.token_cache_key)
|
cached_token = self.redis_client.get(ADVOWARE_CONFIG.token_cache_key)
|
||||||
@@ -210,7 +216,7 @@ class AdvowareAPI:
|
|||||||
self.logger.debug(f"Error reading cached token: {e}")
|
self.logger.debug(f"Error reading cached token: {e}")
|
||||||
|
|
||||||
self.logger.info("Cached token expired or invalid, fetching new")
|
self.logger.info("Cached token expired or invalid, fetching new")
|
||||||
return self._fetch_new_access_token()
|
return await self._fetch_new_access_token()
|
||||||
|
|
||||||
async def api_call(
|
async def api_call(
|
||||||
self,
|
self,
|
||||||
@@ -254,7 +260,7 @@ class AdvowareAPI:
|
|||||||
|
|
||||||
# Get auth token
|
# Get auth token
|
||||||
try:
|
try:
|
||||||
token = self.get_access_token()
|
token = await self.get_access_token()
|
||||||
except AdvowareAuthError:
|
except AdvowareAuthError:
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -282,7 +288,7 @@ class AdvowareAPI:
|
|||||||
# Handle 401 - retry with fresh token
|
# Handle 401 - retry with fresh token
|
||||||
if response.status == 401:
|
if response.status == 401:
|
||||||
self.logger.warning("401 Unauthorized, refreshing token")
|
self.logger.warning("401 Unauthorized, refreshing token")
|
||||||
token = self.get_access_token(force_refresh=True)
|
token = await self.get_access_token(force_refresh=True)
|
||||||
effective_headers['Authorization'] = f'Bearer {token}'
|
effective_headers['Authorization'] = f'Bearer {token}'
|
||||||
|
|
||||||
async with session.request(
|
async with session.request(
|
||||||
|
|||||||
@@ -6,9 +6,6 @@ Transformiert Bankverbindungen zwischen den beiden Systemen
|
|||||||
|
|
||||||
from typing import Dict, Any, Optional, List
|
from typing import Dict, Any, Optional, List
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import logging
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class BankverbindungenMapper:
|
class BankverbindungenMapper:
|
||||||
|
|||||||
@@ -20,6 +20,12 @@ MAX_SYNC_RETRIES = 5
|
|||||||
# Retry backoff: Wartezeit zwischen Retries (in Minuten)
|
# Retry backoff: Wartezeit zwischen Retries (in Minuten)
|
||||||
RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h
|
RETRY_BACKOFF_MINUTES = [1, 5, 15, 60, 240] # 1min, 5min, 15min, 1h, 4h
|
||||||
|
|
||||||
|
# Legacy file status values (for backward compatibility)
|
||||||
|
# These are old German and English status values that may still exist in the database
|
||||||
|
LEGACY_NEW_STATUS_VALUES = {'neu', 'Neu', 'New'}
|
||||||
|
LEGACY_CHANGED_STATUS_VALUES = {'geändert', 'Geändert', 'Changed'}
|
||||||
|
LEGACY_SYNCED_STATUS_VALUES = {'synced', 'Synced', 'synchronized', 'Synchronized'}
|
||||||
|
|
||||||
|
|
||||||
class DocumentSync(BaseSyncUtils):
|
class DocumentSync(BaseSyncUtils):
|
||||||
"""Utility class for document synchronization with xAI"""
|
"""Utility class for document synchronization with xAI"""
|
||||||
@@ -185,16 +191,11 @@ class DocumentSync(BaseSyncUtils):
|
|||||||
# ═══════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════
|
||||||
# PRIORITY CHECK 2: fileStatus "new" or "changed"
|
# PRIORITY CHECK 2: fileStatus "new" or "changed"
|
||||||
# ═══════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════
|
||||||
if datei_status in [
|
# Check for standard enum values and legacy values
|
||||||
FileStatus.NEW.value,
|
is_new = (datei_status == FileStatus.NEW.value or datei_status in LEGACY_NEW_STATUS_VALUES)
|
||||||
FileStatus.CHANGED.value,
|
is_changed = (datei_status == FileStatus.CHANGED.value or datei_status in LEGACY_CHANGED_STATUS_VALUES)
|
||||||
'neu', # Legacy German values
|
|
||||||
'geändert', # Legacy German values
|
if is_new or is_changed:
|
||||||
'Neu', # Case variations
|
|
||||||
'Geändert',
|
|
||||||
'New',
|
|
||||||
'Changed'
|
|
||||||
]:
|
|
||||||
self._log(f"🆕 fileStatus: '{datei_status}' → xAI sync REQUIRED")
|
self._log(f"🆕 fileStatus: '{datei_status}' → xAI sync REQUIRED")
|
||||||
|
|
||||||
if target_collections:
|
if target_collections:
|
||||||
|
|||||||
@@ -17,8 +17,6 @@ from services.redis_client import get_redis_client
|
|||||||
from services.config import ESPOCRM_CONFIG, API_CONFIG
|
from services.config import ESPOCRM_CONFIG, API_CONFIG
|
||||||
from services.logging_utils import get_service_logger
|
from services.logging_utils import get_service_logger
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class EspoCRMAPI:
|
class EspoCRMAPI:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -18,8 +18,6 @@ from services.models import (
|
|||||||
from services.exceptions import ValidationError
|
from services.exceptions import ValidationError
|
||||||
from services.config import FEATURE_FLAGS
|
from services.config import FEATURE_FLAGS
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class BeteiligteMapper:
|
class BeteiligteMapper:
|
||||||
"""Mapper für CBeteiligte (EspoCRM) ↔ Beteiligte (Advoware)"""
|
"""Mapper für CBeteiligte (EspoCRM) ↔ Beteiligte (Advoware)"""
|
||||||
|
|||||||
@@ -24,8 +24,6 @@ from services.kommunikation_mapper import (
|
|||||||
from services.advoware_service import AdvowareService
|
from services.advoware_service import AdvowareService
|
||||||
from services.espocrm import EspoCRMAPI
|
from services.espocrm import EspoCRMAPI
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class KommunikationSyncManager:
|
class KommunikationSyncManager:
|
||||||
"""Manager für Kommunikation-Synchronisation"""
|
"""Manager für Kommunikation-Synchronisation"""
|
||||||
|
|||||||
@@ -69,4 +69,3 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
|||||||
status=500,
|
status=500,
|
||||||
body={'error': 'Internal server error', 'details': str(e)}
|
body={'error': 'Internal server error', 'details': str(e)}
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|||||||
@@ -71,7 +71,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
ctx.logger.error("=" * 80)
|
ctx.logger.error("=" * 80)
|
||||||
ctx.logger.error("❌ ERROR: VMH CREATE WEBHOOK")
|
ctx.logger.error("❌ ERROR: VMH CREATE WEBHOOK")
|
||||||
|
ctx.logger.error("=" * 80)
|
||||||
ctx.logger.error(f"Error: {e}")
|
ctx.logger.error(f"Error: {e}")
|
||||||
|
ctx.logger.error(f"Entity IDs attempted: {list(entity_ids) if 'entity_ids' in locals() else 'N/A'}")
|
||||||
|
ctx.logger.error(f"Full Payload: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
|
||||||
|
ctx.logger.error(f"Timestamp: {datetime.datetime.now().isoformat()}")
|
||||||
ctx.logger.error("=" * 80)
|
ctx.logger.error("=" * 80)
|
||||||
return ApiResponse(
|
return ApiResponse(
|
||||||
status=500,
|
status=500,
|
||||||
|
|||||||
@@ -71,7 +71,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
ctx.logger.error("=" * 80)
|
ctx.logger.error("=" * 80)
|
||||||
ctx.logger.error("❌ ERROR: VMH UPDATE WEBHOOK")
|
ctx.logger.error("❌ ERROR: VMH UPDATE WEBHOOK")
|
||||||
|
ctx.logger.error("=" * 80)
|
||||||
ctx.logger.error(f"Error: {e}")
|
ctx.logger.error(f"Error: {e}")
|
||||||
|
ctx.logger.error(f"Entity IDs attempted: {list(entity_ids) if 'entity_ids' in locals() else 'N/A'}")
|
||||||
|
ctx.logger.error(f"Full Payload: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
|
||||||
|
ctx.logger.error(f"Timestamp: {datetime.datetime.now().isoformat()}")
|
||||||
ctx.logger.error("=" * 80)
|
ctx.logger.error("=" * 80)
|
||||||
return ApiResponse(
|
return ApiResponse(
|
||||||
status=500,
|
status=500,
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
"""VMH Webhook - Document Create"""
|
"""VMH Webhook - Document Create"""
|
||||||
import json
|
import json
|
||||||
|
import datetime
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||||
|
|
||||||
@@ -74,8 +75,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
ctx.logger.error("=" * 80)
|
ctx.logger.error("=" * 80)
|
||||||
ctx.logger.error("❌ ERROR: DOCUMENT CREATE WEBHOOK")
|
ctx.logger.error("❌ ERROR: DOCUMENT CREATE WEBHOOK")
|
||||||
|
ctx.logger.error("=" * 80)
|
||||||
ctx.logger.error(f"Error: {e}")
|
ctx.logger.error(f"Error: {e}")
|
||||||
ctx.logger.error(f"Payload: {request.body}")
|
ctx.logger.error(f"Entity IDs attempted: {list(entity_ids) if 'entity_ids' in locals() else 'N/A'}")
|
||||||
|
ctx.logger.error(f"Full Payload: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
|
||||||
|
ctx.logger.error(f"Timestamp: {datetime.datetime.now().isoformat()}")
|
||||||
ctx.logger.error("=" * 80)
|
ctx.logger.error("=" * 80)
|
||||||
|
|
||||||
return ApiResponse(
|
return ApiResponse(
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
"""VMH Webhook - Document Delete"""
|
"""VMH Webhook - Document Delete"""
|
||||||
import json
|
import json
|
||||||
|
import datetime
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||||
|
|
||||||
@@ -74,8 +75,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
ctx.logger.error("=" * 80)
|
ctx.logger.error("=" * 80)
|
||||||
ctx.logger.error("❌ ERROR: DOCUMENT DELETE WEBHOOK")
|
ctx.logger.error("❌ ERROR: DOCUMENT DELETE WEBHOOK")
|
||||||
|
ctx.logger.error("=" * 80)
|
||||||
ctx.logger.error(f"Error: {e}")
|
ctx.logger.error(f"Error: {e}")
|
||||||
ctx.logger.error(f"Payload: {request.body}")
|
ctx.logger.error(f"Entity IDs attempted: {list(entity_ids) if 'entity_ids' in locals() else 'N/A'}")
|
||||||
|
ctx.logger.error(f"Full Payload: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
|
||||||
|
ctx.logger.error(f"Timestamp: {datetime.datetime.now().isoformat()}")
|
||||||
ctx.logger.error("=" * 80)
|
ctx.logger.error("=" * 80)
|
||||||
|
|
||||||
return ApiResponse(
|
return ApiResponse(
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
"""VMH Webhook - Document Update"""
|
"""VMH Webhook - Document Update"""
|
||||||
import json
|
import json
|
||||||
|
import datetime
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from motia import FlowContext, http, ApiRequest, ApiResponse
|
from motia import FlowContext, http, ApiRequest, ApiResponse
|
||||||
|
|
||||||
@@ -74,8 +75,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
ctx.logger.error("=" * 80)
|
ctx.logger.error("=" * 80)
|
||||||
ctx.logger.error("❌ ERROR: DOCUMENT UPDATE WEBHOOK")
|
ctx.logger.error("❌ ERROR: DOCUMENT UPDATE WEBHOOK")
|
||||||
|
ctx.logger.error("=" * 80)
|
||||||
ctx.logger.error(f"Error: {e}")
|
ctx.logger.error(f"Error: {e}")
|
||||||
ctx.logger.error(f"Payload: {request.body}")
|
ctx.logger.error(f"Entity IDs attempted: {list(entity_ids) if 'entity_ids' in locals() else 'N/A'}")
|
||||||
|
ctx.logger.error(f"Full Payload: {json.dumps(request.body, indent=2, ensure_ascii=False)}")
|
||||||
|
ctx.logger.error(f"Timestamp: {datetime.datetime.now().isoformat()}")
|
||||||
ctx.logger.error("=" * 80)
|
ctx.logger.error("=" * 80)
|
||||||
|
|
||||||
return ApiResponse(
|
return ApiResponse(
|
||||||
|
|||||||
Reference in New Issue
Block a user