Files
motia/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py
2025-10-23 23:58:53 +00:00

962 lines
52 KiB
Python

import asyncio
import logging
import os
import datetime
from datetime import timedelta
import pytz
import backoff
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.oauth2 import service_account
import asyncpg
import redis
from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc.
from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls
# Setup logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
logger.addHandler(handler)
# Timezone for all operations (optimize TZ handling)
BERLIN_TZ = pytz.timezone('Europe/Berlin')
# Constants for ranges (optimize fetch efficiency)
now = datetime.datetime.now(BERLIN_TZ)
current_year = now.year
FETCH_FROM = f"{current_year - 1}-01-01T00:00:00" # Start of previous year
FETCH_TO = f"{current_year + 9}-12-31T23:59:59" # End of 9 years ahead
CALENDAR_SYNC_LOCK_KEY = 'calendar_sync_lock'
def log_operation(level, message, **context):
"""Centralized logging with context."""
context_str = ' '.join(f"{k}={v}" for k, v in context.items() if v is not None)
full_message = f"{message} {context_str}".strip()
if level == 'info':
logger.info(full_message)
elif level == 'warning':
logger.warning(full_message)
elif level == 'error':
logger.error(full_message)
elif level == 'debug':
logger.debug(full_message)
async def connect_db():
"""Connect to Postgres DB from Config."""
try:
conn = await asyncpg.connect(
host=Config.POSTGRES_HOST or 'localhost',
user=Config.POSTGRES_USER,
password=Config.POSTGRES_PASSWORD,
database=Config.POSTGRES_DB_NAME,
timeout=10
)
return conn
except Exception as e:
logger.error(f"Failed to connect to DB: {e}")
raise
async def get_google_service():
"""Initialize Google Calendar service."""
try:
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
if not os.path.exists(service_account_path):
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
creds = service_account.Credentials.from_service_account_file(
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
)
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
logger.error(f"Failed to initialize Google service: {e}")
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def ensure_google_calendar(service, employee_kuerzel):
"""Ensure Google Calendar exists for employee."""
calendar_name = f"AW-{employee_kuerzel}"
try:
calendar_list = service.calendarList().list().execute()
for calendar in calendar_list.get('items', []):
if calendar['summary'] == calendar_name:
return calendar['id']
# Create new
calendar_body = {
'summary': calendar_name,
'timeZone': 'Europe/Berlin'
}
created = service.calendars().insert(body=calendar_body).execute()
calendar_id = created['id']
# Share with main account if needed
acl_rule = {
'scope': {'type': 'user', 'value': 'lehmannundpartner@gmail.com'},
'role': 'owner'
}
service.acl().insert(calendarId=calendar_id, body=acl_rule).execute()
return calendar_id
except HttpError as e:
logger.error(f"Google API error for calendar {employee_kuerzel}: {e}")
raise
except Exception as e:
logger.error(f"Failed to ensure Google calendar for {employee_kuerzel}: {e}")
raise
async def fetch_advoware_appointments(advoware, employee_kuerzel):
"""Fetch Advoware appointments in range."""
try:
params = {
'kuerzel': employee_kuerzel,
'from': FETCH_FROM,
'to': FETCH_TO
}
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params)
logger.debug(f"Raw Advoware API response: {result}")
appointments = result if isinstance(result, list) else []
logger.info(f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}")
return appointments
except Exception as e:
logger.error(f"Failed to fetch Advoware appointments: {e}")
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [429, 500, 502, 503, 504])
async def fetch_google_events(service, calendar_id):
"""Fetch Google events in range."""
try:
time_min = f"{current_year - 2}-01-01T00:00:00Z"
time_max = f"{current_year + 10}-12-31T23:59:59Z"
all_events = []
page_token = None
while True:
events_result = service.events().list(
calendarId=calendar_id,
timeMin=time_min,
timeMax=time_max,
singleEvents=True, # Expand recurring
orderBy='startTime',
pageToken=page_token,
maxResults=2500 # Max per page
).execute()
events_page = events_result.get('items', [])
all_events.extend(events_page)
page_token = events_result.get('nextPageToken')
if not page_token:
break
events = [evt for evt in all_events if evt.get('status') != 'cancelled']
logger.info(f"Fetched {len(all_events)} total Google events ({len(events)} not cancelled) for calendar {calendar_id}")
return events
except HttpError as e:
logger.error(f"Google API error fetching events: {e}")
raise
except Exception as e:
logger.error(f"Failed to fetch Google events: {e}")
raise
def generate_rrule(turnus, turnus_art, datum_bis):
"""Generate RRULE string from Advoware turnus and turnusArt."""
freq_map = {
1: 'DAILY',
2: 'WEEKLY',
3: 'MONTHLY',
4: 'YEARLY'
}
if turnus_art not in freq_map:
return None
freq = freq_map[turnus_art]
# Parse datum_bis to date and limit to max 2 years from now to avoid Google Calendar limits
try:
if 'T' in datum_bis:
bis_dt = datetime.datetime.fromisoformat(datum_bis.replace('Z', ''))
else:
bis_dt = datetime.datetime.fromisoformat(datum_bis + 'T00:00:00')
# Limit to max 2 years from now
max_until = datetime.datetime.now() + timedelta(days=730) # 2 years
if bis_dt > max_until:
bis_dt = max_until
logger.info(f"Limited recurrence until date to {bis_dt.date()} to avoid Google Calendar limits")
until_date = bis_dt.strftime('%Y%m%d')
except ValueError:
logger.warning(f"Invalid datum_bis: {datum_bis}, skipping recurrence")
return None
rrule = f"RRULE:FREQ={freq};INTERVAL={turnus};UNTIL={until_date}"
return rrule
def parse_times(data, source):
"""Parse start and end times from data."""
if source == 'advoware':
start_str = data.get('datum', '')
if 'T' in start_str:
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_str.replace('Z', '')))
else:
start_time = data.get('uhrzeitVon') or '09:00:00'
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{start_str}T{start_time}"))
end_date_str = data.get('datum', '')
if 'T' in end_date_str:
base_end_date = end_date_str.split('T')[0]
else:
base_end_date = end_date_str
end_time = data.get('uhrzeitBis', '10:00:00')
try:
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(f"{base_end_date}T{end_time}"))
except ValueError:
end_dt = start_dt + timedelta(hours=1)
elif source == 'google':
start_obj = data.get('start', {})
end_obj = data.get('end', {})
if 'dateTime' in start_obj:
start_dt = datetime.datetime.fromisoformat(start_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ)
else:
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_obj['date']))
if 'dateTime' in end_obj:
end_dt = datetime.datetime.fromisoformat(end_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ)
else:
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(end_obj['date']))
return start_dt, end_dt
def adjust_times(start_dt, end_dt, data):
"""Adjust times for preparation, travel, etc."""
vorbereitungs_dauer = data.get('vorbereitungsDauer', '00:00:00')
fahrzeit = data.get('fahrzeit', '00:00:00')
fahrt_anzeigen = data.get('fahrtAnzeigen', 0)
try:
vorb_h, vorb_m, vorb_s = map(int, vorbereitungs_dauer.split(':'))
vorbereitung_td = timedelta(hours=vorb_h, minutes=vorb_m, seconds=vorb_s)
except:
vorbereitung_td = timedelta(0)
try:
fahrt_h, fahrt_m, fahrt_s = map(int, fahrzeit.split(':'))
fahrt_td = timedelta(hours=fahrt_h, minutes=fahrt_m, seconds=fahrt_s)
except:
fahrt_td = timedelta(0)
hinfahrt_td = timedelta(0)
rueckfahrt_td = timedelta(0)
if fahrt_anzeigen == 1:
hinfahrt_td = fahrt_td
elif fahrt_anzeigen == 2:
rueckfahrt_td = fahrt_td
elif fahrt_anzeigen == 3:
hinfahrt_td = fahrt_td
rueckfahrt_td = fahrt_td
adjusted_start = start_dt - vorbereitung_td - hinfahrt_td
adjusted_end = end_dt + rueckfahrt_td
return adjusted_start, adjusted_end, vorbereitung_td, hinfahrt_td, rueckfahrt_td
def build_notiz(original_notiz, time_breakdown, duration_capped):
"""Build the description string."""
notiz_parts = []
if original_notiz.strip():
notiz_parts.append(original_notiz.strip())
notiz_parts.append("Zeitaufteilung:")
notiz_parts.extend(time_breakdown)
if duration_capped:
notiz_parts.append("\nHinweis: Ereignisdauer wurde auf 24 Stunden begrenzt (Google Calendar Limit)")
return "\n".join(notiz_parts)
def standardize_appointment_data(data, source):
"""Standardize data from Advoware or Google to comparable dict, with TZ handling."""
duration_capped = False
start_dt, end_dt = parse_times(data, source)
if source == 'advoware':
adjusted_start, adjusted_end, vorbereitung_td, hinfahrt_td, rueckfahrt_td = adjust_times(start_dt, end_dt, data)
if Config.CALENDAR_SYNC_ANONYMIZE_GOOGLE_EVENTS:
text = 'Advoware blocked'
ort = ''
original_notiz = ''
else:
text = data.get('text', '')
ort = data.get('ort', '')
original_notiz = data.get('notiz', '')
time_breakdown = []
if vorbereitung_td.total_seconds() > 0:
vorb_start = adjusted_start
vorb_end = adjusted_start + vorbereitung_td
time_breakdown.append(f"{vorb_start.strftime('%H:%M')}-{vorb_end.strftime('%H:%M')} Vorbereitung")
if hinfahrt_td.total_seconds() > 0:
outbound_start = adjusted_start + vorbereitung_td
outbound_end = adjusted_start + vorbereitung_td + hinfahrt_td
time_breakdown.append(f"{outbound_start.strftime('%H:%M')}-{outbound_end.strftime('%H:%M')} Hinfahrt")
appt_start = adjusted_start + vorbereitung_td + hinfahrt_td
appt_end = adjusted_end - rueckfahrt_td
time_breakdown.append(f"{appt_start.strftime('%H:%M')}-{appt_end.strftime('%H:%M')} Termin")
if rueckfahrt_td.total_seconds() > 0:
return_start = appt_end
return_end = adjusted_end
time_breakdown.append(f"{return_start.strftime('%H:%M')}-{return_end.strftime('%H:%M')} Rückfahrt")
notiz = build_notiz(original_notiz, time_breakdown, duration_capped)
start_dt, end_dt = adjusted_start, adjusted_end
duration = end_dt - start_dt
max_duration = timedelta(hours=24)
if duration > max_duration:
end_dt = start_dt + max_duration
duration_capped = True
recurrence = None
if data.get('dauertermin', 0) == 1:
turnus = data.get('turnus', 1)
turnus_art = data.get('turnusArt', 1)
datum_bis = data.get('datumBis', '')
if datum_bis:
recurrence = generate_rrule(turnus, turnus_art, datum_bis)
if recurrence:
recurrence = [recurrence]
return {
'start': start_dt,
'end': end_dt,
'text': text,
'notiz': notiz,
'ort': ort,
'dauertermin': data.get('dauertermin', 0),
'turnus': data.get('turnus', 0),
'turnusArt': data.get('turnusArt', 0),
'recurrence': recurrence
}
elif source == 'google':
duration_days = (end_dt.date() - start_dt.date()).days
dauertermin = 1 if data.get('start', {}).get('date') or duration_days > 1 else 0
recurrence = data.get('recurrence')
if recurrence:
turnus = 1
turnus_art = 0
else:
turnus = 0
turnus_art = 0
return {
'start': start_dt,
'end': end_dt,
'text': data.get('summary', ''),
'notiz': data.get('description', ''),
'ort': data.get('location', ''),
'dauertermin': dauertermin,
'turnus': turnus,
'turnusArt': turnus_art,
'recurrence': recurrence
}
async def create_advoware_appointment(advoware, data, employee_kuerzel):
"""Create Advoware appointment from standardized data."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
appointment_data = {
'text': data['text'],
'notiz': data['notiz'],
'ort': data['ort'],
'datum': start_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'uhrzeitBis': end_dt.strftime('%H:%M:%S'),
'datumBis': end_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'anwalt': employee_kuerzel,
'vorbereitungsDauer': '00:00:00',
'dauertermin': data['dauertermin'],
'turnus': data['turnus'],
'turnusArt': data['turnusArt']
}
try:
result = await advoware.api_call('api/v1/advonet/Termine', method='POST', json_data=appointment_data)
logger.debug(f"Raw Advoware POST response: {result}")
frnr = str(result.get('frNr') or result.get('frnr'))
logger.info(f"Created Advoware appointment frNr: {frnr}")
return frnr
except Exception as e:
logger.error(f"Failed to create Advoware appointment: {e}")
raise
async def update_advoware_appointment(advoware, frnr, data, employee_kuerzel):
"""Update Advoware appointment."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
appointment_data = {
'frNr': int(frnr),
'text': data['text'],
'notiz': data['notiz'],
'ort': data['ort'],
'datum': start_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'uhrzeitBis': end_dt.strftime('%H:%M:%S'),
'datumBis': end_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'anwalt': employee_kuerzel,
'vorbereitungsDauer': '00:00:00',
'dauertermin': data['dauertermin'],
'turnus': data['turnus'],
'turnusArt': data['turnusArt']
}
try:
await advoware.api_call('api/v1/advonet/Termine', method='PUT', json_data=appointment_data)
logger.info(f"Updated Advoware appointment frNr: {frnr}")
except Exception as e:
logger.error(f"Failed to update Advoware appointment {frnr}: {e}")
raise
async def delete_advoware_appointment(advoware, frnr):
"""Delete Advoware appointment."""
try:
await advoware.api_call('api/v1/advonet/Termine', method='DELETE', params={'frnr': frnr})
logger.info(f"Deleted Advoware appointment frNr: {frnr}")
except Exception as e:
logger.error(f"Failed to delete Advoware appointment {frnr}: {e}")
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def create_google_event(service, calendar_id, data):
"""Create Google event from standardized data."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0)
if all_day:
start_obj = {'date': start_dt.strftime('%Y-%m-%d')}
# For all-day events, end date is exclusive, so add 1 day
end_date = (start_dt + timedelta(days=1)).strftime('%Y-%m-%d')
end_obj = {'date': end_date}
else:
start_obj = {'dateTime': start_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
end_obj = {'dateTime': end_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
event_body = {
'summary': data['text'],
'description': data['notiz'],
'location': data['ort'],
'start': start_obj,
'end': end_obj,
'recurrence': data['recurrence'] # RRULE if present
}
try:
created = service.events().insert(calendarId=calendar_id, body=event_body).execute()
event_id = created['id']
logger.info(f"Created Google event ID: {event_id}")
return event_id
except HttpError as e:
logger.error(f"Google API error creating event: {e}")
raise
except Exception as e:
logger.error(f"Failed to create Google event: {e}")
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def update_google_event(service, calendar_id, event_id, data):
"""Update Google event."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0)
if all_day:
start_obj = {'date': start_dt.strftime('%Y-%m-%d')}
# For all-day events, end date is exclusive, so add 1 day
end_date = (start_dt + timedelta(days=1)).strftime('%Y-%m-%d')
end_obj = {'date': end_date}
else:
start_obj = {'dateTime': start_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
end_obj = {'dateTime': end_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
event_body = {
'summary': data['text'],
'description': data['notiz'],
'location': data['ort'],
'start': start_obj,
'end': end_obj,
'recurrence': data['recurrence']
}
try:
service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute()
logger.info(f"Updated Google event ID: {event_id}")
except HttpError as e:
logger.error(f"Google API error updating event {event_id}: {e}")
raise
except Exception as e:
logger.error(f"Failed to update Google event {event_id}: {e}")
raise
@backoff.on_exception(backoff.expo, HttpError, max_tries=4, base=4, giveup=lambda e: e.resp.status not in [403, 429, 500, 502, 503, 504])
async def delete_google_event(service, calendar_id, event_id):
"""Delete Google event."""
try:
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
logger.info(f"Deleted Google event ID: {event_id}")
except HttpError as e:
logger.error(f"Google API error deleting event {event_id}: {e}")
raise
except Exception as e:
logger.error(f"Failed to delete Google event {event_id}: {e}")
raise
async def safe_create_advoware_appointment(advoware, data, employee_kuerzel, write_allowed):
"""Safe wrapper for creating Advoware appointments with write permission check and global protection."""
if Config.ADVOWARE_WRITE_PROTECTION:
logger.warning("Global write protection active, skipping Advoware create")
return None
if not write_allowed:
logger.warning("Cannot create in Advoware, write not allowed")
return None
return await create_advoware_appointment(advoware, data, employee_kuerzel)
async def safe_delete_advoware_appointment(advoware, frnr, write_allowed):
"""Safe wrapper for deleting Advoware appointments with write permission check and global protection."""
if Config.ADVOWARE_WRITE_PROTECTION:
logger.warning("Global write protection active, skipping Advoware delete")
return
if not write_allowed:
logger.warning("Cannot delete in Advoware, write not allowed")
return
await delete_advoware_appointment(advoware, frnr)
async def safe_update_advoware_appointment(advoware, frnr, data, write_allowed, employee_kuerzel):
"""Safe wrapper for updating Advoware appointments with write permission check and global protection."""
if Config.ADVOWARE_WRITE_PROTECTION:
logger.warning("Global write protection active, skipping Advoware update")
return
if not write_allowed:
logger.warning("Cannot update in Advoware, write not allowed")
return
await update_advoware_appointment(advoware, frnr, data, employee_kuerzel)
async def safe_advoware_operation(operation, write_allowed, *args, **kwargs):
"""Generic safe wrapper for Advoware operations with write permission check."""
if Config.ADVOWARE_WRITE_PROTECTION:
log_operation('warning', "Global write protection active, skipping Advoware operation")
return None
if not write_allowed:
log_operation('warning', "Cannot perform operation in Advoware, write not allowed")
return None
return await operation(*args, **kwargs)
async def get_advoware_employees(advoware):
"""Fetch list of employees from Advoware."""
try:
result = await advoware.api_call('api/v1/advonet/Mitarbeiter', method='GET', params={'aktiv': 'true'})
employees = result if isinstance(result, list) else []
logger.info(f"Fetched {len(employees)} Advoware employees")
return employees
except Exception as e:
logger.error(f"Failed to fetch Advoware employees: {e}")
raise
async def get_advoware_timestamp(advoware, frnr):
"""Fetch the last modified timestamp for an Advoware appointment."""
try:
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params={'frnr': frnr})
if isinstance(result, list) and result:
appointment = result[0] # Assuming it returns a list with one item
timestamp_str = appointment.get('zuletztGeaendertAm')
if timestamp_str:
return BERLIN_TZ.localize(datetime.datetime.fromisoformat(timestamp_str))
return None
except Exception as e:
logger.error(f"Failed to fetch timestamp for Advoware frNr {frnr}: {e}")
return None
def get_timestamps(adv_data, google_data):
"""Extract and parse timestamps from Advoware and Google data."""
adv_ts = None
if adv_data:
ts_str = adv_data.get('zuletztGeaendertAm')
if ts_str:
adv_ts = BERLIN_TZ.localize(datetime.datetime.fromisoformat(ts_str))
google_ts = None
if google_data:
ts_str = google_data.get('updated')
if ts_str:
google_ts = datetime.datetime.fromisoformat(ts_str.rstrip('Z')).astimezone(BERLIN_TZ)
return adv_ts, google_ts
async def handler(event, context):
"""Main event handler for calendar sync."""
logger.info("Starting calendar sync event handler")
try:
# Get kuerzel from event
body = event.get('body', {})
kuerzel = body.get('kuerzel')
if not kuerzel:
logger.error("Calendar Sync Event: kuerzel is required in event")
return {'status': 400, 'body': {'error': 'kuerzel is required'}}
logger.info(f"Starting calendar sync for {kuerzel}")
# Per-employee lock
CALENDAR_SYNC_LOCK_KEY = f'calendar_sync_lock_{kuerzel}'
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=int(Config.REDIS_PORT),
db=int(Config.REDIS_DB_CALENDAR_SYNC),
socket_timeout=Config.REDIS_TIMEOUT_SECONDS
)
if redis_client.get(CALENDAR_SYNC_LOCK_KEY):
logger.info(f"Calendar Sync Event: Sync for {kuerzel} already active, skipping")
return {'status': 409, 'body': {'error': 'sync_already_running', 'kuerzel': kuerzel}}
# Set lock for 30 minutes
redis_client.set(CALENDAR_SYNC_LOCK_KEY, 'event', ex=1800)
logger.info(f"Calendar Sync Event: Lock set for {kuerzel}")
try:
logger.debug("Initializing Advoware API")
advoware = AdvowareAPI(context)
# Initialize Google service
logger.debug("Initializing Google service")
service = await get_google_service()
logger.debug(f"Ensuring Google calendar for {kuerzel}")
calendar_id = await ensure_google_calendar(service, kuerzel)
conn = await connect_db()
try:
# Initialize state
state = {
'rows': [],
'db_adv_index': {},
'db_google_index': {},
'adv_appointments': [],
'adv_map': {},
'google_events': [],
'google_map': {}
}
async def reload_db_indexes():
"""Reload database indexes after DB changes in phases."""
state['rows'] = await conn.fetch(
"""
SELECT * FROM calendar_sync
WHERE employee_kuerzel = $1 AND deleted = FALSE
""",
kuerzel
)
state['db_adv_index'] = {str(row['advoware_frnr']): row for row in state['rows'] if row['advoware_frnr']}
state['db_google_index'] = {}
for row in state['rows']:
if row['google_event_id']:
state['db_google_index'][row['google_event_id']] = row
log_operation('debug', "Reloaded indexes", rows=len(state['rows']), adv=len(state['db_adv_index']), google=len(state['db_google_index']))
async def reload_api_maps():
"""Reload API maps after creating new events in phases."""
state['adv_appointments'] = await fetch_advoware_appointments(advoware, kuerzel)
state['adv_map'] = {str(app['frNr']): app for app in state['adv_appointments'] if app.get('frNr')}
state['google_events'] = await fetch_google_events(service, calendar_id)
state['google_map'] = {evt['id']: evt for evt in state['google_events']}
log_operation('debug', "Reloaded API maps", adv=len(state['adv_map']), google=len(state['google_map']))
# Initial fetch
log_operation('info', "Fetching fresh data from APIs")
await reload_api_maps()
await reload_db_indexes()
log_operation('info', "Fetched existing sync rows", count=len(state['rows']))
async def phase_1(state, conn, service, calendar_id, advoware, kuerzel):
"""Phase 1: New from Advoware => Google"""
log_operation('info', "Phase 1: Processing new appointments from Advoware")
for frnr, app in state['adv_map'].items():
if frnr not in state['db_adv_index']:
try:
event_id = await create_google_event(service, calendar_id, standardize_appointment_data(app, 'advoware'))
async with conn.transaction():
await conn.execute(
"""
INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed)
VALUES ($1, $2, $3, 'advoware', 'source_system_wins', 'synced', FALSE);
""",
kuerzel, int(frnr), event_id
)
log_operation('info', "Phase 1: Created new from Advoware", frnr=frnr, event_id=event_id)
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', "Phase 1: Failed to process new Advoware", frnr=frnr, error=str(e))
async def phase_2(state, conn, service, calendar_id, advoware, kuerzel):
"""Phase 2: New from Google => Advoware"""
log_operation('info', "Phase 2: Processing new events from Google")
for event_id, evt in state['google_map'].items():
# For recurring events, check if the master event (recurringEventId) is already synced
# For regular events, check the event_id directly
recurring_master_id = evt.get('recurringEventId')
is_already_synced = event_id in state['db_google_index'] or (recurring_master_id and recurring_master_id in state['db_google_index'])
if not is_already_synced:
try:
frnr = await safe_advoware_operation(create_advoware_appointment, True, advoware, standardize_appointment_data(evt, 'google'), kuerzel)
if frnr and str(frnr) != 'None':
async with conn.transaction():
await conn.execute(
"""
INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, sync_status, advoware_write_allowed)
VALUES ($1, $2, $3, 'google', 'source_system_wins', 'synced', TRUE);
""",
kuerzel, int(frnr), event_id
)
log_operation('info', "Phase 2: Created new from Google", event_id=event_id, frnr=frnr)
else:
log_operation('warning', "Phase 2: Skipped DB insert for Google event", event_id=event_id)
except Exception as e:
log_operation('warning', "Phase 2: Failed to process new Google", event_id=event_id, error=str(e))
async def phase_3(state, conn, service, calendar_id, advoware, kuerzel):
"""Phase 3: Identify deleted entries"""
log_operation('info', "Phase 3: Processing deleted entries")
for row in state['rows']:
frnr = row['advoware_frnr']
event_id = row['google_event_id']
adv_exists = str(frnr) in state['adv_map'] if frnr else False
# For Google events, check if the master event or any instance exists
google_exists = False
if event_id:
# Check if the stored event_id exists
if event_id in state['google_map']:
google_exists = True
else:
# Check if any event has this as recurringEventId (master event still exists)
for evt in state['google_map'].values():
if evt.get('recurringEventId') == event_id:
google_exists = True
break
if not adv_exists and not google_exists:
# Both missing - soft delete
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', "Phase 3: Soft deleted", sync_id=row['sync_id'])
elif not adv_exists:
# Missing in Advoware
strategy = row['sync_strategy']
if strategy == 'source_system_wins':
if row['source_system'] == 'advoware':
# Propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', "Phase 3: Propagated delete to Google", sync_id=row['sync_id'])
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', "Phase 3: Failed to delete Google", sync_id=row['sync_id'], error=str(e))
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
# Recreate in Advoware
try:
new_frnr = await safe_advoware_operation(create_advoware_appointment, row['advoware_write_allowed'], advoware, standardize_appointment_data(state['google_map'][event_id], 'google'), kuerzel)
if new_frnr and str(new_frnr) != 'None':
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET advoware_frnr = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", int(new_frnr), row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', "Phase 3: Recreated Advoware appointment", frnr=new_frnr, sync_id=row['sync_id'])
else:
log_operation('warning', "Phase 3: Failed to recreate Advoware", sync_id=row['sync_id'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
except Exception as e:
log_operation('warning', "Phase 3: Failed to recreate Advoware", sync_id=row['sync_id'], error=str(e))
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else:
# For other cases, propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', "Phase 3: Propagated delete to Google", sync_id=row['sync_id'])
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', "Phase 3: Failed to delete Google", sync_id=row['sync_id'], error=str(e))
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else:
# Propagate delete to Google
try:
await delete_google_event(service, calendar_id, event_id)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', "Phase 3: Propagated delete to Google", sync_id=row['sync_id'])
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', "Phase 3: Failed to delete Google", sync_id=row['sync_id'], error=str(e))
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
elif not google_exists:
# Missing in Google
strategy = row['sync_strategy']
if strategy == 'source_system_wins':
if row['source_system'] == 'google':
# Delete in Advoware
if row['advoware_write_allowed']:
try:
await safe_advoware_operation(delete_advoware_appointment, row['advoware_write_allowed'], advoware, frnr)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', "Phase 3: Propagated delete to Advoware", sync_id=row['sync_id'])
except Exception as e:
log_operation('warning', "Phase 3: Failed to delete Advoware", sync_id=row['sync_id'], error=str(e))
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else:
log_operation('warning', "Phase 3: Cannot delete in Advoware, write not allowed", sync_id=row['sync_id'])
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
elif row['source_system'] == 'advoware':
# Recreate in Google
try:
new_event_id = await create_google_event(service, calendar_id, standardize_appointment_data(state['adv_map'][str(frnr)], 'advoware'))
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET google_event_id = $1, sync_status = 'synced', last_sync = $3 WHERE sync_id = $2;", new_event_id, row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', "Phase 3: Recreated Google event", event_id=new_event_id, sync_id=row['sync_id'])
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', "Phase 3: Failed to recreate Google", sync_id=row['sync_id'], error=str(e))
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
else:
# last_change_wins or other, propagate delete to Advoware
try:
await safe_advoware_operation(delete_advoware_appointment, row['advoware_write_allowed'], advoware, frnr)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET deleted = TRUE, sync_status = 'synced' WHERE sync_id = $1;", row['sync_id'])
log_operation('info', "Phase 3: Propagated delete to Advoware", sync_id=row['sync_id'])
except Exception as e:
log_operation('warning', "Phase 3: Failed to delete Advoware", sync_id=row['sync_id'], error=str(e))
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
async def phase_4(state, conn, service, calendar_id, advoware, kuerzel):
"""Phase 4: Update existing entries if changed"""
log_operation('info', "Phase 4: Processing updates for existing entries")
# Track which master events we've already processed to avoid duplicate updates
processed_master_events = set()
for row in state['rows']:
frnr = row['advoware_frnr']
event_id = row['google_event_id']
adv_data = state['adv_map'].get(str(frnr)) if frnr else None
# For Google events, find the corresponding event (could be master or instance)
google_data = None
if event_id:
# First try to find the exact event_id
if event_id in state['google_map']:
google_data = state['google_map'][event_id]
else:
# Look for any event that has this as recurringEventId
for evt in state['google_map'].values():
if evt.get('recurringEventId') == event_id:
google_data = evt
break
# Skip if we don't have both sides or if we've already processed this master event
if not adv_data or not google_data:
continue
# For recurring events, only process the master event once
master_event_id = google_data.get('recurringEventId') or event_id
if master_event_id in processed_master_events:
continue
processed_master_events.add(master_event_id)
if adv_data and google_data:
adv_std = standardize_appointment_data(adv_data, 'advoware')
google_std = standardize_appointment_data(google_data, 'google')
strategy = row['sync_strategy']
try:
if strategy == 'source_system_wins':
if row['source_system'] == 'advoware':
# Check for changes in source (Advoware) or unauthorized changes in target (Google)
adv_ts, google_ts = get_timestamps(adv_data, google_data)
if adv_ts and adv_ts > row['last_sync']:
await update_google_event(service, calendar_id, event_id, adv_std)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', "Phase 4: Updated Google event from Advoware", event_id=event_id, frnr=frnr)
await asyncio.sleep(0.1) # Small delay to avoid rate limits
elif google_ts and google_ts > row['last_sync']:
log_operation('warning', "Phase 4: Unauthorized change in Google event, resetting to Advoware", event_id=event_id, frnr=frnr)
await update_google_event(service, calendar_id, event_id, adv_std)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', "Phase 4: Reset Google event to Advoware", event_id=event_id, frnr=frnr)
await asyncio.sleep(0.1) # Small delay to avoid rate limits
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
# Check for changes in source (Google) or unauthorized changes in target (Advoware)
adv_ts, google_ts = get_timestamps(adv_data, google_data)
log_operation('debug', "Phase 4: Checking sync", sync_id=row['sync_id'], adv_ts=adv_ts, google_ts=google_ts, last_sync=row['last_sync'])
if google_ts and google_ts > row['last_sync']:
await safe_advoware_operation(update_advoware_appointment, row['advoware_write_allowed'], advoware, frnr, google_std, kuerzel)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', "Phase 4: Updated Advoware from Google event", frnr=frnr, event_id=event_id)
elif adv_ts and adv_ts > row['last_sync']:
log_operation('warning', "Phase 4: Unauthorized change in Advoware, resetting to Google", frnr=frnr, event_id=event_id)
await safe_advoware_operation(update_advoware_appointment, row['advoware_write_allowed'], advoware, frnr, google_std, kuerzel)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], datetime.datetime.now(BERLIN_TZ))
log_operation('info', "Phase 4: Reset Advoware to Google event", frnr=frnr, event_id=event_id)
elif strategy == 'last_change_wins':
adv_ts = await get_advoware_timestamp(advoware, frnr)
google_ts_str = google_data.get('updated', '')
google_ts = datetime.datetime.fromisoformat(google_ts_str.rstrip('Z')).astimezone(BERLIN_TZ) if google_ts_str else None
if adv_ts and google_ts:
if adv_ts > google_ts:
await update_google_event(service, calendar_id, event_id, adv_std)
elif row['advoware_write_allowed']:
await safe_advoware_operation(update_advoware_appointment, row['advoware_write_allowed'], advoware, frnr, google_std, kuerzel)
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'synced', last_sync = $2 WHERE sync_id = $1;", row['sync_id'], max(adv_ts, google_ts))
log_operation('info', "Phase 4: Updated based on last_change_wins", sync_id=row['sync_id'])
await asyncio.sleep(0.1) # Small delay to avoid rate limits
except Exception as e:
log_operation('warning', "Phase 4: Failed to update", sync_id=row['sync_id'], error=str(e))
async with conn.transaction():
await conn.execute("UPDATE calendar_sync SET sync_status = 'failed' WHERE sync_id = $1;", row['sync_id'])
# Execute phases
await phase_1(state, conn, service, calendar_id, advoware, kuerzel)
await reload_db_indexes()
await reload_api_maps()
await phase_2(state, conn, service, calendar_id, advoware, kuerzel)
await reload_db_indexes()
await reload_api_maps()
await phase_3(state, conn, service, calendar_id, advoware, kuerzel)
await reload_db_indexes()
await reload_api_maps()
await phase_4(state, conn, service, calendar_id, advoware, kuerzel)
# Update last_sync timestamps
log_operation('debug', "Updated last_sync timestamps")
finally:
await conn.close()
redis_client.delete(CALENDAR_SYNC_LOCK_KEY)
logger.info(f"Calendar sync completed for {kuerzel}")
return {'status': 200, 'body': {'status': 'completed', 'kuerzel': kuerzel}}
except Exception as e:
logger.error(f"Sync failed for {kuerzel}: {e}")
redis_client.delete(CALENDAR_SYNC_LOCK_KEY)
return {'status': 500, 'body': {'error': str(e), 'kuerzel': kuerzel}}
except Exception as e:
logger.error(f"Sync failed: {e}")
return {'status': 500, 'body': {'error': str(e)}}
# Motia Step Configuration
config = {
"type": "event",
"name": "Calendar Sync Event Step",
"description": "Handles bidirectional calendar sync between Advoware and Google Calendar using Postgres as hub",
"subscribes": ["calendar.sync.triggered"],
"emits": [],
"flows": ["advoware"]
}