feat(sync): Enhance Akte sync process with batch processing and retry logic for failed events
This commit is contained in:
@@ -28,14 +28,15 @@ config = {
|
||||
PENDING_ADVO_KEY = "advoware:pending_aktennummern"
|
||||
PROCESSING_ADVO_KEY = "advoware:processing_aktennummern"
|
||||
|
||||
# Queue 2: written by EspoCRM webhook (keyed by entity ID)
|
||||
# Queue 2: retry queue for failed akte.sync events (EspoCRM webhooks now emit directly)
|
||||
PENDING_ID_KEY = "akte:pending_entity_ids"
|
||||
PROCESSING_ID_KEY = "akte:processing_entity_ids"
|
||||
|
||||
DEBOUNCE_SECS = 10
|
||||
BATCH_SIZE = 5 # max items to process per queue per cron tick
|
||||
|
||||
VALID_ADVOWARE_STATUSES = {'import', 'neu', 'new', 'aktiv', 'active'}
|
||||
VALID_AI_STATUSES = {'new', 'neu', 'aktiv', 'active'}
|
||||
VALID_ADVOWARE_STATUSES = frozenset({'import', 'neu', 'new', 'aktiv', 'active'})
|
||||
VALID_AI_STATUSES = frozenset({'new', 'neu', 'aktiv', 'active'})
|
||||
|
||||
|
||||
async def handler(input_data: None, ctx: FlowContext) -> None:
|
||||
@@ -60,23 +61,18 @@ async def handler(input_data: None, ctx: FlowContext) -> None:
|
||||
ctx.logger.info(f" Pending (aktennr) : {advo_pending}")
|
||||
ctx.logger.info(f" Pending (akte_id) : {id_pending}")
|
||||
|
||||
processed = False
|
||||
processed_count = 0
|
||||
|
||||
# ── Queue 1: Advoware Watcher (by Aktennummer) ─────────────────────
|
||||
advo_entries = redis_client.zrangebyscore(PENDING_ADVO_KEY, min=0, max=cutoff, start=0, num=1)
|
||||
if advo_entries:
|
||||
aktennr = advo_entries[0]
|
||||
if isinstance(aktennr, bytes):
|
||||
aktennr = aktennr.decode()
|
||||
|
||||
advo_entries = redis_client.zrangebyscore(PENDING_ADVO_KEY, min=0, max=cutoff, start=0, num=BATCH_SIZE)
|
||||
for raw in advo_entries:
|
||||
aktennr = raw.decode() if isinstance(raw, bytes) else raw
|
||||
score = redis_client.zscore(PENDING_ADVO_KEY, aktennr) or 0
|
||||
age = time.time() - score
|
||||
redis_client.zrem(PENDING_ADVO_KEY, aktennr)
|
||||
redis_client.sadd(PROCESSING_ADVO_KEY, aktennr)
|
||||
|
||||
processed_count += 1
|
||||
ctx.logger.info(f"📋 Aktennummer: {aktennr} (age={age:.1f}s)")
|
||||
processed = True
|
||||
|
||||
try:
|
||||
result = await espocrm.list_entities(
|
||||
'CAkten',
|
||||
@@ -85,51 +81,44 @@ async def handler(input_data: None, ctx: FlowContext) -> None:
|
||||
)
|
||||
if not result or not result.get('list'):
|
||||
ctx.logger.warn(f"⚠️ No CAkten found for aktennummer={aktennr} – removing")
|
||||
redis_client.srem(PROCESSING_ADVO_KEY, aktennr)
|
||||
else:
|
||||
akte = result['list'][0]
|
||||
await _emit_if_eligible(akte, aktennr, ctx)
|
||||
redis_client.srem(PROCESSING_ADVO_KEY, aktennr)
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Error (aktennr queue) {aktennr}: {e}")
|
||||
redis_client.zadd(PENDING_ADVO_KEY, {aktennr: time.time()})
|
||||
finally:
|
||||
redis_client.srem(PROCESSING_ADVO_KEY, aktennr)
|
||||
raise
|
||||
|
||||
# ── Queue 2: EspoCRM Webhook (by Entity ID) ────────────────────────
|
||||
id_entries = redis_client.zrangebyscore(PENDING_ID_KEY, min=0, max=cutoff, start=0, num=1)
|
||||
if id_entries:
|
||||
akte_id = id_entries[0]
|
||||
if isinstance(akte_id, bytes):
|
||||
akte_id = akte_id.decode()
|
||||
|
||||
# ── Queue 2: Retry queue for failed syncs ──────────────────────────
|
||||
id_entries = redis_client.zrangebyscore(PENDING_ID_KEY, min=0, max=cutoff, start=0, num=BATCH_SIZE)
|
||||
for raw in id_entries:
|
||||
akte_id = raw.decode() if isinstance(raw, bytes) else raw
|
||||
score = redis_client.zscore(PENDING_ID_KEY, akte_id) or 0
|
||||
age = time.time() - score
|
||||
redis_client.zrem(PENDING_ID_KEY, akte_id)
|
||||
redis_client.sadd(PROCESSING_ID_KEY, akte_id)
|
||||
|
||||
ctx.logger.info(f"📋 Entity ID: {akte_id} (age={age:.1f}s)")
|
||||
processed = True
|
||||
|
||||
processed_count += 1
|
||||
ctx.logger.info(f"📋 Entity ID (retry): {akte_id} (age={age:.1f}s)")
|
||||
try:
|
||||
akte = await espocrm.get_entity('CAkten', akte_id)
|
||||
if not akte:
|
||||
ctx.logger.warn(f"⚠️ No CAkten found for id={akte_id} – removing")
|
||||
redis_client.srem(PROCESSING_ID_KEY, akte_id)
|
||||
else:
|
||||
await _emit_if_eligible(akte, None, ctx)
|
||||
redis_client.srem(PROCESSING_ID_KEY, akte_id)
|
||||
except Exception as e:
|
||||
ctx.logger.error(f"❌ Error (entity-id queue) {akte_id}: {e}")
|
||||
ctx.logger.error(f"❌ Error (retry queue) {akte_id}: {e}")
|
||||
redis_client.zadd(PENDING_ID_KEY, {akte_id: time.time()})
|
||||
finally:
|
||||
redis_client.srem(PROCESSING_ID_KEY, akte_id)
|
||||
raise
|
||||
|
||||
if not processed:
|
||||
if not processed_count:
|
||||
if advo_pending > 0 or id_pending > 0:
|
||||
ctx.logger.info(f"⏸️ Entries pending but all too recent (< {DEBOUNCE_SECS}s)")
|
||||
else:
|
||||
ctx.logger.info("✓ Both queues empty")
|
||||
else:
|
||||
ctx.logger.info(f"✓ Processed {processed_count} item(s)")
|
||||
|
||||
ctx.logger.info("=" * 60)
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@ config = {
|
||||
"enqueues": ["document.generate_preview"],
|
||||
}
|
||||
|
||||
VALID_ADVOWARE_STATUSES = frozenset({'import', 'neu', 'new', 'aktiv', 'active'})
|
||||
VALID_AI_STATUSES = frozenset({'new', 'neu', 'aktiv', 'active'})
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Entry point
|
||||
@@ -52,7 +54,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
|
||||
return
|
||||
|
||||
lock_key = f"akte_sync:{akte_id}"
|
||||
lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=1800)
|
||||
lock_acquired = redis_client.set(lock_key, datetime.now().isoformat(), nx=True, ex=600)
|
||||
if not lock_acquired:
|
||||
ctx.logger.warn(f"⏸️ Lock busy for Akte {akte_id} – requeueing")
|
||||
raise RuntimeError(f"Lock busy for akte_id={akte_id}")
|
||||
@@ -81,8 +83,8 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
|
||||
ctx.logger.info(f" aiAktivierungsstatus : {ai_aktivierungsstatus}")
|
||||
|
||||
# Advoware sync requires an aktennummer (Akten without Advoware won't have one)
|
||||
advoware_enabled = bool(aktennummer) and sync_schalter and aktivierungsstatus in ('import', 'neu', 'new', 'aktiv', 'active')
|
||||
xai_enabled = ai_aktivierungsstatus in ('new', 'neu', 'aktiv', 'active')
|
||||
advoware_enabled = bool(aktennummer) and sync_schalter and aktivierungsstatus in VALID_ADVOWARE_STATUSES
|
||||
xai_enabled = ai_aktivierungsstatus in VALID_AI_STATUSES
|
||||
|
||||
ctx.logger.info(f" Advoware sync : {'✅ ON' if advoware_enabled else '⏭️ OFF'}")
|
||||
ctx.logger.info(f" xAI sync : {'✅ ON' if xai_enabled else '⏭️ OFF'}")
|
||||
@@ -91,14 +93,20 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
|
||||
ctx.logger.info("⏭️ Both syncs disabled – nothing to do")
|
||||
return
|
||||
|
||||
# ── ADVOWARE SYNC ──────────────────────────────────────────────────
|
||||
# ── Load CDokumente once (shared by Advoware + xAI sync) ─────────────────
|
||||
espo_docs: list = []
|
||||
if advoware_enabled or xai_enabled:
|
||||
docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes', max_size=1000)
|
||||
espo_docs = docs_result.get('list', [])
|
||||
|
||||
# ── ADVOWARE SYNC ────────────────────────────────────────────
|
||||
advoware_results = None
|
||||
if advoware_enabled:
|
||||
advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx)
|
||||
advoware_results = await _run_advoware_sync(akte, aktennummer, akte_id, espocrm, ctx, espo_docs)
|
||||
|
||||
# ── xAI SYNC ──────────────────────────────────────────────────────
|
||||
# ── xAI SYNC ────────────────────────────────────────────────
|
||||
if xai_enabled:
|
||||
await _run_xai_sync(akte, akte_id, espocrm, ctx)
|
||||
await _run_xai_sync(akte, akte_id, espocrm, ctx, espo_docs)
|
||||
|
||||
# ── Final Status ───────────────────────────────────────────────────
|
||||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
@@ -154,7 +162,7 @@ async def handler(event_data: Dict[str, Any], ctx: FlowContext) -> None:
|
||||
finally:
|
||||
if lock_acquired and redis_client:
|
||||
redis_client.delete(lock_key)
|
||||
ctx.logger.info(f"🔓 Lock released for Akte {aktennummer}")
|
||||
ctx.logger.info(f"🔓 Lock released for Akte {akte_id}")
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
@@ -167,6 +175,7 @@ async def _run_advoware_sync(
|
||||
akte_id: str,
|
||||
espocrm,
|
||||
ctx: FlowContext,
|
||||
espo_docs: list,
|
||||
) -> Dict[str, int]:
|
||||
from services.advoware_watcher_service import AdvowareWatcherService
|
||||
from services.advoware_history_service import AdvowareHistoryService
|
||||
@@ -187,10 +196,7 @@ async def _run_advoware_sync(
|
||||
ctx.logger.info("📂 ADVOWARE SYNC")
|
||||
ctx.logger.info("─" * 60)
|
||||
|
||||
# ── Fetch from all 3 sources ───────────────────────────────────────
|
||||
espo_docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes')
|
||||
espo_docs = espo_docs_result.get('list', [])
|
||||
|
||||
# ── Fetch Windows files + Advoware History ───────────────────────────
|
||||
try:
|
||||
windows_files = await watcher.get_akte_files(aktennummer)
|
||||
except Exception as e:
|
||||
@@ -225,7 +231,7 @@ async def _run_advoware_sync(
|
||||
|
||||
all_hnrs = set(espo_by_hnr.keys()) | set(history_by_hnr.keys())
|
||||
ctx.logger.info(f" Unique HNRs : {len(all_hnrs)}")
|
||||
|
||||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
# ── 3-way merge per HNR ───────────────────────────────────────────
|
||||
for hnr in all_hnrs:
|
||||
espo_doc = espo_by_hnr.get(hnr)
|
||||
@@ -259,7 +265,6 @@ async def _run_advoware_sync(
|
||||
blake3_hash = compute_blake3(content)
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
mime_type = mime_type or 'application/octet-stream'
|
||||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
attachment = await espocrm.upload_attachment_for_file_field(
|
||||
file_content=content,
|
||||
@@ -307,7 +312,6 @@ async def _run_advoware_sync(
|
||||
blake3_hash = compute_blake3(content)
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
mime_type = mime_type or 'application/octet-stream'
|
||||
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
update_data: Dict[str, Any] = {
|
||||
'name': filename,
|
||||
@@ -323,15 +327,12 @@ async def _run_advoware_sync(
|
||||
update_data['advowareArt'] = (history_entry.get('art', 'Schreiben') or 'Schreiben')[:100]
|
||||
update_data['advowareBemerkung'] = (history_entry.get('text', '') or '')[:255]
|
||||
|
||||
# Mark for re-sync to xAI if content changed
|
||||
if espo_doc.get('aiSyncStatus') == 'synced':
|
||||
update_data['aiSyncStatus'] = 'unclean'
|
||||
await espocrm.update_entity('CDokumente', espo_doc['id'], update_data)
|
||||
results['updated'] += 1
|
||||
|
||||
# Mark for re-sync to xAI (hash changed)
|
||||
if espo_doc.get('aiSyncStatus') == 'synced':
|
||||
await espocrm.update_entity('CDokumente', espo_doc['id'], {
|
||||
'aiSyncStatus': 'unclean',
|
||||
})
|
||||
|
||||
try:
|
||||
await ctx.emit('document.generate_preview', {
|
||||
'entity_id': espo_doc['id'],
|
||||
@@ -388,6 +389,7 @@ async def _run_xai_sync(
|
||||
akte_id: str,
|
||||
espocrm,
|
||||
ctx: FlowContext,
|
||||
docs: list,
|
||||
) -> None:
|
||||
from services.xai_service import XAIService
|
||||
from services.xai_upload_utils import XAIUploadUtils
|
||||
@@ -408,9 +410,6 @@ async def _run_xai_sync(
|
||||
await espocrm.update_entity('CAkten', akte_id, {'aiSyncStatus': 'failed'})
|
||||
return
|
||||
|
||||
# ── Load all linked documents ──────────────────────────────────
|
||||
docs_result = await espocrm.list_related('CAkten', akte_id, 'dokumentes')
|
||||
docs = docs_result.get('list', [])
|
||||
ctx.logger.info(f" Documents to check: {len(docs)}")
|
||||
|
||||
synced = 0
|
||||
@@ -418,9 +417,16 @@ async def _run_xai_sync(
|
||||
failed = 0
|
||||
|
||||
for doc in docs:
|
||||
# Determine skip condition based on pre-sync state (avoids stale-dict stats bug)
|
||||
will_skip = (
|
||||
doc.get('aiSyncStatus') == 'synced'
|
||||
and doc.get('aiSyncHash')
|
||||
and doc.get('blake3hash')
|
||||
and doc.get('aiSyncHash') == doc.get('blake3hash')
|
||||
)
|
||||
ok = await upload_utils.sync_document_to_xai(doc, collection_id, xai, espocrm)
|
||||
if ok:
|
||||
if doc.get('aiSyncStatus') == 'synced' and doc.get('aiSyncHash') == doc.get('blake3hash'):
|
||||
if will_skip:
|
||||
skipped += 1
|
||||
else:
|
||||
synced += 1
|
||||
|
||||
Reference in New Issue
Block a user