Fix timestamp-based update logic and remove global last_sync update to prevent redundant syncs and race conditions. Update README with anonymization and latest changes.

This commit is contained in:
root
2025-10-23 10:40:00 +00:00
parent 1de5bcd369
commit 9ab90fef5a
9 changed files with 910 additions and 323 deletions

1
bitbylaw/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Bitbylaw project

View File

@@ -29,5 +29,11 @@ class Config:
GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH = os.getenv('GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH', 'service-account.json')
GOOGLE_CALENDAR_SCOPES = ['https://www.googleapis.com/auth/calendar']
# PostgreSQL settings for Calendar Sync Hub
POSTGRES_HOST = os.getenv('POSTGRES_HOST', 'localhost')
POSTGRES_USER = os.getenv('POSTGRES_USER', 'calendar_sync_user')
POSTGRES_PASSWORD = os.getenv('POSTGRES_PASSWORD', 'default_password')
POSTGRES_DB_NAME = os.getenv('POSTGRES_DB_NAME', 'calendar_sync_db')
# Calendar Sync settings
CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS = os.getenv('CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS', 'true').lower() == 'true'

View File

@@ -67,10 +67,6 @@
"steps/advoware_cal_sync/calendar_sync_cron_step.py": {
"x": 200,
"y": 0
},
"steps/advoware_cal_sync/calendar_sync_event_step.py": {
"x": 100,
"y": 100
}
}
},

View File

@@ -4,7 +4,7 @@
"scripts": {
"postinstall": "motia install",
"dev": "motia dev",
"start": ". python_modules/bin/activate && motia start --host 0.0.0.0",
"start": "PYTHONPATH=/opt/motia-app/bitbylaw . python_modules/bin/activate && motia start --host 0.0.0.0",
"generate-types": "motia generate-types",
"build": "motia build",
"clean": "rm -rf dist node_modules python_modules .motia .mermaid"

View File

@@ -0,0 +1 @@
# Steps package

View File

@@ -1,65 +1,155 @@
# Advoware Calendar Sync
# Advoware Calendar Sync - Hub-Based Design
Dieser Abschnitt implementiert die bidirektionale Synchronisation zwischen Advoware-Terminen und Google Calendar. Für jeden Mitarbeiter in Advoware wird automatisch ein entsprechender Google Calendar erstellt und gepflegt.
# Advoware Calendar Sync - Hub-Based Design
Dieser Abschnitt implementiert die bidirektionale Synchronisation zwischen Advoware-Terminen und Google Calendar unter Verwendung von PostgreSQL als zentralem Hub (Single Source of Truth). Das System stellt sicher, dass Termine konsistent gehalten werden, mit konfigurierbaren Konfliktauflösungsstrategien, Schreibberechtigungen und Datenschutzfeatures wie Anonymisierung. Der Sync läuft in vier strikten Phasen, um maximale Robustheit und Atomarität zu gewährleisten.
## Übersicht
Das System synchronisiert Termine zwischen:
- **Advoware**: Zentrale Terminverwaltung mit detaillierten Informationen
- **Google Calendar**: Benutzerfreundliche Kalenderansicht für jeden Mitarbeiter
- **Advoware**: Zentrale Terminverwaltung mit detaillierten Informationen (aber vielen API-Bugs).
- **Google Calendar**: Benutzerfreundliche Kalenderansicht für jeden Mitarbeiter.
- **PostgreSQL Hub**: Zentraler Datenspeicher für State, Policies und Audit-Logs.
## Architektur
### Hub-Design
- **Single Source of Truth**: Alle Sync-Informationen werden in PostgreSQL gespeichert.
- **Policies**: Enums für Sync-Strategien (`source_system_wins`, `last_change_wins`) und Flags für Schreibberechtigung (`advoware_write_allowed`).
- **Status-Tracking**: `sync_status` ('pending', 'synced', 'failed') für Monitoring und Retries.
- **Transaktionen**: Jede DB-Operation läuft in separaten Transaktionen; Fehler beeinflussen nur den aktuellen Eintrag.
- **Soft Deletes**: Gelöschte Termine werden markiert, nicht entfernt.
- **Phasen-basierte Verarbeitung**: Sync in 4 Phasen, um Neue, Deletes und Updates zu trennen.
- **Timestamp-basierte Updates**: Updates werden ausschließlich auf Basis von `last_sync` (gesetzt auf den API-Timestamp der Quelle) getriggert, nicht auf Datenvergleichen, um Race-Conditions zu vermeiden.
- **Anonymisierung**: Optionale Anonymisierung sensibler Daten (Text, Notiz, Ort) bei Advoware → Google Sync, um Datenschutz zu wahren.
### Sync-Phasen
1. **Phase 1: Neue Einträge Advoware → Google** - Erstelle Google-Events für neue Advoware-Termine, dann DB-Insert.
2. **Phase 2: Neue Einträge Google → Advoware** - Erstelle Advoware-Termine für neue Google-Events, dann DB-Insert.
3. **Phase 3: Gelöschte Einträge identifizieren** - Handle Deletes/Recreates basierend auf Strategie.
4. **Phase 4: Bestehende Einträge updaten** - Update bei Änderungen, basierend auf Timestamps (API-Timestamp > `last_sync`).
### Datenbank-Schema
```sql
-- Haupt-Tabelle
CREATE TABLE calendar_sync (
sync_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
employee_kuerzel VARCHAR(10) NOT NULL,
advoware_frnr INTEGER,
google_event_id VARCHAR(255),
source_system source_system_enum NOT NULL,
sync_strategy sync_strategy_enum NOT NULL DEFAULT 'source_system_wins',
sync_status sync_status_enum NOT NULL DEFAULT 'synced',
advoware_write_allowed BOOLEAN NOT NULL DEFAULT FALSE,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
last_sync TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Enums
CREATE TYPE source_system_enum AS ENUM ('advoware', 'google');
CREATE TYPE sync_strategy_enum AS ENUM ('source_system_wins', 'last_change_wins');
CREATE TYPE sync_status_enum AS ENUM ('pending', 'synced', 'failed');
-- Audit-Tabelle
CREATE TABLE calendar_sync_audit (
id SERIAL PRIMARY KEY,
sync_id UUID NOT NULL,
action VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Indizes (angepasst für Soft Deletes)
CREATE UNIQUE INDEX idx_calendar_sync_advoware ON calendar_sync (employee_kuerzel, advoware_frnr) WHERE advoware_frnr IS NOT NULL AND deleted = FALSE;
CREATE UNIQUE INDEX idx_calendar_sync_google ON calendar_sync (employee_kuerzel, google_event_id) WHERE google_event_id IS NOT NULL;
```
## Funktionalität
### Automatische Kalender-Erstellung
- Für jeden Advoware-Mitarbeiter wird ein Google Calendar mit dem Namen `AW-{Kuerzel}` erstellt
- Beispiel: Mitarbeiter mit Kürzel "SB" → Calendar "AW-SB"
- Kalender wird mit dem Haupt-Google-Account (`lehmannundpartner@gmail.com`) als Owner geteilt
- Für jeden Advoware-Mitarbeiter wird ein Google Calendar mit dem Namen `AW-{Kuerzel}` erstellt.
- Beispiel: Mitarbeiter mit Kürzel "SB" → Calendar "AW-SB".
- Kalender wird mit dem Haupt-Google-Account (`lehmannundpartner@gmail.com`) als Owner geteilt.
### Bidirektionale Synchronisation mit Token-Validierung
### Phasen-Details
#### Advoware → Google Calendar
- Alle Termine eines Mitarbeiters werden aus Advoware abgerufen (Zeitraum: aktuelles Jahr + 2 Jahre)
- Neue Termine werden in den entsprechenden Google Calendar eingetragen
- Die Advoware-Termin-ID (`frNr`) wird als Metadaten gespeichert
- Bestehende Termine werden aktualisiert, wenn Änderungen erkannt werden
#### Phase 1: Neue Einträge Advoware → Google
- Fetch Advoware-Termine.
- Für jede frNr, die nicht in DB (deleted=FALSE) existiert: Standardisiere Daten (mit Anonymisierung falls aktiviert), erstelle Google-Event, dann INSERT in DB mit `sync_status = 'synced'`, `last_sync` auf Advoware-Timestamp.
- Bei Fehlern: Warnung loggen, weitermachen (nicht abbrechen).
#### Google Calendar → Advoware
- Termine aus Google Calendar ohne `frNr` werden als neue Termine in Advoware erstellt
- Die generierte `frNr` wird zurück in den Google Calendar geschrieben
- Token-basierte Validierung verhindert unbefugte Änderungen
- Sync-Info wird in der Description/Notiz gespeichert
#### Phase 2: Neue Einträge Google → Advoware
- Fetch Google-Events.
- Für jeden event_id, der nicht in DB existiert: Standardisiere Daten, erstelle Advoware-Termin, dann INSERT in DB mit `sync_status = 'synced'`, `last_sync` auf Google-Timestamp.
- Bei frNr None (API-Bug): Skippen mit Warnung.
- Bei Fehlern: Warnung loggen, weitermachen.
### Token-Sicherheit
- MD5-Hash mit Salt (aus Umgebungsvariable `CALENDAR_SYNC_SALT`) für Änderungsvalidierung
- Sync-Info Format: `## no change below this line ##\nfrNr: {frNr}\nsync-token: {token}`
- Token wird bei jeder Änderung neu berechnet und validiert
#### Phase 3: Gelöschte Einträge identifizieren
- Für jeden DB-Eintrag: Prüfe, ob Termin in API fehlt.
- Bei beiden fehlend: Soft Delete.
- Bei einem fehlend: Recreate oder propagate Delete basierend auf Strategie.
- Bei Fehlern: `sync_status = 'failed'`, Warnung.
### Löschungen
- Google-initiale Termine, die in Google gelöscht werden, werden auch in Advoware gelöscht
- Tracking von gelöschten `frNr` um Re-Sync zu verhindern
#### Phase 4: Bestehende Einträge updaten
- Für bestehende Einträge: Prüfe API-Timestamp > `last_sync`.
- Bei `source_system_wins`: Update basierend auf `source_system`, setze `last_sync` auf den API-Timestamp der Quelle.
- Bei `last_change_wins`: Vergleiche Timestamps, update das System mit dem neueren, setze `last_sync` auf den neueren Timestamp.
- Anonymisierung: Bei Advoware → Google wird Text/Notiz/Ort anonymisiert, wenn `CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS = True`.
- Bei Fehlern: `sync_status = 'failed'`, Warnung.
## API-Endpunkte
### Datenmapping und Standardisierung
Beide Systeme werden auf gemeinsames Format normalisiert (Berlin TZ):
```python
{
'start': datetime, # Berlin TZ
'end': datetime,
'text': str,
'notiz': str,
'ort': str,
'dauertermin': int, # 0/1
'turnus': int, # 0/1
'turnusArt': int,
'recurrence': str # RRULE oder None
}
```
### Advoware API
- `GET /api/v1/advonet/Mitarbeiter?aktiv=true` - Liste aktiver Mitarbeiter
- `GET /api/v1/advonet/Termine?frnr={frnr}` - Einzelner Termin
- `GET /api/v1/advonet/Termine?kuerzel={kuerzel}&from={date}&to={date}` - Termine eines Mitarbeiters
- `POST /api/v1/advonet/Termine` - Neuen Termin erstellen
- `PUT /api/v1/advonet/Termine` - Termin aktualisieren
- `DELETE /api/v1/advonet/Termine?frnr={frnr}` - Termin löschen
#### Advoware → Standard
- Start: `datum` + `uhrzeitVon` (Fallback 09:00), oder `datum` als datetime.
- End: `datumBis` + `uhrzeitBis` (Fallback 10:00), oder `datum` + 1h.
- All-Day: `dauertermin=1` oder Dauer >1 Tag.
- Recurring: `turnus`/`turnusArt` (vereinfacht, keine RRULE).
- Anonymisierung: Wenn `CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS`, setze `text='Advoware blocked'`, `notiz=''`, `ort=''`.
**API-Schwächen und Seltsamkeiten:**
- Parameter müssen als Strings übergeben werden (z.B. `aktiv='true'`, nicht `True`)
- Response kann manchmal HTML statt JSON zurückgeben, auch bei 200 Status
- Content-Type Header ist nicht immer korrekt gesetzt
- Token-Authentifizierung mit HMAC-Signature erforderlich
- Keine Paginierung für große Resultate
- Zeitformate: `datum` als `YYYY-MM-DDTHH:MM:SS`, aber manchmal ohne T
#### Google → Standard
- Start/End: `dateTime` oder `date` (All-Day).
- All-Day: `dauertermin=1` wenn All-Day oder Dauer >1 Tag.
- Recurring: RRULE aus `recurrence`.
### Google Calendar API
- Kalender-Management (erstellen, auflisten, ACL setzen)
- Event-Management (erstellen, aktualisieren, löschen)
- Service Account Authentifizierung mit Backoff bei Rate Limits
#### Standard → Advoware
- POST/PUT: `datum`/`uhrzeitBis`/`datumBis` aus start/end.
- Defaults: `vorbereitungsDauer='00:00:00'`, `sb`/`anwalt`=employee_kuerzel.
#### Standard → Google
- All-Day: `date` statt `dateTime`, end +1 Tag.
- Recurring: RRULE aus `recurrence`.
## API-Schwächen und Fuckups
### Advoware API (Buggy und Inkonsistent)
- **Case Sensitivity in Responses**: Feldnamen variieren manchmal `'frNr'`, manchmal `'frnr'` (z.B. POST-Response: `{'frnr': 123}`). Code prüft beide (`result.get('frNr') or result.get('frnr')`), um None zu vermeiden.
- **Zeitformate**: `datum`/`datumBis` als `'YYYY-MM-DD'` oder `'YYYY-MM-DDTHH:MM:SS'`. `uhrzeitVon`/`uhrzeitBis` separat (z.B. `'09:00:00'`). Fehlt `uhrzeitVon`, Fallback 09:00; fehlt `uhrzeitBis`, 10:00. Parsing muss beide Formate handhaben.
- **Defaults und Fehlende Felder**: Viele Felder optional; Code setzt Fallbacks (z.B. `uhrzeitVon='09:00:00'`).
- **Recurring-Unterstützung**: Keine RRULE; nur `turnus` (0/1) und `turnusArt` (0-?). Mapping zu Google RRULE ist vereinfacht und unvollständig.
- **API-Zuverlässigkeit**: Manchmal erfolgreicher POST, aber `frNr: None` (trotz gültiger Response). 500-Fehler bei Bad Requests. Keine Timestamp-Details in Responses.
- **Zeitzonen**: Alles implizit Berlin; Code konvertiert explizit.
- **Andere Bugs**: `zuletztGeaendertAm` für Timestamps, aber Format unzuverlässig.
### Google Calendar API (Zuverlässig)
- **Zeitformate**: `dateTime` als ISO mit TZ (z.B. `'2025-01-01T10:00:00+01:00'`), `date` für All-Day. Code parst mit `fromisoformat` und `.rstrip('Z')`.
- **Zeitzonen**: Explizit (z.B. `'Europe/Berlin'`); Code konvertiert zu Berlin TZ.
- **Recurring**: RRULE in `recurrence`; vollständig unterstützt.
- **Updates**: `updated` Timestamp für last-change.
- **Keine bekannten Bugs**: Zuverlässig, aber Rate-Limits möglich.
## Step-Konfiguration
@@ -71,41 +161,38 @@ Das System synchronisiert Termine zwischen:
**Event Data:**
```json
{
"full_content": true // oder false für nur "blocked"
"data": {
"body": {
"employee_kuerzel": "SB" // Optional, default "AI"
}
}
}
```
### calendar_sync_api_step.py
- **Type:** api
- **Path:** `/advoware/calendar/sync`
- **Method:** POST
- **Flows:** advoware
**Request Body:**
```json
{
"full_content": true // oder false für nur "blocked"
}
```
### calendar_sync_cron_step.py
- **Type:** cron
- **Schedule:** Täglich um 2:00 Uhr
- **Flows:** advoware
## Setup
### PostgreSQL
1. PostgreSQL 17 installieren und starten (localhost-only).
2. Datenbank erstellen: `sudo -u postgres psql -f /tmp/create_db.sql`
3. User und Berechtigungen setzen.
### Google API Credentials
1. Google Cloud Console Projekt erstellen
2. Google Calendar API aktivieren
3. Service Account erstellen (kein OAuth)
4. `service-account.json` Datei im Projektverzeichnis bereitstellen
1. Google Cloud Console Projekt erstellen.
2. Google Calendar API aktivieren.
3. Service Account erstellen.
4. `service-account.json` im Projekt bereitstellen.
### Advoware API Credentials
OAuth-ähnliche Authentifizierung mit HMAC-Signature.
OAuth-ähnliche Authentifizierung.
### Umgebungsvariablen
```env
# PostgreSQL
POSTGRES_HOST=localhost
POSTGRES_USER=calendar_sync_user
POSTGRES_PASSWORD=your_password
POSTGRES_DB_NAME=calendar_sync_db
# Google Calendar
GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH=service-account.json
@@ -122,14 +209,14 @@ ADVOWARE_PASSWORD=your_password
ADVOWARE_TOKEN_LIFETIME_MINUTES=55
ADVOWARE_API_TIMEOUT_SECONDS=30
# Redis für Token Caching
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_ADVOWARE_CACHE=1
REDIS_DB_CALENDAR_SYNC=1
REDIS_TIMEOUT_SECONDS=5
# Calendar Sync
CALENDAR_SYNC_SALT=your_secret_salt
# Anonymisierung
CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS=true # Optional, default false
```
## Verwendung
@@ -141,121 +228,490 @@ curl -X POST "http://localhost:3000/advoware/calendar/sync" \
-d '{"full_content": true}'
```
### Event-basierter Sync
```bash
curl -X POST "http://localhost:3000/events" \
-H "Content-Type: application/json" \
-d '{"type": "calendar.sync.triggered", "data": {"full_content": true}}'
### Automatischer Sync
Cron-Step für regelmäßige Ausführung.
## Fehlerbehandlung und Logging
- **Transaktionen**: Pro Operation separat; Rollback nur für diese.
- **Logging**: Detailliert (Info/Debug für API, Warnung für Fehler).
- **API-Fehler**: Retry mit Backoff für Google; robust gegen Advoware-Bugs.
- **Datenfehler**: Fallbacks bei Parsing-Fehlern.
## Sicherheit und Datenschutz
- DB-User mit minimalen Berechtigungen.
- Schreibberechtigung-Flags verhindern unbefugte Änderungen.
- Anonymisierung: Verhindert Leakage sensibler Daten in Google Calendar.
- Audit-Logs für Compliance.
## Bekannte Probleme
- Recurring-Events: Begrenzte Unterstützung; Advoware hat keine RRULE.
- Timestamps: Fehlende in Google können zu Fallback führen.
- Performance: Bei vielen Terminen könnte Paginierung helfen.
## Erweiterungen
- Inkrementelle Syncs basierend auf `last_sync`.
- Mehr Strategien (z.B. 'manual').
- Webhooks für Echtzeit.
- Tests und Monitoring.
## Übersicht
Das System synchronisiert Termine zwischen:
- **Advoware**: Zentrale Terminverwaltung mit detaillierten Informationen (aber vielen API-Bugs).
- **Google Calendar**: Benutzerfreundliche Kalenderansicht für jeden Mitarbeiter.
- **PostgreSQL Hub**: Zentraler Datenspeicher für State, Policies und Audit-Logs.
## Architektur
### Hub-Design
- **Single Source of Truth**: Alle Sync-Informationen werden in PostgreSQL gespeichert.
- **Policies**: Enums für Sync-Strategien (`source_system_wins`, `last_change_wins`) und Flags für Schreibberechtigung (`advoware_write_allowed`).
- **Status-Tracking**: `sync_status` ('pending', 'synced', 'failed') für Monitoring und Retries.
- **Transaktionen**: Jede DB-Operation läuft in separaten Transaktionen; Fehler beeinflussen nur den aktuellen Eintrag.
- **Soft Deletes**: Gelöschte Termine werden markiert, nicht entfernt.
- **Phasen-basierte Verarbeitung**: Sync in 4 Phasen, um Neue, Deletes und Updates zu trennen.
### Sync-Phasen
1. **Phase 1: Neue Einträge Advoware → Google** - Erstelle Google-Events für neue Advoware-Termine, dann DB-Insert.
2. **Phase 2: Neue Einträge Google → Advoware** - Erstelle Advoware-Termine für neue Google-Events, dann DB-Insert.
3. **Phase 3: Gelöschte Einträge identifizieren** - Handle Deletes/Recreates basierend auf Strategie.
4. **Phase 4: Bestehende Einträge updaten** - Update bei Änderungen, basierend auf Timestamps.
### Datenbank-Schema
```sql
-- Haupt-Tabelle
CREATE TABLE calendar_sync (
sync_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
employee_kuerzel VARCHAR(10) NOT NULL,
advoware_frnr INTEGER,
google_event_id VARCHAR(255),
source_system source_system_enum NOT NULL,
sync_strategy sync_strategy_enum NOT NULL DEFAULT 'source_system_wins',
sync_status sync_status_enum NOT NULL DEFAULT 'synced',
advoware_write_allowed BOOLEAN NOT NULL DEFAULT FALSE,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
last_sync TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Enums
CREATE TYPE source_system_enum AS ENUM ('advoware', 'google');
CREATE TYPE sync_strategy_enum AS ENUM ('source_system_wins', 'last_change_wins');
CREATE TYPE sync_status_enum AS ENUM ('pending', 'synced', 'failed');
-- Audit-Tabelle
CREATE TABLE calendar_sync_audit (
id SERIAL PRIMARY KEY,
action VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Indizes (angepasst für Soft Deletes)
CREATE UNIQUE INDEX idx_calendar_sync_advoware ON calendar_sync (employee_kuerzel, advoware_frnr) WHERE advoware_frnr IS NOT NULL AND deleted = FALSE;
CREATE UNIQUE INDEX idx_calendar_sync_google ON calendar_sync (employee_kuerzel, google_event_id) WHERE google_event_id IS NOT NULL;
```
### Automatischer Sync
Der Cron-Step führt täglich um 2:00 Uhr einen vollständigen Sync aus.
## Funktionalität
## Datenmapping
### Automatische Kalender-Erstellung
- Für jeden Advoware-Mitarbeiter wird ein Google Calendar mit dem Namen `AW-{Kuerzel}` erstellt.
- Beispiel: Mitarbeiter mit Kürzel "SB" → Calendar "AW-SB".
- Kalender wird mit dem Haupt-Google-Account (`lehmannundpartner@gmail.com`) als Owner geteilt.
### Advoware → Google Calendar
```javascript
### Phasen-Details
#### Phase 1: Neue Einträge Advoware → Google
- Fetch Advoware-Termine.
- Für jede frNr, die nicht in DB (deleted=FALSE) existiert: Standardisiere Daten, erstelle Google-Event, dann INSERT in DB mit `sync_status = 'synced'`.
- Bei Fehlern: Warnung loggen, weitermachen (nicht abbrechen).
#### Phase 2: Neue Einträge Google → Advoware
- Fetch Google-Events.
- Für jeden event_id, der nicht in DB existiert: Standardisiere Daten, erstelle Advoware-Termin, dann INSERT in DB mit `sync_status = 'synced'`.
- Bei frNr None (API-Bug): Skippen mit Warnung.
- Bei Fehlern: Warnung loggen, weitermachen.
#### Phase 3: Gelöschte Einträge identifizieren
- Für jeden DB-Eintrag: Prüfe, ob Termin in API fehlt.
- Bei beiden fehlend: Soft Delete.
- Bei einem fehlend: Recreate oder propagate Delete basierend auf Strategie.
- Bei Fehlern: `sync_status = 'failed'`, Warnung.
#### Phase 4: Bestehende Einträge updaten
- Für bestehende Einträge: Vergleiche standardisierte Daten.
- Bei Differenzen: Update basierend auf Strategie und Timestamps.
- Bei Fehlern: `sync_status = 'failed'`, Warnung.
### Datenmapping und Standardisierung
Beide Systeme werden auf gemeinsames Format normalisiert (Berlin TZ):
```python
{
"summary": "Advoware blocked", // Immer "blocked" für Privacy
"description": `Advoware Termin ID: ${appointment.frNr}`,
"location": "",
"start": {
"dateTime": start_datetime, // Berechnet aus datum/uhrzeitVon
"timeZone": "Europe/Berlin"
},
"end": {
"dateTime": end_datetime, // Berechnet aus datumBis/uhrzeitBis
"timeZone": "Europe/Berlin"
},
"extendedProperties": {
"private": {
"advoware_frnr": appointment.frNr.toString()
'start': datetime, # Berlin TZ
'end': datetime,
'text': str,
'notiz': str,
'ort': str,
'dauertermin': int, # 0/1
'turnus': int, # 0/1
'turnusArt': int,
'recurrence': str # RRULE oder None
}
```
#### Advoware → Standard
- Start: `datum` + `uhrzeitVon` (Fallback 09:00), oder `datum` als datetime.
- End: `datumBis` + `uhrzeitBis` (Fallback 10:00), oder `datum` + 1h.
- All-Day: `dauertermin=1` oder Dauer >1 Tag.
- Recurring: `turnus`/`turnusArt` (vereinfacht, keine RRULE).
#### Google → Standard
- Start/End: `dateTime` oder `date` (All-Day).
- All-Day: `dauertermin=1` wenn All-Day oder Dauer >1 Tag.
- Recurring: RRULE aus `recurrence`.
#### Standard → Advoware
- POST/PUT: `datum`/`uhrzeitBis`/`datumBis` aus start/end.
- Defaults: `vorbereitungsDauer='00:00:00'`, `sb`/`anwalt`=employee_kuerzel.
#### Standard → Google
- All-Day: `date` statt `dateTime`, end +1 Tag.
- Recurring: RRULE aus `recurrence`.
## API-Schwächen und Fuckups
### Advoware API (Buggy und Inkonsistent)
- **Case Sensitivity in Responses**: Feldnamen variieren manchmal `'frNr'`, manchmal `'frnr'` (z.B. POST-Response: `{'frnr': 123}`). Code prüft beide (`result.get('frNr') or result.get('frnr')`), um None zu vermeiden.
- **Zeitformate**: `datum`/`datumBis` als `'YYYY-MM-DD'` oder `'YYYY-MM-DDTHH:MM:SS'`. `uhrzeitVon`/`uhrzeitBis` separat (z.B. `'09:00:00'`). Fehlt `uhrzeitVon`, Fallback 09:00; fehlt `uhrzeitBis`, 10:00. Parsing muss beide Formate handhaben.
- **Defaults und Fehlende Felder**: Viele Felder optional; Code setzt Fallbacks (z.B. `uhrzeitVon='09:00:00'`).
- **Recurring-Unterstützung**: Keine RRULE; nur `turnus` (0/1) und `turnusArt` (0-?). Mapping zu Google RRULE ist vereinfacht und unvollständig.
- **API-Zuverlässigkeit**: Manchmal erfolgreicher POST, aber `frNr: None` (trotz gültiger Response). 500-Fehler bei Bad Requests. Keine Timestamp-Details in Responses.
- **Zeitzonen**: Alles implizit Berlin; Code konvertiert explizit.
- **Andere Bugs**: `zuletztGeaendertAm` für Timestamps, aber Format unzuverlässig.
### Google Calendar API (Zuverlässig)
- **Zeitformate**: `dateTime` als ISO mit TZ (z.B. `'2025-01-01T10:00:00+01:00'`), `date` für All-Day. Code parst mit `fromisoformat` und `.rstrip('Z')`.
- **Zeitzonen**: Explizit (z.B. `'Europe/Berlin'`); Code konvertiert zu Berlin TZ.
- **Recurring**: RRULE in `recurrence`; vollständig unterstützt.
- **Updates**: `updated` Timestamp für last-change.
- **Keine bekannten Bugs**: Zuverlässig, aber Rate-Limits möglich.
## Step-Konfiguration
### calendar_sync_event_step.py
- **Type:** event
- **Subscribes:** calendar.sync.triggered
- **Flows:** advoware
**Event Data:**
```json
{
"data": {
"body": {
"employee_kuerzel": "SB" // Optional, default "AI"
}
}
}
```
### Google Calendar → Advoware
```javascript
{
"frNr": int(frnr),
"text": event.summary,
"notiz": updated_description, // Mit sync-info
"ort": event.location,
"datum": start.split('+')[0] if '+' in start else start,
"uhrzeitBis": end.split('T')[1].split('+')[0] if 'T' in end else '09:00:00',
"datumBis": end.split('+')[0] if '+' in end else end,
"sb": employee_kuerzel,
"anwalt": employee_kuerzel,
"vorbereitungsDauer": "00:00:00"
}
## Setup
### PostgreSQL
1. PostgreSQL 17 installieren und starten (localhost-only).
2. Datenbank erstellen: `sudo -u postgres psql -f /tmp/create_db.sql`
3. User und Berechtigungen setzen.
### Google API Credentials
1. Google Cloud Console Projekt erstellen.
2. Google Calendar API aktivieren.
3. Service Account erstellen.
4. `service-account.json` im Projekt bereitstellen.
### Advoware API Credentials
OAuth-ähnliche Authentifizierung.
### Umgebungsvariablen
```env
# PostgreSQL
POSTGRES_HOST=localhost
POSTGRES_USER=calendar_sync_user
POSTGRES_PASSWORD=your_password
POSTGRES_DB_NAME=calendar_sync_db
# Google Calendar
GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH=service-account.json
# Advoware API
ADVOWARE_API_BASE_URL=https://www2.advo-net.net:90/
ADVOWARE_PRODUCT_ID=64
ADVOWARE_APP_ID=your_app_id
ADVOWARE_API_KEY=your_api_key
ADVOWARE_KANZLEI=your_kanzlei
ADVOWARE_DATABASE=your_database
ADVOWARE_USER=your_user
ADVOWARE_ROLE=2
ADVOWARE_PASSWORD=your_password
ADVOWARE_TOKEN_LIFETIME_MINUTES=55
ADVOWARE_API_TIMEOUT_SECONDS=30
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_ADVOWARE_CACHE=1
REDIS_TIMEOUT_SECONDS=5
```
## Fehlerbehandlung
## Verwendung
- **Google API Fehler:** Exponentieller Backoff bei 403/429, automatische Wiederholung
- **Advoware API Fehler:** Token-Refresh bei 401, detaillierte Logging
- **JSON Parse Fehler:** Response-Text logging für Debugging
- **Netzwerkfehler:** Timeout-Handling, konfigurierbare Timeouts
- **Dateninkonsistenzen:** Token-Validierung, Änderungsvergleich vor Update
### Manueller Sync
```bash
curl -X POST "http://localhost:3000/events" \
-H "Content-Type: application/json" \
-d '{"type": "calendar.sync.triggered", "data": {"body": {"employee_kuerzel": "SB"}}}'
```
## Monitoring
### Automatischer Sync
Cron-Step für regelmäßige Ausführung.
### Logs
- Erfolgreiche Synchronisationen mit Anzahl
- Fehlerhafte API-Calls mit Details
- Kalender-Erstellungen und ACL-Setups
- Performance-Metriken (Sync-Dauer)
- Debug-Logs für Änderungsvergleiche
## Fehlerbehandlung und Logging
### Metriken
- Anzahl synchronisierter Termine
- Verarbeitete Mitarbeiter
- Fehlerquoten pro API
- Gelöschte Termine
- Token-Validierungsfehler
- **Transaktionen**: Pro Operation separat; Rollback nur für diese.
- **Logging**: Detailliert (Info/Debug für API, Warnung für Fehler).
- **API-Fehler**: Retry mit Backoff für Google; robust gegen Advoware-Bugs.
- **Datenfehler**: Fallbacks bei Parsing-Fehlern.
## Sicherheit
- Service Account für Google API (kein User-Consent)
- HMAC-SHA512 für Advoware Token-Generierung
- Token-Caching in Redis mit TTL
- MD5-Token für Sync-Validierung mit Salt
- Scoped Permissions (nur Calendar-Zugriff)
- Audit-Logs für alle Änderungen
- DB-User mit minimalen Berechtigungen.
- Schreibberechtigung-Flags verhindern unbefugte Änderungen.
- Audit-Logs für Compliance.
## Bekannte Probleme und Workarounds
## Bekannte Probleme
### Advoware API
- **Parameter-Typen:** Alle Query-Parameter müssen Strings sein (z.B. `'true'` statt `True`)
- **Response-Formate:** Manchmal HTML statt JSON, auch bei 200 Status
- **Content-Type:** Nicht immer korrekt gesetzt, führt zu Parse-Fehlern
- **Zeitformate:** Inkonsistente Formate, manuelle Parsing erforderlich
- **Rate Limits:** Keine dokumentierten Limits, aber praktische Limits vorhanden
### Google Calendar API
- **Rate Limits:** 403/429 mit Backoff-Handling
- **ACL-Sharing:** Owner-Rolle für Hauptaccount erforderlich
- **Event-Updates:** Vollständige Event-Daten bei Updates senden
### Sync-Logik
- **Token-Mismatch:** Verhindert unbefugte Änderungen
- **Deleted Tracking:** Verhindert Re-Sync gelöschter Termine
- **Änderungsvergleich:** Nur tatsächliche Änderungen syncen
- Recurring-Events: Begrenzte Unterstützung; Advoware hat keine RRULE.
- Timestamps: Fehlende in Google können zu Fallback führen.
- Performance: Bei vielen Terminen könnte Paginierung helfen.
## Erweiterungen
### Geplante Features
- Inkrementelle Syncs (nur geänderte Termine seit letztem Sync)
- Konfliktlösungsstrategien (Advoware gewinnt, Google gewinnt, Manuell)
- Batch-Verarbeitung für bessere Performance
- Webhook-Integration für Echtzeit-Syncs
- Mehrere Google-Accounts unterstützen
- Outlook/Apple Calendar Integration
- Inkrementelle Syncs basierend auf `last_sync`.
- Mehr Strategien (z.B. 'manual').
- Webhooks für Echtzeit.
- Tests und Monitoring.
### Code-Verbesserungen
- Unit-Tests für API-Calls
- Mock-Server für Testing
- Config-Validation beim Startup
- Health-Checks für beide APIs
- Metrics-Export (Prometheus)
### Datenbank-Schema
```sql
-- Haupt-Tabelle
CREATE TABLE calendar_sync (
sync_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
employee_kuerzel VARCHAR(10) NOT NULL,
advoware_frnr INTEGER,
google_event_id VARCHAR(255),
source_system source_system_enum NOT NULL,
sync_strategy sync_strategy_enum NOT NULL DEFAULT 'source_system_wins',
sync_status sync_status_enum NOT NULL DEFAULT 'synced',
advoware_write_allowed BOOLEAN NOT NULL DEFAULT FALSE,
deleted BOOLEAN NOT NULL DEFAULT FALSE,
last_sync TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Enums
CREATE TYPE source_system_enum AS ENUM ('advoware', 'google');
CREATE TYPE sync_strategy_enum AS ENUM ('source_system_wins', 'last_change_wins');
CREATE TYPE sync_status_enum AS ENUM ('pending', 'synced', 'failed');
-- Audit-Tabelle
CREATE TABLE calendar_sync_audit (
id SERIAL PRIMARY KEY,
sync_id UUID NOT NULL,
action VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Indizes
CREATE UNIQUE INDEX idx_calendar_sync_advoware ON calendar_sync (employee_kuerzel, advoware_frnr) WHERE advoware_frnr IS NOT NULL;
CREATE UNIQUE INDEX idx_calendar_sync_google ON calendar_sync (employee_kuerzel, google_event_id) WHERE google_event_id IS NOT NULL;
```
## Funktionalität
### Automatische Kalender-Erstellung
- Für jeden Advoware-Mitarbeiter wird ein Google Calendar mit dem Namen `AW-{Kuerzel}` erstellt.
- Beispiel: Mitarbeiter mit Kürzel "SB" → Calendar "AW-SB".
- Kalender wird mit dem Haupt-Google-Account (`lehmannundpartner@gmail.com`) als Owner geteilt.
### Bidirektionale Synchronisation
#### Fetch und Standardisierung
- Frische Daten werden von beiden APIs gefetcht (Zeitraum: 365 Tage zurück, 730 vor).
- Daten werden standardisiert: Start/End, Text, Notiz, Ort, Dauertermin, Turnus.
- Unterschiede werden basierend auf relevanten Feldern erkannt.
#### Verarbeitung bestehender Termine
- Für jeden DB-Eintrag: Prüfe, ob Termin in beiden Systemen existiert.
- Bei Unterschieden: Löse Konflikt basierend auf `sync_strategy`.
- `source_system_wins`: Aktualisiere basierend auf `source_system`.
- `last_change_wins`: Vergleiche Timestamps (`zuletztGeaendertAm` in Advoware, `updated` in Google); bei Fehlen: Fallback auf `source_system_wins`.
- Schreibberechtigung: Nur aktualisiere Advoware, wenn `advoware_write_allowed = TRUE`.
- Bei Fehlen in einem System: Rekreiere oder propagiere Delete basierend auf Strategie.
#### Neue Termine
- Neue aus Advoware: Erstelle in Google, füge DB-Eintrag hinzu (`source_system = 'advoware'`, `advoware_write_allowed = FALSE`).
- Neue aus Google: Erstelle in Advoware (falls erlaubt), füge DB-Eintrag hinzu (`source_system = 'google'`, `advoware_write_allowed = TRUE`).
#### Löschungen
- Wenn Termin in beiden fehlt: Soft Delete in DB.
- Wenn in einem fehlt: Propagiere Delete basierend auf Strategie und Berechtigung.
## API-Endpunkte
### Advoware API
- `GET /api/v1/advonet/Termine?kuerzel={kuerzel}&from={date}&to={date}` - Termine eines Mitarbeiters
- `GET /api/v1/advonet/Termine?frnr={frnr}` - Einzelner Termin (für Timestamp)
- `POST /api/v1/advonet/Termine` - Neuen Termin erstellen
- `PUT /api/v1/advonet/Termine` - Termin aktualisieren
- `DELETE /api/v1/advonet/Termine?frnr={frnr}` - Termin löschen
**API-Schwächen:**
- Zeitformate: `datum`/`datumBis` als `YYYY-MM-DD` oder `YYYY-MM-DDTHH:MM:SS`, `uhrzeitVon`/`uhrzeitBis` separat.
- Defaults: `uhrzeitVon` kann fehlen, dann 09:00; `uhrzeitBis` 10:00.
- Keine Recurring-Unterstützung (RRULE); nur `turnus`/`turnusArt` als Flags.
### Google Calendar API
- Kalender-Management (erstellen, ACL setzen)
- Events: Listen, erstellen, aktualisieren, löschen
- Recurring: RRULE-Unterstützung
- Timestamps: `updated` für Änderungszeit
## Step-Konfiguration
### calendar_sync_event_step.py
- **Type:** event
- **Subscribes:** calendar.sync.triggered
- **Flows:** advoware
**Event Data:**
```json
{
"data": {
"body": {
"employee_kuerzel": "SB" // Optional, default "AI" für Test
}
}
}
```
## Setup
### PostgreSQL
1. PostgreSQL 17 installieren und starten (localhost-only).
2. Datenbank erstellen: `sudo -u postgres psql -f /tmp/create_db.sql`
3. User und Berechtigungen setzen.
### Google API Credentials
1. Google Cloud Console Projekt erstellen.
2. Google Calendar API aktivieren.
3. Service Account erstellen.
4. `service-account.json` im Projekt bereitstellen.
### Advoware API Credentials
OAuth-ähnliche Authentifizierung.
### Umgebungsvariablen
```env
# PostgreSQL
POSTGRES_HOST=localhost
POSTGRES_USER=calendar_sync_user
POSTGRES_PASSWORD=your_password
POSTGRES_DB_NAME=calendar_sync_db
# Google Calendar
GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH=service-account.json
# Advoware API (wie zuvor)
ADVOWARE_API_BASE_URL=https://www2.advo-net.net:90/
ADVOWARE_PRODUCT_ID=64
ADVOWARE_APP_ID=your_app_id
ADVOWARE_API_KEY=your_api_key
ADVOWARE_KANZLEI=your_kanzlei
ADVOWARE_DATABASE=your_database
ADVOWARE_USER=your_user
ADVOWARE_ROLE=2
ADVOWARE_PASSWORD=your_password
ADVOWARE_TOKEN_LIFETIME_MINUTES=55
ADVOWARE_API_TIMEOUT_SECONDS=30
# Redis (falls verwendet)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB_ADVOWARE_CACHE=1
REDIS_TIMEOUT_SECONDS=5
```
## Verwendung
### Manueller Sync
```bash
curl -X POST "http://localhost:3000/events" \
-H "Content-Type: application/json" \
-d '{"type": "calendar.sync.triggered", "data": {"body": {"employee_kuerzel": "SB"}}}'
```
### Automatischer Sync
Cron-Step für regelmäßige Ausführung.
## Datenmapping
### Standardisierung
Beide Systeme werden auf gemeinsames Format normalisiert:
```python
{
'start': datetime, # Berlin TZ
'end': datetime,
'text': str,
'notiz': str,
'ort': str,
'dauertermin': int, # 0/1
'turnus': int, # 0/1
'turnusArt': int,
'recurrence': str # RRULE oder None
}
```
### Advoware → Google
- All-Day: Wenn `dauertermin=1` oder Dauer >1 Tag.
- Recurring: RRULE aus `turnus`/`turnusArt` (vereinfacht).
### Google → Advoware
- Zeit: `datum`/`uhrzeitBis` aus Start/End.
- Recurring: `turnus=1` wenn RRULE vorhanden.
## Fehlerbehandlung und Logging
- **Transaktionen**: Rollback bei Fehlern.
- **Logging**: Detailliertes Info/Debug-Logging für jeden Schritt.
- **API-Fehler**: Retry mit Backoff für Google; detailliert für Advoware.
- **Datenfehler**: Fallbacks bei Parsing-Fehlern.
## Sicherheit
- DB-User mit minimalen Berechtigungen.
- Schreibberechtigung-Flags verhindern unbefugte Änderungen.
- Audit-Logs für Compliance.
## Bekannte Probleme
- Recurring-Events: Begrenzte Unterstützung; Advoware hat keine RRULE.
- Timestamps: Fehlende in Google können zu Fallback führen.
- Performance: Bei vielen Terminen könnte Paginierung helfen.
## Erweiterungen
- Inkrementelle Syncs basierend auf `last_sync`.
- Mehr Strategien (z.B. 'manual').
- Webhooks für Echtzeit.
- Tests und Monitoring.

View File

@@ -0,0 +1 @@
# Advoware Calendar Sync package

View File

@@ -8,12 +8,13 @@ from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.oauth2 import service_account
import asyncpg
import redis
from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc.
from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls
# Setup logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
logger.addHandler(handler)
@@ -24,6 +25,8 @@ BERLIN_TZ = pytz.timezone('Europe/Berlin')
FETCH_FROM = (datetime.datetime.now(BERLIN_TZ) - timedelta(days=365)).strftime('%Y-01-01T00:00:00')
FETCH_TO = (datetime.datetime.now(BERLIN_TZ) + timedelta(days=730)).strftime('%Y-12-31T23:59:59')
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
# Relevant fields for data comparison (simple diff)
COMPARISON_FIELDS = ['text', 'notiz', 'ort', 'datum', 'uhrzeitBis', 'datumBis', 'dauertermin', 'turnus', 'turnusArt']
@@ -95,6 +98,7 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel):
'to': FETCH_TO
}
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params)
logger.debug(f"Raw Advoware API response: {result}")
appointments = result if isinstance(result, list) else []
logger.info(f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}")
return appointments
@@ -108,13 +112,16 @@ async def fetch_google_events(service, calendar_id):
now = datetime.datetime.now(pytz.utc)
from_date = now - timedelta(days=365)
to_date = now + timedelta(days=730)
time_min = from_date.strftime('%Y-%m-%dT%H:%M:%SZ')
time_max = to_date.strftime('%Y-%m-%dT%H:%M:%SZ')
events_result = service.events().list(
calendarId=calendar_id,
timeMin=from_date.isoformat() + 'Z',
timeMax=to_date.isoformat() + 'Z',
timeMin=time_min,
timeMax=time_max,
singleEvents=True, # Expand recurring
orderBy='startTime'
).execute()
logger.debug(f"Raw Google API response: {events_result}")
events = [evt for evt in events_result.get('items', []) if evt.get('status') != 'cancelled']
logger.info(f"Fetched {len(events)} Google events for calendar {calendar_id}")
return events
@@ -129,17 +136,46 @@ def standardize_appointment_data(data, source):
"""Standardize data from Advoware or Google to comparable dict, with TZ handling."""
if source == 'advoware':
start_str = data.get('datum', '')
end_str = data.get('datumBis', data.get('datum', ''))
start_time = data.get('uhrzeitVon') or '09:00:00' if 'T' not in start_str else start_str.split('T')[1]
# Improved parsing: if datum contains 'T', it's datetime; else combine with uhrzeitVon
if 'T' in start_str:
try:
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_str.replace('Z', '')))
except ValueError:
logger.warning(f"Invalid start datetime in Advoware: {start_str}")
start_dt = BERLIN_TZ.localize(datetime.datetime.now())
else:
start_time = data.get('uhrzeitVon') or '09:00:00'
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{start_str}T{start_time}"))
# For end: Use date from datumBis (or datum), time from uhrzeitBis
end_date_str = data.get('datumBis', data.get('datum', ''))
if 'T' in end_date_str:
base_end_date = end_date_str.split('T')[0]
else:
base_end_date = end_date_str
end_time = data.get('uhrzeitBis', '10:00:00')
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_str.replace('Z', '')))
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(end_str.replace('Z', '')))
try:
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{base_end_date}T{end_time}"))
except ValueError:
logger.warning(f"Invalid end datetime in Advoware: {base_end_date}T{end_time}")
end_dt = start_dt + timedelta(hours=1)
# Anonymization for Google events
if Config.CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS:
text = 'Advoware blocked'
notiz = ''
ort = ''
else:
text = data.get('text', '')
notiz = data.get('notiz', '')
ort = data.get('ort', '')
return {
'start': start_dt,
'end': end_dt,
'text': data.get('text', ''),
'notiz': data.get('notiz', ''),
'ort': data.get('ort', ''),
'text': text,
'notiz': notiz,
'ort': ort,
'dauertermin': data.get('dauertermin', 0),
'turnus': data.get('turnus', 0),
'turnusArt': data.get('turnusArt', 0),
@@ -158,7 +194,17 @@ def standardize_appointment_data(data, source):
end_dt = datetime.datetime.fromisoformat(end_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ)
else:
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(end_obj['date']))
dauertermin = 1 if all_day or (end_dt - start_dt).days > 1 else 0
# Improved dauertermin: set to 1 if all-day or duration >1 day
duration_days = (end_dt.date() - start_dt.date()).days
dauertermin = 1 if all_day or duration_days > 1 else 0
recurrence = data.get('recurrence')
if recurrence:
# Simple mapping: if recurrence exists, set turnus=1, turnusArt based on RRULE (simplified)
turnus = 1
turnus_art = 0 # Default, could parse RRULE for better mapping
else:
turnus = 0
turnus_art = 0
return {
'start': start_dt,
'end': end_dt,
@@ -166,9 +212,9 @@ def standardize_appointment_data(data, source):
'notiz': data.get('description', ''),
'ort': data.get('location', ''),
'dauertermin': dauertermin,
'turnus': 1 if data.get('recurrence') else 0, # Simplified
'turnusArt': 0, # Map RRULE to type if needed
'recurrence': data.get('recurrence')
'turnus': turnus,
'turnusArt': turnus_art,
'recurrence': recurrence
}
def data_diff(data1, data2):
@@ -200,7 +246,8 @@ async def create_advoware_appointment(advoware, data, employee_kuerzel):
}
try:
result = await advoware.api_call('api/v1/advonet/Termine', method='POST', json_data=appointment_data)
frnr = str(result.get('frNr'))
logger.debug(f"Raw Advoware POST response: {result}")
frnr = str(result.get('frNr') or result.get('frnr'))
logger.info(f"Created Advoware appointment frNr: {frnr}")
return frnr
except Exception as e:
@@ -246,7 +293,9 @@ async def create_google_event(service, calendar_id, data):
all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0)
if all_day:
start_obj = {'date': start_dt.strftime('%Y-%m-%d')}
end_obj = {'date': end_dt.strftime('%Y-%m-%d')}
# For all-day events, end date is exclusive, so add 1 day
end_date = (start_dt + timedelta(days=1)).strftime('%Y-%m-%d')
end_obj = {'date': end_date}
else:
start_obj = {'dateTime': start_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
end_obj = {'dateTime': end_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
@@ -277,7 +326,9 @@ async def update_google_event(service, calendar_id, event_id, data):
all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0)
if all_day:
start_obj = {'date': start_dt.strftime('%Y-%m-%d')}
end_obj = {'date': end_dt.strftime('%Y-%m-%d')}
# For all-day events, end date is exclusive, so add 1 day
end_date = (start_dt + timedelta(days=1)).strftime('%Y-%m-%d')
end_obj = {'date': end_date}
else:
start_obj = {'dateTime': start_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
end_obj = {'dateTime': end_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
@@ -311,167 +362,242 @@ async def delete_google_event(service, calendar_id, event_id):
logger.error(f"Failed to delete Google event {event_id}: {e}")
raise
async def get_advoware_timestamp(advoware, frnr):
"""Fetch last modified timestamp from Advoware (zuletztGeaendertAm)."""
try:
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params={'frnr': frnr})
if result and isinstance(result, list) and result:
return datetime.datetime.fromisoformat(result[0].get('zuletztGeaendertAm', '').rstrip('Z')).astimezone(BERLIN_TZ)
return None
except Exception as e:
logger.error(f"Failed to get Advoware timestamp for {frnr}: {e}")
async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed):
"""Safe wrapper for creating Advoware appointments with write permission check."""
if not write_allowed:
logger.warning("Cannot create in Advoware, write not allowed")
return None
return await create_advoware_appointment(advoware, data, employee_kuerzel)
async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed):
"""Safe wrapper for updating Advoware appointments with write permission check."""
if not write_allowed:
logger.warning("Cannot update in Advoware, write not allowed")
return
await update_advoware_appointment(advoware, frnr, data)
async def safe_delete_advoware_appointment(advoware, frnr, write_allowed):
"""Safe wrapper for deleting Advoware appointments with write permission check."""
if not write_allowed:
logger.warning("Cannot delete in Advoware, write not allowed")
return
await delete_advoware_appointment(advoware, frnr)
async def handler(event, context):
"""Main event handler for calendar sync."""
employee_kuerzel = event.get('data', {}).get('body', {}).get('employee_kuerzel', 'AI') # Default to 'AI' for test
logger.info(f"Starting calendar sync for {employee_kuerzel}")
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_CALENDAR_SYNC),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
)
try:
logger.debug("Initializing Google service")
service = await get_google_service()
logger.debug(f"Ensuring Google calendar for {employee_kuerzel}")
calendar_id = await ensure_google_calendar(service, employee_kuerzel)
logger.debug("Initializing Advoware API")
advoware = AdvowareAPI(context)
async with await connect_db() as conn:
async with conn.transaction():
# Lock rows
rows = await conn.fetch(
"""
SELECT * FROM calendar_sync
WHERE employee_kuerzel = $1 AND deleted = FALSE
FOR UPDATE
""",
employee_kuerzel
)
conn = await connect_db()
try:
# Fetch fresh data
logger.info("Fetching fresh data from APIs")
adv_appointments = await fetch_advoware_appointments(advoware, employee_kuerzel)
adv_map = {str(app['frNr']): app for app in adv_appointments if app.get('frNr')}
google_events = await fetch_google_events(service, calendar_id)
google_map = {evt['id']: evt for evt in google_events}
# Build maps
adv_appointments = await fetch_advoware_appointments(advoware, employee_kuerzel)
adv_map = {str(app['frNr']): app for app in adv_appointments if app.get('frNr')}
google_events = await fetch_google_events(service, calendar_id)
google_map = {evt['id']: evt for evt in google_events}
# Fetch existing DB rows
rows = await conn.fetch(
"""
SELECT * FROM calendar_sync
WHERE employee_kuerzel = $1 AND deleted = FALSE
""",
employee_kuerzel
)
logger.info(f"Fetched {len(rows)} existing sync rows")
adv_map_std = {frnr: standardize_appointment_data(app, 'advoware') for frnr, app in adv_map.items()}
google_map_std = {eid: standardize_appointment_data(evt, 'google') for eid, evt in google_map.items()}
# Build indexes
db_adv_index = {str(row['advoware_frnr']): row for row in rows if row['advoware_frnr']}
db_google_index = {row['google_event_id']: row for row in rows if row['google_event_id']}
# Build index from DB rows
db_adv_index = {row['advoware_frnr']: row for row in rows if row['advoware_frnr']}
db_google_index = {row['google_event_id']: row for row in rows if row['google_event_id']}
# Process existing
for row in rows:
frnr = row['advoware_frnr']
event_id = row['google_event_id']
adv_data = adv_map.pop(frnr, None) if frnr else None
google_data = google_map.pop(event_id, None) if event_id else None
adv_std = adv_map_std.pop(frnr, None) if frnr else None
google_std = google_map_std.pop(event_id, None) if event_id else None
if not adv_data and not google_data:
# Both missing - soft delete
await conn.execute("UPDATE calendar_sync SET deleted = TRUE WHERE sync_id = $1;", row['sync_id'])
logger.info(f"Marked as deleted: sync_id {row['sync_id']}")
continue
if adv_data and google_data:
# Both exist - check diff
if data_diff(adv_std, google_std):
strategy = row['sync_strategy']
if strategy == 'source_system_wins':
if row['source_system'] == 'advoware':
await update_google_event(service, calendar_id, event_id, adv_std)
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
await update_advoware_appointment(advoware, frnr, google_std)
else:
logger.warning(f"Write to Advoware blocked for sync_id {row['sync_id']}")
elif strategy == 'last_change_wins':
adv_ts = await get_advoware_timestamp(advoware, frnr)
google_ts = datetime.datetime.fromisoformat(google_data.get('updated', '').rstrip('Z')).astimezone(BERLIN_TZ)
if adv_ts and google_ts:
if adv_ts > google_ts:
await update_google_event(service, calendar_id, event_id, adv_std)
elif row['advoware_write_allowed']:
await update_advoware_appointment(advoware, frnr, google_std)
else:
logger.warning(f"Missing timestamps for last_change_wins: sync_id {row['sync_id']}")
elif adv_data:
# Missing in Google - recreate or delete?
strategy = row['sync_strategy']
if strategy == 'source_system_wins' and row['source_system'] == 'advoware':
new_event_id = await create_google_event(service, calendar_id, adv_std)
# Phase 1: New from Advoware => Google
logger.info("Phase 1: Processing new appointments from Advoware")
for frnr, app in adv_map.items():
if frnr not in db_adv_index:
try:
event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware'))
async with conn.transaction():
await conn.execute(
"UPDATE calendar_sync SET google_event_id = $1 WHERE sync_id = $2;",
new_event_id, row['sync_id']
)
elif strategy == 'last_change_wins':
# Assume Adv change newer - recreate
new_event_id = await create_google_event(service, calendar_id, adv_std)
await conn.execute(
"UPDATE calendar_sync SET google_event_id = $1 WHERE sync_id = $2;",
new_event_id, row['sync_id']
"""
INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed)
VALUES ($1, $2, $3, 'advoware', 'source_system_wins', 'synced', FALSE);
""",
employee_kuerzel, int(frnr), event_id
)
logger.info(f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}")
except Exception as e:
logger.warning(f"Phase 1: Failed to process new Advoware {frnr}: {e}")
# Phase 2: New from Google => Advoware
logger.info("Phase 2: Processing new events from Google")
for event_id, evt in google_map.items():
if event_id not in db_google_index:
try:
frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), employee_kuerzel, True)
if frnr and str(frnr) != 'None':
async with conn.transaction():
await conn.execute(
"""
INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed)
VALUES ($1, $2, $3, 'google', 'source_system_wins', 'synced', TRUE);
""",
employee_kuerzel, int(frnr), event_id
)
logger.info(f"Phase 2: Created new from Google: event_id {event_id}, frNr {frnr}")
else:
# Propagate delete to Advoware if allowed
if row['advoware_write_allowed']:
await delete_advoware_appointment(advoware, frnr)
await conn.execute("UPDATE calendar_sync SET deleted = TRUE WHERE sync_id = $1;", row['sync_id'])
logger.warning(f"Phase 2: Skipped DB insert for Google event {event_id}, frNr is None")
except Exception as e:
logger.warning(f"Phase 2: Failed to process new Google {event_id}: {e}")
elif google_data:
# Missing in Advoware - recreate or delete?
strategy = row['sync_strategy']
if strategy == 'source_system_wins' and row['source_system'] == 'google' and row['advoware_write_allowed']:
new_frnr = await create_advoware_appointment(advoware, google_std, employee_kuerzel)
await conn.execute(
"UPDATE calendar_sync SET advoware_frnr = $1 WHERE sync_id = $2;",
int(new_frnr), row['sync_id']
)
elif strategy == 'last_change_wins' and row['advoware_write_allowed']:
# Assume Google change newer - recreate
new_frnr = await create_advoware_appointment(advoware, google_std, employee_kuerzel)
await conn.execute(
"UPDATE calendar_sync SET advoware_frnr = $1 WHERE sync_id = $2;",
int(new_frnr), row['sync_id']
)
else:
# Propagate delete to Google
# Phase 3: Identify deleted entries
logger.info("Phase 3: Processing deleted entries")
for row in rows:
frnr = row['advoware_frnr']
event_id = row['google_event_id']
adv_exists = str(frnr) in adv_map if frnr else False
google_exists = event_id in google_map if event_id else False
if not adv_exists and not google_exists:
# Both missing - soft delete
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
logger.info(f"Phase 3: Soft deleted sync_id {row['sync_id']} (both missing)")
elif not adv_exists:
# Missing in Advoware
strategy = row['sync_strategy']
if strategy == 'source_system_wins' and row['source_system'] == 'advoware':
# Recreate in Google
try:
new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(google_map[event_id], 'google'))
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced' WHERE sync_id = $2;", new_event_id, row['sync_id'])
logger.info(f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else:
# Propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
await conn.execute("UPDATE calendar_sync SET deleted = TRUE WHERE sync_id = $1;", row['sync_id'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
elif not google_exists:
# Missing in Google
strategy = row['sync_strategy']
if strategy == 'source_system_wins' and row['source_system'] == 'google' and row['advoware_write_allowed']:
# Recreate in Advoware
try:
new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(adv_map[frnr], 'advoware'), employee_kuerzel, row['advoware_write_allowed'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced' WHERE sync_id = $2;", int(new_frnr), row['sync_id'])
logger.info(f"Phase 3: Recreated Advoware appointment {new_frnr} for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else:
# Propagate delete to Advoware
try:
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
# New from Advoware
for frnr, app in adv_map.items():
adv_std = standardize_appointment_data(app, 'advoware')
event_id = await create_google_event(service, calendar_id, adv_std)
await conn.execute(
"""
INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, advoware_write_allowed)
VALUES ($1, $2, $3, 'advoware', 'source_system_wins', FALSE);
""",
employee_kuerzel, int(frnr), event_id
)
logger.info(f"Created new from Advoware: frNr {frnr}, event_id {event_id}")
# Phase 4: Update existing entries if changed
logger.info("Phase 4: Processing updates for existing entries")
for row in rows:
frnr = row['advoware_frnr']
event_id = row['google_event_id']
adv_data = adv_map.get(str(frnr)) if frnr else None
google_data = google_map.get(event_id) if event_id else None
# New from Google
for event_id, evt in google_map.items():
google_std = standardize_appointment_data(evt, 'google')
frnr = await create_advoware_appointment(advoware, google_std, employee_kuerzel)
await conn.execute(
"""
INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, advoware_write_allowed)
VALUES ($1, $2, $3, 'google', 'source_system_wins', TRUE);
""",
employee_kuerzel, int(frnr), event_id
)
logger.info(f"Created new from Google: event_id {event_id}, frNr {frnr}")
if adv_data and google_data:
adv_std = standardize_appointment_data(adv_data, 'advoware')
google_std = standardize_appointment_data(google_data, 'google')
strategy = row['sync_strategy']
try:
if strategy == 'source_system_wins':
if row['source_system'] == 'advoware':
# Check if Advoware was modified since last sync
adv_ts = datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm']).astimezone(BERLIN_TZ)
if adv_ts > row['last_sync']:
await update_google_event(service, calendar_id, event_id, adv_std)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], adv_ts)
logger.info(f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}")
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
# Check if Google was modified since last sync
google_ts_str = google_data.get('updated', '')
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
if google_ts and google_ts > row['last_sync']:
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], google_ts)
logger.info(f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}")
elif strategy == 'last_change_wins':
adv_ts = await get_advoware_timestamp(advoware, frnr)
google_ts_str = google_data.get('updated', '')
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
if adv_ts and google_ts:
if adv_ts > google_ts:
await update_google_event(service, calendar_id, event_id, adv_std)
elif row['advoware_write_allowed']:
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts))
logger.info(f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
# Update last_sync
await conn.execute(
"UPDATE calendar_sync SET last_sync = NOW() WHERE employee_kuerzel = $1;",
employee_kuerzel
)
# Update last_sync timestamps
logger.debug("Updated last_sync timestamps")
finally:
await conn.close()
redis_client.delete(CALENDAR_SYNC_LOCK_KEY)
logger.info(f"Calendar sync completed for {employee_kuerzel}")
return {'status': 200, 'body': {'status': 'completed'}}
except Exception as e:
logger.error(f"Sync failed for {employee_kuerzel}: {e}")
return {'status': 500, 'body': {'error': str(e)}}
redis_client.delete(CALENDAR_SYNC_LOCK_KEY)
return {'status': 500, 'body': {'error': str(e)}}
# Motia Step Configuration
config = {
"type": "event",
"name": "Calendar Sync Event Step",
"description": "Handles bidirectional calendar sync between Advoware and Google Calendar using Postgres as hub",
"subscribes": ["calendar.sync.triggered"],
"emits": [],
"flows": ["advoware"]
}

2
bitbylaw/types.d.ts vendored
View File

@@ -24,7 +24,7 @@ declare module 'motia' {
'Advoware Proxy POST': ApiRouteHandler<Record<string, unknown>, unknown, never>
'Advoware Proxy GET': ApiRouteHandler<Record<string, unknown>, unknown, never>
'Advoware Proxy DELETE': ApiRouteHandler<Record<string, unknown>, unknown, never>
'Calendar Sync Event Handler': EventHandler<never, never>
'Calendar Sync Event Step': EventHandler<never, never>
'Calendar Sync Cron Job': CronHandler<{ topic: 'calendar.sync.triggered'; data: never }>
'Calendar Sync API Trigger': ApiRouteHandler<Record<string, unknown>, unknown, { topic: 'calendar.sync.triggered'; data: never }>
}