91 lines
3.0 KiB
Python
91 lines
3.0 KiB
Python
"""AI Knowledge Daily Sync - Cron Job"""
|
|
from typing import Any
|
|
from motia import FlowContext, cron
|
|
|
|
|
|
config = {
|
|
"name": "AI Knowledge Daily Sync",
|
|
"description": "Daily sync of all CAIKnowledge entities (catches missed webhooks, Blake3 verification included)",
|
|
"flows": ["aiknowledge-full-sync"],
|
|
"triggers": [
|
|
cron("0 0 2 * * *"), # Daily at 2:00 AM
|
|
],
|
|
"enqueues": ["aiknowledge.sync"],
|
|
}
|
|
|
|
|
|
async def handler(input_data: None, ctx: FlowContext[Any]) -> None:
|
|
"""
|
|
Daily sync handler - ensures all active knowledge bases are synchronized.
|
|
|
|
Loads all CAIKnowledge entities that need sync and emits events.
|
|
Blake3 hash verification is always performed (hash available from JunctionData API).
|
|
Runs every day at 02:00:00.
|
|
"""
|
|
from services.espocrm import EspoCRMAPI
|
|
from services.models import AIKnowledgeActivationStatus, AIKnowledgeSyncStatus
|
|
|
|
ctx.logger.info("=" * 80)
|
|
ctx.logger.info("🌙 DAILY AI KNOWLEDGE SYNC STARTED")
|
|
ctx.logger.info("=" * 80)
|
|
|
|
espocrm = EspoCRMAPI(ctx)
|
|
|
|
try:
|
|
# Load all CAIKnowledge entities with status 'active' that need sync
|
|
result = await espocrm.list_entities(
|
|
'CAIKnowledge',
|
|
where=[
|
|
{
|
|
'type': 'equals',
|
|
'attribute': 'aktivierungsstatus',
|
|
'value': AIKnowledgeActivationStatus.ACTIVE.value
|
|
},
|
|
{
|
|
'type': 'in',
|
|
'attribute': 'syncStatus',
|
|
'value': [
|
|
AIKnowledgeSyncStatus.UNCLEAN.value,
|
|
AIKnowledgeSyncStatus.FAILED.value
|
|
]
|
|
}
|
|
],
|
|
select='id,name,syncStatus',
|
|
max_size=1000 # Adjust if you have more
|
|
)
|
|
|
|
entities = result.get('list', [])
|
|
total = len(entities)
|
|
|
|
ctx.logger.info(f"📊 Found {total} knowledge bases needing sync")
|
|
|
|
if total == 0:
|
|
ctx.logger.info("✅ All knowledge bases are synced")
|
|
ctx.logger.info("=" * 80)
|
|
return
|
|
|
|
# Enqueue sync events for all (Blake3 verification always enabled)
|
|
for i, entity in enumerate(entities, 1):
|
|
await ctx.enqueue({
|
|
'topic': 'aiknowledge.sync',
|
|
'data': {
|
|
'knowledge_id': entity['id'],
|
|
'source': 'daily_cron'
|
|
}
|
|
})
|
|
ctx.logger.info(
|
|
f"📤 [{i}/{total}] Enqueued: {entity['name']} "
|
|
f"(syncStatus={entity.get('syncStatus')})"
|
|
)
|
|
|
|
ctx.logger.info("=" * 80)
|
|
ctx.logger.info(f"✅ Daily sync complete: {total} events enqueued")
|
|
ctx.logger.info("=" * 80)
|
|
|
|
except Exception as e:
|
|
ctx.logger.error("=" * 80)
|
|
ctx.logger.error("❌ FULL SYNC FAILED")
|
|
ctx.logger.error("=" * 80)
|
|
ctx.logger.error(f"Error: {e}", exc_info=True)
|
|
raise
|