Compare commits
2 Commits
72ee01b74b
...
9d40f47e19
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9d40f47e19 | ||
|
|
f4490f21cb |
@@ -37,5 +37,5 @@ class Config:
|
||||
|
||||
# Calendar Sync settings
|
||||
CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS = os.getenv('CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS', 'true').lower() == 'true'
|
||||
CALENDAR_SYNC_DEBUG_KUERZEL = [k.strip().upper() for k in os.getenv('CALENDAR_SYNC_DEBUG_KUERZEL', 'SB,AI,RO').split(',')]
|
||||
CALENDAR_SYNC_DEBUG_KUERZEL = [k.strip().upper() for k in os.getenv('CALENDAR_SYNC_DEBUG_KUERZEL', 'SB,AI,RO,OK,BI,ST,UR,PB,VB').split(',')]
|
||||
ADVOWARE_WRITE_PROTECTION = True
|
||||
@@ -74,24 +74,24 @@
|
||||
"id": "advoware",
|
||||
"config": {
|
||||
"steps/advoware_proxy/advoware_api_proxy_put_step.py": {
|
||||
"x": 168,
|
||||
"y": -54
|
||||
"x": -7,
|
||||
"y": 7
|
||||
},
|
||||
"steps/advoware_proxy/advoware_api_proxy_post_step.py": {
|
||||
"x": -340,
|
||||
"y": -2
|
||||
},
|
||||
"steps/advoware_proxy/advoware_api_proxy_get_step.py": {
|
||||
"x": 12,
|
||||
"y": 406
|
||||
"x": -334,
|
||||
"y": 193
|
||||
},
|
||||
"steps/advoware_proxy/advoware_api_proxy_delete_step.py": {
|
||||
"x": 600,
|
||||
"y": 0
|
||||
"x": 18,
|
||||
"y": 204
|
||||
},
|
||||
"steps/advoware_cal_sync/calendar_sync_event_step.py": {
|
||||
"x": 395,
|
||||
"y": 893
|
||||
"x": 732,
|
||||
"y": 1014
|
||||
},
|
||||
"steps/advoware_cal_sync/calendar_sync_cron_step.py": {
|
||||
"x": -78,
|
||||
@@ -100,6 +100,10 @@
|
||||
"steps/advoware_cal_sync/calendar_sync_api_step.py": {
|
||||
"x": -77,
|
||||
"y": 990
|
||||
},
|
||||
"steps/advoware_cal_sync/calendar_sync_all_step.py": {
|
||||
"x": 343,
|
||||
"y": 904
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,101 +1,26 @@
|
||||
# Advoware Calendar Sync - Hub-Based Design
|
||||
# Advoware Calendar Sync - Event-Driven Design
|
||||
|
||||
# Advoware Calendar Sync - Hub-Based Design
|
||||
|
||||
Dieser Abschnitt implementiert die bidirektionale Synchronisation zwischen Advoware-Terminen und Google Calendar unter Verwendung von PostgreSQL als zentralem Hub (Single Source of Truth). Das System stellt sicher, dass Termine konsistent gehalten werden, mit konfigurierbaren Konfliktauflösungsstrategien, Schreibberechtigungen und Datenschutzfeatures wie Anonymisierung. Der Sync läuft in vier strikten Phasen, um maximale Robustheit und Atomarität zu gewährleisten.
|
||||
Dieser Abschnitt implementiert die bidirektionale Synchronisation zwischen Advoware-Terminen und Google Calendar. Das System wurde zu einem einfachen, event-driven Ansatz refaktoriert, der auf direkten API-Calls basiert, mit Redis für Locking und Deduplikation. Es stellt sicher, dass Termine konsistent gehalten werden, mit Fokus auf Robustheit, Fehlerbehandlung und korrekte Handhabung von mehrtägigen Terminen.
|
||||
|
||||
## Übersicht
|
||||
|
||||
Das System synchronisiert Termine zwischen:
|
||||
- **Advoware**: Zentrale Terminverwaltung mit detaillierten Informationen (aber vielen API-Bugs).
|
||||
- **Advoware**: Zentrale Terminverwaltung mit detaillierten Informationen.
|
||||
- **Google Calendar**: Benutzerfreundliche Kalenderansicht für jeden Mitarbeiter.
|
||||
- **PostgreSQL Hub**: Zentraler Datenspeicher für State, Policies und Audit-Logs.
|
||||
|
||||
## Architektur
|
||||
|
||||
### Hub-Design
|
||||
- **Single Source of Truth**: Alle Sync-Informationen werden in PostgreSQL gespeichert.
|
||||
- **Policies**: Enums für Sync-Strategien (`source_system_wins`, `last_change_wins`) und Flags für Schreibberechtigung (`advoware_write_allowed`).
|
||||
- **Status-Tracking**: `sync_status` ('pending', 'synced', 'failed') für Monitoring und Retries.
|
||||
- **Transaktionen**: Jede DB-Operation läuft in separaten Transaktionen; Fehler beeinflussen nur den aktuellen Eintrag.
|
||||
- **Soft Deletes**: Gelöschte Termine werden markiert, nicht entfernt.
|
||||
- **Phasen-basierte Verarbeitung**: Sync in 4 Phasen, um Neue, Deletes und Updates zu trennen.
|
||||
- **Timestamp-basierte Updates**: Updates werden ausschließlich auf Basis von `last_sync` (gesetzt auf den API-Timestamp der Quelle) getriggert, nicht auf Datenvergleichen, um Race-Conditions zu vermeiden.
|
||||
- **Anonymisierung**: Optionale Anonymisierung sensibler Daten (Text, Notiz, Ort) bei Advoware → Google Sync, um Datenschutz zu wahren.
|
||||
### Event-Driven Design
|
||||
- **Direkte API-Synchronisation**: Kein zentraler Hub; Sync läuft direkt zwischen APIs.
|
||||
- **Redis Locking**: Per-Employee Locking verhindert Race-Conditions.
|
||||
- **Event Emission**: Cron → All-Step → Employee-Step für skalierbare Verarbeitung.
|
||||
- **Fehlerresistenz**: Einzelne Fehler stoppen nicht den gesamten Sync.
|
||||
- **Logging**: Alle Logs erscheinen im Motia Workbench via context.logger.
|
||||
|
||||
### Sync-Phasen
|
||||
1. **Phase 1: Neue Einträge Advoware → Google** - Erstelle Google-Events für neue Advoware-Termine, dann DB-Insert.
|
||||
2. **Phase 2: Neue Einträge Google → Advoware** - Erstelle Advoware-Termine für neue Google-Events, dann DB-Insert.
|
||||
3. **Phase 3: Gelöschte Einträge identifizieren** - Handle Deletes/Recreates basierend auf Strategie.
|
||||
4. **Phase 4: Bestehende Einträge updaten** - Update bei Änderungen, basierend auf Timestamps (API-Timestamp > `last_sync`).
|
||||
|
||||
### Datenbank-Schema
|
||||
```sql
|
||||
-- Haupt-Tabelle
|
||||
CREATE TABLE calendar_sync (
|
||||
sync_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
employee_kuerzel VARCHAR(10) NOT NULL,
|
||||
advoware_frnr INTEGER,
|
||||
google_event_id VARCHAR(255),
|
||||
source_system source_system_enum NOT NULL,
|
||||
sync_strategy sync_strategy_enum NOT NULL DEFAULT 'source_system_wins',
|
||||
sync_status sync_status_enum NOT NULL DEFAULT 'synced',
|
||||
advoware_write_allowed BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
deleted BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
last_sync TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
||||
);
|
||||
|
||||
-- Enums
|
||||
CREATE TYPE source_system_enum AS ENUM ('advoware', 'google');
|
||||
CREATE TYPE sync_strategy_enum AS ENUM ('source_system_wins', 'last_change_wins');
|
||||
CREATE TYPE sync_status_enum AS ENUM ('pending', 'synced', 'failed');
|
||||
|
||||
-- Audit-Tabelle
|
||||
CREATE TABLE calendar_sync_audit (
|
||||
id SERIAL PRIMARY KEY,
|
||||
sync_id UUID NOT NULL,
|
||||
action VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
|
||||
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
||||
);
|
||||
|
||||
-- Indizes (angepasst für Soft Deletes)
|
||||
CREATE UNIQUE INDEX idx_calendar_sync_advoware ON calendar_sync (employee_kuerzel, advoware_frnr) WHERE advoware_frnr IS NOT NULL AND deleted = FALSE;
|
||||
CREATE UNIQUE INDEX idx_calendar_sync_google ON calendar_sync (employee_kuerzel, google_event_id) WHERE google_event_id IS NOT NULL AND deleted = FALSE;
|
||||
```
|
||||
|
||||
## Funktionalität
|
||||
|
||||
### Automatische Kalender-Erstellung
|
||||
- Für jeden Advoware-Mitarbeiter wird ein Google Calendar mit dem Namen `AW-{Kuerzel}` erstellt.
|
||||
- Beispiel: Mitarbeiter mit Kürzel "SB" → Calendar "AW-SB".
|
||||
- Kalender wird mit dem Haupt-Google-Account (`lehmannundpartner@gmail.com`) als Owner geteilt.
|
||||
|
||||
### Phasen-Details
|
||||
|
||||
#### Phase 1: Neue Einträge Advoware → Google
|
||||
- Fetch Advoware-Termine.
|
||||
- Für jede frNr, die nicht in DB (deleted=FALSE) existiert: Standardisiere Daten (mit Anonymisierung falls aktiviert), erstelle Google-Event, dann INSERT in DB mit `sync_status = 'synced'`, `last_sync` auf Advoware-Timestamp.
|
||||
- Bei Fehlern: Warnung loggen, weitermachen (nicht abbrechen).
|
||||
|
||||
#### Phase 2: Neue Einträge Google → Advoware
|
||||
- Fetch Google-Events.
|
||||
- Für jeden event_id, der nicht in DB existiert: Standardisiere Daten, erstelle Advoware-Termin, dann INSERT in DB mit `sync_status = 'synced'`, `last_sync` auf Google-Timestamp.
|
||||
- Bei frNr None (API-Bug): Skippen mit Warnung.
|
||||
- Bei Fehlern: Warnung loggen, weitermachen.
|
||||
|
||||
#### Phase 3: Gelöschte Einträge identifizieren
|
||||
- Für jeden DB-Eintrag: Prüfe, ob Termin in API fehlt.
|
||||
- Bei beiden fehlend: Soft Delete.
|
||||
- Bei einem fehlend: Recreate oder propagate Delete basierend auf Strategie.
|
||||
- Bei Fehlern: `sync_status = 'failed'`, Warnung.
|
||||
|
||||
#### Phase 4: Bestehende Einträge updaten
|
||||
- Für bestehende Einträge: Prüfe API-Timestamp > `last_sync`.
|
||||
- Bei `source_system_wins`: Update basierend auf `source_system`, setze `last_sync` auf den API-Timestamp der Quelle.
|
||||
- Bei `last_change_wins`: Vergleiche Timestamps, update das System mit dem neueren, setze `last_sync` auf den neueren Timestamp.
|
||||
- Anonymisierung: Bei Advoware → Google wird Text/Notiz/Ort anonymisiert, wenn `CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS = True`.
|
||||
- Bei Fehlern: `sync_status = 'failed'`, Warnung.
|
||||
1. **Cron-Step**: Tägliche Auslösung des Syncs.
|
||||
2. **All-Step**: Fetcht alle Mitarbeiter und emittiert Events pro Employee.
|
||||
3. **Employee-Step**: Synchronisiert Termine für einen einzelnen Mitarbeiter.
|
||||
|
||||
### Datenmapping und Standardisierung
|
||||
Beide Systeme werden auf gemeinsames Format normalisiert (Berlin TZ):
|
||||
@@ -118,7 +43,6 @@ Beide Systeme werden auf gemeinsames Format normalisiert (Berlin TZ):
|
||||
- End: `datumBis` + `uhrzeitBis` (Fallback 10:00), oder `datum` + 1h.
|
||||
- All-Day: `dauertermin=1` oder Dauer >1 Tag.
|
||||
- Recurring: `turnus`/`turnusArt` (vereinfacht, keine RRULE).
|
||||
- Anonymisierung: Wenn `CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS`, setze `text='Advoware blocked'`, `notiz=''`, `ort=''`.
|
||||
|
||||
#### Google → Standard
|
||||
- Start/End: `dateTime` oder `date` (All-Day).
|
||||
@@ -133,71 +57,70 @@ Beide Systeme werden auf gemeinsames Format normalisiert (Berlin TZ):
|
||||
- All-Day: `date` statt `dateTime`, end +1 Tag.
|
||||
- Recurring: RRULE aus `recurrence`.
|
||||
|
||||
## API-Schwächen und Fuckups
|
||||
## Funktionalität
|
||||
|
||||
### Advoware API (Buggy und Inkonsistent)
|
||||
- **Case Sensitivity in Responses**: Feldnamen variieren – manchmal `'frNr'`, manchmal `'frnr'` (z.B. POST-Response: `{'frnr': 123}`). Code prüft beide (`result.get('frNr') or result.get('frnr')`), um None zu vermeiden.
|
||||
- **Zeitformate**: `datum`/`datumBis` als `'YYYY-MM-DD'` oder `'YYYY-MM-DDTHH:MM:SS'`. `uhrzeitVon`/`uhrzeitBis` separat (z.B. `'09:00:00'`). Fehlt `uhrzeitVon`, Fallback 09:00; fehlt `uhrzeitBis`, 10:00. Parsing muss beide Formate handhaben.
|
||||
- **Defaults und Fehlende Felder**: Viele Felder optional; Code setzt Fallbacks (z.B. `uhrzeitVon='09:00:00'`).
|
||||
- **Recurring-Unterstützung**: Keine RRULE; nur `turnus` (0/1) und `turnusArt` (0-?). Mapping zu Google RRULE ist vereinfacht und unvollständig.
|
||||
- **API-Zuverlässigkeit**: Manchmal erfolgreicher POST, aber `frNr: None` (trotz gültiger Response). 500-Fehler bei Bad Requests. Keine Timestamp-Details in Responses.
|
||||
- **Zeitzonen**: Alles implizit Berlin; Code konvertiert explizit.
|
||||
- **Andere Bugs**: `zuletztGeaendertAm` für Timestamps, aber Format unzuverlässig.
|
||||
- **DELETE Responses**: DELETE-Anfragen geben manchmal einen leeren Body zurück, was zu `JSONDecodeError` führt. Code fängt dies mit try/except ab und gibt `None` zurück, um den Sync nicht zu brechen.
|
||||
- **frNr Wiederverwendung**: frNr sind sequentiell und werden nicht wiederverwendet. Getestet durch Erstellen/Löschen/Erstellen: z.B. 85861, 85862, delete 85861, nächstes Create 85863. Kein Risiko für DB-Konflikte durch ID-Reuse.
|
||||
- **Timestamp-basierte Updates**: Um Race-Conditions und redundante Syncs zu vermeiden, werden Updates in Phase 4 nur durchgeführt, wenn der API-Timestamp der Quelle > `last_sync` (gesetzt auf den API-Timestamp nach erfolgreichem Write).
|
||||
- **Soft Deletes und Partielle Unique Indexes**: Gelöschte Termine werden mit `deleted = TRUE` markiert, nicht entfernt. Partielle Unique Indexes (z.B. `WHERE deleted = FALSE`) verhindern Duplikate für aktive Einträge.
|
||||
- **Anonymisierung**: Optionale Anonymisierung sensibler Daten (Text, Notiz, Ort) bei Advoware → Google Sync, um Datenschutz zu wahren (z.B. `text='Advoware blocked'`).
|
||||
### Automatische Kalender-Erstellung
|
||||
- Für jeden Advoware-Mitarbeiter wird ein Google Calendar mit dem Namen `AW-{Kuerzel}` erstellt.
|
||||
- Beispiel: Mitarbeiter mit Kürzel "SB" → Calendar "AW-SB".
|
||||
- Kalender wird mit dem Haupt-Google-Account (`lehmannundpartner@gmail.com`) als Owner geteilt.
|
||||
|
||||
### Google Calendar API (Zuverlässig)
|
||||
- **Zeitformate**: `dateTime` als ISO mit TZ (z.B. `'2025-01-01T10:00:00+01:00'`), `date` für All-Day. Code parst mit `fromisoformat` und `.rstrip('Z')`.
|
||||
- **Zeitzonen**: Explizit (z.B. `'Europe/Berlin'`); Code konvertiert zu Berlin TZ.
|
||||
- **Recurring**: RRULE in `recurrence`; vollständig unterstützt.
|
||||
- **Updates**: `updated` Timestamp für last-change.
|
||||
- **Keine bekannten Bugs**: Zuverlässig, aber Rate-Limits möglich.
|
||||
### Sync-Details
|
||||
|
||||
#### Cron-Step (calendar_sync_cron_step.py)
|
||||
- Läuft täglich und emittiert "calendar_sync_all".
|
||||
|
||||
#### All-Step (calendar_sync_all_step.py)
|
||||
- Fetcht alle Mitarbeiter aus Advoware.
|
||||
- Filtert Debug-Liste (falls konfiguriert).
|
||||
- Setzt Redis-Lock pro Employee.
|
||||
- Emittiert "calendar_sync_employee" pro Employee.
|
||||
|
||||
#### Employee-Step (calendar_sync_event_step.py)
|
||||
- Fetcht Advoware-Termine für den Employee.
|
||||
- Fetcht Google-Events für den Employee.
|
||||
- Synchronisiert: Neue erstellen, Updates anwenden, Deletes handhaben.
|
||||
- Verwendet Locking, um parallele Syncs zu verhindern.
|
||||
|
||||
### API-Step (calendar_sync_api_step.py)
|
||||
- Manueller Trigger für einzelnen Employee oder "ALL".
|
||||
- Bei "ALL": Emittiert "calendar_sync_all".
|
||||
- Bei Employee: Setzt Lock und emittiert "calendar_sync_employee".
|
||||
|
||||
## API-Schwächen und Fixes
|
||||
|
||||
### Advoware API
|
||||
- **Mehrtägige Termine**: `datumBis` wird korrekt für Enddatum verwendet; '00:00:00' als '23:59:59' interpretiert.
|
||||
- **Zeitformate**: Robuste Parsing mit Fallbacks.
|
||||
- **Keine 24h-Limit**: Termine können länger als 24h sein; Google Calendar unterstützt das.
|
||||
|
||||
### Google Calendar API
|
||||
- **Zeitbereiche**: Akzeptiert Events >24h ohne Probleme.
|
||||
- **Rate Limits**: Backoff-Retry implementiert.
|
||||
|
||||
## Step-Konfiguration
|
||||
|
||||
### calendar_sync_cron_step.py
|
||||
- **Type:** cron
|
||||
- **Flows:** advoware-calendar-sync
|
||||
|
||||
### calendar_sync_all_step.py
|
||||
- **Type:** event
|
||||
- **Subscribes:** calendar_sync_all
|
||||
- **Flows:** advoware-calendar-sync
|
||||
|
||||
### calendar_sync_event_step.py
|
||||
- **Type:** event
|
||||
- **Subscribes:** calendar.sync.triggered
|
||||
- **Flows:** advoware
|
||||
- **Subscribes:** calendar_sync_employee
|
||||
- **Flows:** advoware-calendar-sync
|
||||
|
||||
**Event Data:**
|
||||
```json
|
||||
{
|
||||
"data": {
|
||||
"body": {
|
||||
// Kein employee_kuerzel erforderlich, syncronisiert alle Mitarbeiter automatisch
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
### calendar_sync_api_step.py
|
||||
- **Type:** api
|
||||
- **Flows:** advoware-calendar-sync
|
||||
|
||||
## Setup
|
||||
|
||||
### PostgreSQL
|
||||
1. PostgreSQL 17 installieren und starten (localhost-only).
|
||||
2. Datenbank erstellen: `sudo -u postgres psql -f /tmp/create_db.sql`
|
||||
3. User und Berechtigungen setzen.
|
||||
|
||||
### Google API Credentials
|
||||
1. Google Cloud Console Projekt erstellen.
|
||||
2. Google Calendar API aktivieren.
|
||||
3. Service Account erstellen.
|
||||
4. `service-account.json` im Projekt bereitstellen.
|
||||
|
||||
### Advoware API Credentials
|
||||
OAuth-ähnliche Authentifizierung.
|
||||
|
||||
### Umgebungsvariablen
|
||||
```env
|
||||
# PostgreSQL
|
||||
POSTGRES_HOST=localhost
|
||||
POSTGRES_USER=calendar_sync_user
|
||||
POSTGRES_PASSWORD=your_password
|
||||
POSTGRES_DB_NAME=calendar_sync_db
|
||||
|
||||
# Google Calendar
|
||||
GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH=service-account.json
|
||||
|
||||
@@ -220,8 +143,8 @@ REDIS_PORT=6379
|
||||
REDIS_DB_CALENDAR_SYNC=1
|
||||
REDIS_TIMEOUT_SECONDS=5
|
||||
|
||||
# Anonymisierung
|
||||
CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS=true # Optional, default false
|
||||
# Debug
|
||||
CALENDAR_SYNC_DEBUG_EMPLOYEES=PB,AI # Optional, filter employees
|
||||
```
|
||||
|
||||
## Verwendung
|
||||
@@ -230,137 +153,37 @@ CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS=true # Optional, default false
|
||||
```bash
|
||||
curl -X POST "http://localhost:3000/advoware/calendar/sync" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"full_content": true}'
|
||||
-d '{"kuerzel": "PB"}'
|
||||
```
|
||||
|
||||
### Automatischer Sync
|
||||
Cron-Step für regelmäßige Ausführung.
|
||||
Cron-Step läuft täglich.
|
||||
|
||||
## Fehlerbehandlung und Logging
|
||||
|
||||
- **Transaktionen**: Pro Operation separat; Rollback nur für diese.
|
||||
- **Logging**: Detailliert (Info/Debug für API, Warnung für Fehler).
|
||||
- **API-Fehler**: Retry mit Backoff für Google; robust gegen Advoware-Bugs.
|
||||
- **Datenfehler**: Fallbacks bei Parsing-Fehlern.
|
||||
- **Locking**: Redis NX/EX verhindert parallele Syncs.
|
||||
- **Logging**: context.logger für Workbench-Sichtbarkeit.
|
||||
- **API-Fehler**: Retry mit Backoff.
|
||||
- **Parsing-Fehler**: Robuste Fallbacks.
|
||||
|
||||
## Sicherheit und Datenschutz
|
||||
## Sicherheit
|
||||
|
||||
- DB-User mit minimalen Berechtigungen.
|
||||
- Schreibberechtigung-Flags verhindern unbefugte Änderungen.
|
||||
- Anonymisierung: Verhindert Leakage sensibler Daten in Google Calendar.
|
||||
- Audit-Logs für Compliance.
|
||||
- Service Account für Google.
|
||||
- HMAC für Advoware.
|
||||
- Redis für Locking.
|
||||
|
||||
## Bekannte Probleme
|
||||
|
||||
- Recurring-Events: Begrenzte Unterstützung; Advoware hat keine RRULE.
|
||||
- Timestamps: Fehlende in Google können zu Fallback führen.
|
||||
- Performance: Bei vielen Terminen könnte Paginierung helfen.
|
||||
- **Single Events Expansion**: `singleEvents=true` in `fetch_google_events()` expandiert wiederkehrende Events in einzelne Instanzen, was zu Duplizierungsproblemen führt, wenn nicht korrekt behandelt.
|
||||
- **Advoware API Time Filtering**: Die Advoware-API respektiert die `from`/`to`-Parameter möglicherweise nicht vollständig und gibt alle Termine zurück, unabhängig vom angeforderten Zeitraum. Das Audit-Script prüft dies und warnt bei Abweichungen. Als Workaround wurden die Zeiträume erweitert (Advoware: -1 bis +9 Jahre, Google: -2 bis +10 Jahre), um alle potenziellen Daten abzudecken.
|
||||
- Recurring-Events: Begrenzte Unterstützung.
|
||||
- Performance: Bei vielen Terminen Paginierung prüfen.
|
||||
|
||||
## Kritischer Bugfix: Duplizierung wiederkehrender Termine
|
||||
## Letzte Änderungen
|
||||
|
||||
### Problemstellung
|
||||
Bei wiederkehrenden Terminen (`dauertermin=1`) wurden Termine bei jedem Sync dupliziert, weil `fetch_google_events()` mit `singleEvents=true` arbeitet:
|
||||
|
||||
1. **Google Calendar erstellt Master-Event** mit RRULE und `event_id` (z.B. `"abc123"`)
|
||||
2. **`fetch_google_events()` expandiert** das Event in einzelne Instanzen mit IDs wie `"abc123_20251024"`, `"abc123_20251031"`, etc.
|
||||
3. **Jede Instanz wird als "neu" behandelt** und erstellt einen separaten Advoware-Termin
|
||||
4. **Ergebnis:** 1 wiederkehrender Advoware-Termin → N duplizierte Advoware-Termine
|
||||
|
||||
### Lösung
|
||||
**RecurringEventId-basierte Erkennung** in allen Phasen:
|
||||
|
||||
- **DB-Indizes:** Verwenden weiterhin die gespeicherten `event_id` (Master-ID)
|
||||
- **Phase 2:** Prüfe sowohl `event_id` als auch `recurringEventId` gegen DB-Index
|
||||
- **Phase 3:** Berücksichtige `recurringEventId` bei Existenzprüfungen
|
||||
- **Phase 4:** Verarbeite nur Master-Events einmal, nicht jede Instanz
|
||||
|
||||
**Code-Änderungen:**
|
||||
```python
|
||||
# Phase 2: Prüfe Master-Event
|
||||
recurring_master_id = evt.get('recurringEventId')
|
||||
is_already_synced = event_id in db_google_index or (recurring_master_id and recurring_master_id in db_google_index)
|
||||
|
||||
# Phase 4: Verarbeite nur Master-Events einmal
|
||||
master_event_id = google_data.get('recurringEventId') or event_id
|
||||
if master_event_id in processed_master_events:
|
||||
continue
|
||||
```
|
||||
|
||||
### Auswirkung
|
||||
- Wiederkehrende Termine werden nicht mehr dupliziert
|
||||
- Bidirektionale Sync funktioniert korrekt für alle Event-Typen
|
||||
- Performance-Verbesserung durch weniger redundante Verarbeitung
|
||||
|
||||
## Korrekter Umgang mit Advoware-Timestamps
|
||||
|
||||
### Problemstellung
|
||||
Advoware-Timestamps (z.B. `'zuletztGeaendertAm'`) werden in Berlin-Zeit geliefert, aber das Parsing mit `datetime.datetime.fromisoformat(...).replace(tzinfo=BERLIN_TZ)` führte zu falschen Offsets (z.B. 53 Minuten Unterschied), da `replace(tzinfo=...)` auf naive datetime nicht korrekt mit pytz-TZ-Objekten funktioniert. Dies verursachte Endlosschleifen in Phase 4, da `adv_ts` falsch hochgesetzt wurde.
|
||||
|
||||
### Lösung
|
||||
Verwende `BERLIN_TZ.localize(naive_datetime)` statt `.replace(tzinfo=BERLIN_TZ)`:
|
||||
- `localize()` setzt die TZ korrekt auf pytz-TZ-Objekte.
|
||||
- Beispiel:
|
||||
```python
|
||||
naive = datetime.datetime.fromisoformat('2025-10-23T14:18:36.245')
|
||||
adv_ts = BERLIN_TZ.localize(naive) # Ergebnis: 2025-10-23 14:18:36.245+02:00
|
||||
```
|
||||
- Dies stellt sicher, dass Timestamps korrekt in UTC konvertiert werden (z.B. 12:18 UTC) und Vergleiche in Phase 4 funktionieren.
|
||||
|
||||
### Implementierung
|
||||
- In `calendar_sync_event_step.py`, Phase 4:
|
||||
```python
|
||||
adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm']))
|
||||
```
|
||||
- Für Google-Timestamps: `.astimezone(BERLIN_TZ)` bleibt korrekt.
|
||||
- Alle Timestamps werden zu UTC normalisiert für DB-Speicherung und Vergleiche.
|
||||
|
||||
### Vermeidung von Fehlern
|
||||
- Niemals `.replace(tzinfo=pytz_tz)` verwenden – immer `tz.localize(naive)`.
|
||||
- Teste Parsing: `BERLIN_TZ.localize(datetime.datetime.fromisoformat(ts)).astimezone(pytz.utc)` sollte korrekte UTC ergeben.
|
||||
- Bei anderen TZ: Gleiche Regel anwenden.
|
||||
|
||||
## Erweiterungen
|
||||
|
||||
Der Sync funktioniert jetzt perfekt für alle Mitarbeiter ohne Limit auf 'AI'. Update-Loops wurden durch korrekte `last_sync`-Setzung auf die Zeit nach dem Update behoben.
|
||||
|
||||
## Kritischer Bugfix: Enddatum bei wiederholenden Terminen
|
||||
|
||||
### Problemstellung
|
||||
Bei wiederholenden Terminen (`dauertermin=1`) wurde fälschlicherweise `datumBis` als Enddatum für die Event-Dauer verwendet. `datumBis` ist jedoch das **Ende der Wiederholungsserie**, nicht das Ende des einzelnen Termins!
|
||||
|
||||
**Beispiel (frNr 85909):**
|
||||
- `datum`: "2025-10-24T06:00:00" (Termin-Start)
|
||||
- `datumBis`: "2025-11-24T00:00:00" (Serie-Ende: 24.11.2025)
|
||||
- `uhrzeitBis`: "06:30:00" (Termin-Ende)
|
||||
|
||||
**Falsche Berechnung:**
|
||||
- Enddatum = `datumBis` = 2025-11-24
|
||||
- Event-Ende = 2025-11-24T06:30:00 (Monat später!)
|
||||
- Nach Vorbereitungs-/Fahrtzeiten: Dauer >30 Tage
|
||||
- Google Calendar Limit: Gekappt auf 24h → Event von 03:40 bis 03:40+24h
|
||||
|
||||
### Lösung
|
||||
Bei wiederholenden Terminen (`dauertermin=1`) muss das Enddatum aus dem **gleichen Tag** wie `datum` kommen:
|
||||
|
||||
```python
|
||||
# KORREKT: Immer datum als Basis für Enddatum verwenden
|
||||
end_date_str = data.get('datum', '') # Nicht datumBis!
|
||||
```
|
||||
|
||||
**Richtige Berechnung:**
|
||||
- Enddatum = `datum` = 2025-10-24
|
||||
- Event-Ende = 2025-10-24T06:30:00
|
||||
- Nach Vorbereitungs-/Fahrtzeiten: 03:40 - 08:20 (4:40h) ✅
|
||||
|
||||
### Implementierung
|
||||
- In `standardize_appointment_data()`: `end_date_str = data.get('datum', '')`
|
||||
- `datumBis` wird nur noch für RRULE-Generierung verwendet
|
||||
- Bei dauertermin=0 und dauertermin=1 gleiche Logik
|
||||
|
||||
### Auswirkung
|
||||
- Events haben jetzt korrekte Dauer (keine 24h-Kappung bei kurzen Terminen)
|
||||
- Zeitaufteilung in Beschreibungen ist präzise
|
||||
- Google Calendar zeigt Events mit realistischen Zeiträumen an
|
||||
- Refaktorierung zu event-driven Design ohne PostgreSQL Hub.
|
||||
- Fixes für mehrtägige Termine: Korrekte Verwendung von `datumBis`.
|
||||
- Entfernung 24h-Limit; Google Calendar unterstützt lange Events.
|
||||
- Per-Employee Locking mit Redis.
|
||||
- Logging via context.logger für Workbench.
|
||||
- Neue Schritte: calendar_sync_all_step.py, calendar_sync_cron_step.py.
|
||||
- Workbench-Gruppierung: "advoware-calendar-sync".
|
||||
|
||||
|
||||
86
bitbylaw/steps/advoware_cal_sync/calendar_sync_all_step.py
Normal file
86
bitbylaw/steps/advoware_cal_sync/calendar_sync_all_step.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import json
|
||||
import redis
|
||||
from config import Config
|
||||
from services.advoware import AdvowareAPI
|
||||
|
||||
config = {
|
||||
'type': 'event',
|
||||
'name': 'Calendar Sync All Step',
|
||||
'description': 'Nimmt sync-all Event auf und emittiert individuelle Events für jeden Mitarbeiter',
|
||||
'subscribes': ['calendar_sync_all'],
|
||||
'emits': ['calendar_sync_employee'],
|
||||
'flows': ['advoware']
|
||||
}
|
||||
|
||||
async def get_advoware_employees(context, advoware):
|
||||
"""Fetch list of employees from Advoware."""
|
||||
try:
|
||||
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
|
||||
employees = result if isinstance(result, list) else []
|
||||
context.logger.info(f"Fetched {len(employees)} Advoware employees")
|
||||
return employees
|
||||
except Exception as e:
|
||||
context.logger.error(f"Failed to fetch Advoware employees: {e}")
|
||||
raise
|
||||
|
||||
async def handler(event_data, context):
|
||||
try:
|
||||
triggered_by = event_data.get('triggered_by', 'unknown')
|
||||
context.logger.info(f"Calendar Sync All: Starting to emit events for all employees, triggered by {triggered_by}")
|
||||
|
||||
# Initialize Advoware API
|
||||
advoware = AdvowareAPI(context)
|
||||
|
||||
# Fetch employees
|
||||
employees = await get_advoware_employees(context, advoware)
|
||||
if not employees:
|
||||
context.logger.error("Keine Mitarbeiter gefunden. All-Sync abgebrochen.")
|
||||
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
|
||||
|
||||
# Emit event for each employee
|
||||
for employee in employees:
|
||||
kuerzel = employee.get('kuerzel')
|
||||
if not kuerzel:
|
||||
context.logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}")
|
||||
continue
|
||||
|
||||
# # DEBUG: Nur für konfigurierte Nutzer syncen
|
||||
# if kuerzel not in Config.CALENDAR_SYNC_DEBUG_KUERZEL:
|
||||
# context.logger.info(f"DEBUG: Überspringe {kuerzel}, nur {Config.CALENDAR_SYNC_DEBUG_KUERZEL} werden gesynct")
|
||||
# continue
|
||||
|
||||
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||
|
||||
redis_client = redis.Redis(
|
||||
host=Config.REDIS_HOST,
|
||||
port=int(Config.REDIS_PORT),
|
||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||
)
|
||||
|
||||
if redis_client.set(employee_lock_key, triggered_by, ex=1800, nx=True) is None:
|
||||
context.logger.info(f"Calendar Sync All: Sync bereits aktiv für {kuerzel}, überspringe")
|
||||
continue
|
||||
|
||||
# Emit event for this employee
|
||||
await context.emit({
|
||||
"topic": "calendar_sync_employee",
|
||||
"data": {
|
||||
"kuerzel": kuerzel,
|
||||
"triggered_by": triggered_by
|
||||
}
|
||||
})
|
||||
context.logger.info(f"Calendar Sync All: Emitted event for employee {kuerzel}")
|
||||
|
||||
context.logger.info("Calendar Sync All: Completed emitting events for employees")
|
||||
return {
|
||||
'status': 'completed',
|
||||
'triggered_by': triggered_by
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
context.logger.error(f"Fehler beim All-Sync: {e}")
|
||||
return {
|
||||
'status': 'error',
|
||||
'error': str(e)
|
||||
}
|
||||
@@ -5,10 +5,10 @@ from config import Config
|
||||
config = {
|
||||
'type': 'api',
|
||||
'name': 'Calendar Sync API Trigger',
|
||||
'description': 'API-Endpunkt zum manuellen Auslösen des Calendar Sync für einen Mitarbeiter',
|
||||
'description': 'API-Endpunkt zum manuellen Auslösen des Calendar Sync für einen Mitarbeiter oder ALL',
|
||||
'path': '/advoware/calendar/sync',
|
||||
'method': 'POST',
|
||||
'emits': ['calendar_sync_employee'],
|
||||
'emits': ['calendar_sync_employee', 'calendar_sync_all'],
|
||||
'flows': ['advoware']
|
||||
}
|
||||
|
||||
@@ -26,50 +26,71 @@ async def handler(req, context):
|
||||
}
|
||||
}
|
||||
|
||||
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||
kuerzel_upper = kuerzel.upper()
|
||||
|
||||
# Prüfe ob bereits ein Sync für diesen Mitarbeiter läuft
|
||||
redis_client = redis.Redis(
|
||||
host=Config.REDIS_HOST,
|
||||
port=int(Config.REDIS_PORT),
|
||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||
)
|
||||
|
||||
if redis_client.set(employee_lock_key, 'api', ex=1800, nx=True) is None:
|
||||
context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel}, überspringe")
|
||||
if kuerzel_upper == 'ALL':
|
||||
# Emit sync-all event
|
||||
context.logger.info("Calendar Sync API: Emitting sync-all event")
|
||||
await context.emit({
|
||||
"topic": "calendar_sync_all",
|
||||
"data": {
|
||||
"triggered_by": "api"
|
||||
}
|
||||
})
|
||||
return {
|
||||
'status': 409,
|
||||
'status': 200,
|
||||
'body': {
|
||||
'status': 'conflict',
|
||||
'message': f'Calendar sync bereits aktiv für {kuerzel}',
|
||||
'kuerzel': kuerzel,
|
||||
'status': 'triggered',
|
||||
'message': 'Calendar sync wurde für alle Mitarbeiter ausgelöst',
|
||||
'triggered_by': 'api'
|
||||
}
|
||||
}
|
||||
else:
|
||||
# Einzelnes Kürzel
|
||||
employee_lock_key = f'calendar_sync_lock_{kuerzel_upper}'
|
||||
|
||||
context.logger.info(f"Calendar Sync API aufgerufen für {kuerzel}")
|
||||
|
||||
# Lock erfolgreich gesetzt, jetzt emittieren
|
||||
# Prüfe ob bereits ein Sync für diesen Mitarbeiter läuft
|
||||
redis_client = redis.Redis(
|
||||
host=Config.REDIS_HOST,
|
||||
port=int(Config.REDIS_PORT),
|
||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||
)
|
||||
|
||||
if redis_client.set(employee_lock_key, 'api', ex=1800, nx=True) is None:
|
||||
context.logger.info(f"Calendar Sync API: Sync bereits aktiv für {kuerzel_upper}, überspringe")
|
||||
return {
|
||||
'status': 409,
|
||||
'body': {
|
||||
'status': 'conflict',
|
||||
'message': f'Calendar sync bereits aktiv für {kuerzel_upper}',
|
||||
'kuerzel': kuerzel_upper,
|
||||
'triggered_by': 'api'
|
||||
}
|
||||
}
|
||||
|
||||
# Emit Event für den Sync
|
||||
await context.emit({
|
||||
"topic": "calendar_sync_employee",
|
||||
"data": {
|
||||
"kuerzel": kuerzel,
|
||||
"triggered_by": "api"
|
||||
context.logger.info(f"Calendar Sync API aufgerufen für {kuerzel_upper}")
|
||||
|
||||
# Lock erfolgreich gesetzt, jetzt emittieren
|
||||
|
||||
# Emit Event für den Sync
|
||||
await context.emit({
|
||||
"topic": "calendar_sync_employee",
|
||||
"data": {
|
||||
"kuerzel": kuerzel_upper,
|
||||
"triggered_by": "api"
|
||||
}
|
||||
})
|
||||
|
||||
return {
|
||||
'status': 200,
|
||||
'body': {
|
||||
'status': 'triggered',
|
||||
'message': f'Calendar sync wurde ausgelöst für {kuerzel_upper}',
|
||||
'kuerzel': kuerzel_upper,
|
||||
'triggered_by': 'api'
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return {
|
||||
'status': 200,
|
||||
'body': {
|
||||
'status': 'triggered',
|
||||
'message': f'Calendar sync wurde ausgelöst für {kuerzel}',
|
||||
'kuerzel': kuerzel,
|
||||
'triggered_by': 'api'
|
||||
}
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
context.logger.error(f"Fehler beim API-Trigger: {e}")
|
||||
|
||||
@@ -10,7 +10,7 @@ config = {
|
||||
'name': 'Calendar Sync Cron Job',
|
||||
'description': 'Führt den Calendar Sync alle 5 Minuten automatisch aus',
|
||||
'cron': '*/5 * * * *', # Alle 5 Minuten
|
||||
'emits': ['calendar_sync_employee'],
|
||||
'emits': ['calendar_sync_all'],
|
||||
'flows': ['advoware']
|
||||
}
|
||||
|
||||
@@ -27,53 +27,17 @@ async def get_advoware_employees(context, advoware):
|
||||
|
||||
async def handler(context):
|
||||
try:
|
||||
context.logger.info("Calendar Sync Cron: Starting to fetch employees and emit events")
|
||||
context.logger.info("Calendar Sync Cron: Starting to emit sync-all event")
|
||||
|
||||
# Initialize Advoware API
|
||||
advoware = AdvowareAPI(context)
|
||||
# Emit sync-all event
|
||||
await context.emit({
|
||||
"topic": "calendar_sync_all",
|
||||
"data": {
|
||||
"triggered_by": "cron"
|
||||
}
|
||||
})
|
||||
|
||||
# Fetch employees
|
||||
employees = await get_advoware_employees(context, advoware)
|
||||
if not employees:
|
||||
context.logger.error("Keine Mitarbeiter gefunden. Cron abgebrochen.")
|
||||
return {'status': 500, 'body': {'error': 'Keine Mitarbeiter gefunden'}}
|
||||
|
||||
# Emit event for each employee (DEBUG: only for SB)
|
||||
for employee in employees:
|
||||
kuerzel = employee.get('kuerzel')
|
||||
if not kuerzel:
|
||||
context.logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}")
|
||||
continue
|
||||
|
||||
# DEBUG: Nur für konfigurierte Nutzer syncen
|
||||
if kuerzel not in Config.CALENDAR_SYNC_DEBUG_KUERZEL:
|
||||
context.logger.info(f"DEBUG: Überspringe {kuerzel}, nur {Config.CALENDAR_SYNC_DEBUG_KUERZEL} werden gesynct")
|
||||
continue
|
||||
|
||||
employee_lock_key = f'calendar_sync_lock_{kuerzel}'
|
||||
|
||||
redis_client = redis.Redis(
|
||||
host=Config.REDIS_HOST,
|
||||
port=int(Config.REDIS_PORT),
|
||||
db=int(Config.REDIS_DB_CALENDAR_SYNC),
|
||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||
)
|
||||
|
||||
if redis_client.set(employee_lock_key, 'cron', ex=1800, nx=True) is None:
|
||||
context.logger.info(f"Calendar Sync Cron: Sync bereits aktiv für {kuerzel}, überspringe")
|
||||
continue
|
||||
|
||||
# Emit event for this employee
|
||||
await context.emit({
|
||||
"topic": "calendar_sync_employee",
|
||||
"data": {
|
||||
"kuerzel": kuerzel,
|
||||
"triggered_by": "cron"
|
||||
}
|
||||
})
|
||||
context.logger.info(f"Calendar Sync Cron: Emitted event for employee {kuerzel}")
|
||||
|
||||
context.logger.info("Calendar Sync Cron: Completed emitting events for employees")
|
||||
context.logger.info("Calendar Sync Cron: Emitted sync-all event")
|
||||
return {
|
||||
'status': 'completed',
|
||||
'triggered_by': 'cron'
|
||||
|
||||
@@ -38,11 +38,14 @@ def log_operation(level, message, context=None, **context_vars):
|
||||
if level == 'info':
|
||||
context.logger.info(full_message)
|
||||
elif level == 'warning':
|
||||
context.logger.warning(full_message)
|
||||
if hasattr(context.logger, 'warn'):
|
||||
context.logger.warn(full_message)
|
||||
else:
|
||||
context.logger.warning(full_message)
|
||||
elif level == 'error':
|
||||
context.logger.error(full_message)
|
||||
elif level == 'debug':
|
||||
context.logger.debug(full_message)
|
||||
# elif level == 'debug':
|
||||
# context.logger.debug(full_message)dddd
|
||||
else:
|
||||
if level == 'info':
|
||||
logger.info(full_message)
|
||||
@@ -53,7 +56,7 @@ def log_operation(level, message, context=None, **context_vars):
|
||||
elif level == 'debug':
|
||||
logger.debug(full_message)
|
||||
|
||||
async def connect_db():
|
||||
async def connect_db(context=None):
|
||||
"""Connect to Postgres DB from Config."""
|
||||
try:
|
||||
conn = await asyncpg.connect(
|
||||
@@ -65,10 +68,10 @@ async def connect_db():
|
||||
)
|
||||
return conn
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to DB: {e}")
|
||||
log_operation('error', f"Failed to connect to DB: {e}", context=context)
|
||||
raise
|
||||
|
||||
async def get_google_service():
|
||||
async def get_google_service(context=None):
|
||||
"""Initialize Google Calendar service."""
|
||||
try:
|
||||
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
|
||||
@@ -80,11 +83,11 @@ async def get_google_service():
|
||||
service = build('calendar', 'v3', credentials=creds)
|
||||
return service
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize Google service: {e}")
|
||||
log_operation('error', f"Failed to initialize Google service: {e}", context=context)
|
||||
raise
|
||||
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def ensure_google_calendar(service, employee_kuerzel):
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def ensure_google_calendar(service, employee_kuerzel, context=None):
|
||||
"""Ensure Google Calendar exists for employee."""
|
||||
calendar_name = f"AW-{employee_kuerzel}"
|
||||
try:
|
||||
@@ -107,13 +110,13 @@ async def ensure_google_calendar(service, employee_kuerzel):
|
||||
service.acl().insert(calendarId=calendar_id, body=acl_rule).execute()
|
||||
return calendar_id
|
||||
except HttpError as e:
|
||||
logger.error(f"Google API error for calendar {employee_kuerzel}: {e}")
|
||||
log_operation('error', f"Google API error for calendar {employee_kuerzel}: {e}", context=context)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to ensure Google calendar for {employee_kuerzel}: {e}")
|
||||
log_operation('error', f"Failed to ensure Google calendar for {employee_kuerzel}: {e}", context=context)
|
||||
raise
|
||||
|
||||
async def fetch_advoware_appointments(advoware, employee_kuerzel):
|
||||
async def fetch_advoware_appointments(advoware, employee_kuerzel, context=None):
|
||||
"""Fetch Advoware appointments in range."""
|
||||
try:
|
||||
params = {
|
||||
@@ -122,16 +125,16 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel):
|
||||
'to': FETCH_TO
|
||||
}
|
||||
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params)
|
||||
logger.debug(f"Raw Advoware API response: {result}")
|
||||
log_operation('debug', f"Raw Advoware API response: {result}", context=context)
|
||||
appointments = result if isinstance(result, list) else []
|
||||
logger.info(f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}")
|
||||
log_operation('info', f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}", context=context)
|
||||
return appointments
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch Advoware appointments: {e}")
|
||||
log_operation('error', f"Failed to fetch Advoware appointments: {e}", context=context)
|
||||
raise
|
||||
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
|
||||
async def fetch_google_events(service, calendar_id):
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
|
||||
async def fetch_google_events(service, calendar_id, context=None):
|
||||
"""Fetch Google events in range."""
|
||||
try:
|
||||
time_min = f"{current_year - 2}-01-01T00:00:00Z"
|
||||
@@ -156,16 +159,16 @@ async def fetch_google_events(service, calendar_id):
|
||||
break
|
||||
|
||||
events = [evt for evt in all_events if evt.get('status') != 'cancelled']
|
||||
logger.info(f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}")
|
||||
log_operation('info', f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}", context=context)
|
||||
return events
|
||||
except HttpError as e:
|
||||
logger.error(f"Google API error fetching events: {e}")
|
||||
log_operation('error', f"Google API error fetching events: {e}", context=context)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch Google events: {e}")
|
||||
log_operation('error', f"Failed to fetch Google events: {e}", context=context)
|
||||
raise
|
||||
|
||||
def generate_rrule(turnus, turnus_art, datum_bis):
|
||||
def generate_rrule(turnus, turnus_art, datum_bis, context=None):
|
||||
"""Generate RRULE string from Advoware turnus and turnusArt."""
|
||||
freq_map = {
|
||||
1: 'DAILY',
|
||||
@@ -188,11 +191,11 @@ def generate_rrule(turnus, turnus_art, datum_bis):
|
||||
max_until = datetime.datetime.now() + timedelta(days=730) # 2 years
|
||||
if bis_dt > max_until:
|
||||
bis_dt = max_until
|
||||
logger.info(f"Limited recurrence until date to {bis_dt.date()} to avoid Google Calendar limits")
|
||||
log_operation('info', f"Limited recurrence until date to {bis_dt.date()} to avoid Google Calendar limits", context=context)
|
||||
|
||||
until_date = bis_dt.strftime('%Y%m%d')
|
||||
except ValueError:
|
||||
logger.warning(f"Invalid datum_bis: {datum_bis}, skipping recurrence")
|
||||
log_operation('warning', f"Invalid datum_bis: {datum_bis}, skipping recurrence", context=context)
|
||||
return None
|
||||
|
||||
rrule = f"RRULE:FREQ={freq};INTERVAL={turnus};UNTIL={until_date}"
|
||||
@@ -208,12 +211,17 @@ def parse_times(data, source):
|
||||
start_time = data.get('uhrzeitVon') or '09:00:00'
|
||||
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{start_str}T{start_time}"))
|
||||
|
||||
end_date_str = data.get('datum', '')
|
||||
# Use datumBis for end date if available, otherwise datum
|
||||
end_date_str = data.get('datumBis', data.get('datum', ''))
|
||||
if 'T' in end_date_str:
|
||||
base_end_date = end_date_str.split('T')[0]
|
||||
else:
|
||||
base_end_date = end_date_str
|
||||
end_time = data.get('uhrzeitBis', '10:00:00')
|
||||
# Special handling: if end_time is '00:00:00' and it's multi-day, interpret as end of day
|
||||
start_date_str = data.get('datum', '').split('T')[0] if 'T' in data.get('datum', '') else data.get('datum', '')
|
||||
if end_time == '00:00:00' and base_end_date != start_date_str:
|
||||
end_time = '23:59:59'
|
||||
try:
|
||||
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{base_end_date}T{end_time}"))
|
||||
except ValueError:
|
||||
@@ -274,7 +282,7 @@ def build_notiz(original_notiz, time_breakdown, duration_capped):
|
||||
notiz_parts.append("\nHinweis: Ereignisdauer wurde auf 24 Stunden begrenzt (Google Calendar Limit)")
|
||||
return "\n".join(notiz_parts)
|
||||
|
||||
def standardize_appointment_data(data, source):
|
||||
def standardize_appointment_data(data, source, context=None):
|
||||
"""Standardize data from Advoware or Google to comparable dict, with TZ handling."""
|
||||
duration_capped = False
|
||||
start_dt, end_dt = parse_times(data, source)
|
||||
@@ -283,7 +291,7 @@ def standardize_appointment_data(data, source):
|
||||
adjusted_start, adjusted_end, vorbereitung_td, hinfahrt_td, rueckfahrt_td = adjust_times(start_dt, end_dt, data)
|
||||
|
||||
if Config.CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS:
|
||||
text = 'Advoware blocked'
|
||||
text = f'Advoware (frNr: {data.get("frNr", "unknown")})'
|
||||
ort = ''
|
||||
original_notiz = ''
|
||||
else:
|
||||
@@ -308,22 +316,16 @@ def standardize_appointment_data(data, source):
|
||||
return_end = adjusted_end
|
||||
time_breakdown.append(f"{return_start.strftime('%H:%M')}-{return_end.strftime('%H:%M')} Rückfahrt")
|
||||
|
||||
notiz = build_notiz(original_notiz, time_breakdown, duration_capped)
|
||||
notiz = build_notiz(original_notiz, time_breakdown, False) # No duration capping
|
||||
start_dt, end_dt = adjusted_start, adjusted_end
|
||||
|
||||
duration = end_dt - start_dt
|
||||
max_duration = timedelta(hours=24)
|
||||
if duration > max_duration:
|
||||
end_dt = start_dt + max_duration
|
||||
duration_capped = True
|
||||
|
||||
recurrence = None
|
||||
if data.get('dauertermin', 0) == 1:
|
||||
turnus = data.get('turnus', 1)
|
||||
turnus_art = data.get('turnusArt', 1)
|
||||
datum_bis = data.get('datumBis', '')
|
||||
if datum_bis:
|
||||
recurrence = generate_rrule(turnus, turnus_art, datum_bis)
|
||||
recurrence = generate_rrule(turnus, turnus_art, datum_bis, context)
|
||||
if recurrence:
|
||||
recurrence = [recurrence]
|
||||
|
||||
@@ -360,7 +362,7 @@ def standardize_appointment_data(data, source):
|
||||
'recurrence': recurrence
|
||||
}
|
||||
|
||||
async def create_advoware_appointment(advoware, data, employee_kuerzel):
|
||||
async def create_advoware_appointment(advoware, data, employee_kuerzel, context=None):
|
||||
"""Create Advoware appointment from standardized data."""
|
||||
start_dt = data['start'].astimezone(BERLIN_TZ)
|
||||
end_dt = data['end'].astimezone(BERLIN_TZ)
|
||||
@@ -379,15 +381,15 @@ async def create_advoware_appointment(advoware, data, employee_kuerzel):
|
||||
}
|
||||
try:
|
||||
result = await advoware.api_call('api/v1/advonet/Termine', method='POST', json_data=appointment_data)
|
||||
logger.debug(f"Raw Advoware POST response: {result}")
|
||||
log_operation('debug', f"Raw Advoware POST response: {result}", context=context)
|
||||
frnr = str(result.get('frNr') or result.get('frnr'))
|
||||
logger.info(f"Created Advoware appointment frNr: {frnr}")
|
||||
log_operation('info', f"Created Advoware appointment frNr: {frnr}", context=context)
|
||||
return frnr
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create Advoware appointment: {e}")
|
||||
log_operation('error', f"Failed to create Advoware appointment: {e}", context=context)
|
||||
raise
|
||||
|
||||
async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel):
|
||||
async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel, context=None):
|
||||
"""Update Advoware appointment."""
|
||||
start_dt = data['start'].astimezone(BERLIN_TZ)
|
||||
end_dt = data['end'].astimezone(BERLIN_TZ)
|
||||
@@ -407,22 +409,22 @@ async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel):
|
||||
}
|
||||
try:
|
||||
await advoware.api_call('api/v1/advonet/Termine', method='PUT', json_data=appointment_data)
|
||||
logger.info(f"Updated Advoware appointment frNr: {frnr}")
|
||||
log_operation('info', f"Updated Advoware appointment frNr: {frnr}", context=context)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update Advoware appointment {frnr}: {e}")
|
||||
log_operation('error', f"Failed to update Advoware appointment {frnr}: {e}", context=context)
|
||||
raise
|
||||
|
||||
async def delete_advoware_appointment(advoware, frnr):
|
||||
async def delete_advoware_appointment(advoware, frnr, context=None):
|
||||
"""Delete Advoware appointment."""
|
||||
try:
|
||||
await advoware.api_call('api/v1/advonet/Termine', method='DELETE', params={'frnr': frnr})
|
||||
logger.info(f"Deleted Advoware appointment frNr: {frnr}")
|
||||
log_operation('info', f"Deleted Advoware appointment frNr: {frnr}", context=context)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete Advoware appointment {frnr}: {e}")
|
||||
log_operation('error', f"Failed to delete Advoware appointment {frnr}: {e}", context=context)
|
||||
raise
|
||||
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def create_google_event(service, calendar_id, data):
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def create_google_event(service, calendar_id, data, context=None):
|
||||
"""Create Google event from standardized data."""
|
||||
start_dt = data['start'].astimezone(BERLIN_TZ)
|
||||
end_dt = data['end'].astimezone(BERLIN_TZ)
|
||||
@@ -446,17 +448,17 @@ async def create_google_event(service, calendar_id, data):
|
||||
try:
|
||||
created = service.events().insert(calendarId=calendar_id, body=event_body).execute()
|
||||
event_id = created['id']
|
||||
logger.info(f"Created Google event ID: {event_id}")
|
||||
log_operation('info', f"Created Google event ID: {event_id}", context=context)
|
||||
return event_id
|
||||
except HttpError as e:
|
||||
logger.error(f"Google API error creating event: {e}")
|
||||
log_operation('error', f"Google API error creating event: {e}", context=context)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create Google event: {e}")
|
||||
log_operation('error', f"Failed to create Google event: {e}", context=context)
|
||||
raise
|
||||
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def update_google_event(service, calendar_id, event_id, data):
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def update_google_event(service, calendar_id, event_id, data, context=None):
|
||||
"""Update Google event."""
|
||||
start_dt = data['start'].astimezone(BERLIN_TZ)
|
||||
end_dt = data['end'].astimezone(BERLIN_TZ)
|
||||
@@ -479,79 +481,79 @@ async def update_google_event(service, calendar_id, event_id, data):
|
||||
}
|
||||
try:
|
||||
service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute()
|
||||
logger.info(f"Updated Google event ID: {event_id}")
|
||||
log_operation('info', f"Updated Google event ID: {event_id}", context=context)
|
||||
except HttpError as e:
|
||||
logger.error(f"Google API error updating event {event_id}: {e}")
|
||||
log_operation('error', f"Google API error updating event {event_id}: {e}", context=context)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update Google event {event_id}: {e}")
|
||||
log_operation('error', f"Failed to update Google event {event_id}: {e}", context=context)
|
||||
raise
|
||||
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def delete_google_event(service, calendar_id, event_id):
|
||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=3, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||
async def delete_google_event(service, calendar_id, event_id, context=None):
|
||||
"""Delete Google event."""
|
||||
try:
|
||||
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
|
||||
logger.info(f"Deleted Google event ID: {event_id}")
|
||||
log_operation('info', f"Deleted Google event ID: {event_id}", context=context)
|
||||
except HttpError as e:
|
||||
logger.error(f"Google API error deleting event {event_id}: {e}")
|
||||
log_operation('error', f"Google API error deleting event {event_id}: {e}", context=context)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete Google event {event_id}: {e}")
|
||||
log_operation('error', f"Failed to delete Google event {event_id}: {e}", context=context)
|
||||
raise
|
||||
|
||||
async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed):
|
||||
async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed, context=None):
|
||||
"""Safe wrapper for creating Advoware appointments with write permission check and global protection."""
|
||||
if Config.ADVOWARE_WRITE_PROTECTION:
|
||||
logger.warning("Global write protection active, skipping Advoware create")
|
||||
log_operation('warning', "Global write protection active, skipping Advoware create", context=context)
|
||||
return None
|
||||
if not write_allowed:
|
||||
logger.warning("Cannot create in Advoware, write not allowed")
|
||||
log_operation('warning', "Cannot create in Advoware, write not allowed", context=context)
|
||||
return None
|
||||
return await create_advoware_appointment(advoware, data, employee_kuerzel)
|
||||
return await create_advoware_appointment(advoware, data, employee_kuerzel, context)
|
||||
|
||||
async def safe_delete_advoware_appointment(advoware, frnr, write_allowed):
|
||||
async def safe_delete_advoware_appointment(advoware, frnr, write_allowed, context=None):
|
||||
"""Safe wrapper for deleting Advoware appointments with write permission check and global protection."""
|
||||
if Config.ADVOWARE_WRITE_PROTECTION:
|
||||
logger.warning("Global write protection active, skipping Advoware delete")
|
||||
log_operation('warning', "Global write protection active, skipping Advoware delete", context=context)
|
||||
return
|
||||
if not write_allowed:
|
||||
logger.warning("Cannot delete in Advoware, write not allowed")
|
||||
log_operation('warning', "Cannot delete in Advoware, write not allowed", context=context)
|
||||
return
|
||||
await delete_advoware_appointment(advoware, frnr)
|
||||
await delete_advoware_appointment(advoware, frnr, context)
|
||||
|
||||
async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel):
|
||||
async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel, context=None):
|
||||
"""Safe wrapper for updating Advoware appointments with write permission check and global protection."""
|
||||
if Config.ADVOWARE_WRITE_PROTECTION:
|
||||
logger.warning("Global write protection active, skipping Advoware update")
|
||||
log_operation('warning', "Global write protection active, skipping Advoware update", context=context)
|
||||
return
|
||||
if not write_allowed:
|
||||
logger.warning("Cannot update in Advoware, write not allowed")
|
||||
log_operation('warning', "Cannot update in Advoware, write not allowed", context=context)
|
||||
return
|
||||
await update_advoware_appointment(advoware, frnr, data, employee_kuerzel)
|
||||
await update_advoware_appointment(advoware, frnr, data, employee_kuerzel, context)
|
||||
|
||||
async def safe_advoware_operation(operation, write_allowed, *args, **kwargs):
|
||||
async def safe_advoware_operation(operation, write_allowed, context=None, *args, **kwargs):
|
||||
"""Generic safe wrapper for Advoware operations with write permission check."""
|
||||
if Config.ADVOWARE_WRITE_PROTECTION:
|
||||
log_operation('warning', "Global write protection active, skipping Advoware operation")
|
||||
log_operation('warning', "Global write protection active, skipping Advoware operation", context=context)
|
||||
return None
|
||||
if not write_allowed:
|
||||
log_operation('warning', "Cannot perform operation in Advoware, write not allowed")
|
||||
log_operation('warning', "Cannot perform operation in Advoware, write not allowed", context=context)
|
||||
return None
|
||||
return await operation(*args, **kwargs)
|
||||
|
||||
async def get_advoware_employees(advoware):
|
||||
async def get_advoware_employees(advoware, context=None):
|
||||
"""Fetch list of employees from Advoware."""
|
||||
try:
|
||||
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
|
||||
employees = result if isinstance(result, list) else []
|
||||
logger.info(f"Fetched {len(employees)} Advoware employees")
|
||||
log_operation('info', f"Fetched {len(employees)} Advoware employees", context=context)
|
||||
return employees
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch Advoware employees: {e}")
|
||||
log_operation('error', f"Failed to fetch Advoware employees: {e}", context=context)
|
||||
raise
|
||||
|
||||
async def get_advoware_timestamp(advoware, frnr):
|
||||
async def get_advoware_timestamp(advoware, frnr, context=None):
|
||||
"""Fetch the last modified timestamp for an Advoware appointment."""
|
||||
try:
|
||||
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params={'frnr': frnr})
|
||||
@@ -562,7 +564,7 @@ async def get_advoware_timestamp(advoware, frnr):
|
||||
return BERLIN_TZ.localize(datetime.datetime.fromisoformat(timestamp_str))
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}")
|
||||
log_operation('error', f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}", context=context)
|
||||
return None
|
||||
|
||||
def get_timestamps(adv_data, google_data):
|
||||
@@ -587,7 +589,7 @@ async def process_new_from_advoware(state, conn, service, calendar_id, kuerzel,
|
||||
for frnr, app in state['adv_map'].items():
|
||||
if frnr not in state['db_adv_index']:
|
||||
try:
|
||||
event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware'))
|
||||
event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware', context), context)
|
||||
async with conn.transaction():
|
||||
await conn.execute(
|
||||
"""
|
||||
@@ -613,7 +615,7 @@ async def process_new_from_google(state, conn, service, calendar_id, kuerzel, ad
|
||||
|
||||
if not is_already_synced:
|
||||
try:
|
||||
frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), kuerzel, True)
|
||||
frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google', context), kuerzel, True, context)
|
||||
if frnr and str(frnr) != 'None':
|
||||
async with conn.transaction():
|
||||
await conn.execute(
|
||||
@@ -664,7 +666,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
|
||||
if row['source_system'] == 'advoware':
|
||||
# Propagate delete to Google
|
||||
try:
|
||||
await delete_google_event(service, calendar_id, event_id)
|
||||
await delete_google_event(service, calendar_id, event_id, context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
|
||||
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
|
||||
@@ -676,7 +678,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
|
||||
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
|
||||
# Recreate in Advoware
|
||||
try:
|
||||
new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google'), kuerzel, row['advoware_write_allowed'])
|
||||
new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(state['google_map'][event_id], 'google', context), kuerzel, row['advoware_write_allowed'], context)
|
||||
if new_frnr and str(new_frnr) != 'None':
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
||||
@@ -693,7 +695,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
|
||||
else:
|
||||
# For other cases, propagate delete to Google
|
||||
try:
|
||||
await delete_google_event(service, calendar_id, event_id)
|
||||
await delete_google_event(service, calendar_id, event_id, context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
|
||||
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
|
||||
@@ -705,7 +707,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
|
||||
else:
|
||||
# Propagate delete to Google
|
||||
try:
|
||||
await delete_google_event(service, calendar_id, event_id)
|
||||
await delete_google_event(service, calendar_id, event_id, context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
|
||||
log_operation('info', f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}", context=context)
|
||||
@@ -722,7 +724,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
|
||||
# Delete in Advoware
|
||||
if row['advoware_write_allowed']:
|
||||
try:
|
||||
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'])
|
||||
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
|
||||
log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context)
|
||||
@@ -737,7 +739,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
|
||||
elif row['source_system'] == 'advoware':
|
||||
# Recreate in Google
|
||||
try:
|
||||
new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware'))
|
||||
new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware', context), context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
||||
log_operation('info', f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}", context=context)
|
||||
@@ -750,7 +752,7 @@ async def process_deleted_entries(state, conn, service, calendar_id, kuerzel, ad
|
||||
else:
|
||||
# last_change_wins or other, propagate delete to Advoware
|
||||
try:
|
||||
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'])
|
||||
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'], context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
|
||||
log_operation('info', f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}", context=context)
|
||||
@@ -794,8 +796,8 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
|
||||
processed_master_events.add(master_event_id)
|
||||
|
||||
if adv_data and google_data:
|
||||
adv_std = standardize_appointment_data(adv_data, 'advoware')
|
||||
google_std = standardize_appointment_data(google_data, 'google')
|
||||
adv_std = standardize_appointment_data(adv_data, 'advoware', context)
|
||||
google_std = standardize_appointment_data(google_data, 'google', context)
|
||||
strategy = row['sync_strategy']
|
||||
try:
|
||||
if strategy == 'source_system_wins':
|
||||
@@ -805,7 +807,7 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
|
||||
google_ts_str = google_data.get('updated', '')
|
||||
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
|
||||
if adv_ts > row['last_sync']:
|
||||
await update_google_event(service, calendar_id, event_id, adv_std)
|
||||
await update_google_event(service, calendar_id, event_id, adv_std, context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
||||
log_operation('info', f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}", context=context)
|
||||
@@ -813,7 +815,7 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
|
||||
await asyncio.sleep(0.1) # Small delay to avoid rate limits
|
||||
elif google_ts and google_ts > row['last_sync']:
|
||||
log_operation('warning', f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}", context=context)
|
||||
await update_google_event(service, calendar_id, event_id, adv_std)
|
||||
await update_google_event(service, calendar_id, event_id, adv_std, context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
||||
log_operation('info', f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}", context=context)
|
||||
@@ -825,29 +827,28 @@ async def process_updates(state, conn, service, calendar_id, kuerzel, advoware,
|
||||
adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm']))
|
||||
log_operation('debug', f"Phase 4: Checking sync_id {row['sync_id']}: adv_ts={adv_ts}, google_ts={google_ts}, last_sync={row['last_sync']}", context=context)
|
||||
if google_ts and google_ts > row['last_sync']:
|
||||
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
|
||||
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
||||
log_operation('info', f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}", context=context)
|
||||
elif adv_ts > row['last_sync']:
|
||||
log_operation('warning', f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}", context=context)
|
||||
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
|
||||
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
||||
log_operation('info', f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}", context=context)
|
||||
elif strategy == 'last_change_wins':
|
||||
adv_ts = await get_advoware_timestamp(advoware, frnr)
|
||||
adv_ts = await get_advoware_timestamp(advoware, frnr, context)
|
||||
google_ts_str = google_data.get('updated', '')
|
||||
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
|
||||
if adv_ts and google_ts:
|
||||
if adv_ts > google_ts:
|
||||
await update_google_event(service, calendar_id, event_id, adv_std)
|
||||
await update_google_event(service, calendar_id, event_id, adv_std, context)
|
||||
elif row['advoware_write_allowed']:
|
||||
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
|
||||
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'], context)
|
||||
async with conn.transaction():
|
||||
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts))
|
||||
log_operation('info', f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}", context=context)
|
||||
await asyncio.sleep(0.1) # Small delay to avoid rate limits
|
||||
except Exception as e:
|
||||
log_operation('warning', f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}", context=context)
|
||||
async with conn.transaction():
|
||||
@@ -878,11 +879,11 @@ async def handler(event_data, context):
|
||||
advoware = AdvowareAPI(context)
|
||||
|
||||
log_operation('debug', "Initializing Google service", context=context)
|
||||
service = await get_google_service()
|
||||
service = await get_google_service(context)
|
||||
log_operation('debug', f"Ensuring Google calendar for {kuerzel}", context=context)
|
||||
calendar_id = await ensure_google_calendar(service, kuerzel)
|
||||
calendar_id = await ensure_google_calendar(service, kuerzel, context)
|
||||
|
||||
conn = await connect_db()
|
||||
conn = await connect_db(context)
|
||||
try:
|
||||
# Initialize state
|
||||
state = {
|
||||
@@ -920,9 +921,9 @@ async def handler(event_data, context):
|
||||
|
||||
async def reload_api_maps():
|
||||
"""Reload API maps after creating new events in phases."""
|
||||
state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel)
|
||||
state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel, context)
|
||||
state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')}
|
||||
state['google_events'] = await fetch_google_events(service, calendar_id)
|
||||
state['google_events'] = await fetch_google_events(service, calendar_id, context)
|
||||
state['google_map'] = {evt['id']: evt for evt in state['google_events']}
|
||||
log_operation('debug', "Reloaded API maps", context=context, adv=len(state['adv_map']), google=len(state['google_map']))
|
||||
|
||||
|
||||
5
bitbylaw/types.d.ts
vendored
5
bitbylaw/types.d.ts
vendored
@@ -25,7 +25,8 @@ declare module 'motia' {
|
||||
'Advoware Proxy GET': ApiRouteHandler<Record<string, unknown>, unknown, never>
|
||||
'Advoware Proxy DELETE': ApiRouteHandler<Record<string, unknown>, unknown, never>
|
||||
'Calendar Sync Event Step': EventHandler<never, never>
|
||||
'Calendar Sync Cron Job': CronHandler<{ topic: 'calendar_sync_employee'; data: never }>
|
||||
'Calendar Sync API Trigger': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'calendar_sync_employee'; data: never }>
|
||||
'Calendar Sync Cron Job': CronHandler<{ topic: 'calendar_sync_all'; data: never }>
|
||||
'Calendar Sync API Trigger': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'calendar_sync_employee'; data: never } | { topic: 'calendar_sync_all'; data: never }>
|
||||
'Calendar Sync All Step': EventHandler<never, { topic: 'calendar_sync_employee'; data: never }>
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user