diff --git a/steps/advoware_cal_sync/calendar_sync_api_step.py b/steps/advoware_cal_sync/calendar_sync_api_step.py index 899088a..b62c3b5 100644 --- a/steps/advoware_cal_sync/calendar_sync_api_step.py +++ b/steps/advoware_cal_sync/calendar_sync_api_step.py @@ -38,7 +38,7 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: kuerzel = body.get('kuerzel') if not kuerzel: return ApiResponse( - status_code=400, + status=400, body={ 'error': 'kuerzel required', 'message': 'Please provide kuerzel in body' @@ -57,7 +57,7 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: } }) return ApiResponse( - status_code=200, + status=200, body={ 'status': 'triggered', 'message': 'Calendar sync triggered for all employees', @@ -71,7 +71,7 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: if not set_employee_lock(redis_client, kuerzel_upper, 'api', ctx): ctx.logger.info(f"Calendar Sync API: Sync already active for {kuerzel_upper}, skipping") return ApiResponse( - status_code=409, + status=409, body={ 'status': 'conflict', 'message': f'Calendar sync already active for {kuerzel_upper}', @@ -92,7 +92,7 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: }) return ApiResponse( - status_code=200, + status=200, body={ 'status': 'triggered', 'message': f'Calendar sync triggered for {kuerzel_upper}', @@ -104,7 +104,7 @@ async def handler(request: ApiRequest, ctx: FlowContext) -> ApiResponse: except Exception as e: ctx.logger.error(f"Error in API trigger: {e}") return ApiResponse( - status_code=500, + status=500, body={ 'error': 'Internal server error', 'details': str(e) diff --git a/steps/advoware_cal_sync/calendar_sync_cron_step.py b/steps/advoware_cal_sync/calendar_sync_cron_step.py index 4e2aaeb..6f909a9 100644 --- a/steps/advoware_cal_sync/calendar_sync_cron_step.py +++ b/steps/advoware_cal_sync/calendar_sync_cron_step.py @@ -24,10 +24,13 @@ config = { } -async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None: +async def handler(input_data: None, ctx: FlowContext) -> None: """Cron handler that triggers the calendar sync cascade.""" try: - log_operation('info', "Calendar Sync Cron: Starting to emit sync-all event", context=ctx) + ctx.logger.info("=" * 80) + ctx.logger.info("🕐 CALENDAR SYNC CRON: STARTING") + ctx.logger.info("=" * 80) + ctx.logger.info("Emitting sync-all event") # Enqueue sync-all event await ctx.enqueue({ @@ -37,7 +40,11 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None: } }) - log_operation('info', "Calendar Sync Cron: Emitted sync-all event", context=ctx) + ctx.logger.info("✅ Calendar sync-all event emitted successfully") + ctx.logger.info("=" * 80) except Exception as e: - log_operation('error', f"Fehler beim Cron-Job: {e}", context=ctx) + ctx.logger.error("=" * 80) + ctx.logger.error("❌ ERROR: CALENDAR SYNC CRON") + ctx.logger.error(f"Error: {e}") + ctx.logger.error("=" * 80) diff --git a/steps/vmh/bankverbindungen_sync_event_step.py b/steps/vmh/bankverbindungen_sync_event_step.py index d449e33..a13b145 100644 --- a/steps/vmh/bankverbindungen_sync_event_step.py +++ b/steps/vmh/bankverbindungen_sync_event_step.py @@ -11,24 +11,23 @@ Verarbeitet: """ from typing import Dict, Any, Optional -from motia import FlowContext +from motia import FlowContext, queue from services.advoware import AdvowareAPI from services.espocrm import EspoCRMAPI from services.bankverbindungen_mapper import BankverbindungenMapper from services.notification_utils import NotificationManager +from services.redis_client import get_redis_client import json -import redis -import os config = { "name": "VMH Bankverbindungen Sync Handler", "description": "Zentraler Sync-Handler für Bankverbindungen (Webhooks + Cron Events)", "flows": ["vmh-bankverbindungen"], "triggers": [ - {"type": "queue", "topic": "vmh.bankverbindungen.create"}, - {"type": "queue", "topic": "vmh.bankverbindungen.update"}, - {"type": "queue", "topic": "vmh.bankverbindungen.delete"}, - {"type": "queue", "topic": "vmh.bankverbindungen.sync_check"} + queue("vmh.bankverbindungen.create"), + queue("vmh.bankverbindungen.update"), + queue("vmh.bankverbindungen.delete"), + queue("vmh.bankverbindungen.sync_check") ], "enqueues": [] } @@ -47,20 +46,11 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: ctx.logger.info(f"🔄 Bankverbindungen Sync gestartet: {action.upper()} | Entity: {entity_id} | Source: {source}") - # Shared Redis client - redis_host = os.getenv('REDIS_HOST', 'localhost') - redis_port = int(os.getenv('REDIS_PORT', '6379')) - redis_db = int(os.getenv('REDIS_DB_ADVOWARE_CACHE', '1')) + # Shared Redis client (centralized factory) + redis_client = get_redis_client(strict=False) - redis_client = redis.Redis( - host=redis_host, - port=redis_port, - db=redis_db, - decode_responses=True - ) - - # APIs initialisieren - espocrm = EspoCRMAPI() + # APIs initialisieren (mit Context für besseres Logging) + espocrm = EspoCRMAPI(ctx) advoware = AdvowareAPI(ctx) mapper = BankverbindungenMapper() notification_mgr = NotificationManager(espocrm_api=espocrm, context=ctx) @@ -130,7 +120,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: pass -async def handle_create(entity_id, betnr, espo_entity, espocrm, advoware, mapper, ctx, redis_client, lock_key): +async def handle_create(entity_id, betnr, espo_entity, espocrm, advoware, mapper, ctx, redis_client, lock_key) -> None: """Erstellt neue Bankverbindung in Advoware""" try: ctx.logger.info(f"🔨 CREATE Bankverbindung in Advoware für Beteiligter {betnr}...") @@ -176,7 +166,7 @@ async def handle_create(entity_id, betnr, espo_entity, espocrm, advoware, mapper redis_client.delete(lock_key) -async def handle_update(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key): +async def handle_update(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key) -> None: """Update nicht möglich - Sendet Notification an User""" try: ctx.logger.warn(f"⚠️ UPDATE: Advoware API unterstützt kein PUT für Bankverbindungen") @@ -219,7 +209,7 @@ async def handle_update(entity_id, betnr, advoware_id, espo_entity, espocrm, not redis_client.delete(lock_key) -async def handle_delete(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key): +async def handle_delete(entity_id, betnr, advoware_id, espo_entity, espocrm, notification_mgr, ctx, redis_client, lock_key) -> None: """Delete nicht möglich - Sendet Notification an User""" try: ctx.logger.warn(f"⚠️ DELETE: Advoware API unterstützt kein DELETE für Bankverbindungen") diff --git a/steps/vmh/beteiligte_sync_cron_step.py b/steps/vmh/beteiligte_sync_cron_step.py index 2941789..34178e1 100644 --- a/steps/vmh/beteiligte_sync_cron_step.py +++ b/steps/vmh/beteiligte_sync_cron_step.py @@ -32,7 +32,7 @@ async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None: ctx.logger.info("🕐 Beteiligte Sync Cron gestartet") try: - espocrm = EspoCRMAPI() + espocrm = EspoCRMAPI(ctx) # Berechne Threshold für "veraltete" Syncs (24 Stunden) threshold = datetime.datetime.now() - datetime.timedelta(hours=24) diff --git a/steps/vmh/beteiligte_sync_event_step.py b/steps/vmh/beteiligte_sync_event_step.py index 20cf719..30dbcf4 100644 --- a/steps/vmh/beteiligte_sync_event_step.py +++ b/steps/vmh/beteiligte_sync_event_step.py @@ -11,7 +11,7 @@ Verarbeitet: """ from typing import Dict, Any, Optional -from motia import FlowContext +from motia import FlowContext, queue from services.advoware import AdvowareAPI from services.advoware_service import AdvowareService from services.espocrm import EspoCRMAPI @@ -33,10 +33,10 @@ config = { "description": "Zentraler Sync-Handler für Beteiligte (Webhooks + Cron Events)", "flows": ["vmh-beteiligte"], "triggers": [ - {"type": "queue", "topic": "vmh.beteiligte.create"}, - {"type": "queue", "topic": "vmh.beteiligte.update"}, - {"type": "queue", "topic": "vmh.beteiligte.delete"}, - {"type": "queue", "topic": "vmh.beteiligte.sync_check"} + queue("vmh.beteiligte.create"), + queue("vmh.beteiligte.update"), + queue("vmh.beteiligte.delete"), + queue("vmh.beteiligte.sync_check") ], "enqueues": [] } @@ -174,7 +174,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: ctx.logger.error(traceback.format_exc()) -async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, ctx): +async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, ctx) -> None: """Erstellt neuen Beteiligten in Advoware""" try: ctx.logger.info(f"🔨 CREATE in Advoware...") @@ -233,7 +233,7 @@ async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, m await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True) -async def handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, ctx): +async def handle_update(entity_id, betnr, espo_entity, espocrm, advoware, sync_utils, mapper, ctx) -> None: """Synchronisiert existierenden Beteiligten""" try: ctx.logger.info(f"🔍 Fetch von Advoware betNr={betnr}...") diff --git a/steps/vmh/document_sync_event_step.py b/steps/vmh/document_sync_event_step.py index 6b00573..8a58be8 100644 --- a/steps/vmh/document_sync_event_step.py +++ b/steps/vmh/document_sync_event_step.py @@ -10,7 +10,7 @@ Verarbeitet: """ from typing import Dict, Any -from motia import FlowContext +from motia import FlowContext, queue from services.espocrm import EspoCRMAPI from services.document_sync_utils import DocumentSync from services.xai_service import XAIService @@ -23,9 +23,9 @@ config = { "description": "Zentraler Sync-Handler für Documents mit xAI Collections", "flows": ["vmh-documents"], "triggers": [ - {"type": "queue", "topic": "vmh.document.create"}, - {"type": "queue", "topic": "vmh.document.update"}, - {"type": "queue", "topic": "vmh.document.delete"} + queue("vmh.document.create"), + queue("vmh.document.update"), + queue("vmh.document.delete") ], "enqueues": [] } @@ -127,7 +127,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: ctx.logger.error(traceback.format_exc()) -async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente'): +async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente') -> None: """ Behandelt Create/Update von Documents @@ -316,7 +316,7 @@ async def handle_create_or_update(entity_id: str, document: Dict[str, Any], sync await sync_utils.release_sync_lock(entity_id, success=False, error_message=str(e)) -async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente'): +async def handle_delete(entity_id: str, document: Dict[str, Any], sync_utils: DocumentSync, xai_service: XAIService, ctx: FlowContext[Any], entity_type: str = 'CDokumente') -> None: """ Behandelt Delete von Documents diff --git a/steps/vmh/webhook/bankverbindungen_create_api_step.py b/steps/vmh/webhook/bankverbindungen_create_api_step.py index dc18557..512b281 100644 --- a/steps/vmh/webhook/bankverbindungen_create_api_step.py +++ b/steps/vmh/webhook/bankverbindungen_create_api_step.py @@ -57,7 +57,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: f"{len(entity_ids)} events emitted") return ApiResponse( - status_code=200, + status=200, body={ 'status': 'received', 'action': 'create', @@ -71,6 +71,6 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status_code=500, + status=500, body={'error': 'Internal server error', 'details': str(e)} ) diff --git a/steps/vmh/webhook/bankverbindungen_delete_api_step.py b/steps/vmh/webhook/bankverbindungen_delete_api_step.py index 847c486..1f5f110 100644 --- a/steps/vmh/webhook/bankverbindungen_delete_api_step.py +++ b/steps/vmh/webhook/bankverbindungen_delete_api_step.py @@ -57,7 +57,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: f"{len(entity_ids)} events emitted") return ApiResponse( - status_code=200, + status=200, body={ 'status': 'received', 'action': 'delete', @@ -71,6 +71,6 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status_code=500, + status=500, body={'error': 'Internal server error', 'details': str(e)} ) diff --git a/steps/vmh/webhook/bankverbindungen_update_api_step.py b/steps/vmh/webhook/bankverbindungen_update_api_step.py index 94dbf07..8f5a498 100644 --- a/steps/vmh/webhook/bankverbindungen_update_api_step.py +++ b/steps/vmh/webhook/bankverbindungen_update_api_step.py @@ -57,7 +57,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: f"{len(entity_ids)} events emitted") return ApiResponse( - status_code=200, + status=200, body={ 'status': 'received', 'action': 'update', @@ -71,6 +71,6 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status_code=500, + status=500, body={'error': 'Internal server error', 'details': str(e)} ) diff --git a/steps/vmh/webhook/beteiligte_create_api_step.py b/steps/vmh/webhook/beteiligte_create_api_step.py index b8dfbde..9c40c3e 100644 --- a/steps/vmh/webhook/beteiligte_create_api_step.py +++ b/steps/vmh/webhook/beteiligte_create_api_step.py @@ -60,7 +60,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: f"{len(entity_ids)} events emitted") return ApiResponse( - status_code=200, + status=200, body={ 'status': 'received', 'action': 'create', @@ -74,7 +74,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status_code=500, + status=500, body={ 'error': 'Internal server error', 'details': str(e) diff --git a/steps/vmh/webhook/beteiligte_delete_api_step.py b/steps/vmh/webhook/beteiligte_delete_api_step.py index 763f38c..60ccce4 100644 --- a/steps/vmh/webhook/beteiligte_delete_api_step.py +++ b/steps/vmh/webhook/beteiligte_delete_api_step.py @@ -57,7 +57,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: f"{len(entity_ids)} events emitted") return ApiResponse( - status_code=200, + status=200, body={ 'status': 'received', 'action': 'delete', @@ -71,6 +71,6 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status_code=500, + status=500, body={'error': 'Internal server error', 'details': str(e)} ) diff --git a/steps/vmh/webhook/beteiligte_update_api_step.py b/steps/vmh/webhook/beteiligte_update_api_step.py index b02e0a2..04be6db 100644 --- a/steps/vmh/webhook/beteiligte_update_api_step.py +++ b/steps/vmh/webhook/beteiligte_update_api_step.py @@ -60,7 +60,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: f"{len(entity_ids)} events emitted") return ApiResponse( - status_code=200, + status=200, body={ 'status': 'received', 'action': 'update', @@ -74,7 +74,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.error(f"Error: {e}") ctx.logger.error("=" * 80) return ApiResponse( - status_code=500, + status=500, body={ 'error': 'Internal server error', 'details': str(e) diff --git a/steps/vmh/webhook/document_create_api_step.py b/steps/vmh/webhook/document_create_api_step.py index 5f58f26..58defcf 100644 --- a/steps/vmh/webhook/document_create_api_step.py +++ b/steps/vmh/webhook/document_create_api_step.py @@ -63,7 +63,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: f"{len(entity_ids)} events emitted") return ApiResponse( - status_code=200, + status=200, body={ 'success': True, 'message': f'{len(entity_ids)} document(s) enqueued for sync', @@ -79,7 +79,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.error("=" * 80) return ApiResponse( - status_code=500, + status=500, body={ 'success': False, 'error': str(e) diff --git a/steps/vmh/webhook/document_delete_api_step.py b/steps/vmh/webhook/document_delete_api_step.py index c74899e..22386c9 100644 --- a/steps/vmh/webhook/document_delete_api_step.py +++ b/steps/vmh/webhook/document_delete_api_step.py @@ -63,7 +63,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: f"{len(entity_ids)} events emitted") return ApiResponse( - status_code=200, + status=200, body={ 'success': True, 'message': f'{len(entity_ids)} document(s) enqueued for deletion', @@ -79,7 +79,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.error("=" * 80) return ApiResponse( - status_code=500, + status=500, body={ 'success': False, 'error': str(e) diff --git a/steps/vmh/webhook/document_update_api_step.py b/steps/vmh/webhook/document_update_api_step.py index 88b3c15..dcf8454 100644 --- a/steps/vmh/webhook/document_update_api_step.py +++ b/steps/vmh/webhook/document_update_api_step.py @@ -63,7 +63,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: f"{len(entity_ids)} events emitted") return ApiResponse( - status_code=200, + status=200, body={ 'success': True, 'message': f'{len(entity_ids)} document(s) enqueued for sync', @@ -79,7 +79,7 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ctx.logger.error("=" * 80) return ApiResponse( - status_code=500, + status=500, body={ 'success': False, 'error': str(e)