Files
motia/bitbylaw/steps/advoware_cal_sync/calendar_sync_event_step.py

477 lines
22 KiB
Python

import asyncio
import logging
import os
import datetime
from datetime import timedelta
import pytz
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.oauth2 import service_account
import asyncpg
from config import Config # Assuming Config has POSTGRES_HOST='localhost', USER, PASSWORD, DB_NAME, GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH, GOOGLE_CALENDAR_SCOPES, etc.
from services.advoware import AdvowareAPI # Assuming this is the existing wrapper for Advoware API calls
# Setup logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
logger.addHandler(handler)
# Timezone for all operations (optimize TZ handling)
BERLIN_TZ = pytz.timezone('Europe/Berlin')
# Constants for ranges (optimize fetch efficiency)
FETCH_FROM = (datetime.datetime.now(BERLIN_TZ) - timedelta(days=365)).strftime('%Y-01-01T00:00:00')
FETCH_TO = (datetime.datetime.now(BERLIN_TZ) + timedelta(days=730)).strftime('%Y-12-31T23:59:59')
# Relevant fields for data comparison (simple diff)
COMPARISON_FIELDS = ['text', 'notiz', 'ort', 'datum', 'uhrzeitBis', 'datumBis', 'dauertermin', 'turnus', 'turnusArt']
async def connect_db():
"""Connect to Postgres DB from Config."""
try:
conn = await asyncpg.connect(
host=Config.POSTGRES_HOST or 'localhost',
user=Config.POSTGRES_USER,
password=Config.POSTGRES_PASSWORD,
database=Config.POSTGRES_DB_NAME,
timeout=10
)
return conn
except Exception as e:
logger.error(f"Failed to connect to DB: {e}")
raise
async def get_google_service():
"""Initialize Google Calendar service."""
try:
service_account_path = Config.GOOGLE_CALENDAR_SERVICE_ACCOUNT_PATH
if not os.path.exists(service_account_path):
raise FileNotFoundError(f"Service account file not found: {service_account_path}")
creds = service_account.Credentials.from_service_account_file(
service_account_path, scopes=Config.GOOGLE_CALENDAR_SCOPES
)
service = build('calendar', 'v3', credentials=creds)
return service
except Exception as e:
logger.error(f"Failed to initialize Google service: {e}")
raise
async def ensure_google_calendar(service, employee_kuerzel):
"""Ensure Google Calendar exists for employee."""
calendar_name = f"AW-{employee_kuerzel}"
try:
calendar_list = service.calendarList().list().execute()
for calendar in calendar_list.get('items', []):
if calendar['summary'] == calendar_name:
return calendar['id']
# Create new
calendar_body = {
'summary': calendar_name,
'timeZone': 'Europe/Berlin'
}
created = service.calendars().insert(body=calendar_body).execute()
calendar_id = created['id']
# Share with main account if needed
acl_rule = {
'scope': {'type': 'user', 'value': 'lehmannundpartner@gmail.com'},
'role': 'owner'
}
service.acl().insert(calendarId=calendar_id, body=acl_rule).execute()
return calendar_id
except HttpError as e:
logger.error(f"Google API error for calendar {employee_kuerzel}: {e}")
raise
except Exception as e:
logger.error(f"Failed to ensure Google calendar for {employee_kuerzel}: {e}")
raise
async def fetch_advoware_appointments(advoware, employee_kuerzel):
"""Fetch Advoware appointments in range."""
try:
params = {
'kuerzel': employee_kuerzel,
'from': FETCH_FROM,
'to': FETCH_TO
}
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params=params)
appointments = result if isinstance(result, list) else []
logger.info(f"Fetched {len(appointments)} Advoware appointments for {employee_kuerzel}")
return appointments
except Exception as e:
logger.error(f"Failed to fetch Advoware appointments: {e}")
raise
async def fetch_google_events(service, calendar_id):
"""Fetch Google events in range."""
try:
now = datetime.datetime.now(pytz.utc)
from_date = now - timedelta(days=365)
to_date = now + timedelta(days=730)
events_result = service.events().list(
calendarId=calendar_id,
timeMin=from_date.isoformat() + 'Z',
timeMax=to_date.isoformat() + 'Z',
singleEvents=True, # Expand recurring
orderBy='startTime'
).execute()
events = [evt for evt in events_result.get('items', []) if evt.get('status') != 'cancelled']
logger.info(f"Fetched {len(events)} Google events for calendar {calendar_id}")
return events
except HttpError as e:
logger.error(f"Google API error fetching events: {e}")
raise
except Exception as e:
logger.error(f"Failed to fetch Google events: {e}")
raise
def standardize_appointment_data(data, source):
"""Standardize data from Advoware or Google to comparable dict, with TZ handling."""
if source == 'advoware':
start_str = data.get('datum', '')
end_str = data.get('datumBis', data.get('datum', ''))
start_time = data.get('uhrzeitVon') or '09:00:00' if 'T' not in start_str else start_str.split('T')[1]
end_time = data.get('uhrzeitBis', '10:00:00')
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_str.replace('Z', '')))
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(end_str.replace('Z', '')))
return {
'start': start_dt,
'end': end_dt,
'text': data.get('text', ''),
'notiz': data.get('notiz', ''),
'ort': data.get('ort', ''),
'dauertermin': data.get('dauertermin', 0),
'turnus': data.get('turnus', 0),
'turnusArt': data.get('turnusArt', 0),
'recurrence': None # No RRULE in Advoware
}
elif source == 'google':
start_obj = data.get('start', {})
end_obj = data.get('end', {})
if 'dateTime' in start_obj:
start_dt = datetime.datetime.fromisoformat(start_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ)
all_day = False
else:
start_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(start_obj['date']))
all_day = True
if 'dateTime' in end_obj:
end_dt = datetime.datetime.fromisoformat(end_obj['dateTime'].rstrip('Z')).astimezone(BERLIN_TZ)
else:
end_dt = BERLIN_TZ.localize(datetime.datetime.fromisoformat(end_obj['date']))
dauertermin = 1 if all_day or (end_dt - start_dt).days > 1 else 0
return {
'start': start_dt,
'end': end_dt,
'text': data.get('summary', ''),
'notiz': data.get('description', ''),
'ort': data.get('location', ''),
'dauertermin': dauertermin,
'turnus': 1 if data.get('recurrence') else 0, # Simplified
'turnusArt': 0, # Map RRULE to type if needed
'recurrence': data.get('recurrence')
}
def data_diff(data1, data2):
"""Simple diff on standardized data."""
if not data1 or not data2:
return True
for field in COMPARISON_FIELDS:
if data1.get(field) != data2.get(field):
return True
return False
async def create_advoware_appointment(advoware, data, employee_kuerzel):
"""Create Advoware appointment from standardized data."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
appointment_data = {
'text': data['text'],
'notiz': data['notiz'],
'ort': data['ort'],
'datum': start_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'uhrzeitBis': end_dt.strftime('%H:%M:%S'),
'datumBis': end_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'sb': employee_kuerzel,
'anwalt': employee_kuerzel,
'vorbereitungsDauer': '00:00:00',
'dauertermin': data['dauertermin'],
'turnus': data['turnus'],
'turnusArt': data['turnusArt']
}
try:
result = await advoware.api_call('api/v1/advonet/Termine', method='POST', json_data=appointment_data)
frnr = str(result.get('frNr'))
logger.info(f"Created Advoware appointment frNr: {frnr}")
return frnr
except Exception as e:
logger.error(f"Failed to create Advoware appointment: {e}")
raise
async def update_advoware_appointment(advoware, frnr, data):
"""Update Advoware appointment."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
appointment_data = {
'frNr': int(frnr),
'text': data['text'],
'notiz': data['notiz'],
'ort': data['ort'],
'datum': start_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'uhrzeitBis': end_dt.strftime('%H:%M:%S'),
'datumBis': end_dt.strftime('%Y-%m-%dT%H:%M:%S'),
'dauertermin': data['dauertermin'],
'turnus': data['turnus'],
'turnusArt': data['turnusArt']
}
try:
await advoware.api_call('api/v1/advonet/Termine', method='PUT', json_data=appointment_data)
logger.info(f"Updated Advoware appointment frNr: {frnr}")
except Exception as e:
logger.error(f"Failed to update Advoware appointment {frnr}: {e}")
raise
async def delete_advoware_appointment(advoware, frnr):
"""Delete Advoware appointment."""
try:
await advoware.api_call('api/v1/advonet/Termine', method='DELETE', params={'frnr': frnr})
logger.info(f"Deleted Advoware appointment frNr: {frnr}")
except Exception as e:
logger.error(f"Failed to delete Advoware appointment {frnr}: {e}")
raise
async def create_google_event(service, calendar_id, data):
"""Create Google event from standardized data."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0)
if all_day:
start_obj = {'date': start_dt.strftime('%Y-%m-%d')}
end_obj = {'date': end_dt.strftime('%Y-%m-%d')}
else:
start_obj = {'dateTime': start_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
end_obj = {'dateTime': end_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
event_body = {
'summary': data['text'],
'description': data['notiz'],
'location': data['ort'],
'start': start_obj,
'end': end_obj,
'recurrence': data['recurrence'] # RRULE if present
}
try:
created = service.events().insert(calendarId=calendar_id, body=event_body).execute()
event_id = created['id']
logger.info(f"Created Google event ID: {event_id}")
return event_id
except HttpError as e:
logger.error(f"Google API error creating event: {e}")
raise
except Exception as e:
logger.error(f"Failed to create Google event: {e}")
raise
async def update_google_event(service, calendar_id, event_id, data):
"""Update Google event."""
start_dt = data['start'].astimezone(BERLIN_TZ)
end_dt = data['end'].astimezone(BERLIN_TZ)
all_day = data['dauertermin'] == 1 and start_dt.time() == datetime.time(0,0) and end_dt.time() == datetime.time(0,0)
if all_day:
start_obj = {'date': start_dt.strftime('%Y-%m-%d')}
end_obj = {'date': end_dt.strftime('%Y-%m-%d')}
else:
start_obj = {'dateTime': start_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
end_obj = {'dateTime': end_dt.isoformat(), 'timeZone': 'Europe/Berlin'}
event_body = {
'summary': data['text'],
'description': data['notiz'],
'location': data['ort'],
'start': start_obj,
'end': end_obj,
'recurrence': data['recurrence']
}
try:
service.events().update(calendarId=calendar_id, eventId=event_id, body=event_body).execute()
logger.info(f"Updated Google event ID: {event_id}")
except HttpError as e:
logger.error(f"Google API error updating event {event_id}: {e}")
raise
except Exception as e:
logger.error(f"Failed to update Google event {event_id}: {e}")
raise
async def delete_google_event(service, calendar_id, event_id):
"""Delete Google event."""
try:
service.events().delete(calendarId=calendar_id, eventId=event_id).execute()
logger.info(f"Deleted Google event ID: {event_id}")
except HttpError as e:
logger.error(f"Google API error deleting event {event_id}: {e}")
raise
except Exception as e:
logger.error(f"Failed to delete Google event {event_id}: {e}")
raise
async def get_advoware_timestamp(advoware, frnr):
"""Fetch last modified timestamp from Advoware (zuletztGeaendertAm)."""
try:
result = await advoware.api_call('api/v1/advonet/Termine', method='GET', params={'frnr': frnr})
if result and isinstance(result, list) and result:
return datetime.datetime.fromisoformat(result[0].get('zuletztGeaendertAm', '').rstrip('Z')).astimezone(BERLIN_TZ)
return None
except Exception as e:
logger.error(f"Failed to get Advoware timestamp for {frnr}: {e}")
return None
async def handler(event, context):
"""Main event handler for calendar sync."""
employee_kuerzel = event.get('data', {}).get('body', {}).get('employee_kuerzel', 'AI') # Default to 'AI' for test
logger.info(f"Starting calendar sync for {employee_kuerzel}")
try:
service = await get_google_service()
calendar_id = await ensure_google_calendar(service, employee_kuerzel)
advoware = AdvowareAPI(context)
async with await connect_db() as conn:
async with conn.transaction():
# Lock rows
rows = await conn.fetch(
"""
SELECT * FROM calendar_sync
WHERE employee_kuerzel = $1 AND deleted = FALSE
FOR UPDATE
""",
employee_kuerzel
)
# Build maps
adv_appointments = await fetch_advoware_appointments(advoware, employee_kuerzel)
adv_map = {str(app['frNr']): app for app in adv_appointments if app.get('frNr')}
google_events = await fetch_google_events(service, calendar_id)
google_map = {evt['id']: evt for evt in google_events}
adv_map_std = {frnr: standardize_appointment_data(app, 'advoware') for frnr, app in adv_map.items()}
google_map_std = {eid: standardize_appointment_data(evt, 'google') for eid, evt in google_map.items()}
# Build index from DB rows
db_adv_index = {row['advoware_frnr']: row for row in rows if row['advoware_frnr']}
db_google_index = {row['google_event_id']: row for row in rows if row['google_event_id']}
# Process existing
for row in rows:
frnr = row['advoware_frnr']
event_id = row['google_event_id']
adv_data = adv_map.pop(frnr, None) if frnr else None
google_data = google_map.pop(event_id, None) if event_id else None
adv_std = adv_map_std.pop(frnr, None) if frnr else None
google_std = google_map_std.pop(event_id, None) if event_id else None
if not adv_data and not google_data:
# Both missing - soft delete
await conn.execute("UPDATE calendar_sync SET deleted = TRUE WHERE sync_id = $1;", row['sync_id'])
logger.info(f"Marked as deleted: sync_id {row['sync_id']}")
continue
if adv_data and google_data:
# Both exist - check diff
if data_diff(adv_std, google_std):
strategy = row['sync_strategy']
if strategy == 'source_system_wins':
if row['source_system'] == 'advoware':
await update_google_event(service, calendar_id, event_id, adv_std)
elif row['source_system'] == 'google' and row['advoware_write_allowed']:
await update_advoware_appointment(advoware, frnr, google_std)
else:
logger.warning(f"Write to Advoware blocked for sync_id {row['sync_id']}")
elif strategy == 'last_change_wins':
adv_ts = await get_advoware_timestamp(advoware, frnr)
google_ts = datetime.datetime.fromisoformat(google_data.get('updated', '').rstrip('Z')).astimezone(BERLIN_TZ)
if adv_ts and google_ts:
if adv_ts > google_ts:
await update_google_event(service, calendar_id, event_id, adv_std)
elif row['advoware_write_allowed']:
await update_advoware_appointment(advoware, frnr, google_std)
else:
logger.warning(f"Missing timestamps for last_change_wins: sync_id {row['sync_id']}")
elif adv_data:
# Missing in Google - recreate or delete?
strategy = row['sync_strategy']
if strategy == 'source_system_wins' and row['source_system'] == 'advoware':
new_event_id = await create_google_event(service, calendar_id, adv_std)
await conn.execute(
"UPDATE calendar_sync SET google_event_id = $1 WHERE sync_id = $2;",
new_event_id, row['sync_id']
)
elif strategy == 'last_change_wins':
# Assume Adv change newer - recreate
new_event_id = await create_google_event(service, calendar_id, adv_std)
await conn.execute(
"UPDATE calendar_sync SET google_event_id = $1 WHERE sync_id = $2;",
new_event_id, row['sync_id']
)
else:
# Propagate delete to Advoware if allowed
if row['advoware_write_allowed']:
await delete_advoware_appointment(advoware, frnr)
await conn.execute("UPDATE calendar_sync SET deleted = TRUE WHERE sync_id = $1;", row['sync_id'])
elif google_data:
# Missing in Advoware - recreate or delete?
strategy = row['sync_strategy']
if strategy == 'source_system_wins' and row['source_system'] == 'google' and row['advoware_write_allowed']:
new_frnr = await create_advoware_appointment(advoware, google_std, employee_kuerzel)
await conn.execute(
"UPDATE calendar_sync SET advoware_frnr = $1 WHERE sync_id = $2;",
int(new_frnr), row['sync_id']
)
elif strategy == 'last_change_wins' and row['advoware_write_allowed']:
# Assume Google change newer - recreate
new_frnr = await create_advoware_appointment(advoware, google_std, employee_kuerzel)
await conn.execute(
"UPDATE calendar_sync SET advoware_frnr = $1 WHERE sync_id = $2;",
int(new_frnr), row['sync_id']
)
else:
# Propagate delete to Google
await delete_google_event(service, calendar_id, event_id)
await conn.execute("UPDATE calendar_sync SET deleted = TRUE WHERE sync_id = $1;", row['sync_id'])
# New from Advoware
for frnr, app in adv_map.items():
adv_std = standardize_appointment_data(app, 'advoware')
event_id = await create_google_event(service, calendar_id, adv_std)
await conn.execute(
"""
INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, advoware_write_allowed)
VALUES ($1, $2, $3, 'advoware', 'source_system_wins', FALSE);
""",
employee_kuerzel, int(frnr), event_id
)
logger.info(f"Created new from Advoware: frNr {frnr}, event_id {event_id}")
# New from Google
for event_id, evt in google_map.items():
google_std = standardize_appointment_data(evt, 'google')
frnr = await create_advoware_appointment(advoware, google_std, employee_kuerzel)
await conn.execute(
"""
INSERT INTO calendar_sync (employee_kuerzel, advoware_frnr, google_event_id, source_system, sync_strategy, advoware_write_allowed)
VALUES ($1, $2, $3, 'google', 'source_system_wins', TRUE);
""",
employee_kuerzel, int(frnr), event_id
)
logger.info(f"Created new from Google: event_id {event_id}, frNr {frnr}")
# Update last_sync
await conn.execute(
"UPDATE calendar_sync SET last_sync = NOW() WHERE employee_kuerzel = $1;",
employee_kuerzel
)
logger.info(f"Calendar sync completed for {employee_kuerzel}")
return {'status': 200, 'body': {'status': 'completed'}}
except Exception as e:
logger.error(f"Sync failed for {employee_kuerzel}: {e}")
return {'status': 500, 'body': {'error': str(e)}}