Files
motia/bitbylaw/docs/SYNC_TEMPLATE.md

22 KiB

Advoware Sync Template

Template für neue bidirektionale Syncs zwischen EspoCRM und Advoware.

Quick Start

Für neuen Sync von Entity XYZ:

1. EspoCRM Custom Fields

-- In EspoCRM Admin → Entity Manager → XYZ
advowareId         (int, unique)           -- Foreign Key zu Advoware
advowareRowId      (varchar 50)            -- Für Change Detection (WICHTIG!)
syncStatus         (enum: clean|dirty|...) -- Status tracking
advowareLastSync   (datetime)              -- Timestamp letzter erfolgreicher Sync
syncErrorMessage   (text, 2000)            -- Fehler-Details
syncRetryCount     (int)                   -- Anzahl Retry-Versuche

WICHTIG: Change Detection via rowId

  • Advoware's rowId Feld ändert sich bei jedem Update
  • EINZIGE Methode für Advoware Change Detection (Advoware liefert keine Timestamps!)
  • Base64-kodierte Binary-ID (~40 Zeichen), sehr zuverlässig

2. Mapper erstellen

# services/xyz_mapper.py
class XYZMapper:
    @staticmethod
    def map_espo_to_advoware(espo_entity: Dict) -> Dict:
        """EspoCRM → Advoware transformation"""
        return {
            'field1': espo_entity.get('espoField1'),
            'field2': espo_entity.get('espoField2'),
            # Nur relevante Felder mappen!
        }
    
    @staticmethod
    def map_advoware_to_espo(advo_entity: Dict) -> Dict:
        """Advoware → EspoCRM transformation"""
        return {
            'espoField1': advo_entity.get('field1'),
            'espoField2': advo_entity.get('field2'),
            'advowareRowId': advo_entity.get('rowId'),  # WICHTIG für Change Detection!
        }

3. Sync Utils erstellen

# services/xyz_sync_utils.py
import redis
from typing import Dict, Any, Optional
from datetime import datetime
import pytz

MAX_SYNC_RETRIES = 5
LOCK_TTL_SECONDS = 300

class XYZSync:
    def __init__(self, espocrm_api, redis_client: redis.Redis, context=None):
        self.espocrm = espocrm_api
        self.redis = redis_client
        self.context = context
    
    async def acquire_sync_lock(self, entity_id: str) -> bool:
        """Atomic distributed lock via Redis"""
        if self.redis:
            lock_key = f"sync_lock:xyz:{entity_id}"
            acquired = self.redis.set(lock_key, "locked", nx=True, ex=LOCK_TTL_SECONDS)
            if not acquired:
                return False
        
        await self.espocrm.update_entity('XYZ', entity_id, {'syncStatus': 'syncing'})
        return True
    
    async def release_sync_lock(
        self,
        entity_id: str,
        new_status: str = 'clean',
        error_message: Optional[str] = None,
        increment_retry: bool = False,
        extra_fields: Optional[Dict[str, Any]] = None
    ):
        """
        Release lock and update status (combined operation)
        
        WICHTIG: extra_fields verwenden um advowareRowId nach jedem Sync zu speichern!
        """
        # EspoCRM DateTime Format: 'YYYY-MM-DD HH:MM:SS' (kein Timezone!)
        now_utc = datetime.now(pytz.UTC)
        espocrm_timestamp = now_utc.strftime('%Y-%m-%d %H:%M:%S')
        
        update_data = {
            'syncStatus': new_status,
            'advowareLastSync': espocrm_timestamp
        }
        
        if error_message:
            update_data['syncErrorMessage'] = error_message[:2000]
        else:
            update_data['syncErrorMessage'] = None
        
        if increment_retry:
            entity = await self.espocrm.get_entity('XYZ', entity_id)
            retry_count = (entity.get('syncRetryCount') or 0) + 1
            update_data['syncRetryCount'] = retry_count
            
            if retry_count >= MAX_SYNC_RETRIES:
                update_data['syncStatus'] = 'permanently_failed'
                await self.send_notification(
                    entity_id,
                    f"Sync failed after {MAX_SYNC_RETRIES} attempts"
                )
        else:
            update_data['syncRetryCount'] = 0
        
        if extra_fields:
            update_data.update(extra_fields)
        
        await self.espocrm.update_entity('XYZ', entity_id, update_data)
        
        if self.redis:
            self.redis.delete(f"sync_lock:xyz:{entity_id}")
    entities(self, espo_entity: Dict, advo_entity: Dict) -> str:
        """
        Vergleicht EspoCRM und Advoware Entity mit rowId-basierter Change Detection.
        
        PRIMÄR: rowId-Vergleich (Advoware rowId ändert sich bei jedem Update)
        FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar)
        
        Logik:
        - rowId geändert + EspoCRM geändert (modifiedAt > lastSync) → conflict
        - Nur rowId geändert → advoware_newer
        - Nur EspoCRM geändert → espocrm_newer
        - Keine Änderung → no_change
        
        Returns:
            "espocrm_newer": EspoCRM wurde geändert
            "advoware_newer": Advoware wurde geändert
            "conflict": Beide wurden geändert
            "no_change": Keine Änderungen
        """
        espo_rowid = espo_entity.get('advowareRowId')
        advo_rowid = advo_entity.get('rowId')
        last_sync = espo_entity.get('advowareLastSync')
        espo_modified = espo_entity.get('modifiedAt')
        
        # PRIMÄR: rowId-basierte Änderungserkennung (sehr zuverlässig!)
        if espo_rowid and advo_rowid and last_sync:
            # Prüfe ob Advoware geändert wurde (rowId)
            advo_changed = (espo_rowid != advo_rowid)
            
            # Prüfe ob EspoCRM auch geändert wurde (seit letztem Sync)
            espo_changed = False
            if espo_modified:
                try:
                    espo_ts = self._parse_ts(espo_modified)
                    sync_ts = self._parse_ts(last_sync)
                    if espo_ts and sync_ts:
                        espo_changed = (espo_ts > sync_ts)
                except Exception as e:
                    self._log(f"Timestamp-Parse-Fehler: {e}", level='debug')
            
            # Konfliktlogik
            if advo_changed and espo_changed:
                self._log(f"🚨 KONFLIKT: Beide Seiten geändert seit letztem Sync")
                return 'conflict'
            elif advo_changed:
                self._log(f"Advoware rowId geändert: {espo_rowid[:20]}... → {advo_rowid[:20]}...")
                return 'advoware_newer'
            elif espo_changed:
                self._log(f"EspoCRM neuer (modifiedAt > lastSync)")
                return 'espocrm_newer'
            else:
                # Weder Advoware noch EspoCRM geändert
                return 'no_change'
        
        # FALLBACK: Timestamp-Vergleich (wenn rowId nicht verfügbar)
        self._log("⚠️ rowId nicht verfügbar, fallback auf Timestamp-Vergleich", level='warn')
        return self.compare_timestamps(
            espo_entity.get('modifiedAt'),
            advo_entity.get('geaendertAm'),  # Advoware Timestamp-Feld
            espo_entity.get('advowareLastSync')
        )
    
    def compare_timestamps(self, espo_ts, advo_ts, last_sync_ts):
        """
        FALLBACK: Timestamp-basierte Änderungserkennung
        
        ACHTUNG: Weniger zuverlässig als rowId (Timestamps können NULL sein)
        Nur verwenden wenn rowId nicht verfügbar!
        nc_ts):
        """Compare timestamps and determine sync direction"""
        # Parse timestamps
        espo = self._parse_ts(espo_ts)
        advo = self._parse_ts(advo_ts)
        sync = self._parse_ts(last_sync_ts)
        
        if not sync:
            if not espo or not advo:
                return "no_change"
            return "espocrm_newer" if espo > advo else "advoware_newer"
        
        espo_changed = espo and espo > sync
        advo_changed = advo and advo > sync
        
        if espo_changed and advo_changed:
            return "conflict"
        elif espo_changed:
            return "espocrm_newer"
        elif advo_changed:
            return "advoware_newer"
        else:
            return "no_change"
    
    def merge_for_advoware_put(self, advo_entity, espo_entity, mapper):
        """Merge EspoCRM updates into Advoware entity (Read-Modify-Write)"""
        advo_updates = mapper.map_espo_to_advoware(espo_entity)
        merged = {**advo_entity, **advo_updates}
        
        self._log(f"📝 Merge: {len(advo_updates)} updates → {len(merged)} total")
        return merged
    
    async def send_notification(self, entity_id, message):
        """Send in-app notification to EspoCRM"""
        # Implementation...
        pass
    
    def _parse_ts(self, ts):
        """Parse timestamp string to datetime"""
        # Implementation...
        pass
    
    def _log(self, msg, level='info'):
        """Log with context support"""
        if self.context:
            getattr(self.context.logger, level)(msg)

4. Event Handler erstellen

# steps/vmh/xyz_sync_event_step.py
from services.advoware import AdvowareAPI
from services.espocrm import EspoCRMAPI
from services.xyz_mapper import XYZMapper
from services.xyz_sync_utils import XYZSync
import redis
from config import Config

config = {
    'type': 'event',
    'name': 'VMH XYZ Sync Handler',
    'description': 'Bidirectional sync for XYZ entities',
    'subscribes': [
        'vmh.xyz.create',
        'vmh.xyz.update',
        'vmh.xyz.delete',
        'vmh.xyz.sync_check'
    ],
    'flows': ['vmh']
}

async def handler(event_data, context):
    entity_id = event_data.get('entity_id')
    action = event_data.get('action', 'sync_check')
    
    if not entity_id:
        context.logger.error("No entity_id in event")
        return
    
    # Initialize
    redis_client = redis.Redis(
        host=Config.REDIS_HOST,
        port=int(Config.REDIS_PORT),
        db=int(Config.REDIS_DB_ADVOWARE_CACHE),
        decode_responses=True
    )
    
    espocrm = EspoCRMAPI()
    advoware = AdvowareAPI(context)
    sync_utils = XYZSync(espocrm, redis_client, context)
    mapper = XYZMapper()
    
    try:
        # Acquire lock
        if not await sync_utils.acquire_sync_lock(entity_id):
            context.logger.warning(f"Already syncing: {entity_id}")
            return
        
        # Load entity
        espo_entity = await espocrm.get_entity('XYZ', entity_id)
        advoware_id = espo_entity.get('advowareId')
        
        # Route to handler
        if not advoware_id and action in ['create', 'sync_check']:
            await handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, context)
        elif advoware_id:
            await handle_update(entity_id, advoware_id, espo_entity, espocrm, advoware, sync_utils, mapper, context)
        
    except Exception as e:
        context.logger.error(f"Sync failed: {e}")
        await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)


async def handle_create(entity_id, espo_entity, espocrm, advoware, sync_utils, mapper, context):
    """Create new entity in Advoware"""
    try:
        advo_data = mapper.map_espo_to_advoware(espo_entity)
        
        result = await advoware.api_call(
            'api/v1/advonet/XYZ',
          WICHTIG: Lade Entity nach POST um rowId zu bekommen
        created_entity = await advoware.api_call(
            f'api/v1/advonet/XYZ/{new_id}',
            method='GET'
        )
        new_rowid = created_entity.get('rowId') if isinstance(created_entity, dict) else created_entity[0].get('rowId')
        
        # Combined API call: release lock + save foreign key + rowId
        await sync_utils.release_sync_lock(
            entity_id,
            'clean',
            extra_fields={
                'advowareId': new_id,
                'advowareRowId': new_rowid  # WICHTIG für Change Detection!
            }
        )
        
        context.logger.info(f"✅ Created in Advoware: {new_id} (rowId: {new_rowid[:20]}...)
        # Combined API call: release lock + save foreign key
        await sync_utils.release_sync_lock(
            entity_id,
            'clean',
            extra_fields={'advowareId': new_id}
        )
        
        context.logger.info(f"✅ Created in Advoware: {new_id}")
        entities (rowId-basiert, NICHT nur Timestamps!)
        comparison = sync_utils.compare_entities(espo_entity, advo_entity
async def handle_update(entity_id, advoware_id, espo_entity, espocrm, advoware, sync_utils, mapper, context):
    """Sync existing entity"""
    try:
        # Fetch from Advoware
        advo_result = await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='GET')
        advo_entity = advo_result[0] if isinstance(advo_result, list) else advo_result
        
        if not advo_entity:
            context.logger.error(f"Entity not found in Advoware: {advoware_id}")
            await sync_utils.release_sync_lock(entity_id, 'failed', "Not found in Advoware")
            return
        
        # Compare timestamps
        comparison = sync_utils.compa - Merge EspoCRM  Advoware
        if not espo_entity.get('advowareLastSync'):
            merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
            await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='PUT', data=merged_data)
            
            # Lade Entity nach PUT um neue rowId zu bekommen
            updated_entity = await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='GET')
            new_rowid = updated_entity.get('rowId') if isinstance(updated_entity, dict) else updated_entity[0].get('rowId')
            
            await sync_utils.release_sync_lock(entity_id, 'clean', extra_fields={'advowareRowId': new_rowid}
        
        # Initial sync (no last_sync)
        if not espo_ent  Update Advoware
        if comparison == 'espocrm_newer':
            merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
            await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='PUT', data=merged_data)
            
            # WICHTIG: Lade Entity nach PUT um neue rowId zu bekommen
            updated_entity = await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='GET')
            new_rowid = updated_entity.get('rowId') if isinstance(updated_entity, dict) else updated_entity[0].get('rowId')
            
            await sync_utils.release_sync_lock(entity_id, 'clean', extra_fields={'advowareRowId': new_rowid})
        
        # Advoware newer → Update EspoCRM
        elif comparison == 'advoware_newer':
            espo_data = mapper.map_advoware_to_espo(advo_entity)  # Enthält bereits rowId!
            await espocrm.update_entity('XYZ', entity_id, espo_data)
            await sync_utils.release_sync_lock(entity_id, 'clean')
        
        # Conflict → EspoCRM wins
        elif comparison == 'conflict':
            merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
            await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='PUT', data=merged_data)
            
            # WICHTIG: Auch bei Konflikt rowId aktualisieren
            updated_entity = await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='GET')
            new_rowid = updated_entity.get('rowId') if isinstance(updated_entity, dict) else updated_entity[0].get('rowId')
            
            await sync_utils.send_notification(entity_id, "Conflict resolved: EspoCRM won")
            await sync_utils.release_sync_lock(entity_id, 'clean', extra_fields={'advowareRowId': new_rowid}
        elif comparison == 'advoware_newer':
            espo_data = mapper.map_advoware_to_espo(advo_entity)
            await espocrm.update_entity('XYZ', entity_id, espo_data)
            await sync_utils.release_sync_lock(entity_id, 'clean')
        
        # Conflict → EspoCRM wins
        elif comparison == 'conflict':
            merged_data = sync_utils.merge_for_advoware_put(advo_entity, espo_entity, mapper)
            await advoware.api_call(f'api/v1/advonet/XYZ/{advoware_id}', method='PUT', data=merged_data)
            await sync_utils.send_notification(entity_id, "Conflict resolved: EspoCRM won")
            await sync_utils.release_sync_lock(entity_id, 'clean')
        
    except Exception as e:
        context.logger.error(f"❌ Update failed: {e}")
        await sync_utils.release_sync_lock(entity_id, 'failed', str(e), increment_retry=True)

5. Cron erstellen

# steps/vmh/xyz_sync_cron_step.py
import asyncio
from services.espocrm import EspoCRMAPI
import datetime

config = {
    'type': 'cron',
    'name': 'VMH XYZ Sync Cron',
    'description': 'Check for XYZ entities needing sync',
    'schedule': '*/15 * * * *',  # Every 15 minutes
    'flows': ['vmh'],
    'emits': ['vmh.xyz.sync_check']
}

async def handler(context):
    context.logger.info("🕐 XYZ Sync Cron started")
    
    espocrm = EspoCRMAPI()
    threshold = datetime.datetime.now() - datetime.timedelta(hours=24)
    
    # Find entities needing sync
    unclean_filter = {
        'where': [{
            'type': 'or',
            'value': [
                {'type': 'equals', 'attribute': 'syncStatus', 'value': 'pending_sync'},
                {'type': 'equals', 'attribute': 'syncStatus', 'value': 'dirty'},
                {'type': 'equals', 'attribute': 'syncStatus', 'value': 'failed'},
            ]
        }]
    }
    
    result = await espocrm.search_entities('XYZ', unclean_filter, max_size=100)
    entities = result.get('list', [])
    entity_ids = [e['id'] for e in entities]
    
    context.logger.info(f"Found {len(entity_ids)} entities to sync")
    
    if not entity_ids:
        return
    
    # Batch emit (parallel)
    tasks = [
        context.emit({
            'topic': 'vmh.xyz.sync_check',
            'data': {
                'entity_id': eid,
                'action': 'sync_check',
                'source': 'cron',
                'timestamp': datetime.datetime.now().isoformat()
            }
        })
        for eid in entity_ids
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    success_count = sum(1 for r in results if not isinstance(r, Exception))
    
    context.logger.info(f"✅ Emitted {success_count}/{len(entity_ids)} events")

Best Practices

DO

  • Use Redis distributed lock (atomicity)
  • Combine API calls with extra_fields
  • Use merge_for_advoware_put() utility
  • Implement max retries (5x)
  • Batch emit in cron with asyncio.gather()
  • Map only relevant fields (avoid overhead)
  • Add proper error logging

DON'T

  • Don't use GET-then-PUT for locks (race condition)
  • Don't make unnecessary API calls
  • Don't duplicate merge logic
  • Don't retry infinitely
  • Don't emit events sequentially in cron
  • Don't map every field (performance)
  • Don't swallow exceptions silently
  • Don't rely on Advoware timestamps (nicht vorhanden!)

Architecture Principles

  1. Atomicity: Redis lock + TTL
  2. Efficiency: Combined operations
  3. Reusability: Utility functions
  4. Robustness: Max retries + notifications
  5. Scalability: Batch processing
  6. Maintainability: Clear separation of concerns
  7. Reliability: rowId-basierte Change Detection (EINZIGE Methode)

Change Detection Details

rowId-basierte Erkennung (EINZIGE METHODE)

Warum nur rowId?

  • Advoware liefert KEINE Timestamps (geaendertAm, modifiedAt etc.)
  • Advoware's rowId Feld ändert sich bei jedem Update der Entity
  • Base64-kodierte Binary-ID (~40 Zeichen)
  • Sehr zuverlässig, keine Timezone-Probleme, keine NULL-Werte

Implementierung:

# 1. EspoCRM Feld: advowareRowId (varchar 50)
# 2. Im Mapper IMMER rowId mitmappen:
'advowareRowId': advo_entity.get('rowId')

# 3. Nach JEDEM Sync rowId in EspoCRM speichern:
await sync_utils.release_sync_lock(
    entity_id, 
    'clean', 
    extra_fields={'advowareRowId': new_rowid}
)

# 4. Bei Änderungserkennung:
if espo_rowid != advo_rowid:
    # Advoware wurde geändert!
    if espo_modified > last_sync:
        # Konflikt: Beide Seiten geändert
        return 'conflict'
    else:
        # Nur Advoware geändert
        return 'advoware_newer'

Wichtige Sync-Punkte für rowId:

  • Nach POST (Create) - GET aufrufen um rowId zu laden
  • Nach PUT (EspoCRM → Advoware) - GET aufrufen um neue rowId zu laden
  • Nach PUT (Konfliktlösung) - GET aufrufen um neue rowId zu laden
  • Bei Advoware → EspoCRM (via Mapper) - rowId ist bereits in Advoware Response

WICHTIG: rowId ist PFLICHT für Change Detection! Ohne rowId können Änderungen nicht erkannt werden.

Person vs. Firma Mapping

Unterschiedliche Felder je nach Typ:

# EspoCRM Struktur:
# - Natürliche Person: firstName, lastName (firmenname=None)
# - Firma: firmenname (firstName=None, lastName=None)

def map_advoware_to_espo(advo_entity):
    vorname = advo_entity.get('vorname')
    is_person = bool(vorname and vorname.strip())
    
    if is_person:
        # Natürliche Person
        return {
            'firstName': vorname,
            'lastName': advo_entity.get('name'),
            'name': f"{vorname} {advo_entity.get('name')}".strip(),
            'firmenname': None
        }
    else:
        # Firma
        return {
            'firmenname': advo_entity.get('name'),
            'name': advo_entity.get('name'),
            'firstName': None,
            'lastName': None  # EspoCRM blendet aus bei Firmen
        }

Wichtig: EspoCRM blendet firstName/lastName im Frontend aus wenn firmenname gefüllt ist. Daher sauber trennen!

  • Don't map every field (performance)
  • Don't swallow exceptions silently

Architecture Principles

  1. Atomicity: Redis lock + TTL
  2. Efficiency: Combined operations
  3. Reusability: Utility functions
  4. Robustness: Max retries + notifications
  5. Scalability: Batch processing
  6. Maintainability: Clear separation of concerns

Performance Targets

Metric Target
Single sync latency < 500ms
API calls per operation ≤ 3
Cron execution (100 entities) < 2s
Lock timeout 5 min
Max retries 5

Testing

# Test script template
async def main():
    entity_id = 'test-id'
    espo = EspoCRMAPI()
    
    # Reset entity
    await espo.update_entity('XYZ', entity_id, {
        'advowareLastSync': None,
        'syncStatus': 'clean',
        'syncRetryCount': 0
    })
    
    # Trigger sync
    event_data = {
        'entity_id': entity_id,
        'action': 'sync_check',
        'source': 'test'
    }
    
    await xyz_sync_event_step.handler(event_data, MockContext())
    
    # Verify
    entity_after = await espo.get_entity('XYZ', entity_id)
    assert entity_after['syncStatus'] == 'clean'

Siehe auch