feat: Implement address synchronization between EspoCRM and Advoware

- Add AdressenMapper for transforming addresses between EspoCRM and Advoware formats.
- Create AdressenSync class to handle address creation, update, and deletion synchronization.
- Introduce NotificationManager for managing manual intervention notifications in case of sync issues.
- Implement detailed logging for address sync operations and error handling.
- Ensure READ-ONLY field changes are detected and notified for manual resolution.
This commit is contained in:
2026-02-08 14:29:29 +00:00
parent 68c8b398aa
commit c770f2c8ee
15 changed files with 6427 additions and 0 deletions

View File

@@ -0,0 +1,266 @@
"""
Adressen Mapper: EspoCRM CAdressen ↔ Advoware Adressen
Transformiert Adressen zwischen den beiden Systemen.
Basierend auf ADRESSEN_SYNC_ANALYSE.md Abschnitt 12.
"""
from typing import Dict, Any, Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class AdressenMapper:
"""Mapper für CAdressen (EspoCRM) ↔ Adressen (Advoware)"""
@staticmethod
def map_cadressen_to_advoware_create(espo_addr: Dict[str, Any]) -> Dict[str, Any]:
"""
Transformiert EspoCRM CAdressen → Advoware Adressen Format (CREATE/POST)
Für CREATE werden ALLE 11 Felder gemappt (inkl. READ-ONLY bei PUT).
Args:
espo_addr: CAdressen Entity von EspoCRM
Returns:
Dict für Advoware POST /api/v1/advonet/Beteiligte/{betnr}/Adressen
"""
logger.debug(f"Mapping EspoCRM → Advoware (CREATE): {espo_addr.get('id')}")
# Formatiere Anschrift (mehrzeilig)
anschrift = AdressenMapper._format_anschrift(espo_addr)
advo_data = {
# R/W Felder (via PUT änderbar)
'strasse': espo_addr.get('adresseStreet') or '',
'plz': espo_addr.get('adressePostalCode') or '',
'ort': espo_addr.get('adresseCity') or '',
'anschrift': anschrift,
# READ-ONLY Felder (nur bei CREATE!)
'land': espo_addr.get('adresseCountry') or 'DE',
'postfach': espo_addr.get('postfach'),
'postfachPLZ': espo_addr.get('postfachPLZ'),
'standardAnschrift': bool(espo_addr.get('isPrimary', False)),
'bemerkung': f"EspoCRM-ID: {espo_addr['id']}", # WICHTIG für Matching!
'gueltigVon': AdressenMapper._format_datetime(espo_addr.get('validFrom')),
'gueltigBis': AdressenMapper._format_datetime(espo_addr.get('validUntil'))
}
return advo_data
@staticmethod
def map_cadressen_to_advoware_update(espo_addr: Dict[str, Any]) -> Dict[str, Any]:
"""
Transformiert EspoCRM CAdressen → Advoware Adressen Format (UPDATE/PUT)
Für UPDATE werden NUR die 4 R/W Felder gemappt!
Alle anderen Änderungen müssen über Notifications gehandelt werden.
Args:
espo_addr: CAdressen Entity von EspoCRM
Returns:
Dict für Advoware PUT /api/v1/advonet/Beteiligte/{betnr}/Adressen/{index}
"""
logger.debug(f"Mapping EspoCRM → Advoware (UPDATE): {espo_addr.get('id')}")
# NUR R/W Felder!
advo_data = {
'strasse': espo_addr.get('adresseStreet') or '',
'plz': espo_addr.get('adressePostalCode') or '',
'ort': espo_addr.get('adresseCity') or '',
'anschrift': AdressenMapper._format_anschrift(espo_addr)
}
return advo_data
@staticmethod
def map_advoware_to_cadressen(advo_addr: Dict[str, Any],
beteiligte_id: str,
existing_espo_addr: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Transformiert Advoware Adressen → EspoCRM CAdressen Format
Args:
advo_addr: Adresse von Advoware GET
beteiligte_id: EspoCRM CBeteiligte ID (für Relation)
existing_espo_addr: Existierende EspoCRM Entity (für Update)
Returns:
Dict für EspoCRM API
"""
logger.debug(f"Mapping Advoware → EspoCRM: Index {advo_addr.get('reihenfolgeIndex')}")
espo_data = {
# Core Adressfelder
'adresseStreet': advo_addr.get('strasse'),
'adressePostalCode': advo_addr.get('plz'),
'adresseCity': advo_addr.get('ort'),
'adresseCountry': advo_addr.get('land') or 'DE',
# Zusatzfelder
'postfach': advo_addr.get('postfach'),
'postfachPLZ': advo_addr.get('postfachPLZ'),
'description': advo_addr.get('bemerkung'),
# Status-Felder
'isPrimary': bool(advo_addr.get('standardAnschrift', False)),
'validFrom': advo_addr.get('gueltigVon'),
'validUntil': advo_addr.get('gueltigBis'),
# Sync-Felder
'advowareRowId': advo_addr.get('rowId'),
'advowareLastSync': datetime.now().isoformat(),
'syncStatus': 'synced',
# Relation
'beteiligteId': beteiligte_id
}
# Preserve existing fields when updating
if existing_espo_addr:
espo_data['id'] = existing_espo_addr['id']
# Keep existing isActive if not changed
if 'isActive' in existing_espo_addr:
espo_data['isActive'] = existing_espo_addr['isActive']
else:
# New address
espo_data['isActive'] = True
return espo_data
@staticmethod
def detect_readonly_changes(espo_addr: Dict[str, Any],
advo_addr: Dict[str, Any]) -> list[Dict[str, Any]]:
"""
Erkenne Änderungen an READ-ONLY Feldern (nicht via PUT änderbar)
Args:
espo_addr: EspoCRM CAdressen Entity
advo_addr: Advoware Adresse
Returns:
Liste von Änderungen mit Feldnamen und Werten
"""
changes = []
# Mapping: EspoCRM-Feld → (Advoware-Feld, Label)
readonly_mappings = {
'adresseCountry': ('land', 'Land'),
'postfach': ('postfach', 'Postfach'),
'postfachPLZ': ('postfachPLZ', 'Postfach PLZ'),
'isPrimary': ('standardAnschrift', 'Hauptadresse'),
'validFrom': ('gueltigVon', 'Gültig von'),
'validUntil': ('gueltigBis', 'Gültig bis')
}
for espo_field, (advo_field, label) in readonly_mappings.items():
espo_value = espo_addr.get(espo_field)
advo_value = advo_addr.get(advo_field)
# Normalisiere Werte für Vergleich
if espo_field == 'isPrimary':
espo_value = bool(espo_value)
advo_value = bool(advo_value)
elif espo_field in ['validFrom', 'validUntil']:
# Datetime-Vergleich (nur Datum)
espo_value = AdressenMapper._normalize_date(espo_value)
advo_value = AdressenMapper._normalize_date(advo_value)
# Vergleiche
if espo_value != advo_value:
changes.append({
'field': label,
'espoField': espo_field,
'advoField': advo_field,
'espoCRM_value': espo_value,
'advoware_value': advo_value
})
return changes
@staticmethod
def _format_anschrift(espo_addr: Dict[str, Any]) -> str:
"""
Formatiert mehrzeilige Anschrift für Advoware
Format:
{Firmenname oder Name}
{Strasse}
{PLZ} {Ort}
"""
parts = []
# Zeile 1: Name
if espo_addr.get('firmenname'):
parts.append(espo_addr['firmenname'])
elif espo_addr.get('firstName') or espo_addr.get('lastName'):
name = f"{espo_addr.get('firstName', '')} {espo_addr.get('lastName', '')}".strip()
if name:
parts.append(name)
# Zeile 2: Straße
if espo_addr.get('adresseStreet'):
parts.append(espo_addr['adresseStreet'])
# Zeile 3: PLZ + Ort
plz = espo_addr.get('adressePostalCode', '').strip()
ort = espo_addr.get('adresseCity', '').strip()
if plz or ort:
parts.append(f"{plz} {ort}".strip())
return '\n'.join(parts)
@staticmethod
def _format_datetime(dt: Any) -> Optional[str]:
"""
Formatiert Datetime für Advoware API (ISO 8601)
Args:
dt: datetime object, ISO string, oder None
Returns:
ISO 8601 string oder None
"""
if not dt:
return None
if isinstance(dt, str):
# Bereits String - prüfe ob gültig
try:
datetime.fromisoformat(dt.replace('Z', '+00:00'))
return dt
except:
return None
if isinstance(dt, datetime):
return dt.isoformat()
return None
@staticmethod
def _normalize_date(dt: Any) -> Optional[str]:
"""
Normalisiert Datum für Vergleich (nur Datum, keine Zeit)
Returns:
YYYY-MM-DD string oder None
"""
if not dt:
return None
if isinstance(dt, str):
try:
dt_obj = datetime.fromisoformat(dt.replace('Z', '+00:00'))
return dt_obj.strftime('%Y-%m-%d')
except:
return None
if isinstance(dt, datetime):
return dt.strftime('%Y-%m-%d')
return None

View File

@@ -0,0 +1,514 @@
"""
Adressen Synchronization: EspoCRM ↔ Advoware
Synchronisiert CAdressen zwischen EspoCRM und Advoware.
Basierend auf ADRESSEN_SYNC_ANALYSE.md Abschnitt 12.
SYNC-STRATEGIE:
- CREATE: Vollautomatisch (alle 11 Felder)
- UPDATE: Nur R/W Felder (strasse, plz, ort, anschrift)
- DELETE: Nur via Notification (kein API-DELETE verfügbar)
- READ-ONLY Änderungen: Nur via Notification
"""
from typing import Dict, Any, Optional, List
from datetime import datetime
import logging
from services.advoware import AdvowareAPI
from services.espocrm import EspoCRMAPI
from services.adressen_mapper import AdressenMapper
from services.notification_utils import NotificationManager
logger = logging.getLogger(__name__)
class AdressenSync:
"""Sync-Klasse für Adressen zwischen EspoCRM und Advoware"""
def __init__(self, context=None):
"""
Initialize AdressenSync
Args:
context: Application context mit logger
"""
self.context = context
self.advo = AdvowareAPI(context=context)
self.espo = EspoCRMAPI(context=context)
self.mapper = AdressenMapper()
self.notification_manager = NotificationManager(espocrm_api=self.espo, context=context)
# ========================================================================
# CREATE: EspoCRM → Advoware
# ========================================================================
async def create_address(self, espo_addr: Dict[str, Any], betnr: int) -> Optional[Dict[str, Any]]:
"""
Erstelle neue Adresse in Advoware
Alle 11 Felder werden synchronisiert (inkl. READ-ONLY).
Args:
espo_addr: CAdressen Entity von EspoCRM
betnr: Advoware Beteiligte-Nummer
Returns:
Erstellte Adresse oder None bei Fehler
"""
try:
espo_id = espo_addr['id']
logger.info(f"Creating address in Advoware for EspoCRM ID {espo_id}, BetNr {betnr}")
# Map zu Advoware Format (alle Felder)
advo_data = self.mapper.map_cadressen_to_advoware_create(espo_addr)
# POST zu Advoware
result = await self.advo.api_call(
f'/api/v1/advonet/Beteiligte/{betnr}/Adressen',
method='POST',
json_data=advo_data
)
# POST gibt Array zurück, nimm erste Adresse
if isinstance(result, list) and result:
created_addr = result[0]
else:
created_addr = result
logger.info(
f"✓ Created address in Advoware: "
f"Index {created_addr.get('reihenfolgeIndex')}, "
f"EspoCRM ID {espo_id}"
)
# Update EspoCRM mit Sync-Info
await self._update_espo_sync_info(espo_id, created_addr, 'synced')
return created_addr
except Exception as e:
logger.error(f"Failed to create address: {e}", exc_info=True)
# Update syncStatus
await self._update_espo_sync_status(espo_addr['id'], 'error')
return None
# ========================================================================
# UPDATE: EspoCRM → Advoware (nur R/W Felder)
# ========================================================================
async def update_address(self, espo_addr: Dict[str, Any], betnr: int) -> Optional[Dict[str, Any]]:
"""
Update Adresse in Advoware (nur R/W Felder)
Nur strasse, plz, ort, anschrift werden geändert.
Alle anderen Änderungen → Notification.
Args:
espo_addr: CAdressen Entity von EspoCRM
betnr: Advoware Beteiligte-Nummer
Returns:
Aktualisierte Adresse oder None bei Fehler
"""
try:
espo_id = espo_addr['id']
logger.info(f"Updating address in Advoware for EspoCRM ID {espo_id}, BetNr {betnr}")
# 1. Finde Adresse in Advoware via bemerkung (EINZIGE stabile Methode)
target = await self._find_address_by_espo_id(betnr, espo_id)
if not target:
logger.warning(f"Address not found in Advoware: {espo_id} - creating new")
return await self.create_address(espo_addr, betnr)
# 2. Map nur R/W Felder
rw_data = self.mapper.map_cadressen_to_advoware_update(espo_addr)
# 3. PUT mit aktuellem reihenfolgeIndex (dynamisch!)
current_index = target['reihenfolgeIndex']
result = await self.advo.api_call(
f'/api/v1/advonet/Beteiligte/{betnr}/Adressen/{current_index}',
method='PUT',
json_data=rw_data
)
logger.info(
f"✓ Updated address in Advoware (R/W fields): "
f"Index {current_index}, EspoCRM ID {espo_id}"
)
# 4. Prüfe READ-ONLY Feld-Änderungen
readonly_changes = self.mapper.detect_readonly_changes(espo_addr, target)
if readonly_changes:
logger.warning(
f"⚠ READ-ONLY fields changed for {espo_id}: "
f"{len(readonly_changes)} fields"
)
await self._notify_readonly_changes(espo_addr, betnr, readonly_changes)
# 5. Update EspoCRM mit Sync-Info
await self._update_espo_sync_info(espo_id, result, 'synced')
return result
except Exception as e:
logger.error(f"Failed to update address: {e}", exc_info=True)
# Update syncStatus
await self._update_espo_sync_status(espo_addr['id'], 'error')
return None
# ========================================================================
# DELETE: EspoCRM → Advoware (nur Notification)
# ========================================================================
async def handle_address_deletion(self, espo_addr: Dict[str, Any], betnr: int) -> bool:
"""
Handle Adress-Löschung (nur Notification)
Kein API-DELETE verfügbar → Manuelle Löschung erforderlich.
Args:
espo_addr: Gelöschte CAdressen Entity von EspoCRM
betnr: Advoware Beteiligte-Nummer
Returns:
True wenn Notification erfolgreich
"""
try:
espo_id = espo_addr['id']
logger.info(f"Handling address deletion for EspoCRM ID {espo_id}, BetNr {betnr}")
# 1. Finde Adresse in Advoware
target = await self._find_address_by_espo_id(betnr, espo_id)
if not target:
logger.info(f"Address already deleted or not found: {espo_id}")
return True
# 2. Erstelle Notification für manuelle Löschung
await self.notification_manager.notify_manual_action_required(
entity_type='CAdressen',
entity_id=espo_id,
action_type='address_delete_required',
details={
'message': 'Adresse in Advoware löschen',
'description': (
f'Adresse wurde in EspoCRM gelöscht:\n'
f'{target.get("strasse")}\n'
f'{target.get("plz")} {target.get("ort")}\n\n'
f'Bitte manuell in Advoware löschen:\n'
f'1. Öffne Beteiligten {betnr} in Advoware\n'
f'2. Gehe zu Adressen-Tab\n'
f'3. Lösche Adresse (Index {target.get("reihenfolgeIndex")})\n'
f'4. Speichern'
),
'advowareIndex': target.get('reihenfolgeIndex'),
'betnr': betnr,
'address': f"{target.get('strasse')}, {target.get('ort')}",
'priority': 'Medium'
}
)
logger.info(f"✓ Created delete notification for address {espo_id}")
return True
except Exception as e:
logger.error(f"Failed to handle address deletion: {e}", exc_info=True)
return False
# ========================================================================
# SYNC: Advoware → EspoCRM (vollständig)
# ========================================================================
async def sync_from_advoware(self, betnr: int, espo_beteiligte_id: str) -> Dict[str, int]:
"""
Synct alle Adressen von Advoware zu EspoCRM
Alle Felder werden übernommen (Advoware = Master).
Args:
betnr: Advoware Beteiligte-Nummer
espo_beteiligte_id: EspoCRM CBeteiligte ID
Returns:
Dict mit Statistiken: created, updated, unchanged
"""
stats = {'created': 0, 'updated': 0, 'unchanged': 0, 'errors': 0}
try:
logger.info(f"Syncing addresses from Advoware BetNr {betnr} → EspoCRM {espo_beteiligte_id}")
# 1. Hole alle Adressen von Advoware
advo_addresses = await self.advo.api_call(
f'/api/v1/advonet/Beteiligte/{betnr}/Adressen',
method='GET'
)
logger.info(f"Found {len(advo_addresses)} addresses in Advoware")
# 2. Hole existierende EspoCRM Adressen
import json
espo_addresses = await self.espo.list_entities(
'CAdressen',
where=json.dumps([{
'type': 'equals',
'attribute': 'beteiligteId',
'value': espo_beteiligte_id
}])
)
espo_addrs_by_id = {addr['id']: addr for addr in espo_addresses.get('list', [])}
# 3. Sync jede Adresse
for advo_addr in advo_addresses:
try:
# Match via bemerkung
bemerkung = advo_addr.get('bemerkung', '')
if 'EspoCRM-ID:' in bemerkung:
# Existierende Adresse
espo_id = bemerkung.split('EspoCRM-ID:')[1].strip().split()[0]
if espo_id in espo_addrs_by_id:
# Update
result = await self._update_espo_address(
espo_id,
advo_addr,
espo_beteiligte_id,
espo_addrs_by_id[espo_id]
)
if result:
stats['updated'] += 1
else:
stats['errors'] += 1
else:
logger.warning(f"EspoCRM address not found: {espo_id}")
stats['errors'] += 1
else:
# Neue Adresse aus Advoware (kein EspoCRM-ID)
result = await self._create_espo_address(advo_addr, espo_beteiligte_id)
if result:
stats['created'] += 1
else:
stats['errors'] += 1
except Exception as e:
logger.error(f"Failed to sync address: {e}", exc_info=True)
stats['errors'] += 1
logger.info(
f"✓ Sync complete: "
f"created={stats['created']}, "
f"updated={stats['updated']}, "
f"errors={stats['errors']}"
)
return stats
except Exception as e:
logger.error(f"Failed to sync from Advoware: {e}", exc_info=True)
return stats
# ========================================================================
# HELPER METHODS
# ========================================================================
async def _find_address_by_espo_id(self, betnr: int, espo_id: str) -> Optional[Dict[str, Any]]:
"""
Finde Adresse in Advoware via bemerkung-Matching
Args:
betnr: Advoware Beteiligte-Nummer
espo_id: EspoCRM CAdressen ID
Returns:
Advoware Adresse oder None
"""
try:
all_addresses = await self.advo.api_call(
f'/api/v1/advonet/Beteiligte/{betnr}/Adressen',
method='GET'
)
bemerkung_match = f"EspoCRM-ID: {espo_id}"
target = next(
(a for a in all_addresses
if bemerkung_match in (a.get('bemerkung') or '')),
None
)
return target
except Exception as e:
logger.error(f"Failed to find address: {e}", exc_info=True)
return None
async def _update_espo_sync_info(self, espo_id: str, advo_addr: Dict[str, Any],
status: str = 'synced') -> bool:
"""
Update Sync-Info in EspoCRM CAdressen
Args:
espo_id: EspoCRM CAdressen ID
advo_addr: Advoware Adresse (für rowId)
status: syncStatus (nicht verwendet, da EspoCRM-Feld möglicherweise nicht existiert)
Returns:
True wenn erfolgreich
"""
try:
update_data = {
'advowareRowId': advo_addr.get('rowId'),
'advowareLastSync': datetime.now().isoformat()
# syncStatus removed - Feld existiert möglicherweise nicht
}
result = await self.espo.update_entity('CAdressen', espo_id, update_data)
return bool(result)
except Exception as e:
logger.error(f"Failed to update sync info: {e}", exc_info=True)
return False
async def _update_espo_sync_status(self, espo_id: str, status: str) -> bool:
"""
Update nur syncStatus in EspoCRM (optional - Feld möglicherweise nicht vorhanden)
Args:
espo_id: EspoCRM CAdressen ID
status: syncStatus ('error', 'pending', etc.)
Returns:
True wenn erfolgreich
"""
try:
# Feld möglicherweise nicht vorhanden - ignoriere Fehler
result = await self.espo.update_entity(
'CAdressen',
espo_id,
{'description': f'Sync-Status: {status}'} # Als Workaround in description
)
return bool(result)
except Exception as e:
logger.error(f"Failed to update sync status: {e}", exc_info=True)
return False
async def _notify_readonly_changes(self, espo_addr: Dict[str, Any], betnr: int,
changes: List[Dict[str, Any]]) -> bool:
"""
Erstelle Notification für READ-ONLY Feld-Änderungen
Args:
espo_addr: EspoCRM CAdressen Entity
betnr: Advoware Beteiligte-Nummer
changes: Liste von Änderungen
Returns:
True wenn Notification erfolgreich
"""
try:
change_details = '\n'.join([
f"- {c['field']}: EspoCRM='{c['espoCRM_value']}'"
f"Advoware='{c['advoware_value']}'"
for c in changes
])
await self.notification_manager.notify_manual_action_required(
entity_type='CAdressen',
entity_id=espo_addr['id'],
action_type='readonly_field_conflict',
details={
'message': f'{len(changes)} READ-ONLY Feld(er) geändert',
'description': (
f'Folgende Felder wurden in EspoCRM geändert, sind aber '
f'READ-ONLY in Advoware und können nicht automatisch '
f'synchronisiert werden:\n\n{change_details}\n\n'
f'Bitte manuell in Advoware anpassen:\n'
f'1. Öffne Beteiligten {betnr} in Advoware\n'
f'2. Gehe zu Adressen-Tab\n'
f'3. Passe die Felder manuell an\n'
f'4. Speichern'
),
'changes': changes,
'address': f"{espo_addr.get('adresseStreet')}, "
f"{espo_addr.get('adresseCity')}",
'betnr': betnr,
'priority': 'High'
}
)
return True
except Exception as e:
logger.error(f"Failed to create notification: {e}", exc_info=True)
return False
async def _create_espo_address(self, advo_addr: Dict[str, Any],
beteiligte_id: str) -> Optional[str]:
"""
Erstelle neue Adresse in EspoCRM
Args:
advo_addr: Advoware Adresse
beteiligte_id: EspoCRM CBeteiligte ID
Returns:
EspoCRM ID oder None
"""
try:
espo_data = self.mapper.map_advoware_to_cadressen(advo_addr, beteiligte_id)
result = await self.espo.create_entity('CAdressen', espo_data)
if result and 'id' in result:
logger.info(f"✓ Created address in EspoCRM: {result['id']}")
return result['id']
return None
except Exception as e:
logger.error(f"Failed to create EspoCRM address: {e}", exc_info=True)
return None
async def _update_espo_address(self, espo_id: str, advo_addr: Dict[str, Any],
beteiligte_id: str,
existing: Dict[str, Any]) -> bool:
"""
Update existierende Adresse in EspoCRM
Args:
espo_id: EspoCRM CAdressen ID
advo_addr: Advoware Adresse
beteiligte_id: EspoCRM CBeteiligte ID
existing: Existierende EspoCRM Entity
Returns:
True wenn erfolgreich
"""
try:
espo_data = self.mapper.map_advoware_to_cadressen(
advo_addr,
beteiligte_id,
existing
)
result = await self.espo.update_entity('CAdressen', espo_id, espo_data)
if result:
logger.info(f"✓ Updated address in EspoCRM: {espo_id}")
return True
return False
except Exception as e:
logger.error(f"Failed to update EspoCRM address: {e}", exc_info=True)
return False

View File

@@ -0,0 +1,412 @@
"""
Zentrale Notification-Utilities für manuelle Eingriffe
=======================================================
Wenn Advoware-API-Limitierungen existieren (z.B. READ-ONLY Felder),
werden Notifications in EspoCRM erstellt, damit User manuelle Eingriffe
vornehmen können.
Features:
- Notifications an assigned Users
- Task-Erstellung für manuelle Eingriffe
- Zentrale Verwaltung aller Notification-Types
"""
from typing import Dict, Any, Optional, Literal, List
from datetime import datetime, timedelta
import logging
class NotificationManager:
"""
Zentrale Klasse für Notifications bei Sync-Problemen
"""
def __init__(self, espocrm_api, context=None):
"""
Args:
espocrm_api: EspoCRMAPI instance
context: Optional context für Logging
"""
self.espocrm = espocrm_api
self.context = context
self.logger = context.logger if context else logging.getLogger(__name__)
async def notify_manual_action_required(
self,
entity_type: str,
entity_id: str,
action_type: Literal[
"address_delete_required",
"address_reactivate_required",
"address_field_update_required",
"readonly_field_conflict",
"missing_in_advoware",
"general_manual_action"
],
details: Dict[str, Any],
assigned_user_id: Optional[str] = None,
create_task: bool = True
) -> Dict[str, str]:
"""
Erstellt Notification und optional Task für manuelle Eingriffe
Args:
entity_type: EspoCRM Entity Type (z.B. 'CAdressen', 'CBeteiligte')
entity_id: Entity ID in EspoCRM
action_type: Art der manuellen Aktion
details: Detaillierte Informationen
assigned_user_id: User der benachrichtigt werden soll (optional)
create_task: Ob zusätzlich ein Task erstellt werden soll
Returns:
Dict mit notification_id und optional task_id
"""
try:
# Hole Entity-Daten
entity = await self.espocrm.get_entity(entity_type, entity_id)
entity_name = entity.get('name', f"{entity_type} {entity_id}")
# Falls kein assigned_user, versuche aus Entity zu holen
if not assigned_user_id:
assigned_user_id = entity.get('assignedUserId')
# Erstelle Notification
notification_data = self._build_notification_message(
action_type, entity_type, entity_name, details
)
notification_id = await self._create_notification(
user_id=assigned_user_id,
message=notification_data['message'],
entity_type=entity_type,
entity_id=entity_id
)
result = {'notification_id': notification_id}
# Optional: Task erstellen
if create_task:
task_id = await self._create_task(
name=notification_data['task_name'],
description=notification_data['task_description'],
parent_type=entity_type,
parent_id=entity_id,
assigned_user_id=assigned_user_id,
priority=notification_data['priority']
)
result['task_id'] = task_id
self.logger.info(
f"Manual action notification created: {action_type} for "
f"{entity_type}/{entity_id}"
)
return result
except Exception as e:
self.logger.error(f"Failed to create notification: {e}")
raise
def _build_notification_message(
self,
action_type: str,
entity_type: str,
entity_name: str,
details: Dict[str, Any]
) -> Dict[str, str]:
"""
Erstellt Notification-Message basierend auf Action-Type
Returns:
Dict mit 'message', 'task_name', 'task_description', 'priority'
"""
if action_type == "address_delete_required":
return {
'message': (
f"🗑️ Adresse in Advoware löschen erforderlich\n"
f"Adresse: {entity_name}\n"
f"Grund: Advoware API unterstützt kein DELETE und gueltigBis ist READ-ONLY\n"
f"Bitte manuell in Advoware löschen oder deaktivieren."
),
'task_name': f"Adresse in Advoware löschen: {entity_name}",
'task_description': (
f"MANUELLE AKTION ERFORDERLICH\n\n"
f"Adresse: {entity_name}\n"
f"BetNr: {details.get('betnr', 'N/A')}\n"
f"Adresse: {details.get('strasse', '')}, {details.get('plz', '')} {details.get('ort', '')}\n\n"
f"GRUND:\n"
f"- DELETE API nicht verfügbar (403 Forbidden)\n"
f"- gueltigBis ist READ-ONLY (kann nicht nachträglich gesetzt werden)\n\n"
f"AKTION:\n"
f"1. In Advoware Web-Interface einloggen\n"
f"2. Beteiligten mit BetNr {details.get('betnr', 'N/A')} öffnen\n"
f"3. Adresse suchen: {details.get('strasse', '')}\n"
f"4. Adresse löschen oder deaktivieren\n\n"
f"Nach Erledigung: Task als 'Completed' markieren."
),
'priority': 'Normal'
}
elif action_type == "address_reactivate_required":
return {
'message': (
f"♻️ Adresse-Reaktivierung in Advoware erforderlich\n"
f"Adresse: {entity_name}\n"
f"Grund: gueltigBis kann nicht nachträglich geändert werden\n"
f"Bitte neue Adresse in Advoware erstellen."
),
'task_name': f"Neue Adresse in Advoware erstellen: {entity_name}",
'task_description': (
f"MANUELLE AKTION ERFORDERLICH\n\n"
f"Adresse: {entity_name}\n"
f"BetNr: {details.get('betnr', 'N/A')}\n\n"
f"GRUND:\n"
f"Diese Adresse wurde reaktiviert, aber die alte Adresse in Advoware "
f"ist abgelaufen (gueltigBis in Vergangenheit). Da gueltigBis READ-ONLY ist, "
f"muss eine neue Adresse erstellt werden.\n\n"
f"AKTION:\n"
f"1. In Advoware Web-Interface einloggen\n"
f"2. Beteiligten mit BetNr {details.get('betnr', 'N/A')} öffnen\n"
f"3. Neue Adresse erstellen:\n"
f" - Straße: {details.get('strasse', '')}\n"
f" - PLZ: {details.get('plz', '')}\n"
f" - Ort: {details.get('ort', '')}\n"
f" - Land: {details.get('land', '')}\n"
f" - Bemerkung: EspoCRM-ID: {details.get('espocrm_id', '')}\n"
f"4. Sync erneut durchführen, damit Mapping aktualisiert wird\n\n"
f"Nach Erledigung: Task als 'Completed' markieren."
),
'priority': 'Normal'
}
elif action_type == "address_field_update_required":
readonly_fields = details.get('readonly_fields', [])
return {
'message': (
f"⚠️ Adressfelder in Advoware können nicht aktualisiert werden\n"
f"Adresse: {entity_name}\n"
f"READ-ONLY Felder: {', '.join(readonly_fields)}\n"
f"Bitte manuell in Advoware ändern."
),
'task_name': f"Adressfelder in Advoware aktualisieren: {entity_name}",
'task_description': (
f"MANUELLE AKTION ERFORDERLICH\n\n"
f"Adresse: {entity_name}\n"
f"BetNr: {details.get('betnr', 'N/A')}\n\n"
f"GRUND:\n"
f"Folgende Felder sind in Advoware API READ-ONLY und können nicht "
f"via PUT geändert werden:\n"
f"- {', '.join(readonly_fields)}\n\n"
f"GEWÜNSCHTE ÄNDERUNGEN:\n" +
'\n'.join([f" - {k}: {v}" for k, v in details.get('changes', {}).items()]) +
f"\n\nAKTION:\n"
f"1. In Advoware Web-Interface einloggen\n"
f"2. Beteiligten mit BetNr {details.get('betnr', 'N/A')} öffnen\n"
f"3. Adresse suchen und obige Felder manuell ändern\n"
f"4. Sync erneut durchführen zur Bestätigung\n\n"
f"Nach Erledigung: Task als 'Completed' markieren."
),
'priority': 'Low'
}
elif action_type == "readonly_field_conflict":
return {
'message': (
f"⚠️ Sync-Konflikt bei READ-ONLY Feldern\n"
f"{entity_type}: {entity_name}\n"
f"Änderungen konnten nicht synchronisiert werden."
),
'task_name': f"Sync-Konflikt prüfen: {entity_name}",
'task_description': (
f"SYNC-KONFLIKT\n\n"
f"{entity_type}: {entity_name}\n\n"
f"PROBLEM:\n"
f"Felder wurden in EspoCRM geändert, sind aber in Advoware READ-ONLY.\n\n"
f"BETROFFENE FELDER:\n" +
'\n'.join([f" - {k}: {v}" for k, v in details.get('conflicts', {}).items()]) +
f"\n\nOPTIONEN:\n"
f"1. Änderungen in EspoCRM rückgängig machen (Advoware = Master)\n"
f"2. Änderungen manuell in Advoware vornehmen\n"
f"3. Feld als 'nicht synchronisiert' akzeptieren\n\n"
f"Nach Entscheidung: Task als 'Completed' markieren."
),
'priority': 'Normal'
}
elif action_type == "missing_in_advoware":
return {
'message': (
f"❓ Element fehlt in Advoware\n"
f"{entity_type}: {entity_name}\n"
f"Bitte manuell in Advoware erstellen."
),
'task_name': f"In Advoware erstellen: {entity_name}",
'task_description': (
f"MANUELLE AKTION ERFORDERLICH\n\n"
f"{entity_type}: {entity_name}\n\n"
f"GRUND:\n"
f"Dieses Element existiert in EspoCRM, aber nicht in Advoware.\n"
f"Möglicherweise wurde es direkt in EspoCRM erstellt.\n\n"
f"DATEN:\n" +
'\n'.join([f" - {k}: {v}" for k, v in details.items() if k != 'espocrm_id']) +
f"\n\nAKTION:\n"
f"1. In Advoware Web-Interface einloggen\n"
f"2. Element mit obigen Daten manuell erstellen\n"
f"3. Sync erneut durchführen für Mapping\n\n"
f"Nach Erledigung: Task als 'Completed' markieren."
),
'priority': 'Normal'
}
else: # general_manual_action
return {
'message': (
f"🔧 Manuelle Aktion erforderlich\n"
f"{entity_type}: {entity_name}\n"
f"{details.get('message', 'Bitte prüfen.')}"
),
'task_name': f"Manuelle Aktion: {entity_name}",
'task_description': (
f"MANUELLE AKTION ERFORDERLICH\n\n"
f"{entity_type}: {entity_name}\n\n"
f"{details.get('description', 'Keine Details verfügbar.')}"
),
'priority': details.get('priority', 'Normal')
}
async def _create_notification(
self,
user_id: Optional[str],
message: str,
entity_type: str,
entity_id: str
) -> str:
"""
Erstellt EspoCRM Notification (In-App)
Returns:
notification_id
"""
if not user_id:
self.logger.warning("No user assigned - notification not created")
return None
notification_data = {
'type': 'Message',
'message': message,
'userId': user_id,
'relatedType': entity_type,
'relatedId': entity_id,
'read': False
}
try:
result = await self.espocrm.create_entity('Notification', notification_data)
return result.get('id')
except Exception as e:
self.logger.error(f"Failed to create notification: {e}")
return None
async def _create_task(
self,
name: str,
description: str,
parent_type: str,
parent_id: str,
assigned_user_id: Optional[str],
priority: str = 'Normal'
) -> str:
"""
Erstellt EspoCRM Task
Returns:
task_id
"""
# Due Date: 7 Tage in Zukunft
due_date = (datetime.now() + timedelta(days=7)).strftime('%Y-%m-%d')
task_data = {
'name': name,
'description': description,
'status': 'Not Started',
'priority': priority,
'dateEnd': due_date,
'parentType': parent_type,
'parentId': parent_id,
'assignedUserId': assigned_user_id
}
try:
result = await self.espocrm.create_entity('Task', task_data)
return result.get('id')
except Exception as e:
self.logger.error(f"Failed to create task: {e}")
return None
async def resolve_task(self, task_id: str) -> bool:
"""
Markiert Task als erledigt
Args:
task_id: Task ID
Returns:
True wenn erfolgreich
"""
try:
await self.espocrm.update_entity('Task', task_id, {
'status': 'Completed'
})
return True
except Exception as e:
self.logger.error(f"Failed to complete task {task_id}: {e}")
return False
# Helper-Funktionen für häufige Use-Cases
async def notify_address_delete_required(
notification_manager: NotificationManager,
address_entity_id: str,
betnr: str,
address_data: Dict[str, Any]
) -> Dict[str, str]:
"""
Shortcut: Notification für Adresse löschen
"""
return await notification_manager.notify_manual_action_required(
entity_type='CAdressen',
entity_id=address_entity_id,
action_type='address_delete_required',
details={
'betnr': betnr,
'strasse': address_data.get('adresseStreet'),
'plz': address_data.get('adressePostalCode'),
'ort': address_data.get('adresseCity'),
'espocrm_id': address_entity_id
}
)
async def notify_address_readonly_fields(
notification_manager: NotificationManager,
address_entity_id: str,
betnr: str,
readonly_fields: List[str],
changes: Dict[str, Any]
) -> Dict[str, str]:
"""
Shortcut: Notification für READ-ONLY Felder
"""
return await notification_manager.notify_manual_action_required(
entity_type='CAdressen',
entity_id=address_entity_id,
action_type='address_field_update_required',
details={
'betnr': betnr,
'readonly_fields': readonly_fields,
'changes': changes
}
)