Fix calendar sync issues: correct True/False, limit recurrence to 2 years, add share_google_calendar function
- Fix Python syntax error: change 'true' to 'True' in config.py - Limit recurring events to max 2 years to avoid Google Calendar API limits - Add share_google_calendar function for manual calendar sharing - Update README with calendar sync documentation - Debug mode: limit sync to user AI only
This commit is contained in:
@@ -205,20 +205,66 @@ CMD ["motia", "start"]
|
|||||||
motia logs
|
motia logs
|
||||||
```
|
```
|
||||||
|
|
||||||
## Erweiterungen
|
## Calendar Sync
|
||||||
|
|
||||||
### Geplante Features
|
Das System enthält auch eine bidirektionale Kalender-Synchronisation zwischen Advoware und Google Calendar.
|
||||||
|
|
||||||
- Vollständige EspoCRM-API-Integration im Sync-Handler
|
### Architektur
|
||||||
- Retry-Logic für fehlgeschlagene Syncs
|
|
||||||
- Metriken und Alerting
|
|
||||||
- Batch-Verarbeitung für große Datenmengen
|
|
||||||
|
|
||||||
### API Erweiterungen
|
- **PostgreSQL Hub**: Speichert Sync-Zustand und verhindert Datenverlust
|
||||||
|
- **Event-Driven Sync**: 4-Phasen-Sync (Neu, Gelöscht, Aktualisiert)
|
||||||
|
- **Safe Wrappers**: Globale Write-Protection für Advoware-Schreiboperationen
|
||||||
|
- **Rate Limiting**: Backoff-Handling für Google Calendar API-Limits
|
||||||
|
|
||||||
- Zusätzliche Advoware-Endpunkte
|
### Dauertermine (Recurring Appointments)
|
||||||
- Mehr EspoCRM-Entitäten
|
|
||||||
- Custom Mapping-Regeln
|
Advoware verwendet `dauertermin=1` für wiederkehrende Termine mit folgenden Feldern:
|
||||||
|
|
||||||
|
- `turnus`: Intervall (z.B. 1 = jeden, 3 = jeden 3.)
|
||||||
|
- `turnusArt`: Frequenz-Einheit
|
||||||
|
- `1` = Täglich (DAILY)
|
||||||
|
- `2` = Wöchentlich (WEEKLY)
|
||||||
|
- `3` = Monatlich (MONTHLY)
|
||||||
|
- `4` = Jährlich (YEARLY)
|
||||||
|
- `datumBis`: Enddatum der Wiederholung
|
||||||
|
|
||||||
|
**RRULE-Generierung:**
|
||||||
|
```
|
||||||
|
RRULE:FREQ={FREQ};INTERVAL={turnus};UNTIL={datumBis}
|
||||||
|
```
|
||||||
|
|
||||||
|
Beispiel: `turnus=3, turnusArt=1` → `RRULE:FREQ=DAILY;INTERVAL=3;UNTIL=20251224`
|
||||||
|
|
||||||
|
### Setup
|
||||||
|
|
||||||
|
1. **Google Service Account**: `service-account.json` im Projektroot
|
||||||
|
2. **Umgebungsvariablen**:
|
||||||
|
```env
|
||||||
|
ADVOWARE_WRITE_PROTECTION=false # Global write protection
|
||||||
|
POSTGRES_HOST=localhost
|
||||||
|
GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH=service-account.json
|
||||||
|
```
|
||||||
|
3. **Trigger Sync**:
|
||||||
|
```bash
|
||||||
|
curl -X POST "http://localhost:3000/advoware/calendar/sync" -H "Content-Type: application/json" -d '{"full_content": true}'
|
||||||
|
```
|
||||||
|
|
||||||
|
### Rate Limiting & Backoff
|
||||||
|
|
||||||
|
- **Google Calendar API**: 403-Fehler bei Rate-Limits werden mit exponentiellem Backoff (max. 60s) wiederholt
|
||||||
|
- **Delays**: 100ms zwischen API-Calls zur Vermeidung von Limits
|
||||||
|
- **Retry-Logic**: Max. 4 Versuche mit base=4
|
||||||
|
|
||||||
|
### Sicherheit
|
||||||
|
|
||||||
|
- **Write Protection**: `ADVOWARE_WRITE_PROTECTION=true` deaktiviert alle Advoware-Schreiboperationen
|
||||||
|
- **Per-User Calendars**: Automatische Erstellung und Freigabe von Google-Calendars pro Mitarbeiter
|
||||||
|
|
||||||
|
### Troubleshooting
|
||||||
|
|
||||||
|
- **Rate Limit Errors**: Logs zeigen Backoff-Retries; warten oder Limits erhöhen
|
||||||
|
- **Sync Failures**: `ADVOWARE_WRITE_PROTECTION=false` setzen für Debugging
|
||||||
|
- **Calendar Access**: Service Account muss Owner-Rechte haben
|
||||||
|
|
||||||
## Lizenz
|
## Lizenz
|
||||||
|
|
||||||
|
|||||||
@@ -37,3 +37,4 @@ class Config:
|
|||||||
|
|
||||||
# Calendar Sync settings
|
# Calendar Sync settings
|
||||||
CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS = os.getenv('CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS', 'true').lower() == 'true'
|
CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS = os.getenv('CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS', 'true').lower() == 'true'
|
||||||
|
ADVOWARE_WRITE_PROTECTION = os.getenv('ADVOWARE_WRITE_PROTECTION', 'false').lower() == 'true'
|
||||||
@@ -61,7 +61,7 @@ async def get_google_service():
|
|||||||
logger.error(f"Failed to initialize Google service: {e}")
|
logger.error(f"Failed to initialize Google service: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=5, base=5, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
|
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||||
async def ensure_google_calendar(service, employee_kuerzel):
|
async def ensure_google_calendar(service, employee_kuerzel):
|
||||||
"""Ensure Google Calendar exists for employee."""
|
"""Ensure Google Calendar exists for employee."""
|
||||||
calendar_name = f"AW-{employee_kuerzel}"
|
calendar_name = f"AW-{employee_kuerzel}"
|
||||||
@@ -108,7 +108,7 @@ async def fetch_advoware_appointments(advoware, employee_kuerzel):
|
|||||||
logger.error(f"Failed to fetch Advoware appointments: {e}")
|
logger.error(f"Failed to fetch Advoware appointments: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=5, base=5, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
|
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
|
||||||
async def fetch_google_events(service, calendar_id):
|
async def fetch_google_events(service, calendar_id):
|
||||||
"""Fetch Google events in range."""
|
"""Fetch Google events in range."""
|
||||||
try:
|
try:
|
||||||
@@ -290,7 +290,7 @@ async def delete_advoware_appointment(advoware, frnr):
|
|||||||
logger.error(f"Failed to delete Advoware appointment {frnr}: {e}")
|
logger.error(f"Failed to delete Advoware appointment {frnr}: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=5, base=5, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
|
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||||
async def create_google_event(service, calendar_id, data):
|
async def create_google_event(service, calendar_id, data):
|
||||||
"""Create Google event from standardized data."""
|
"""Create Google event from standardized data."""
|
||||||
start_dt = data['start'].astimezone(BERLIN_TZ)
|
start_dt = data['start'].astimezone(BERLIN_TZ)
|
||||||
@@ -324,7 +324,7 @@ async def create_google_event(service, calendar_id, data):
|
|||||||
logger.error(f"Failed to create Google event: {e}")
|
logger.error(f"Failed to create Google event: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=5, base=5, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
|
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||||
async def update_google_event(service, calendar_id, event_id, data):
|
async def update_google_event(service, calendar_id, event_id, data):
|
||||||
"""Update Google event."""
|
"""Update Google event."""
|
||||||
start_dt = data['start'].astimezone(BERLIN_TZ)
|
start_dt = data['start'].astimezone(BERLIN_TZ)
|
||||||
@@ -356,7 +356,7 @@ async def update_google_event(service, calendar_id, event_id, data):
|
|||||||
logger.error(f"Failed to update Google event {event_id}: {e}")
|
logger.error(f"Failed to update Google event {event_id}: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@backoff.on_exception(backoff.expo, HttpError, max_tries=5, base=5, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
|
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
|
||||||
async def delete_google_event(service, calendar_id, event_id):
|
async def delete_google_event(service, calendar_id, event_id):
|
||||||
"""Delete Google event."""
|
"""Delete Google event."""
|
||||||
try:
|
try:
|
||||||
@@ -370,14 +370,30 @@ async def delete_google_event(service, calendar_id, event_id):
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed):
|
async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed):
|
||||||
"""Safe wrapper for creating Advoware appointments with write permission check."""
|
"""Safe wrapper for creating Advoware appointments with write permission check and global protection."""
|
||||||
|
if Config.ADVOWARE_WRITE_PROTECTION:
|
||||||
|
logger.warning("Global write protection active, skipping Advoware create")
|
||||||
|
return None
|
||||||
if not write_allowed:
|
if not write_allowed:
|
||||||
logger.warning("Cannot create in Advoware, write not allowed")
|
logger.warning("Cannot create in Advoware, write not allowed")
|
||||||
return None
|
return None
|
||||||
return await create_advoware_appointment(advoware, data, employee_kuerzel)
|
return await create_advoware_appointment(advoware, data, employee_kuerzel)
|
||||||
|
|
||||||
|
async def safe_delete_advoware_appointment(advoware, frnr, write_allowed):
|
||||||
|
"""Safe wrapper for deleting Advoware appointments with write permission check and global protection."""
|
||||||
|
if Config.ADVOWARE_WRITE_PROTECTION:
|
||||||
|
logger.warning("Global write protection active, skipping Advoware delete")
|
||||||
|
return
|
||||||
|
if not write_allowed:
|
||||||
|
logger.warning("Cannot delete in Advoware, write not allowed")
|
||||||
|
return
|
||||||
|
await delete_advoware_appointment(advoware, frnr)
|
||||||
|
|
||||||
async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel):
|
async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel):
|
||||||
"""Safe wrapper for updating Advoware appointments with write permission check."""
|
"""Safe wrapper for updating Advoware appointments with write permission check and global protection."""
|
||||||
|
if Config.ADVOWARE_WRITE_PROTECTION:
|
||||||
|
logger.warning("Global write protection active, skipping Advoware update")
|
||||||
|
return
|
||||||
if not write_allowed:
|
if not write_allowed:
|
||||||
logger.warning("Cannot update in Advoware, write not allowed")
|
logger.warning("Cannot update in Advoware, write not allowed")
|
||||||
return
|
return
|
||||||
@@ -386,7 +402,7 @@ async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed,
|
|||||||
async def get_advoware_employees(advoware):
|
async def get_advoware_employees(advoware):
|
||||||
"""Fetch list of employees from Advoware."""
|
"""Fetch list of employees from Advoware."""
|
||||||
try:
|
try:
|
||||||
result = await advoware.api_call('api/v1/advonet/Anwaelte', method='GET')
|
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
|
||||||
employees = result if isinstance(result, list) else []
|
employees = result if isinstance(result, list) else []
|
||||||
logger.info(f"Fetched {len(employees)} Advoware employees")
|
logger.info(f"Fetched {len(employees)} Advoware employees")
|
||||||
return employees
|
return employees
|
||||||
@@ -411,7 +427,7 @@ async def handler(event, context):
|
|||||||
|
|
||||||
total_synced = 0
|
total_synced = 0
|
||||||
for employee in employees:
|
for employee in employees:
|
||||||
kuerzel = employee.get('kuerzel') or employee.get('anwalt')
|
kuerzel = employee.get('kuerzel')
|
||||||
if not kuerzel:
|
if not kuerzel:
|
||||||
logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}")
|
logger.warning(f"Mitarbeiter ohne Kürzel übersprungen: {employee}")
|
||||||
continue
|
continue
|
||||||
@@ -425,8 +441,6 @@ async def handler(event, context):
|
|||||||
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
|
||||||
)
|
)
|
||||||
|
|
||||||
CALENDAR_SYNC_LOCK_KEY = f'calendar_sync_lock_{kuerzel}'
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
logger.debug("Initializing Google service")
|
logger.debug("Initializing Google service")
|
||||||
service = await get_google_service()
|
service = await get_google_service()
|
||||||
@@ -471,6 +485,7 @@ async def handler(event, context):
|
|||||||
kuerzel, int(frnr), event_id
|
kuerzel, int(frnr), event_id
|
||||||
)
|
)
|
||||||
logger.info(f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}")
|
logger.info(f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}")
|
||||||
|
await asyncio.sleep(0.1) # Small delay to avoid rate limits
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Phase 1: Failed to process new Advoware {frnr}: {e}")
|
logger.warning(f"Phase 1: Failed to process new Advoware {frnr}: {e}")
|
||||||
|
|
||||||
@@ -519,6 +534,7 @@ async def handler(event, context):
|
|||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
|
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
|
||||||
logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
|
logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
|
||||||
|
await asyncio.sleep(0.1) # Small delay to avoid rate limits
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
|
logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
|
||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
@@ -546,6 +562,7 @@ async def handler(event, context):
|
|||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
|
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
|
||||||
logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
|
logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
|
||||||
|
await asyncio.sleep(0.1) # Small delay to avoid rate limits
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
|
logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
|
||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
@@ -557,6 +574,7 @@ async def handler(event, context):
|
|||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
|
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
|
||||||
logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
|
logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
|
||||||
|
await asyncio.sleep(0.1) # Small delay to avoid rate limits
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
|
logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
|
||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
@@ -588,6 +606,7 @@ async def handler(event, context):
|
|||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
||||||
logger.info(f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}")
|
logger.info(f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}")
|
||||||
|
await asyncio.sleep(0.1) # Small delay to avoid rate limits
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}")
|
logger.warning(f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}")
|
||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
@@ -628,12 +647,14 @@ async def handler(event, context):
|
|||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
||||||
logger.info(f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}")
|
logger.info(f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}")
|
||||||
|
await asyncio.sleep(0.1) # Small delay to avoid rate limits
|
||||||
elif google_ts and google_ts > row['last_sync']:
|
elif google_ts and google_ts > row['last_sync']:
|
||||||
logger.warning(f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}")
|
logger.warning(f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}")
|
||||||
await update_google_event(service, calendar_id, event_id, adv_std)
|
await update_google_event(service, calendar_id, event_id, adv_std)
|
||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
|
||||||
logger.info(f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}")
|
logger.info(f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}")
|
||||||
|
await asyncio.sleep(0.1) # Small delay to avoid rate limits
|
||||||
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
|
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
|
||||||
# Check for changes in source (Google) or unauthorized changes in target (Advoware)
|
# Check for changes in source (Google) or unauthorized changes in target (Advoware)
|
||||||
google_ts_str = google_data.get('updated', '')
|
google_ts_str = google_data.get('updated', '')
|
||||||
@@ -663,6 +684,7 @@ async def handler(event, context):
|
|||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts))
|
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts))
|
||||||
logger.info(f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}")
|
logger.info(f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}")
|
||||||
|
await asyncio.sleep(0.1) # Small delay to avoid rate limits
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}")
|
logger.warning(f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}")
|
||||||
async with conn.transaction():
|
async with conn.transaction():
|
||||||
|
|||||||
Reference in New Issue
Block a user