Files
motia/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py

673 lines
37 KiB
Python

import asyncio
import logging
import os
import datetime
from datetime import timedelta
import pytz
import backoff
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.oauth2 import service_account
import asyncpg
import redis
from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc.
from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls
# Setup logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
logger.addHandler(handler)
# Timezone for all operations (optimize TZ handling)
BERLIN_TZ = pytz.timezone('Europe/Berlin')
# Constants for ranges (optimize fetch efficiency)
FETCH_FROM = (datetime.datetime.now(BERLIN_TZ) - timedelta(days=365)).strftime('%Y-01-01T00:00:00')
FETCH_TO = (datetime.datetime.now(BERLIN_TZ) + timedelta(days=730)).strftime('%Y-12-31T23:59:59')
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
# Relevant fields for data comparison (simple diff)
COMPARISON_FIELDS = ['text', 'notiz', 'ort', 'datum', 'uhrzeitBis', 'datumBis', 'dauertermin', 'turnus', 'turnusArt']
async def connect_db():
"""Connect to Postgres DB from Config."""
try:
conn = await asyncpg.connect(
host=Config.POSTGRES_HOST or 'localhost',
user=Config.POSTGRES_USER,
password=Config.POSTGRES_PASSWORD,
database=Config.POSTGRES_DB_NAME,
timeout=10
)
return conn
except Exception as e:
logger.error(f"Failed to connect to DB: {e}")
raise
async def get_google_service():
"""Initialize Google Calendar service."""
try:
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
if not os.path.exists(service_account_path):
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
creds = service_account.Credentials.from_service_account_file(
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
)
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
logger.error(f"Failed to initialize Google service: {e}")
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=5, base=5, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
async def ensure_google_calendar(service, employee_kuerzel):
"""Ensure Google Calendar exists for employee."""
calendar_name = f"AW-{employee_kuerzel}"
try:
calendar_list = service.calendarList().list().execute()
for calendar in calendar_list.get('items', []):
if calendar['summary'] == calendar_name:
return calendar['id']
# Create new
calendar_body = {
'summary': calendar_name,
'timeZone': 'Europe/Berlin'
}
created = service.calendars().insert(body=calendar_body).execute()
calendar_id = created['id']
# Share with main account if needed
acl_rule = {
'scope': {'type': 'user', 'value': 'lehmannundpartner@gmail.com'},
'role': 'owner'
}
service.acl().insert(calendarId=calendar_id, body=acl_rule).execute()
return calendar_id
except HttpError as e:
logger.error(f"Google API error for calendar {employee_kuerzel}: {e}")
raise
except Exception as e:
logger.error(f"Failed to ensure Google calendar for {employee_kuerzel}: {e}")
raise
async def fetch_advoware_appointments(advoware, employee_kuerzel):
"""Fetch Advoware appointments in range."""
try:
params = {
'kuerzel': employee_kuerzel,
'from': FETCH_FROM,
'to': FETCH_TO
}
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params)
logger.debug(f"Raw Advoware API response: {result}")
appointments = result if isinstance(result, list) else []
logger.info(f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}")
return appointments
except Exception as e:
logger.error(f"Failed to fetch Advoware appointments: {e}")
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=5, base=5, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
async def fetch_google_events(service, calendar_id):
"""Fetch Google events in range."""
try:
now = datetime.datetime.now(pytz.utc)
from_date = now - timedelta(days=365)
to_date = now + timedelta(days=730)
time_min = from_date.strftime('%Y-%m-%dT%H:%M:%SZ')
time_max = to_date.strftime('%Y-%m-%dT%H:%M:%SZ')
events_result = service.events().list(
calendarId=calendar_id,
timeMin=time_min,
timeMax=time_max,
singleEvents=True, # Expand recurring
orderBy='startTime'
).execute()
logger.debug(f"Raw Google API response: {events_result}")
events = [evt for evt in events_result.get('items', []) if evt.get('status') != 'cancelled']
logger.info(f"Fetched {len(events)} Google events for calendar {calendar_id}")
return events
except HttpError as e:
logger.error(f"Google API error fetching events: {e}")
raise
except Exception as e:
logger.error(f"Failed to fetch Google events: {e}")
raise
def standardize_appointment_data(data, source):
"""Standardize data from Advoware or Google to comparable dict, with TZ handling."""
if source == 'advoware':
start_str = data.get('datum', '')
# Improved parsing: if datum contains 'T', it's datetime; else combine with uhrzeitVon
if 'T' in start_str:
try:
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_str.replace('Z', '')))
except ValueError:
logger.warning(f"Invalid start datetime in Advoware: {start_str}")
start_dt = BERLIN_TZ.localize(datetime.datetime.now())
else:
start_time = data.get('uhrzeitVon') or '09:00:00'
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{start_str}T{start_time}"))
# For end: Use date from datumBis (or datum), time from uhrzeitBis
end_date_str = data.get('datumBis', data.get('datum', ''))
if 'T' in end_date_str:
base_end_date = end_date_str.split('T')[0]
else:
base_end_date = end_date_str
end_time = data.get('uhrzeitBis', '10:00:00')
try:
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{base_end_date}T{end_time}"))
except ValueError:
logger.warning(f"Invalid end datetime in Advoware: {base_end_date}T{end_time}")
end_dt = start_dt + timedelta(hours=1)
# Anonymization for Google events
if Config.CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS:
text = 'Advoware blocked'
notiz = ''
ort = ''
else:
text = data.get('text', '')
notiz = data.get('notiz', '')
ort = data.get('ort', '')
return {
'start': start_dt,
'end': end_dt,
'text': text,
'notiz': notiz,
'ort': ort,
'dauertermin': data.get('dauertermin', 0),
'turnus': data.get('turnus', 0),
'turnusArt': data.get('turnusArt', 0),
'recurrence': None # No RRULE in Advoware
}
elif source == 'google':
start_obj = data.get('start', {})
end_obj = data.get('end', {})
if 'dateTime' in start_obj:
start_dt = datetime.datetime.fromisoformat(start_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ)
all_day = False
else:
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_obj['date']))
all_day = True
if 'dateTime' in end_obj:
end_dt = datetime.datetime.fromisoformat(end_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ)
else:
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(end_obj['date']))
# Improved dauertermin: set to 1 if all-day or duration >1 day
duration_days = (end_dt.date() - start_dt.date()).days
dauertermin = 1 if all_day or duration_days > 1 else 0
recurrence = data.get('recurrence')
if recurrence:
# Simple mapping: if recurrence exists, set turnus=1, turnusArt based on RRULE (simplified)
turnus = 1
turnus_art = 0 # Default, could parse RRULE for better mapping
else:
turnus = 0
turnus_art = 0
return {
'start': start_dt,
'end': end_dt,
'text': data.get('summary', ''),
'notiz': data.get('description', ''),
'ort': data.get('location', ''),
'dauertermin': dauertermin,
'turnus': turnus,
'turnusArt': turnus_art,
'recurrence': recurrence
}
def data_diff(data1, data2):
"""Simple diff on standardized data."""
if not data1 or not data2:
return True
for field in COMPARISON_FIELDS:
if data1.get(field) != data2.get(field):
return True
return False
async def create_advoware_appointment(advoware, data, employee_kuerzel):
"""Create Advoware appointment from standardized data."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
appointment_data = {
'text': data['text'],
'notiz': data['notiz'],
'ort': data['ort'],
'datum': start_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'uhrzeitBis': end_dt.strftime('%H:%M:%S'),
'datumBis': end_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'anwalt': employee_kuerzel,
'vorbereitungsDauer': '00:00:00',
'dauertermin': data['dauertermin'],
'turnus': data['turnus'],
'turnusArt': data['turnusArt']
}
try:
result = await advoware.api_call('api/v1/advonet/Termine', method='POST', json_data=appointment_data)
logger.debug(f"Raw Advoware POST response: {result}")
frnr = str(result.get('frNr') or result.get('frnr'))
logger.info(f"Created Advoware appointment frNr: {frnr}")
return frnr
except Exception as e:
logger.error(f"Failed to create Advoware appointment: {e}")
raise
async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel):
"""Update Advoware appointment."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
appointment_data = {
'frNr': int(frnr),
'text': data['text'],
'notiz': data['notiz'],
'ort': data['ort'],
'datum': start_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'uhrzeitBis': end_dt.strftime('%H:%M:%S'),
'datumBis': end_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'anwalt': employee_kuerzel,
'vorbereitungsDauer': '00:00:00',
'dauertermin': data['dauertermin'],
'turnus': data['turnus'],
'turnusArt': data['turnusArt']
}
try:
await advoware.api_call('api/v1/advonet/Termine', method='PUT', json_data=appointment_data)
logger.info(f"Updated Advoware appointment frNr: {frnr}")
except Exception as e:
logger.error(f"Failed to update Advoware appointment {frnr}: {e}")
raise
async def delete_advoware_appointment(advoware, frnr):
"""Delete Advoware appointment."""
try:
await advoware.api_call('api/v1/advonet/Termine', method='DELETE', params={'frnr': frnr})
logger.info(f"Deleted Advoware appointment frNr: {frnr}")
except Exception as e:
logger.error(f"Failed to delete Advoware appointment {frnr}: {e}")
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=5, base=5, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
async def create_google_event(service, calendar_id, data):
"""Create Google event from standardized data."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0)
if all_day:
start_obj = {'date': start_dt.strftime('%Y-%m-%d')}
# For all-day events, end date is exclusive, so add 1 day
end_date = (start_dt + timedelta(days=1)).strftime('%Y-%m-%d')
end_obj = {'date': end_date}
else:
start_obj = {'dateTime': start_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
end_obj = {'dateTime': end_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
event_body = {
'summary': data['text'],
'description': data['notiz'],
'location': data['ort'],
'start': start_obj,
'end': end_obj,
'recurrence': data['recurrence'] # RRULE if present
}
try:
created = service.events().insert(calendarId=calendar_id, body=event_body).execute()
event_id = created['id']
logger.info(f"Created Google event ID: {event_id}")
return event_id
except HttpError as e:
logger.error(f"Google API error creating event: {e}")
raise
except Exception as e:
logger.error(f"Failed to create Google event: {e}")
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=5, base=5, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
async def update_google_event(service, calendar_id, event_id, data):
"""Update Google event."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0)
if all_day:
start_obj = {'date': start_dt.strftime('%Y-%m-%d')}
# For all-day events, end date is exclusive, so add 1 day
end_date = (start_dt + timedelta(days=1)).strftime('%Y-%m-%d')
end_obj = {'date': end_date}
else:
start_obj = {'dateTime': start_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
end_obj = {'dateTime': end_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
event_body = {
'summary': data['text'],
'description': data['notiz'],
'location': data['ort'],
'start': start_obj,
'end': end_obj,
'recurrence': data['recurrence']
}
try:
service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute()
logger.info(f"Updated Google event ID: {event_id}")
except HttpError as e:
logger.error(f"Google API error updating event {event_id}: {e}")
raise
except Exception as e:
logger.error(f"Failed to update Google event {event_id}: {e}")
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=5, base=5, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
async def delete_google_event(service, calendar_id, event_id):
"""Delete Google event."""
try:
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
logger.info(f"Deleted Google event ID: {event_id}")
except HttpError as e:
logger.error(f"Google API error deleting event {event_id}: {e}")
raise
except Exception as e:
logger.error(f"Failed to delete Google event {event_id}: {e}")
raise
async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed):
"""Safe wrapper for creating Advoware appointments with write permission check."""
if not write_allowed:
logger.warning("Cannot create in Advoware, write not allowed")
return None
return await create_advoware_appointment(advoware, data, employee_kuerzel)
async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel):
"""Safe wrapper for updating Advoware appointments with write permission check."""
if not write_allowed:
logger.warning("Cannot update in Advoware, write not allowed")
return
await update_advoware_appointment(advoware, frnr, data, employee_kuerzel)
async def safe_delete_advoware_appointment(advoware, frnr, write_allowed):
"""Safe wrapper for deleting Advoware appointments with write permission check."""
if not write_allowed:
logger.warning("Cannot delete in Advoware, write not allowed")
return
await delete_advoware_appointment(advoware, frnr)
async def handler(event, context):
"""Main event handler for calendar sync."""
employee_kuerzel = event.get('data', {}).get('body', {}).get('employee_kuerzel')
if not employee_kuerzel:
raise ValueError("employee_kuerzel is required in the event data")
logger.info(f"Starting calendar sync for {employee_kuerzel}")
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_CALENDAR_SYNC),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
)
try:
logger.debug("Initializing Google service")
service = await get_google_service()
logger.debug(f"Ensuring Google calendar for {employee_kuerzel}")
calendar_id = await ensure_google_calendar(service, employee_kuerzel)
logger.debug("Initializing Advoware API")
advoware = AdvowareAPI(context)
conn = await connect_db()
try:
# Fetch fresh data
logger.info("Fetching fresh data from APIs")
adv_appointments = await fetch_advoware_appointments(advoware, employee_kuerzel)
adv_map = {str(app['frNr']): app for app in adv_appointments if app.get('frNr')}
google_events = await fetch_google_events(service, calendar_id)
google_map = {evt['id']: evt for evt in google_events}
# Fetch existing DB rows
rows = await conn.fetch(
"""
SELECT * FROM calendar_sync
WHERE employee_kuerzel = $1 AND deleted = FALSE
""",
employee_kuerzel
)
logger.info(f"Fetched {len(rows)} existing sync rows")
# Build indexes
db_adv_index = {str(row['advoware_frnr']): row for row in rows if row['advoware_frnr']}
db_google_index = {row['google_event_id']: row for row in rows if row['google_event_id']}
# Phase 1: New from Advoware => Google
logger.info("Phase 1: Processing new appointments from Advoware")
for frnr, app in adv_map.items():
if frnr not in db_adv_index:
try:
event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware'))
async with conn.transaction():
await conn.execute(
"""
INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed)
VALUES ($1, $2, $3, 'advoware', 'source_system_wins', 'synced', FALSE);
""",
employee_kuerzel, int(frnr), event_id
)
logger.info(f"Phase 1: Created new from Advoware: frNr {frnr}, event_id {event_id}")
except Exception as e:
logger.warning(f"Phase 1: Failed to process new Advoware {frnr}: {e}")
# Phase 2: New from Google => Advoware
logger.info("Phase 2: Processing new events from Google")
for event_id, evt in google_map.items():
if event_id not in db_google_index:
try:
frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(evt, 'google'), employee_kuerzel, True)
if frnr and str(frnr) != 'None':
async with conn.transaction():
await conn.execute(
"""
INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed)
VALUES ($1, $2, $3, 'google', 'source_system_wins', 'synced', TRUE);
""",
employee_kuerzel, int(frnr), event_id
)
logger.info(f"Phase 2: Created new from Google: event_id {event_id}, frNr {frnr}")
else:
logger.warning(f"Phase 2: Skipped DB insert for Google event {event_id}, frNr is None")
except Exception as e:
logger.warning(f"Phase 2: Failed to process new Google {event_id}: {e}")
# Phase 3: Identify deleted entries
logger.info("Phase 3: Processing deleted entries")
for row in rows:
frnr = row['advoware_frnr']
event_id = row['google_event_id']
adv_exists = str(frnr) in adv_map if frnr else False
google_exists = event_id in google_map if event_id else False
if not adv_exists and not google_exists:
# Both missing - soft delete
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
logger.info(f"Phase 3: Soft deleted sync_id {row['sync_id']} (both missing)")
elif not adv_exists:
# Missing in Advoware
strategy = row['sync_strategy']
if strategy == 'source_system_wins':
if row['source_system'] == 'advoware':
# Propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
# Recreate in Advoware
try:
new_frnr = await safe_create_advoware_appointment(advoware, standardize_appointment_data(google_map[event_id], 'google'), employee_kuerzel, row['advoware_write_allowed'])
if new_frnr and str(new_frnr) != 'None':
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ))
logger.info(f"Phase 3: Recreated Advoware appointment {new_frnr} for sync_id {row['sync_id']}")
else:
logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}, frNr is None")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
except Exception as e:
logger.warning(f"Phase 3: Failed to recreate Advoware for sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else:
# For other cases, propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else:
# Propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
logger.info(f"Phase 3: Propagated delete to Google for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 3: Failed to delete Google for sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
elif not google_exists:
# Missing in Google
strategy = row['sync_strategy']
if strategy == 'source_system_wins':
if row['source_system'] == 'google':
# Delete in Advoware
if row['advoware_write_allowed']:
try:
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else:
logger.warning(f"Phase 3: Cannot delete in Advoware for sync_id {row['sync_id']}, write not allowed")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
elif row['source_system'] == 'advoware':
# Recreate in Google
try:
new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(adv_map[str(frnr)], 'advoware'))
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ))
logger.info(f"Phase 3: Recreated Google event {new_event_id} for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 3: Failed to recreate Google for sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else:
# last_change_wins or other, propagate delete to Advoware
try:
await safe_delete_advoware_appointment(advoware, frnr, row['advoware_write_allowed'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
logger.info(f"Phase 3: Propagated delete to Advoware for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 3: Failed to delete Advoware for sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
# Phase 4: Update existing entries if changed
logger.info("Phase 4: Processing updates for existing entries")
for row in rows:
frnr = row['advoware_frnr']
event_id = row['google_event_id']
adv_data = adv_map.get(str(frnr)) if frnr else None
google_data = google_map.get(event_id) if event_id else None
if adv_data and google_data:
adv_std = standardize_appointment_data(adv_data, 'advoware')
google_std = standardize_appointment_data(google_data, 'google')
strategy = row['sync_strategy']
try:
if strategy == 'source_system_wins':
if row['source_system'] == 'advoware':
# Check for changes in source (Advoware) or unauthorized changes in target (Google)
adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm']))
google_ts_str = google_data.get('updated', '')
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
if adv_ts > row['last_sync']:
await update_google_event(service, calendar_id, event_id, adv_std)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
logger.info(f"Phase 4: Updated Google event {event_id} from Advoware frNr {frnr}")
elif google_ts and google_ts > row['last_sync']:
logger.warning(f"Phase 4: Unauthorized change in Google event {event_id}, resetting to Advoware frNr {frnr}")
await update_google_event(service, calendar_id, event_id, adv_std)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
logger.info(f"Phase 4: Reset Google event {event_id} to Advoware frNr {frnr}")
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
# Check for changes in source (Google) or unauthorized changes in target (Advoware)
google_ts_str = google_data.get('updated', '')
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(adv_data['zuletztGeaendertAm']))
logger.debug(f"Phase 4: Checking sync_id {row['sync_id']}: adv_ts={adv_ts}, google_ts={google_ts}, last_sync={row['last_sync']}")
if google_ts and google_ts > row['last_sync']:
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
logger.info(f"Phase 4: Updated Advoware frNr {frnr} from Google event {event_id}")
elif adv_ts > row['last_sync']:
logger.warning(f"Phase 4: Unauthorized change in Advoware frNr {frnr}, resetting to Google event {event_id}")
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
logger.info(f"Phase 4: Reset Advoware frNr {frnr} to Google event {event_id}")
elif strategy == 'last_change_wins':
adv_ts = await get_advoware_timestamp(advoware, frnr)
google_ts_str = google_data.get('updated', '')
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
if adv_ts and google_ts:
if adv_ts > google_ts:
await update_google_event(service, calendar_id, event_id, adv_std)
elif row['advoware_write_allowed']:
await safe_update_advoware_appointment(advoware, frnr, google_std, row['advoware_write_allowed'], row['employee_kuerzel'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts))
logger.info(f"Phase 4: Updated based on last_change_wins for sync_id {row['sync_id']}")
except Exception as e:
logger.warning(f"Phase 4: Failed to update sync_id {row['sync_id']}: {e}")
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
# Update last_sync timestamps
logger.debug("Updated last_sync timestamps")
finally:
await conn.close()
redis_client.delete(CALENDAR_SYNC_LOCK_KEY)
logger.info(f"Calendar sync completed for {employee_kuerzel}")
return {'status': 200, 'body': {'status': 'completed'}}
except Exception as e:
logger.error(f"Sync failed for {employee_kuerzel}: {e}")
redis_client.delete(CALENDAR_SYNC_LOCK_KEY)
return {'status': 500, 'body': {'error': str(e)}}
# Motia Step Configuration
config = {
"type": "event",
"name": "Calendar Sync Event Step",
"description": "Handles bidirectional calendar sync between Advoware and Google Calendar using Postgres as hub",
"subscribes": ["calendar.sync.triggered"],
"emits": [],
"flows": ["advoware"]
}