From f392ec0f061f8834761bd9f578f940572f9b338c Mon Sep 17 00:00:00 2001 From: bsiggel Date: Sun, 8 Mar 2026 21:24:12 +0000 Subject: [PATCH] refactor(typing): update handler signatures to use Dict and Any for improved type hinting --- .../calendar_sync_all_step.py | 4 ++-- .../calendar_sync_cron_step.py | 11 ++-------- .../calendar_sync_event_step.py | 5 +++-- steps/vmh/bankverbindungen_sync_event_step.py | 2 +- steps/vmh/beteiligte_sync_cron_step.py | 2 +- .../bankverbindungen_create_api_step.py | 13 +++++++++--- .../bankverbindungen_delete_api_step.py | 13 +++++++++--- .../bankverbindungen_update_api_step.py | 13 +++++++++--- .../vmh/webhook/beteiligte_delete_api_step.py | 13 +++++++++--- steps/vmh/webhook/document_create_api_step.py | 20 ++++++++++++++----- steps/vmh/webhook/document_delete_api_step.py | 19 ++++++++++++++---- 11 files changed, 79 insertions(+), 36 deletions(-) diff --git a/steps/advoware_cal_sync/calendar_sync_all_step.py b/steps/advoware_cal_sync/calendar_sync_all_step.py index 4c9344c..64cb302 100644 --- a/steps/advoware_cal_sync/calendar_sync_all_step.py +++ b/steps/advoware_cal_sync/calendar_sync_all_step.py @@ -17,7 +17,7 @@ from calendar_sync_utils import ( import math import time from datetime import datetime -from typing import Any +from typing import Any, Dict from motia import queue, FlowContext from pydantic import BaseModel, Field from services.advoware_service import AdvowareService @@ -33,7 +33,7 @@ config = { } -async def handler(input_data: dict, ctx: FlowContext): +async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None: """ Handler that fetches all employees, sorts by last sync time, and emits calendar_sync_employee events for the oldest ones. diff --git a/steps/advoware_cal_sync/calendar_sync_cron_step.py b/steps/advoware_cal_sync/calendar_sync_cron_step.py index c2594ed..4e2aaeb 100644 --- a/steps/advoware_cal_sync/calendar_sync_cron_step.py +++ b/steps/advoware_cal_sync/calendar_sync_cron_step.py @@ -9,6 +9,7 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from calendar_sync_utils import log_operation +from typing import Dict, Any from motia import cron, FlowContext @@ -23,7 +24,7 @@ config = { } -async def handler(input_data: dict, ctx: FlowContext): +async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None: """Cron handler that triggers the calendar sync cascade.""" try: log_operation('info', "Calendar Sync Cron: Starting to emit sync-all event", context=ctx) @@ -37,14 +38,6 @@ async def handler(input_data: dict, ctx: FlowContext): }) log_operation('info', "Calendar Sync Cron: Emitted sync-all event", context=ctx) - return { - 'status': 'completed', - 'triggered_by': 'cron' - } except Exception as e: log_operation('error', f"Fehler beim Cron-Job: {e}", context=ctx) - return { - 'status': 'error', - 'error': str(e) - } diff --git a/steps/advoware_cal_sync/calendar_sync_event_step.py b/steps/advoware_cal_sync/calendar_sync_event_step.py index f93013d..6092468 100644 --- a/steps/advoware_cal_sync/calendar_sync_event_step.py +++ b/steps/advoware_cal_sync/calendar_sync_event_step.py @@ -14,6 +14,7 @@ import asyncio import os import datetime from datetime import timedelta +from typing import Dict, Any import pytz import backoff import time @@ -945,14 +946,14 @@ config = { } -async def handler(input_data: dict, ctx: FlowContext): +async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None: """Main event handler for calendar sync.""" start_time = time.time() kuerzel = input_data.get('kuerzel') if not kuerzel: log_operation('error', "No kuerzel provided in event", context=ctx) - return {'status': 400, 'body': {'error': 'No kuerzel provided'}} + return log_operation('info', f"Starting calendar sync for employee {kuerzel}", context=ctx) diff --git a/steps/vmh/bankverbindungen_sync_event_step.py b/steps/vmh/bankverbindungen_sync_event_step.py index 2679e48..d449e33 100644 --- a/steps/vmh/bankverbindungen_sync_event_step.py +++ b/steps/vmh/bankverbindungen_sync_event_step.py @@ -34,7 +34,7 @@ config = { } -async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]): +async def handler(event_data: Dict[str, Any], ctx: FlowContext[Any]) -> None: """Zentraler Sync-Handler fΓΌr Bankverbindungen""" entity_id = event_data.get('entity_id') diff --git a/steps/vmh/beteiligte_sync_cron_step.py b/steps/vmh/beteiligte_sync_cron_step.py index 86b966e..2941789 100644 --- a/steps/vmh/beteiligte_sync_cron_step.py +++ b/steps/vmh/beteiligte_sync_cron_step.py @@ -25,7 +25,7 @@ config = { } -async def handler(input_data: Dict[str, Any], ctx: FlowContext): +async def handler(input_data: Dict[str, Any], ctx: FlowContext) -> None: """ Cron-Handler: Findet alle Beteiligte die Sync benΓΆtigen und emittiert Events """ diff --git a/steps/vmh/webhook/bankverbindungen_create_api_step.py b/steps/vmh/webhook/bankverbindungen_create_api_step.py index 74e453d..c7b594a 100644 --- a/steps/vmh/webhook/bankverbindungen_create_api_step.py +++ b/steps/vmh/webhook/bankverbindungen_create_api_step.py @@ -23,8 +23,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: try: payload = request.body or [] - ctx.logger.info("VMH Webhook Bankverbindungen Create empfangen") + ctx.logger.info("=" * 80) + ctx.logger.info("πŸ“₯ VMH WEBHOOK: BANKVERBINDUNGEN CREATE") + ctx.logger.info("=" * 80) ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") + ctx.logger.info("=" * 80) # Sammle alle IDs aus dem Batch entity_ids = set() @@ -50,7 +53,8 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info(f"VMH Create Webhook verarbeitet: {len(entity_ids)} Events emittiert") + ctx.logger.info("βœ… VMH Create Webhook verarbeitet: " + f"{len(entity_ids)} Events emittiert") return ApiResponse( status=200, @@ -62,7 +66,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ) except Exception as e: - ctx.logger.error(f"Fehler beim Verarbeiten des VMH Create Webhooks: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ FEHLER: BANKVERBINDUNGEN CREATE WEBHOOK") + ctx.logger.error(f"Error: {e}") + ctx.logger.error("=" * 80) return ApiResponse( status=500, body={'error': 'Internal server error', 'details': str(e)} diff --git a/steps/vmh/webhook/bankverbindungen_delete_api_step.py b/steps/vmh/webhook/bankverbindungen_delete_api_step.py index b02d716..0c06a06 100644 --- a/steps/vmh/webhook/bankverbindungen_delete_api_step.py +++ b/steps/vmh/webhook/bankverbindungen_delete_api_step.py @@ -23,8 +23,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: try: payload = request.body or [] - ctx.logger.info("VMH Webhook Bankverbindungen Delete empfangen") + ctx.logger.info("=" * 80) + ctx.logger.info("πŸ“₯ VMH WEBHOOK: BANKVERBINDUNGEN DELETE") + ctx.logger.info("=" * 80) ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") + ctx.logger.info("=" * 80) # Sammle alle IDs entity_ids = set() @@ -50,7 +53,8 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info(f"VMH Delete Webhook verarbeitet: {len(entity_ids)} Events emittiert") + ctx.logger.info("βœ… VMH Delete Webhook verarbeitet: " + f"{len(entity_ids)} Events emittiert") return ApiResponse( status=200, @@ -62,7 +66,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ) except Exception as e: - ctx.logger.error(f"Fehler beim Verarbeiten des VMH Delete Webhooks: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ FEHLER: BANKVERBINDUNGEN DELETE WEBHOOK") + ctx.logger.error(f"Error: {e}") + ctx.logger.error("=" * 80) return ApiResponse( status=500, body={'error': 'Internal server error', 'details': str(e)} diff --git a/steps/vmh/webhook/bankverbindungen_update_api_step.py b/steps/vmh/webhook/bankverbindungen_update_api_step.py index 1e2544c..11c24ba 100644 --- a/steps/vmh/webhook/bankverbindungen_update_api_step.py +++ b/steps/vmh/webhook/bankverbindungen_update_api_step.py @@ -23,8 +23,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: try: payload = request.body or [] - ctx.logger.info("VMH Webhook Bankverbindungen Update empfangen") + ctx.logger.info("=" * 80) + ctx.logger.info("πŸ“₯ VMH WEBHOOK: BANKVERBINDUNGEN UPDATE") + ctx.logger.info("=" * 80) ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") + ctx.logger.info("=" * 80) # Sammle alle IDs entity_ids = set() @@ -50,7 +53,8 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info(f"VMH Update Webhook verarbeitet: {len(entity_ids)} Events emittiert") + ctx.logger.info("βœ… VMH Update Webhook verarbeitet: " + f"{len(entity_ids)} Events emittiert") return ApiResponse( status=200, @@ -62,7 +66,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ) except Exception as e: - ctx.logger.error(f"Fehler beim Verarbeiten des VMH Update Webhooks: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ FEHLER: BANKVERBINDUNGEN UPDATE WEBHOOK") + ctx.logger.error(f"Error: {e}") + ctx.logger.error("=" * 80) return ApiResponse( status=500, body={'error': 'Internal server error', 'details': str(e)} diff --git a/steps/vmh/webhook/beteiligte_delete_api_step.py b/steps/vmh/webhook/beteiligte_delete_api_step.py index cc277d3..d5fe8ad 100644 --- a/steps/vmh/webhook/beteiligte_delete_api_step.py +++ b/steps/vmh/webhook/beteiligte_delete_api_step.py @@ -23,8 +23,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: try: payload = request.body or [] - ctx.logger.info("VMH Webhook Beteiligte Delete empfangen") + ctx.logger.info("=" * 80) + ctx.logger.info("πŸ“₯ VMH WEBHOOK: BETEILIGTE DELETE") + ctx.logger.info("=" * 80) ctx.logger.info(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") + ctx.logger.info("=" * 80) # Sammle alle IDs aus dem Batch entity_ids = set() @@ -50,7 +53,8 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: } }) - ctx.logger.info(f"VMH Delete Webhook verarbeitet: {len(entity_ids)} Events emittiert") + ctx.logger.info("βœ… VMH Delete Webhook verarbeitet: " + f"{len(entity_ids)} Events emittiert") return ApiResponse( status=200, @@ -62,7 +66,10 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ) except Exception as e: - ctx.logger.error(f"Fehler beim Delete-Webhook: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ FEHLER: BETEILIGTE DELETE WEBHOOK") + ctx.logger.error(f"Error: {e}") + ctx.logger.error("=" * 80) return ApiResponse( status=500, body={'error': 'Internal server error', 'details': str(e)} diff --git a/steps/vmh/webhook/document_create_api_step.py b/steps/vmh/webhook/document_create_api_step.py index fafb71b..bbdac1b 100644 --- a/steps/vmh/webhook/document_create_api_step.py +++ b/steps/vmh/webhook/document_create_api_step.py @@ -25,18 +25,22 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: try: payload = request.body or [] - ctx.logger.info("VMH Webhook Document Create empfangen") + ctx.logger.info("=" * 80) + ctx.logger.info("πŸ“₯ VMH WEBHOOK: DOCUMENT CREATE") + ctx.logger.info("=" * 80) ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") # Sammle alle IDs aus dem Batch entity_ids = set() + entity_type = 'CDokumente' # Default if isinstance(payload, list): for entity in payload: if isinstance(entity, dict) and 'id' in entity: entity_ids.add(entity['id']) - # Extrahiere entityType falls vorhanden - entity_type = entity.get('entityType', 'CDokumente') + # Take entityType from first entity if present + if entity_type == 'CDokumente': + entity_type = entity.get('entityType', 'CDokumente') elif isinstance(payload, dict) and 'id' in payload: entity_ids.add(payload['id']) entity_type = payload.get('entityType', 'CDokumente') @@ -49,12 +53,15 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: 'topic': 'vmh.document.create', 'data': { 'entity_id': entity_id, - 'entity_type': entity_type if 'entity_type' in locals() else 'CDokumente', + 'entity_type': entity_type, 'action': 'create', 'timestamp': payload[0].get('modifiedAt') if isinstance(payload, list) and payload else None } }) + ctx.logger.info("βœ… Document Create Webhook verarbeitet: " + f"{len(entity_ids)} Events emittiert") + return ApiResponse( status=200, body={ @@ -65,8 +72,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ) except Exception as e: - ctx.logger.error(f"Fehler im Document Create Webhook: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ FEHLER: DOCUMENT CREATE WEBHOOK") + ctx.logger.error(f"Error: {e}") ctx.logger.error(f"Payload: {request.body}") + ctx.logger.error("=" * 80) return ApiResponse( status=500, diff --git a/steps/vmh/webhook/document_delete_api_step.py b/steps/vmh/webhook/document_delete_api_step.py index 4a7f59d..5c199e5 100644 --- a/steps/vmh/webhook/document_delete_api_step.py +++ b/steps/vmh/webhook/document_delete_api_step.py @@ -25,17 +25,22 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: try: payload = request.body or [] - ctx.logger.info("VMH Webhook Document Delete empfangen") + ctx.logger.info("=" * 80) + ctx.logger.info("πŸ“₯ VMH WEBHOOK: DOCUMENT DELETE") + ctx.logger.info("=" * 80) ctx.logger.debug(f"Payload: {json.dumps(payload, indent=2, ensure_ascii=False)}") # Sammle alle IDs aus dem Batch entity_ids = set() + entity_type = 'CDokumente' # Default if isinstance(payload, list): for entity in payload: if isinstance(entity, dict) and 'id' in entity: entity_ids.add(entity['id']) - entity_type = entity.get('entityType', 'CDokumente') + # Take entityType from first entity if present + if entity_type == 'CDokumente': + entity_type = entity.get('entityType', 'CDokumente') elif isinstance(payload, dict) and 'id' in payload: entity_ids.add(payload['id']) entity_type = payload.get('entityType', 'CDokumente') @@ -48,12 +53,15 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: 'topic': 'vmh.document.delete', 'data': { 'entity_id': entity_id, - 'entity_type': entity_type if 'entity_type' in locals() else 'CDokumente', + 'entity_type': entity_type, 'action': 'delete', 'timestamp': payload[0].get('deletedAt') if isinstance(payload, list) and payload else None } }) + ctx.logger.info("βœ… Document Delete Webhook verarbeitet: " + f"{len(entity_ids)} Events emittiert") + return ApiResponse( status=200, body={ @@ -64,8 +72,11 @@ async def handler(request: ApiRequest, ctx: FlowContext[Any]) -> ApiResponse: ) except Exception as e: - ctx.logger.error(f"Fehler im Document Delete Webhook: {e}") + ctx.logger.error("=" * 80) + ctx.logger.error("❌ FEHLER: DOCUMENT DELETE WEBHOOK") + ctx.logger.error(f"Error: {e}") ctx.logger.error(f"Payload: {request.body}") + ctx.logger.error("=" * 80) return ApiResponse( status=500,