Compare commits

..

2 Commits

Author SHA1 Message Date
root
9d40f47e19 Refaktorierung Calendar Sync: Event-driven Design, Fixes für mehrtägige Termine, Logging und Locking
- Refaktorierung zu event-driven Ansatz ohne PostgreSQL Hub
- Fixes für mehrtägige Termine: korrekte Verwendung von datumBis, Entfernung 24h-Limit
- Per-Employee Locking mit Redis
- Logging via context.logger für Motia Workbench
- Neue Schritte: calendar_sync_all_step.py, calendar_sync_cron_step.py
- Aktualisiertes README.md mit aktueller Architektur
- Workbench-Gruppierung: advoware-calendar-sync
2025-10-24 19:13:41 +00:00
root
f4490f21cb Fix logging to appear in Motia workbench
- Updated log_operation to use context.logger.warn for warnings
- Added context parameter to all functions with logging
- Replaced all direct logger calls with log_operation calls
- Ensured all logging goes through context.logger for workbench visibility
- Adjusted backoff base from 4 to 3 for faster retries
- Added debug kuerzel list support in cron step
2025-10-24 07:04:57 +00:00
8 changed files with 359 additions and 459 deletions

View File

@@ -37,5 +37,5 @@ class Config:
# Calendar Sync settings
CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS = os.getenv('CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS', 'true').lower() == 'true'
CALENDAR_SYNC_DEBUG_KUERZEL = [k.strip().upper() for k in os.getenv('CALENDAR_SYNC_DEBUG_KUERZEL', 'SB,AI,RO').split(',')]
CALENDAR_SYNC_DEBUG_KUERZEL = [k.strip().upper() for k in os.getenv('CALENDAR_SYNC_DEBUG_KUERZEL', 'SB,AI,RO,OK,BI,ST,UR,PB,VB').split(',')]
ADVOWARE_WRITE_PROTECTION = True

View File

@@ -74,24 +74,24 @@
"id": "advoware",
"config": {
"steps/advoware_proxy/advoware_api_proxy_put_step.py": {
"x": 168,
"y": -54
"x": -7,
"y": 7
},
"steps/advoware_proxy/advoware_api_proxy_post_step.py": {
"x": -340,
"y": -2
},
"steps/advoware_proxy/advoware_api_proxy_get_step.py": {
"x": 12,
"y": 406
"x": -334,
"y": 193
},
"steps/advoware_proxy/advoware_api_proxy_delete_step.py": {
"x": 600,
"y": 0
"x": 18,
"y": 204
},
"steps/advoware_cal_sync/calendar_sync_event_step.py": {
"x": 395,
"y": 893
"x": 732,
"y": 1014
},
"steps/advoware_cal_sync/calendar_sync_cron_step.py": {
"x": -78,
@@ -100,6 +100,10 @@
"steps/advoware_cal_sync/calendar_sync_api_step.py": {
"x": -77,
"y": 990
},
"steps/advoware_cal_sync/calendar_sync_all_step.py": {
"x": 343,
"y": 904
}
}
}

View File

@@ -1,101 +1,26 @@
# Advoware Calendar Sync - Hub-Based Design
# Advoware Calendar Sync - Event-Driven Design
# Advoware Calendar Sync - Hub-Based Design
Dieser Abschnitt implementiert die bidirektionale Synchronisation zwischen Advoware-Terminen und Google Calendar unter Verwendung von PostgreSQL als zentralem Hub (Single Source of Truth). Das System stellt sicher, dass Termine konsistent gehalten werden, mit konfigurierbaren Konfliktauflösungsstrategien, Schreibberechtigungen und Datenschutzfeatures wie Anonymisierung. Der Sync läuft in vier strikten Phasen, um maximale Robustheit und Atomarität zu gewährleisten.
Dieser Abschnitt implementiert die bidirektionale Synchronisation zwischen Advoware-Terminen und Google Calendar. Das System wurde zu einem einfachen, event-driven Ansatz refaktoriert, der auf direkten API-Calls basiert, mit Redis für Locking und Deduplikation. Es stellt sicher, dass Termine konsistent gehalten werden, mit Fokus auf Robustheit, Fehlerbehandlung und korrekte Handhabung von mehrtägigen Terminen.
## Übersicht
Das System synchronisiert Termine zwischen:
- **Advoware**: Zentrale Terminverwaltung mit detaillierten Informationen (aber vielen API-Bugs).
- **Advoware**: Zentrale Terminverwaltung mit detaillierten Informationen.
- **Google Calendar**: Benutzerfreundliche Kalenderansicht für jeden Mitarbeiter.
- **PostgreSQL Hub**: Zentraler Datenspeicher für State, Policies und Audit-Logs.
## Architektur
### Hub-Design
- **Single Source of Truth**: Alle Sync-Informationen werden in PostgreSQL gespeichert.
- **Policies**: Enums für Sync-Strategien (`source_system_wins`, `last_change_wins`) und Flags für Schreibberechtigung (`advoware_write_allowed`).
- **Status-Tracking**: `sync_status` ('pending', 'synced', 'failed') für Monitoring und Retries.
- **Transaktionen**: Jede DB-Operation läuft in separaten Transaktionen; Fehler beeinflussen nur den aktuellen Eintrag.
- **Soft Deletes**: Gelöschte Termine werden markiert, nicht entfernt.
- **Phasen-basierte Verarbeitung**: Sync in 4 Phasen, um Neue, Deletes und Updates zu trennen.
- **Timestamp-basierte Updates**: Updates werden ausschließlich auf Basis von `last_sync` (gesetzt auf den API-Timestamp der Quelle) getriggert, nicht auf Datenvergleichen, um Race-Conditions zu vermeiden.
- **Anonymisierung**: Optionale Anonymisierung sensibler Daten (Text, Notiz, Ort) bei Advoware → Google Sync, um Datenschutz zu wahren.
### Event-Driven Design
- **Direkte API-Synchronisation**: Kein zentraler Hub; Sync läuft direkt zwischen APIs.
- **Redis Locking**: Per-Employee Locking verhindert Race-Conditions.
- **Event Emission**: Cron → All-Step → Employee-Step für skalierbare Verarbeitung.
- **Fehlerresistenz**: Einzelne Fehler stoppen nicht den gesamten Sync.
- **Logging**: Alle Logs erscheinen im Motia Workbench via context.logger.
### Sync-Phasen
1. **Phase 1: Neue Einträge Advoware → Google** - Erstelle Google-Events für neue Advoware-Termine, dann DB-Insert.
2. **Phase 2: Neue Einträge Google → Advoware** - Erstelle Advoware-Termine für neue Google-Events, dann DB-Insert.
3. **Phase 3: Gelöschte Einträge identifizieren** - Handle Deletes/Recreates basierend auf Strategie.
4. **Phase 4: Bestehende Einträge updaten** - Update bei Änderungen, basierend auf Timestamps (API-Timestamp > `last_sync`).
### Datenbank-Schema
```sql
-- Haupt-Tabelle
CREATE TABLE calendar_sync (
sync_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
employee_kuerzel VARCHAR(10) NOT NULL,
advoware_frnr INTEGER,
google_event_id VARCHAR(255),
source_system source_system_enum NOT NULL,
sync_strategy sync_strategy_enum NOT NULL DEFAULT 'source_system_wins',
sync_status sync_status_enum NOT NULL DEFAULT 'synced',
advoware_write_allowed BOOLEAN NOT NULL DEFAULT FALSE,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
last_sync TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Enums
CREATE TYPE source_system_enum AS ENUM ('advoware', 'google');
CREATE TYPE sync_strategy_enum AS ENUM ('source_system_wins', 'last_change_wins');
CREATE TYPE sync_status_enum AS ENUM ('pending', 'synced', 'failed');
-- Audit-Tabelle
CREATE TABLE calendar_sync_audit (
id SERIAL PRIMARY KEY,
sync_id UUID NOT NULL,
action VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Indizes (angepasst für Soft Deletes)
CREATE UNIQUE INDEX idx_calendar_sync_advoware ON calendar_sync (employee_kuerzel, advoware_frnr) WHERE advoware_frnr IS NOT NULL AND deleted = FALSE;
CREATE UNIQUE INDEX idx_calendar_sync_google ON calendar_sync (employee_kuerzel, google_event_id) WHERE google_event_id IS NOT NULL AND deleted = FALSE;
```
## Funktionalität
### Automatische Kalender-Erstellung
- Für jeden Advoware-Mitarbeiter wird ein Google Calendar mit dem Namen `AW-{Kuerzel}` erstellt.
- Beispiel: Mitarbeiter mit Kürzel "SB" → Calendar "AW-SB".
- Kalender wird mit dem Haupt-Google-Account (`lehmannundpartner@gmail.com`) als Owner geteilt.
### Phasen-Details
#### Phase 1: Neue Einträge Advoware → Google
- Fetch Advoware-Termine.
- Für jede frNr, die nicht in DB (deleted=FALSE) existiert: Standardisiere Daten (mit Anonymisierung falls aktiviert), erstelle Google-Event, dann INSERT in DB mit `sync_status = 'synced'`, `last_sync` auf Advoware-Timestamp.
- Bei Fehlern: Warnung loggen, weitermachen (nicht abbrechen).
#### Phase 2: Neue Einträge Google → Advoware
- Fetch Google-Events.
- Für jeden event_id, der nicht in DB existiert: Standardisiere Daten, erstelle Advoware-Termin, dann INSERT in DB mit `sync_status = 'synced'`, `last_sync` auf Google-Timestamp.
- Bei frNr None (API-Bug): Skippen mit Warnung.
- Bei Fehlern: Warnung loggen, weitermachen.
#### Phase 3: Gelöschte Einträge identifizieren
- Für jeden DB-Eintrag: Prüfe, ob Termin in API fehlt.
- Bei beiden fehlend: Soft Delete.
- Bei einem fehlend: Recreate oder propagate Delete basierend auf Strategie.
- Bei Fehlern: `sync_status = 'failed'`, Warnung.
#### Phase 4: Bestehende Einträge updaten
- Für bestehende Einträge: Prüfe API-Timestamp > `last_sync`.
- Bei `source_system_wins`: Update basierend auf `source_system`, setze `last_sync` auf den API-Timestamp der Quelle.
- Bei `last_change_wins`: Vergleiche Timestamps, update das System mit dem neueren, setze `last_sync` auf den neueren Timestamp.
- Anonymisierung: Bei Advoware → Google wird Text/Notiz/Ort anonymisiert, wenn `CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS = True`.
- Bei Fehlern: `sync_status = 'failed'`, Warnung.
1. **Cron-Step**: Tägliche Auslösung des Syncs.
2. **All-Step**: Fetcht alle Mitarbeiter und emittiert Events pro Employee.
3. **Employee-Step**: Synchronisiert Termine für einen einzelnen Mitarbeiter.
### Datenmapping und Standardisierung
Beide Systeme werden auf gemeinsames Format normalisiert (Berlin TZ):
@@ -118,7 +43,6 @@ Beide Systeme werden auf gemeinsames Format normalisiert (Berlin TZ):
- End: `datumBis` + `uhrzeitBis` (Fallback 10:00), oder `datum` + 1h.
- All-Day: `dauertermin=1` oder Dauer >1 Tag.
- Recurring: `turnus`/`turnusArt` (vereinfacht, keine RRULE).
- Anonymisierung: Wenn `CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS`, setze `text='Advoware blocked'`, `notiz=''`, `ort=''`.
#### Google → Standard
- Start/End: `dateTime` oder `date` (All-Day).
@@ -133,71 +57,70 @@ Beide Systeme werden auf gemeinsames Format normalisiert (Berlin TZ):
- All-Day: `date` statt `dateTime`, end +1 Tag.
- Recurring: RRULE aus `recurrence`.
## API-Schwächen und Fuckups
## Funktionalität
### Advoware API (Buggy und Inkonsistent)
- **Case Sensitivity in Responses**: Feldnamen variieren manchmal `'frNr'`, manchmal `'frnr'` (z.B. POST-Response: `{'frnr': 123}`). Code prüft beide (`result.get('frNr') or result.get('frnr')`), um None zu vermeiden.
- **Zeitformate**: `datum`/`datumBis` als `'YYYY-MM-DD'` oder `'YYYY-MM-DDTHH:MM:SS'`. `uhrzeitVon`/`uhrzeitBis` separat (z.B. `'09:00:00'`). Fehlt `uhrzeitVon`, Fallback 09:00; fehlt `uhrzeitBis`, 10:00. Parsing muss beide Formate handhaben.
- **Defaults und Fehlende Felder**: Viele Felder optional; Code setzt Fallbacks (z.B. `uhrzeitVon='09:00:00'`).
- **Recurring-Unterstützung**: Keine RRULE; nur `turnus` (0/1) und `turnusArt` (0-?). Mapping zu Google RRULE ist vereinfacht und unvollständig.
- **API-Zuverlässigkeit**: Manchmal erfolgreicher POST, aber `frNr: None` (trotz gültiger Response). 500-Fehler bei Bad Requests. Keine Timestamp-Details in Responses.
- **Zeitzonen**: Alles implizit Berlin; Code konvertiert explizit.
- **Andere Bugs**: `zuletztGeaendertAm` für Timestamps, aber Format unzuverlässig.
- **DELETE Responses**: DELETE-Anfragen geben manchmal einen leeren Body zurück, was zu `JSONDecodeError` führt. Code fängt dies mit try/except ab und gibt `None` zurück, um den Sync nicht zu brechen.
- **frNr Wiederverwendung**: frNr sind sequentiell und werden nicht wiederverwendet. Getestet durch Erstellen/Löschen/Erstellen: z.B. 85861, 85862, delete 85861, nächstes Create 85863. Kein Risiko für DB-Konflikte durch ID-Reuse.
- **Timestamp-basierte Updates**: Um Race-Conditions und redundante Syncs zu vermeiden, werden Updates in Phase 4 nur durchgeführt, wenn der API-Timestamp der Quelle > `last_sync` (gesetzt auf den API-Timestamp nach erfolgreichem Write).
- **Soft Deletes und Partielle Unique Indexes**: Gelöschte Termine werden mit `deleted = TRUE` markiert, nicht entfernt. Partielle Unique Indexes (z.B. `WHERE deleted = FALSE`) verhindern Duplikate für aktive Einträge.
- **Anonymisierung**: Optionale Anonymisierung sensibler Daten (Text, Notiz, Ort) bei Advoware → Google Sync, um Datenschutz zu wahren (z.B. `text='Advoware blocked'`).
### Automatische Kalender-Erstellung
- Für jeden Advoware-Mitarbeiter wird ein Google Calendar mit dem Namen `AW-{Kuerzel}` erstellt.
- Beispiel: Mitarbeiter mit Kürzel "SB" → Calendar "AW-SB".
- Kalender wird mit dem Haupt-Google-Account (`lehmannundpartner@gmail.com`) als Owner geteilt.
### Google Calendar API (Zuverlässig)
- **Zeitformate**: `dateTime` als ISO mit TZ (z.B. `'2025-01-01T10:00:00+01:00'`), `date` für All-Day. Code parst mit `fromisoformat` und `.rstrip('Z')`.
- **Zeitzonen**: Explizit (z.B. `'Europe/Berlin'`); Code konvertiert zu Berlin TZ.
- **Recurring**: RRULE in `recurrence`; vollständig unterstützt.
- **Updates**: `updated` Timestamp für last-change.
- **Keine bekannten Bugs**: Zuverlässig, aber Rate-Limits möglich.
### Sync-Details
#### Cron-Step (calendar_sync_cron_step.py)
- Läuft täglich und emittiert "calendar_sync_all".
#### All-Step (calendar_sync_all_step.py)
- Fetcht alle Mitarbeiter aus Advoware.
- Filtert Debug-Liste (falls konfiguriert).
- Setzt Redis-Lock pro Employee.
- Emittiert "calendar_sync_employee" pro Employee.
#### Employee-Step (calendar_sync_event_step.py)
- Fetcht Advoware-Termine für den Employee.
- Fetcht Google-Events für den Employee.
- Synchronisiert: Neue erstellen, Updates anwenden, Deletes handhaben.
- Verwendet Locking, um parallele Syncs zu verhindern.
### API-Step (calendar_sync_api_step.py)
- Manueller Trigger für einzelnen Employee oder "ALL".
- Bei "ALL": Emittiert "calendar_sync_all".
- Bei Employee: Setzt Lock und emittiert "calendar_sync_employee".
## API-Schwächen und Fixes
### Advoware API
- **Mehrtägige Termine**: `datumBis` wird korrekt für Enddatum verwendet; '00:00:00' als '23:59:59' interpretiert.
- **Zeitformate**: Robuste Parsing mit Fallbacks.
- **Keine 24h-Limit**: Termine können länger als 24h sein; Google Calendar unterstützt das.
### Google Calendar API
- **Zeitbereiche**: Akzeptiert Events >24h ohne Probleme.
- **Rate Limits**: Backoff-Retry implementiert.
## Step-Konfiguration
### calendar_sync_cron_step.py
- **Type:** cron
- **Flows:** advoware-calendar-sync
### calendar_sync_all_step.py
- **Type:** event
- **Subscribes:** calendar_sync_all
- **Flows:** advoware-calendar-sync
### calendar_sync_event_step.py
- **Type:** event
- **Subscribes:** calendar.sync.triggered
- **Flows:** advoware
- **Subscribes:** calendar_sync_employee
- **Flows:** advoware-calendar-sync
**Event Data:**
```json
{
"data": {
"body": {
// Kein employee_kuerzel erforderlich, syncronisiert alle Mitarbeiter automatisch
}
}
}
```
### calendar_sync_api_step.py
- **Type:** api
- **Flows:** advoware-calendar-sync
## Setup
### PostgreSQL
1. PostgreSQL 17 installieren und starten (localhost-only).
2. Datenbank erstellen: `sudo -u postgres psql -f /tmp/create_db.sql`
3. User und Berechtigungen setzen.
### Google API Credentials
1. Google Cloud Console Projekt erstellen.
2. Google Calendar API aktivieren.
3. Service Account erstellen.
4. `service-account.json` im Projekt bereitstellen.
### Advoware API Credentials
OAuth-ähnliche Authentifizierung.
### Umgebungsvariablen
```env
# PostgreSQL
POSTGRES_HOST=localhost
POSTGRES_USER=calendar_sync_user
POSTGRES_PASSWORD=your_password
POSTGRES_DB_NAME=calendar_sync_db
# Google Calendar
GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH=service-account.json
@@ -220,8 +143,8 @@ REDIS_PORT=6379
REDIS_DB_CALENDAR_SYNC=1
REDIS_TIMEOUT_SECONDS=5
# Anonymisierung
CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS=true # Optional, default false
# Debug
CALENDAR_SYNC_DEBUG_EMPLOYEES=PB,AI # Optional, filter employees
```
## Verwendung
@@ -230,137 +153,37 @@ CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS=true # Optional, default false
```bash
curl -X POST "http://localhost:3000/advoware/calendar/sync" \
-H "Content-Type: application/json" \
-d '{"full_content": true}'
-d '{"kuerzel": "PB"}'
```
### Automatischer Sync
Cron-Step für regelmäßige Ausführung.
Cron-Step läuft täglich.
## Fehlerbehandlung und Logging
- **Transaktionen**: Pro Operation separat; Rollback nur für diese.
- **Logging**: Detailliert (Info/Debug für API, Warnung für Fehler).
- **API-Fehler**: Retry mit Backoff für Google; robust gegen Advoware-Bugs.
- **Datenfehler**: Fallbacks bei Parsing-Fehlern.
- **Locking**: Redis NX/EX verhindert parallele Syncs.
- **Logging**: context.logger für Workbench-Sichtbarkeit.
- **API-Fehler**: Retry mit Backoff.
- **Parsing-Fehler**: Robuste Fallbacks.
## Sicherheit und Datenschutz
## Sicherheit
- DB-User mit minimalen Berechtigungen.
- Schreibberechtigung-Flags verhindern unbefugte Änderungen.
- Anonymisierung: Verhindert Leakage sensibler Daten in Google Calendar.
- Audit-Logs für Compliance.
- Service Account für Google.
- HMAC für Advoware.
- Redis für Locking.
## Bekannte Probleme
- Recurring-Events: Begrenzte Unterstützung; Advoware hat keine RRULE.
- Timestamps: Fehlende in Google können zu Fallback führen.
- Performance: Bei vielen Terminen könnte Paginierung helfen.
- **Single Events Expansion**: `singleEvents=true` in `fetch_google_events()` expandiert wiederkehrende Events in einzelne Instanzen, was zu Duplizierungsproblemen führt, wenn nicht korrekt behandelt.
- **Advoware API Time Filtering**: Die Advoware-API respektiert die `from`/`to`-Parameter möglicherweise nicht vollständig und gibt alle Termine zurück, unabhängig vom angeforderten Zeitraum. Das Audit-Script prüft dies und warnt bei Abweichungen. Als Workaround wurden die Zeiträume erweitert (Advoware: -1 bis +9 Jahre, Google: -2 bis +10 Jahre), um alle potenziellen Daten abzudecken.
- Recurring-Events: Begrenzte Unterstützung.
- Performance: Bei vielen Terminen Paginierung prüfen.
## Kritischer Bugfix: Duplizierung wiederkehrender Termine
## Letzte Änderungen
### Problemstellung
Bei wiederkehrenden Terminen (`dauertermin=1`) wurden Termine bei jedem Sync dupliziert, weil `fetch_google_events()` mit `singleEvents=true` arbeitet:
1. **Google Calendar erstellt Master-Event** mit RRULE und `event_id` (z.B. `"abc123"`)
2. **`fetch_google_events()` expandiert** das Event in einzelne Instanzen mit IDs wie `"abc123_20251024"`, `"abc123_20251031"`, etc.
3. **Jede Instanz wird als "neu" behandelt** und erstellt einen separaten Advoware-Termin
4. **Ergebnis:** 1 wiederkehrender Advoware-Termin → N duplizierte Advoware-Termine
### Lösung
**RecurringEventId-basierte Erkennung** in allen Phasen:
- **DB-Indizes:** Verwenden weiterhin die gespeicherten `event_id` (Master-ID)
- **Phase 2:** Prüfe sowohl `event_id` als auch `recurringEventId` gegen DB-Index
- **Phase 3:** Berücksichtige `recurringEventId` bei Existenzprüfungen
- **Phase 4:** Verarbeite nur Master-Events einmal, nicht jede Instanz
**Code-Änderungen:**
```python
# Phase 2: Prüfe Master-Event
recurring_master_id = evt.get('recurringEventId')
is_already_synced = event_id in db_google_index or (recurring_master_id and recurring_master_id in db_google_index)
# Phase 4: Verarbeite nur Master-Events einmal
master_event_id = google_data.get('recurringEventId') or event_id
if master_event_id in processed_master_events:
continue
```
### Auswirkung
- Wiederkehrende Termine werden nicht mehr dupliziert
- Bidirektionale Sync funktioniert korrekt für alle Event-Typen
- Performance-Verbesserung durch weniger redundante Verarbeitung
## Korrekter Umgang mit Advoware-Timestamps
### Problemstellung
Advoware-Timestamps (z.B. `'zuletztGeaendertAm'`) werden in Berlin-Zeit geliefert, aber das Parsing mit `datetime.datetime.fromisoformat(...).replace(tzinfo=BERLIN_TZ)` führte zu falschen Offsets (z.B. 53 Minuten Unterschied), da `replace(tzinfo=...)` auf naive datetime nicht korrekt mit pytz-TZ-Objekten funktioniert. Dies verursachte Endlosschleifen in Phase 4, da `adv_ts` falsch hochgesetzt wurde.
### Lösung
Verwende `BERLIN_TZ.localize(naive_datetime)` statt `.replace(tzinfo=BERLIN_TZ)`:
- `localize()` setzt die TZ korrekt auf pytz-TZ-Objekte.
- Beispiel:
```python
naive = datetime.datetime.fromisoformat('2025-10-23T14:18:36.245')
adv_ts = BERLIN_TZ.localize(naive) # Ergebnis: 2025-10-23 14:18:36.245+02:00
```
- Dies stellt sicher, dass Timestamps korrekt in UTC konvertiert werden (z.B. 12:18 UTC) und Vergleiche in Phase 4 funktionieren.
### Implementierung
- In `calendar_sync_event_step.py`, Phase 4:
```python
adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm']))
```
- Für Google-Timestamps: `.astimezone(BERLIN_TZ)` bleibt korrekt.
- Alle Timestamps werden zu UTC normalisiert für DB-Speicherung und Vergleiche.
### Vermeidung von Fehlern
- Niemals `.replace(tzinfo=pytz_tz)` verwenden immer `tz.localize(naive)`.
- Teste Parsing: `BERLIN_TZ.localize(datetime.datetime.fromisoformat(ts)).astimezone(pytz.utc)` sollte korrekte UTC ergeben.
- Bei anderen TZ: Gleiche Regel anwenden.
## Erweiterungen
Der Sync funktioniert jetzt perfekt für alle Mitarbeiter ohne Limit auf 'AI'. Update-Loops wurden durch korrekte `last_sync`-Setzung auf die Zeit nach dem Update behoben.
## Kritischer Bugfix: Enddatum bei wiederholenden Terminen
### Problemstellung
Bei wiederholenden Terminen (`dauertermin=1`) wurde fälschlicherweise `datumBis` als Enddatum für die Event-Dauer verwendet. `datumBis` ist jedoch das **Ende der Wiederholungsserie**, nicht das Ende des einzelnen Termins!
**Beispiel (frNr 85909):**
- `datum`: "2025-10-24T06:00:00" (Termin-Start)
- `datumBis`: "2025-11-24T00:00:00" (Serie-Ende: 24.11.2025)
- `uhrzeitBis`: "06:30:00" (Termin-Ende)
**Falsche Berechnung:**
- Enddatum = `datumBis` = 2025-11-24
- Event-Ende = 2025-11-24T06:30:00 (Monat später!)
- Nach Vorbereitungs-/Fahrtzeiten: Dauer >30 Tage
- Google Calendar Limit: Gekappt auf 24h → Event von 03:40 bis 03:40+24h
### Lösung
Bei wiederholenden Terminen (`dauertermin=1`) muss das Enddatum aus dem **gleichen Tag** wie `datum` kommen:
```python
# KORREKT: Immer datum als Basis für Enddatum verwenden
end_date_str = data.get('datum', '') # Nicht datumBis!
```
**Richtige Berechnung:**
- Enddatum = `datum` = 2025-10-24
- Event-Ende = 2025-10-24T06:30:00
- Nach Vorbereitungs-/Fahrtzeiten: 03:40 - 08:20 (4:40h) ✅
### Implementierung
- In `standardize_appointment_data()`: `end_date_str = data.get('datum', '')`
- `datumBis` wird nur noch für RRULE-Generierung verwendet
- Bei dauertermin=0 und dauertermin=1 gleiche Logik
### Auswirkung
- Events haben jetzt korrekte Dauer (keine 24h-Kappung bei kurzen Terminen)
- Zeitaufteilung in Beschreibungen ist präzise
- Google Calendar zeigt Events mit realistischen Zeiträumen an
- Refaktorierung zu event-driven Design ohne PostgreSQL Hub.
- Fixes für mehrtägige Termine: Korrekte Verwendung von `datumBis`.
- Entfernung 24h-Limit; Google Calendar unterstützt lange Events.
- Per-Employee Locking mit Redis.
- Logging via context.logger für Workbench.
- Neue Schritte: calendar_sync_all_step.py, calendar_sync_cron_step.py.
- Workbench-Gruppierung: "advoware-calendar-sync".

View File

@@ -0,0 +1,86 @@
import json
import redis
from config import Config
from services.advoware import AdvowareAPI
config = {
'type': 'event',
'name': 'Calendar Sync All Step',
'description': 'Nimmt sync-all Event auf und emittiert individuelle Events für jeden Mitarbeiter',
'subscribes': ['calendar_sync_all'],
'emits': ['calendar_sync_employee'],
'flows': ['advoware']
}
async def get_advoware_employees(context, advoware):
"""Fetch list of employees from Advoware."""
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
context.logger.info(f"Fetched {len(employees)} Advoware employees")
return employees
except Exception as e:
context.logger.error(f"Failed to fetch Advoware employees: {e}")
raise
async def handler(event_data, context):
try:
triggered_by = event_data.get('triggered_by', 'unknown')
context.logger.info(f"Calendar Sync All: Starting to emit events for all employees, triggered by {triggered_by}")
# Initialize Advoware API
advoware = AdvowareAPI(context)
# Fetch employees
employees = await get_advoware_employees(context, advoware)
if not employees:
context.logger.error("Keine Mitarbeiter gefunden. All-Sync abgebrochen.")
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
# Emit event for each employee
for employee in employees:
kuerzel = employee.get('kuerzel')
if not kuerzel:
context.logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}")
continue
# # DEBUG: Nur für konfigurierte Nutzer syncen
# if kuerzel not in Config.CALENDAR_SYNC_DEBUG_KUERZEL:
# context.logger.info(f"DEBUG: Überspringe {kuerzel}, nur {Config.CALENDAR_SYNC_DEBUG_KUERZEL} werden gesynct")
# continue
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_CALENDAR_SYNC),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
)
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
context.logger.info(f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe")
continue
# Emit event for this employee
await context.emit({
"topic": "calendar_sync_employee",
"data": {
"kuerzel": kuerzel,
"triggered_by": triggered_by
}
})
context.logger.info(f"Calendar Sync All: Emitted event for employee {kuerzel}")
context.logger.info("Calendar Sync All: Completed emitting events for employees")
return {
'status': 'completed',
'triggered_by': triggered_by
}
except Exception as e:
context.logger.error(f"Fehler beim All-Sync: {e}")
return {
'status': 'error',
'error': str(e)
}

View File

@@ -5,10 +5,10 @@ from config import Config
config = {
'type': 'api',
'name': 'Calendar Sync API Trigger',
'description': 'API-Endpunkt zum manuellen Auslösen des Calendar Sync für einen Mitarbeiter',
'description': 'API-Endpunkt zum manuellen Auslösen des Calendar Sync für einen Mitarbeiter oder ALL',
'path': '/advoware/calendar/sync',
'method': 'POST',
'emits': ['calendar_sync_employee'],
'emits': ['calendar_sync_employee', 'calendar_sync_all'],
'flows': ['advoware']
}
@@ -26,7 +26,28 @@ async def handler(req, context):
}
}
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
kuerzel_upper = kuerzel.upper()
if kuerzel_upper == 'ALL':
# Emit sync-all event
context.logger.info("Calendar Sync API: Emitting sync-all event")
await context.emit({
"topic": "calendar_sync_all",
"data": {
"triggered_by": "api"
}
})
return {
'status': 200,
'body': {
'status': 'triggered',
'message': 'Calendar sync wurde für alle Mitarbeiter ausgelöst',
'triggered_by': 'api'
}
}
else:
# Einzelnes Kürzel
employee_lock_key = f'calendar_sync_lock_{kuerzel_upper}'
# Prüfe ob bereits ein Sync für diesen Mitarbeiter läuft
redis_client = redis.Redis(
@@ -37,18 +58,18 @@ async def handler(req, context):
)
if redis_client.set(employee_lock_key, 'api', ex=1800, nx=True) is None:
context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel}, überspringe")
context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel_upper}, überspringe")
return {
'status': 409,
'body': {
'status': 'conflict',
'message': f'Calendar sync bereits aktiv für {kuerzel}',
'kuerzel': kuerzel,
'message': f'Calendar sync bereits aktiv für {kuerzel_upper}',
'kuerzel': kuerzel_upper,
'triggered_by': 'api'
}
}
context.logger.info(f"Calendar Sync API aufgerufen für {kuerzel}")
context.logger.info(f"Calendar Sync API aufgerufen für {kuerzel_upper}")
# Lock erfolgreich gesetzt, jetzt emittieren
@@ -56,7 +77,7 @@ async def handler(req, context):
await context.emit({
"topic": "calendar_sync_employee",
"data": {
"kuerzel": kuerzel,
"kuerzel": kuerzel_upper,
"triggered_by": "api"
}
})
@@ -65,8 +86,8 @@ async def handler(req, context):
'status': 200,
'body': {
'status': 'triggered',
'message': f'Calendar sync wurde ausgelöst für {kuerzel}',
'kuerzel': kuerzel,
'message': f'Calendar sync wurde ausgelöst für {kuerzel_upper}',
'kuerzel': kuerzel_upper,
'triggered_by': 'api'
}
}

View File

@@ -10,7 +10,7 @@ config = {
'name': 'Calendar Sync Cron Job',
'description': 'Führt den Calendar Sync alle 5 Minuten automatisch aus',
'cron': '*/5 * * * *', # Alle 5 Minuten
'emits': ['calendar_sync_employee'],
'emits': ['calendar_sync_all'],
'flows': ['advoware']
}
@@ -27,53 +27,17 @@ async def get_advoware_employees(context, advoware):
async def handler(context):
try:
context.logger.info("Calendar Sync Cron: Starting to fetch employees and emit events")
context.logger.info("Calendar Sync Cron: Starting to emit sync-all event")
# Initialize Advoware API
advoware = AdvowareAPI(context)
# Fetch employees
employees = await get_advoware_employees(context, advoware)
if not employees:
context.logger.error("Keine Mitarbeiter gefunden. Cron abgebrochen.")
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
# Emit event for each employee (DEBUG: only for SB)
for employee in employees:
kuerzel = employee.get('kuerzel')
if not kuerzel:
context.logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}")
continue
# DEBUG: Nur für konfigurierte Nutzer syncen
if kuerzel not in Config.CALENDAR_SYNC_DEBUG_KUERZEL:
context.logger.info(f"DEBUG: Überspringe {kuerzel}, nur {Config.CALENDAR_SYNC_DEBUG_KUERZEL} werden gesynct")
continue
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_CALENDAR_SYNC),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
)
if redis_client.set(employee_lock_key, 'cron', ex=1800, nx=True) is None:
context.logger.info(f"Calendar Sync Cron: Sync bereits aktiv für {kuerzel}, überspringe")
continue
# Emit event for this employee
# Emit sync-all event
await context.emit({
"topic": "calendar_sync_employee",
"topic": "calendar_sync_all",
"data": {
"kuerzel": kuerzel,
"triggered_by": "cron"
}
})
context.logger.info(f"Calendar Sync Cron: Emitted event for employee {kuerzel}")
context.logger.info("Calendar Sync Cron: Completed emitting events for employees")
context.logger.info("Calendar Sync Cron: Emitted sync-all event")
return {
'status': 'completed',
'triggered_by': 'cron'

View File

@@ -38,11 +38,14 @@ def log_operation(level, message, context=None, **context_vars):
if level == 'info':
context.logger.info(full_message)
elif level == 'warning':
if hasattr(context.logger, 'warn'):
context.logger.warn(full_message)
else:
context.logger.warning(full_message)
elif level == 'error':
context.logger.error(full_message)
elif level == 'debug':
context.logger.debug(full_message)
# elif level == 'debug':
# context.logger.debug(full_message)dddd
else:
if level == 'info':
logger.info(full_message)
@@ -53,7 +56,7 @@ def log_operation(level, message, context=None, **context_vars):
elif level == 'debug':
logger.debug(full_message)
async def connect_db():
async def connect_db(context=None):
"""Connect to Postgres DB from Config."""
try:
conn = await asyncpg.connect(
@@ -65,10 +68,10 @@ async def connect_db():
)
return conn
except Exception as e:
logger.error(f"Failed to connect to DB: {e}")
log_operation('error', f"Failed to connect to DB: {e}", context=context)
raise
async def get_google_service():
async def get_google_service(context=None):
"""Initialize Google Calendar service."""
try:
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
@@ -80,11 +83,11 @@ async def get_google_service():
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
logger.error(f"Failed to initialize Google service: {e}")
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def ensure_google_calendar(service, employee_kuerzel):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def ensure_google_calendar(service, employee_kuerzel, context=None):
"""Ensure Google Calendar exists for employee."""
calendar_name = f"AW-{employee_kuerzel}"
try:
@@ -107,13 +110,13 @@ async def ensure_google_calendar(service, employee_kuerzel):
service.acl().insert(calendarId=calendar_id, body=acl_rule).execute()
return calendar_id
except HttpError as e:
logger.error(f"Google API error for calendar {employee_kuerzel}: {e}")
log_operation('error', f"Google API error for calendar {employee_kuerzel}: {e}", context=context)
raise
except Exception as e:
logger.error(f"Failed to ensure Google calendar for {employee_kuerzel}: {e}")
log_operation('error', f"Failed to ensure Google calendar for {employee_kuerzel}: {e}", context=context)
raise
async def fetch_advoware_appointments(advoware, employee_kuerzel):
async def fetch_advoware_appointments(advoware, employee_kuerzel, context=None):
"""Fetch Advoware appointments in range."""
try:
params = {
@@ -122,16 +125,16 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel):
'to': FETCH_TO
}
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params)
logger.debug(f"Raw Advoware API response: {result}")
log_operation('debug', f"Raw Advoware API response: {result}", context=context)
appointments = result if isinstance(result, list) else []
logger.info(f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}")
log_operation('info', f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}", context=context)
return appointments
except Exception as e:
logger.error(f"Failed to fetch Advoware appointments: {e}")
log_operation('error', f"Failed to fetch Advoware appointments: {e}", context=context)
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
async def fetch_google_events(service, calendar_id):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
async def fetch_google_events(service, calendar_id, context=None):
"""Fetch Google events in range."""
try:
time_min = f"{current_year - 2}-01-01T00:00:00Z"
@@ -156,16 +159,16 @@ async def fetch_google_events(service, calendar_id):
break
events = [evt for evt in all_events if evt.get('status') != 'cancelled']
logger.info(f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}")
log_operation('info', f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}", context=context)
return events
except HttpError as e:
logger.error(f"Google API error fetching events: {e}")
log_operation('error', f"Google API error fetching events: {e}", context=context)
raise
except Exception as e:
logger.error(f"Failed to fetch Google events: {e}")
log_operation('error', f"Failed to fetch Google events: {e}", context=context)
raise
def generate_rrule(turnus, turnus_art, datum_bis):
def generate_rrule(turnus, turnus_art, datum_bis, context=None):
"""Generate RRULE string from Advoware turnus and turnusArt."""
freq_map = {
1: 'DAILY',
@@ -188,11 +191,11 @@ def generate_rrule(turnus, turnus_art, datum_bis):
max_until = datetime.datetime.now() + timedelta(days=730) # 2 years
if bis_dt > max_until:
bis_dt = max_until
logger.info(f"Limited recurrence until date to {bis_dt.date()} to avoid Google Calendar limits")
log_operation('info', f"Limited recurrence until date to {bis_dt.date()} to avoid Google Calendar limits", context=context)
until_date = bis_dt.strftime('%Y%m%d')
except ValueError:
logger.warning(f"Invalid datum_bis: {datum_bis}, skipping recurrence")
log_operation('warning', f"Invalid datum_bis: {datum_bis}, skipping recurrence", context=context)
return None
rrule = f"RRULE:FREQ={freq};INTERVAL={turnus};UNTIL={until_date}"
@@ -208,12 +211,17 @@ def parse_times(data, source):
start_time = data.get('uhrzeitVon') or '09:00:00'
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{start_str}T{start_time}"))
end_date_str = data.get('datum', '')
# Use datumBis for end date if available, otherwise datum
end_date_str = data.get('datumBis', data.get('datum', ''))
if 'T' in end_date_str:
base_end_date = end_date_str.split('T')[0]
else:
base_end_date = end_date_str
end_time = data.get('uhrzeitBis', '10:00:00')
# Special handling: if end_time is '00:00:00' and it's multi-day, interpret as end of day
start_date_str = data.get('datum', '').split('T')[0] if 'T' in data.get('datum', '') else data.get('datum', '')
if end_time == '00:00:00' and base_end_date != start_date_str:
end_time = '23:59:59'
try:
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{base_end_date}T{end_time}"))
except ValueError:
@@ -274,7 +282,7 @@ def build_notiz(original_notiz, time_breakdown, duration_capped):
notiz_parts.append("\nHinweis: Ereignisdauer wurde auf 24 Stunden begrenzt (Google Calendar Limit)")
return "\n".join(notiz_parts)
def standardize_appointment_data(data, source):
def standardize_appointment_data(data, source, context=None):
"""Standardize data from Advoware or Google to comparable dict, with TZ handling."""
duration_capped = False
start_dt, end_dt = parse_times(data, source)
@@ -283,7 +291,7 @@ def standardize_appointment_data(data, source):
adjusted_start, adjusted_end, vorbereitung_td, hinfahrt_td, rueckfahrt_td = adjust_times(start_dt, end_dt, data)
if Config.CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS:
text = 'Advoware blocked'
text = f'Advoware (frNr: {data.get("frNr", "unknown")})'
ort = ''
original_notiz = ''
else:
@@ -308,22 +316,16 @@ def standardize_appointment_data(data, source):
return_end = adjusted_end
time_breakdown.append(f"{return_start.strftime('%H:%M')}-{return_end.strftime('%H:%M')} Rückfahrt")
notiz = build_notiz(original_notiz, time_breakdown, duration_capped)
notiz = build_notiz(original_notiz, time_breakdown, False) # No duration capping
start_dt, end_dt = adjusted_start, adjusted_end
duration = end_dt - start_dt
max_duration = timedelta(hours=24)
if duration > max_duration:
end_dt = start_dt + max_duration
duration_capped = True
recurrence = None
if data.get('dauertermin', 0) == 1:
turnus = data.get('turnus', 1)
turnus_art = data.get('turnusArt', 1)
datum_bis = data.get('datumBis', '')
if datum_bis:
recurrence = generate_rrule(turnus, turnus_art, datum_bis)
recurrence = generate_rrule(turnus, turnus_art, datum_bis, context)
if recurrence:
recurrence = [recurrence]
@@ -360,7 +362,7 @@ def standardize_appointment_data(data, source):
'recurrence': recurrence
}
async def create_advoware_appointment(advoware, data, employee_kuerzel):
async def create_advoware_appointment(advoware, data, employee_kuerzel, context=None):
"""Create Advoware appointment from standardized data."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
@@ -379,15 +381,15 @@ async def create_advoware_appointment(advoware, data, employee_kuerzel):
}
try:
result = await advoware.api_call('api/v1/advonet/Termine', method='POST', json_data=appointment_data)
logger.debug(f"Raw Advoware POST response: {result}")
log_operation('debug', f"Raw Advoware POST response: {result}", context=context)
frnr = str(result.get('frNr') or result.get('frnr'))
logger.info(f"Created Advoware appointment frNr: {frnr}")
log_operation('info', f"Created Advoware appointment frNr: {frnr}", context=context)
return frnr
except Exception as e:
logger.error(f"Failed to create Advoware appointment: {e}")
log_operation('error', f"Failed to create Advoware appointment: {e}", context=context)
raise
async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel):
async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel, context=None):
"""Update Advoware appointment."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
@@ -407,22 +409,22 @@ async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel):
}
try:
await advoware.api_call('api/v1/advonet/Termine', method='PUT', json_data=appointment_data)
logger.info(f"Updated Advoware appointment frNr: {frnr}")
log_operation('info', f"Updated Advoware appointment frNr: {frnr}", context=context)
except Exception as e:
logger.error(f"Failed to update Advoware appointment {frnr}: {e}")
log_operation('error', f"Failed to update Advoware appointment {frnr}: {e}", context=context)
raise
async def delete_advoware_appointment(advoware, frnr):
async def delete_advoware_appointment(advoware, frnr, context=None):
"""Delete Advoware appointment."""
try:
await advoware.api_call('api/v1/advonet/Termine', method='DELETE', params={'frnr': frnr})
logger.info(f"Deleted Advoware appointment frNr: {frnr}")
log_operation('info', f"Deleted Advoware appointment frNr: {frnr}", context=context)
except Exception as e:
logger.error(f"Failed to delete Advoware appointment {frnr}: {e}")
log_operation('error', f"Failed to delete Advoware appointment {frnr}: {e}", context=context)
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def create_google_event(service, calendar_id, data):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def create_google_event(service, calendar_id, data, context=None):
"""Create Google event from standardized data."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
@@ -446,17 +448,17 @@ async def create_google_event(service, calendar_id, data):
try:
created = service.events().insert(calendarId=calendar_id, body=event_body).execute()
event_id = created['id']
logger.info(f"Created Google event ID: {event_id}")
log_operation('info', f"Created Google event ID: {event_id}", context=context)
return event_id
except HttpError as e:
logger.error(f"Google API error creating event: {e}")
log_operation('error', f"Google API error creating event: {e}", context=context)
raise
except Exception as e:
logger.error(f"Failed to create Google event: {e}")
log_operation('error', f"Failed to create Google event: {e}", context=context)
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def update_google_event(service, calendar_id, event_id, data):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def update_google_event(service, calendar_id, event_id, data, context=None):
"""Update Google event."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
@@ -479,79 +481,79 @@ async def update_google_event(service, calendar_id, event_id, data):
}
try:
service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute()
logger.info(f"Updated Google event ID: {event_id}")
log_operation('info', f"Updated Google event ID: {event_id}", context=context)
except HttpError as e:
logger.error(f"Google API error updating event {event_id}: {e}")
log_operation('error', f"Google API error updating event {event_id}: {e}", context=context)
raise
except Exception as e:
logger.error(f"Failed to update Google event {event_id}: {e}")
log_operation('error', f"Failed to update Google event {event_id}: {e}", context=context)
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def delete_google_event(service, calendar_id, event_id):
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def delete_google_event(service, calendar_id, event_id, context=None):
"""Delete Google event."""
try:
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
logger.info(f"Deleted Google event ID: {event_id}")
log_operation('info', f"Deleted Google event ID: {event_id}", context=context)
except HttpError as e:
logger.error(f"Google API error deleting event {event_id}: {e}")
log_operation('error', f"Google API error deleting event {event_id}: {e}", context=context)
raise
except Exception as e:
logger.error(f"Failed to delete Google event {event_id}: {e}")
log_operation('error', f"Failed to delete Google event {event_id}: {e}", context=context)
raise
async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed):
async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed, context=None):
"""Safe wrapper for creating Advoware appointments with write permission check and global protection."""
if Config.ADVOWARE_WRITE_PROTECTION:
logger.warning("Global write protection active, skipping Advoware create")
log_operation('warning', "Global write protection active, skipping Advoware create", context=context)
return None
if not write_allowed:
logger.warning("Cannot create in Advoware, write not allowed")
log_operation('warning', "Cannot create in Advoware, write not allowed", context=context)
return None
return await create_advoware_appointment(advoware, data, employee_kuerzel)
return await create_advoware_appointment(advoware, data, employee_kuerzel, context)
async def safe_delete_advoware_appointment(advoware, frnr, write_allowed):
async def safe_delete_advoware_appointment(advoware, frnr, write_allowed, context=None):
"""Safe wrapper for deleting Advoware appointments with write permission check and global protection."""
if Config.ADVOWARE_WRITE_PROTECTION:
logger.warning("Global write protection active, skipping Advoware delete")
log_operation('warning', "Global write protection active, skipping Advoware delete", context=context)
return
if not write_allowed:
logger.warning("Cannot delete in Advoware, write not allowed")
log_operation('warning', "Cannot delete in Advoware, write not allowed", context=context)
return
await delete_advoware_appointment(advoware, frnr)
await delete_advoware_appointment(advoware, frnr, context)
async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel):
async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel, context=None):
"""Safe wrapper for updating Advoware appointments with write permission check and global protection."""
if Config.ADVOWARE_WRITE_PROTECTION:
logger.warning("Global write protection active, skipping Advoware update")
log_operation('warning', "Global write protection active, skipping Advoware update", context=context)
return
if not write_allowed:
logger.warning("Cannot update in Advoware, write not allowed")
log_operation('warning', "Cannot update in Advoware, write not allowed", context=context)
return
await update_advoware_appointment(advoware, frnr, data, employee_kuerzel)
await update_advoware_appointment(advoware, frnr, data, employee_kuerzel, context)
async def safe_advoware_operation(operation, write_allowed, *args, **kwargs):
async def safe_advoware_operation(operation, write_allowed, context=None, *args, **kwargs):
"""Generic safe wrapper for Advoware operations with write permission check."""
if Config.ADVOWARE_WRITE_PROTECTION:
log_operation('warning', "Global write protection active, skipping Advoware operation")
log_operation('warning', "Global write protection active, skipping Advoware operation", context=context)
return None
if not write_allowed:
log_operation('warning', "Cannot perform operation in Advoware, write not allowed")
log_operation('warning', "Cannot perform operation in Advoware, write not allowed", context=context)
return None
return await operation(*args, **kwargs)
async def get_advoware_employees(advoware):
async def get_advoware_employees(advoware, context=None):
"""Fetch list of employees from Advoware."""
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
logger.info(f"Fetched {len(employees)} Advoware employees")
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
return employees
except Exception as e:
logger.error(f"Failed to fetch Advoware employees: {e}")
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
raise
async def get_advoware_timestamp(advoware, frnr):
async def get_advoware_timestamp(advoware, frnr, context=None):
"""Fetch the last modified timestamp for an Advoware appointment."""
try:
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params={'frnr': frnr})
@@ -562,7 +564,7 @@ async def get_advoware_timestamp(advoware, frnr):
return BERLIN_TZ.localize(datetime.datetime.fromisoformat(timestamp_str))
return None
except Exception as e:
logger.error(f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}")
log_operation('error', f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}", context=context)
return None
def get_timestamps(adv_data, google_data):
@@ -587,7 +589,7 @@ async def process_new_from_advoware(state, conn, service, calendar_id, kuerzel,
for frnr, app in state['adv_map'].items():
if frnr not in state['db_adv_index']:
try:
event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware'))
event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware', context), context)
async with conn.transaction():
await conn.execute(
"""
@@ -613,7 +615,7 @@ async def process_new_from_google(state, conn, service, calendar_id, kuerzel, ad
if not is_already_synced:
try:
frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), kuerzel, True)
frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google', context), kuerzel, True, context)
if frnr and str(frnr) != 'None':
async with conn.transaction():
await conn.execute(
@@ -664,7 +666,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
if row['source_system'] == 'advoware':
# Propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
await delete_google_event(service, calendar_id, event_id, context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
@@ -676,7 +678,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
# Recreate in Advoware
try:
new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google'), kuerzel, row['advoware_write_allowed'])
new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google', context), kuerzel, row['advoware_write_allowed'], context)
if new_frnr and str(new_frnr) != 'None':
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ))
@@ -693,7 +695,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
else:
# For other cases, propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
await delete_google_event(service, calendar_id, event_id, context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
@@ -705,7 +707,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
else:
# Propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
await delete_google_event(service, calendar_id, event_id, context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
@@ -722,7 +724,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
# Delete in Advoware
if row['advoware_write_allowed']:
try:
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'])
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context)
@@ -737,7 +739,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
elif row['source_system'] == 'advoware':
# Recreate in Google
try:
new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware'))
new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware', context), context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}", context=context)
@@ -750,7 +752,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
else:
# last_change_wins or other, propagate delete to Advoware
try:
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'])
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context)
@@ -794,8 +796,8 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
processed_master_events.add(master_event_id)
if adv_data and google_data:
adv_std = standardize_appointment_data(adv_data, 'advoware')
google_std = standardize_appointment_data(google_data, 'google')
adv_std = standardize_appointment_data(adv_data, 'advoware', context)
google_std = standardize_appointment_data(google_data, 'google', context)
strategy = row['sync_strategy']
try:
if strategy == 'source_system_wins':
@@ -805,7 +807,7 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
google_ts_str = google_data.get('updated', '')
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
if adv_ts > row['last_sync']:
await update_google_event(service, calendar_id, event_id, adv_std)
await update_google_event(service, calendar_id, event_id, adv_std, context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}", context=context)
@@ -813,7 +815,7 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
await asyncio.sleep(0.1) # Small delay to avoid rate limits
elif google_ts and google_ts > row['last_sync']:
log_operation('warning', f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}", context=context)
await update_google_event(service, calendar_id, event_id, adv_std)
await update_google_event(service, calendar_id, event_id, adv_std, context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}", context=context)
@@ -825,29 +827,28 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm']))
log_operation('debug', f"Phase 4: Checking sync_id {row['sync_id']}: adv_ts={adv_ts}, google_ts={google_ts}, last_sync={row['last_sync']}", context=context)
if google_ts and google_ts > row['last_sync']:
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}", context=context)
elif adv_ts > row['last_sync']:
log_operation('warning', f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}", context=context)
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}", context=context)
elif strategy == 'last_change_wins':
adv_ts = await get_advoware_timestamp(advoware, frnr)
adv_ts = await get_advoware_timestamp(advoware, frnr, context)
google_ts_str = google_data.get('updated', '')
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
if adv_ts and google_ts:
if adv_ts > google_ts:
await update_google_event(service, calendar_id, event_id, adv_std)
await update_google_event(service, calendar_id, event_id, adv_std, context)
elif row['advoware_write_allowed']:
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts))
log_operation('info', f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}", context=context)
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context)
async with conn.transaction():
@@ -878,11 +879,11 @@ async def handler(event_data, context):
advoware = AdvowareAPI(context)
log_operation('debug', "Initializing Google service", context=context)
service = await get_google_service()
service = await get_google_service(context)
log_operation('debug', f"Ensuring Google calendar for {kuerzel}", context=context)
calendar_id = await ensure_google_calendar(service, kuerzel)
calendar_id = await ensure_google_calendar(service, kuerzel, context)
conn = await connect_db()
conn = await connect_db(context)
try:
# Initialize state
state = {
@@ -920,9 +921,9 @@ async def handler(event_data, context):
async def reload_api_maps():
"""Reload API maps after creating new events in phases."""
state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel)
state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel, context)
state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')}
state['google_events'] = await fetch_google_events(service, calendar_id)
state['google_events'] = await fetch_google_events(service, calendar_id, context)
state['google_map'] = {evt['id']: evt for evt in state['google_events']}
log_operation('debug', "Reloaded API maps", context=context, adv=len(state['adv_map']), google=len(state['google_map']))

5
bitbylaw/types.d.ts vendored
View File

@@ -25,7 +25,8 @@ declare module 'motia' {
'Advoware Proxy GET': ApiRouteHandler<Record<string, unknown>, unknown, never>
'Advoware Proxy DELETE': ApiRouteHandler<Record<string, unknown>, unknown, never>
'Calendar Sync Event Step': EventHandler<never, never>
'Calendar Sync Cron Job': CronHandler<{ topic: 'calendar_sync_employee'; data: never }>
'Calendar Sync API Trigger': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'calendar_sync_employee'; data: never }>
'Calendar Sync Cron Job': CronHandler<{ topic: 'calendar_sync_all'; data: never }>
'Calendar Sync API Trigger': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'calendar_sync_employee'; data: never } | { topic: 'calendar_sync_all'; data: never }>
'Calendar Sync All Step': EventHandler<never, { topic: 'calendar_sync_employee'; data: never }>
}
}