@@ -1,435 +0,0 @@
"""
Akte Sync - Event Handler
Unified sync for one CAkten entity across all configured backends:
- Advoware (3-way merge: Windows ↔ EspoCRM ↔ History)
- xAI (Blake3 hash-based upload to Collection)
Both run in the same event to keep CDokumente perfectly in sync.
Trigger: akte.sync { akte_id, aktennummer }
Lock: Redis per-Akte (30 min TTL, prevents double-sync of same Akte)
Parallel: Different Akten sync simultaneously.
Enqueues:
- document.generate_preview (after CREATE / UPDATE_ESPO)
"""
from typing import Dict , Any
from datetime import datetime
from motia import FlowContext , queue
config = {
" name " : " Akte Sync - Event Handler " ,
" description " : " Unified sync for one Akte: Advoware 3-way merge + xAI upload " ,
" flows " : [ " akte-sync " ] ,
" triggers " : [ queue ( " akte.sync " ) ] ,
" enqueues " : [ " document.generate_preview " ] ,
}
# ─────────────────────────────────────────────────────────────────────────────
# Entry point
# ─────────────────────────────────────────────────────────────────────────────
async def handler ( event_data : Dict [ str , Any ] , ctx : FlowContext ) - > None :
akte_id = event_data . get ( ' akte_id ' )
aktennummer = event_data . get ( ' aktennummer ' )
ctx . logger . info ( " = " * 80 )
ctx . logger . info ( " 🔄 AKTE SYNC STARTED " )
ctx . logger . info ( f " Aktennummer : { aktennummer } " )
ctx . logger . info ( f " EspoCRM ID : { akte_id } " )
ctx . logger . info ( " = " * 80 )
from services . redis_client import get_redis_client
from services . espocrm import EspoCRMAPI
redis_client = get_redis_client ( strict = False )
if not redis_client :
ctx . logger . error ( " ❌ Redis unavailable " )
return
lock_key = f " akte_sync: { akte_id } "
lock_acquired = redis_client . set ( lock_key , datetime . now ( ) . isoformat ( ) , nx = True , ex = 1800 )
if not lock_acquired :
ctx . logger . warn ( f " ⏸️ Lock busy for Akte { akte_id } – requeueing " )
raise RuntimeError ( f " Lock busy for akte_id= { akte_id } " )
espocrm = EspoCRMAPI ( ctx )
try :
# ── Load Akte ──────────────────────────────────────────────────────
akte = await espocrm . get_entity ( ' CAkten ' , akte_id )
if not akte :
ctx . logger . error ( f " ❌ Akte { akte_id } not found in EspoCRM " )
return
# aktennummer can come from the event payload OR from the entity
# (Akten without Advoware have no aktennummer)
if not aktennummer :
aktennummer = akte . get ( ' aktennummer ' )
sync_schalter = akte . get ( ' syncSchalter ' , False )
aktivierungsstatus = str ( akte . get ( ' aktivierungsstatus ' ) or ' ' ) . lower ( )
ai_aktivierungsstatus = str ( akte . get ( ' aiAktivierungsstatus ' ) or ' ' ) . lower ( )
ctx . logger . info ( f " 📋 Akte ' { akte . get ( ' name ' ) } ' " )
ctx . logger . info ( f " syncSchalter : { sync_schalter } " )
ctx . logger . info ( f " aktivierungsstatus : { aktivierungsstatus } " )
ctx . logger . info ( f " aiAktivierungsstatus : { ai_aktivierungsstatus } " )
# Advoware sync requires an aktennummer (Akten without Advoware won't have one)
advoware_enabled = bool ( aktennummer ) and sync_schalter and aktivierungsstatus in ( ' import ' , ' neu ' , ' new ' , ' aktiv ' , ' active ' )
xai_enabled = ai_aktivierungsstatus in ( ' new ' , ' neu ' , ' aktiv ' , ' active ' )
ctx . logger . info ( f " Advoware sync : { ' ✅ ON ' if advoware_enabled else ' ⏭️ OFF ' } " )
ctx . logger . info ( f " xAI sync : { ' ✅ ON ' if xai_enabled else ' ⏭️ OFF ' } " )
if not advoware_enabled and not xai_enabled :
ctx . logger . info ( " ⏭️ Both syncs disabled – nothing to do " )
return
# ── ADVOWARE SYNC ──────────────────────────────────────────────────
advoware_results = None
if advoware_enabled :
advoware_results = await _run_advoware_sync ( akte , aktennummer , akte_id , espocrm , ctx )
# ── xAI SYNC ──────────────────────────────────────────────────────
if xai_enabled :
await _run_xai_sync ( akte , akte_id , espocrm , ctx )
# ── Final Status ───────────────────────────────────────────────────
now = datetime . now ( ) . strftime ( ' % Y- % m- %d % H: % M: % S ' )
final_update : Dict [ str , Any ] = { ' globalLastSync ' : now , ' globalSyncStatus ' : ' synced ' }
if advoware_enabled :
final_update [ ' syncStatus ' ] = ' synced '
final_update [ ' lastSync ' ] = now
# 'import' = erster Sync → danach auf 'aktiv' setzen
if aktivierungsstatus == ' import ' :
final_update [ ' aktivierungsstatus ' ] = ' aktiv '
ctx . logger . info ( " 🔄 aktivierungsstatus: import → aktiv " )
if xai_enabled :
final_update [ ' aiSyncStatus ' ] = ' synced '
final_update [ ' aiLastSync ' ] = now
# 'new' = Collection wurde gerade erstmalig angelegt → auf 'aktiv' setzen
if ai_aktivierungsstatus == ' new ' :
final_update [ ' aiAktivierungsstatus ' ] = ' aktiv '
ctx . logger . info ( " 🔄 aiAktivierungsstatus: new → aktiv " )
await espocrm . update_entity ( ' CAkten ' , akte_id , final_update )
# Clean up processing sets (both queues may have triggered this sync)
if aktennummer :
redis_client . srem ( " advoware:processing_aktennummern " , aktennummer )
redis_client . srem ( " akte:processing_entity_ids " , akte_id )
ctx . logger . info ( " = " * 80 )
ctx . logger . info ( " ✅ AKTE SYNC COMPLETE " )
if advoware_results :
ctx . logger . info ( f " Advoware: created= { advoware_results [ ' created ' ] } updated= { advoware_results [ ' updated ' ] } deleted= { advoware_results [ ' deleted ' ] } errors= { advoware_results [ ' errors ' ] } " )
ctx . logger . info ( " = " * 80 )
except Exception as e :
ctx . logger . error ( f " ❌ Sync failed: { e } " )
import traceback
ctx . logger . error ( traceback . format_exc ( ) )
# Requeue for retry (into the appropriate queue(s))
import time
now_ts = time . time ( )
if aktennummer :
redis_client . zadd ( " advoware:pending_aktennummern " , { aktennummer : now_ts } )
redis_client . zadd ( " akte:pending_entity_ids " , { akte_id : now_ts } )
try :
await espocrm . update_entity ( ' CAkten ' , akte_id , {
' syncStatus ' : ' failed ' ,
' globalSyncStatus ' : ' failed ' ,
} )
except Exception :
pass
raise
finally :
if lock_acquired and redis_client :
redis_client . delete ( lock_key )
ctx . logger . info ( f " 🔓 Lock released for Akte { aktennummer } " )
# ─────────────────────────────────────────────────────────────────────────────
# Advoware 3-way merge
# ─────────────────────────────────────────────────────────────────────────────
async def _run_advoware_sync (
akte : Dict [ str , Any ] ,
aktennummer : str ,
akte_id : str ,
espocrm ,
ctx : FlowContext ,
) - > Dict [ str , int ] :
from services . advoware_watcher_service import AdvowareWatcherService
from services . advoware_history_service import AdvowareHistoryService
from services . advoware_service import AdvowareService
from services . advoware_document_sync_utils import AdvowareDocumentSyncUtils
from services . blake3_utils import compute_blake3
import mimetypes
watcher = AdvowareWatcherService ( ctx )
history_service = AdvowareHistoryService ( ctx )
advoware_service = AdvowareService ( ctx )
sync_utils = AdvowareDocumentSyncUtils ( ctx )
results = { ' created ' : 0 , ' updated ' : 0 , ' deleted ' : 0 , ' skipped ' : 0 , ' errors ' : 0 }
ctx . logger . info ( " " )
ctx . logger . info ( " ─ " * 60 )
ctx . logger . info ( " 📂 ADVOWARE SYNC " )
ctx . logger . info ( " ─ " * 60 )
# ── Fetch from all 3 sources ───────────────────────────────────────
espo_docs_result = await espocrm . list_related ( ' CAkten ' , akte_id , ' dokumentes ' )
espo_docs = espo_docs_result . get ( ' list ' , [ ] )
try :
windows_files = await watcher . get_akte_files ( aktennummer )
except Exception as e :
ctx . logger . error ( f " ❌ Windows watcher failed: { e } " )
windows_files = [ ]
try :
advo_history = await history_service . get_akte_history ( aktennummer )
except Exception as e :
ctx . logger . error ( f " ❌ Advoware history failed: { e } " )
advo_history = [ ]
ctx . logger . info ( f " EspoCRM docs : { len ( espo_docs ) } " )
ctx . logger . info ( f " Windows files : { len ( windows_files ) } " )
ctx . logger . info ( f " History entries: { len ( advo_history ) } " )
# ── Cleanup Windows list (only files in History) ───────────────────
windows_files = sync_utils . cleanup_file_list ( windows_files , advo_history )
# ── Build indexes by HNR (stable identifier from Advoware) ────────
espo_by_hnr = { }
for doc in espo_docs :
if doc . get ( ' hnr ' ) :
espo_by_hnr [ doc [ ' hnr ' ] ] = doc
history_by_hnr = { }
for entry in advo_history :
if entry . get ( ' hNr ' ) :
history_by_hnr [ entry [ ' hNr ' ] ] = entry
windows_by_path = { f . get ( ' path ' , ' ' ) . lower ( ) : f for f in windows_files }
all_hnrs = set ( espo_by_hnr . keys ( ) ) | set ( history_by_hnr . keys ( ) )
ctx . logger . info ( f " Unique HNRs : { len ( all_hnrs ) } " )
# ── 3-way merge per HNR ───────────────────────────────────────────
for hnr in all_hnrs :
espo_doc = espo_by_hnr . get ( hnr )
history_entry = history_by_hnr . get ( hnr )
windows_file = None
if history_entry and history_entry . get ( ' datei ' ) :
windows_file = windows_by_path . get ( history_entry [ ' datei ' ] . lower ( ) )
if history_entry and history_entry . get ( ' datei ' ) :
filename = history_entry [ ' datei ' ] . split ( ' \\ ' ) [ - 1 ]
elif espo_doc :
filename = espo_doc . get ( ' name ' , f ' hnr_ { hnr } ' )
else :
filename = f ' hnr_ { hnr } '
try :
action = sync_utils . merge_three_way ( espo_doc , windows_file , history_entry )
ctx . logger . info ( f " [ { action . action : 12s } ] { filename } (hnr= { hnr } ) – { action . reason } " )
if action . action == ' SKIP ' :
results [ ' skipped ' ] + = 1
elif action . action == ' CREATE ' :
if not windows_file :
ctx . logger . error ( f " ❌ CREATE: no Windows file for hnr { hnr } " )
results [ ' errors ' ] + = 1
continue
content = await watcher . download_file ( aktennummer , windows_file . get ( ' relative_path ' , filename ) )
blake3_hash = compute_blake3 ( content )
mime_type , _ = mimetypes . guess_type ( filename )
mime_type = mime_type or ' application/octet-stream '
now = datetime . now ( ) . strftime ( ' % Y- % m- %d % H: % M: % S ' )
attachment = await espocrm . upload_attachment_for_file_field (
file_content = content ,
filename = filename ,
related_type = ' CDokumente ' ,
field = ' dokument ' ,
mime_type = mime_type ,
)
new_doc = await espocrm . create_entity ( ' CDokumente ' , {
' name ' : filename ,
' dokumentId ' : attachment . get ( ' id ' ) ,
' hnr ' : history_entry . get ( ' hNr ' ) if history_entry else None ,
' advowareArt ' : ( history_entry . get ( ' art ' , ' Schreiben ' ) or ' Schreiben ' ) [ : 100 ] if history_entry else ' Schreiben ' ,
' advowareBemerkung ' : ( history_entry . get ( ' text ' , ' ' ) or ' ' ) [ : 255 ] if history_entry else ' ' ,
' dateipfad ' : windows_file . get ( ' path ' , ' ' ) ,
' blake3hash ' : blake3_hash ,
' syncedHash ' : blake3_hash ,
' usn ' : windows_file . get ( ' usn ' , 0 ) ,
' syncStatus ' : ' synced ' ,
' lastSyncTimestamp ' : now ,
' cAktenId ' : akte_id , # Direct FK to CAkten
} )
doc_id = new_doc . get ( ' id ' )
# Link to Akte
await espocrm . link_entities ( ' CAkten ' , akte_id , ' dokumentes ' , doc_id )
results [ ' created ' ] + = 1
# Trigger preview
try :
await ctx . emit ( ' document.generate_preview ' , {
' entity_id ' : doc_id ,
' entity_type ' : ' CDokumente ' ,
} )
except Exception as e :
ctx . logger . warn ( f " ⚠️ Preview trigger failed: { e } " )
elif action . action == ' UPDATE_ESPO ' :
if not windows_file :
ctx . logger . error ( f " ❌ UPDATE_ESPO: no Windows file for hnr { hnr } " )
results [ ' errors ' ] + = 1
continue
content = await watcher . download_file ( aktennummer , windows_file . get ( ' relative_path ' , filename ) )
blake3_hash = compute_blake3 ( content )
mime_type , _ = mimetypes . guess_type ( filename )
mime_type = mime_type or ' application/octet-stream '
now = datetime . now ( ) . strftime ( ' % Y- % m- %d % H: % M: % S ' )
update_data : Dict [ str , Any ] = {
' name ' : filename ,
' blake3hash ' : blake3_hash ,
' syncedHash ' : blake3_hash ,
' usn ' : windows_file . get ( ' usn ' , 0 ) ,
' dateipfad ' : windows_file . get ( ' path ' , ' ' ) ,
' syncStatus ' : ' synced ' ,
' lastSyncTimestamp ' : now ,
}
if history_entry :
update_data [ ' hnr ' ] = history_entry . get ( ' hNr ' )
update_data [ ' advowareArt ' ] = ( history_entry . get ( ' art ' , ' Schreiben ' ) or ' Schreiben ' ) [ : 100 ]
update_data [ ' advowareBemerkung ' ] = ( history_entry . get ( ' text ' , ' ' ) or ' ' ) [ : 255 ]
await espocrm . update_entity ( ' CDokumente ' , espo_doc [ ' id ' ] , update_data )
results [ ' updated ' ] + = 1
# Mark for re-sync to xAI (hash changed)
if espo_doc . get ( ' aiSyncStatus ' ) == ' synced ' :
await espocrm . update_entity ( ' CDokumente ' , espo_doc [ ' id ' ] , {
' aiSyncStatus ' : ' unclean ' ,
} )
try :
await ctx . emit ( ' document.generate_preview ' , {
' entity_id ' : espo_doc [ ' id ' ] ,
' entity_type ' : ' CDokumente ' ,
} )
except Exception as e :
ctx . logger . warn ( f " ⚠️ Preview trigger failed: { e } " )
elif action . action == ' DELETE ' :
if espo_doc :
# Only delete if the HNR is genuinely absent from Advoware History
# (not just absent from Windows – avoids deleting docs whose file
# is temporarily unavailable on the Windows share)
if hnr in history_by_hnr :
ctx . logger . warn ( f " ⚠️ SKIP DELETE hnr= { hnr } : still in Advoware History, only missing from Windows " )
results [ ' skipped ' ] + = 1
else :
await espocrm . delete_entity ( ' CDokumente ' , espo_doc [ ' id ' ] )
results [ ' deleted ' ] + = 1
except Exception as e :
ctx . logger . error ( f " ❌ Error for hnr { hnr } ( { filename } ): { e } " )
results [ ' errors ' ] + = 1
# ── Ablage check + Rubrum sync ─────────────────────────────────────
try :
akte_details = await advoware_service . get_akte ( aktennummer )
if akte_details :
espo_update : Dict [ str , Any ] = { }
if akte_details . get ( ' ablage ' ) == 1 :
ctx . logger . info ( " 📁 Akte marked as ablage → deactivating " )
espo_update [ ' aktivierungsstatus ' ] = ' deaktiviert '
rubrum = akte_details . get ( ' rubrum ' )
if rubrum and rubrum != akte . get ( ' rubrum ' ) :
espo_update [ ' rubrum ' ] = rubrum
ctx . logger . info ( f " 📝 Rubrum synced: { rubrum [ : 80 ] } " )
if espo_update :
await espocrm . update_entity ( ' CAkten ' , akte_id , espo_update )
except Exception as e :
ctx . logger . warn ( f " ⚠️ Ablage/Rubrum check failed: { e } " )
return results
# ─────────────────────────────────────────────────────────────────────────────
# xAI sync
# ─────────────────────────────────────────────────────────────────────────────
async def _run_xai_sync (
akte : Dict [ str , Any ] ,
akte_id : str ,
espocrm ,
ctx : FlowContext ,
) - > None :
from services . xai_service import XAIService
from services . xai_upload_utils import XAIUploadUtils
xai = XAIService ( ctx )
upload_utils = XAIUploadUtils ( ctx )
ctx . logger . info ( " " )
ctx . logger . info ( " ─ " * 60 )
ctx . logger . info ( " 🤖 xAI SYNC " )
ctx . logger . info ( " ─ " * 60 )
try :
# ── Ensure collection exists ───────────────────────────────────
collection_id = await upload_utils . ensure_collection ( akte , xai , espocrm )
if not collection_id :
ctx . logger . error ( " ❌ Could not obtain xAI collection – aborting xAI sync " )
await espocrm . update_entity ( ' CAkten ' , akte_id , { ' aiSyncStatus ' : ' failed ' } )
return
# ── Load all linked documents ──────────────────────────────────
docs_result = await espocrm . list_related ( ' CAkten ' , akte_id , ' dokumentes ' )
docs = docs_result . get ( ' list ' , [ ] )
ctx . logger . info ( f " Documents to check: { len ( docs ) } " )
synced = 0
skipped = 0
failed = 0
for doc in docs :
ok = await upload_utils . sync_document_to_xai ( doc , collection_id , xai , espocrm )
if ok :
if doc . get ( ' aiSyncStatus ' ) == ' synced ' and doc . get ( ' aiSyncHash ' ) == doc . get ( ' blake3hash ' ) :
skipped + = 1
else :
synced + = 1
else :
failed + = 1
ctx . logger . info ( f " ✅ Synced : { synced } " )
ctx . logger . info ( f " ⏭️ Skipped : { skipped } " )
ctx . logger . info ( f " ❌ Failed : { failed } " )
finally :
await xai . close ( )