Add tests for Kommunikation Sync implementation and verification scripts

- Implemented comprehensive tests for the Kommunikation Sync functionality, covering base64 encoding, marker parsing, creation, type detection, and integration scenarios.
- Added a verification script to check for unique IDs in Advoware communications, ensuring stability and integrity of the IDs.
- Created utility scripts for code validation, notification testing, and PUT response detail analysis to enhance development and testing processes.
- Updated README with details on new tools and their usage.
This commit is contained in:
2026-02-08 23:05:56 +00:00
parent a157d3fa1d
commit 7856dd1d68
37 changed files with 438 additions and 271 deletions

View File

@@ -0,0 +1,48 @@
# Tools & Utilities
Allgemeine Utilities für Entwicklung und Testing.
## Scripts
### validate_code.py
Code-Validierung Tool.
**Features:**
- Syntax-Check für Python Files
- Import-Check
- Error-Detection
**Verwendung:**
```bash
cd /opt/motia-app/bitbylaw
python scripts/tools/validate_code.py services/kommunikation_sync_utils.py
python scripts/tools/validate_code.py steps/vmh/beteiligte_sync_event_step.py
```
**Output:**
```
✅ File validated successfully: 0 Errors
```
### test_notification.py
Test für EspoCRM Notification System.
**Testet:**
- Notification Creation
- User Assignment
- Notification Types
### test_put_response_detail.py
Analysiert PUT Response Details von Advoware.
**Testet:**
- Response Structure
- rowId Changes
- Returned Fields
## Verwendung
```bash
cd /opt/motia-app/bitbylaw
python scripts/tools/validate_code.py <file_path>
```

View File

@@ -0,0 +1,252 @@
#!/usr/bin/env python3
"""
Test: Notification System
==========================
Sendet testweise Notifications an EspoCRM:
1. Task-Erstellung
2. In-App Notification
3. READ-ONLY Field Conflict
"""
import asyncio
import sys
import os
from datetime import datetime
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from services.notification_utils import NotificationManager
from services.espocrm import EspoCRMAPI
BOLD = '\033[1m'
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RESET = '\033[0m'
def print_success(text):
print(f"{GREEN}{text}{RESET}")
def print_error(text):
print(f"{RED}{text}{RESET}")
def print_info(text):
print(f"{BLUE} {text}{RESET}")
def print_section(title):
print(f"\n{BOLD}{'='*70}{RESET}")
print(f"{BOLD}{title}{RESET}")
print(f"{BOLD}{'='*70}{RESET}\n")
class SimpleLogger:
def debug(self, msg): pass
def info(self, msg): print(f"[INFO] {msg}")
def warning(self, msg): print(f"{YELLOW}[WARN] {msg}{RESET}")
def error(self, msg): print(f"{RED}[ERROR] {msg}{RESET}")
class SimpleContext:
def __init__(self):
self.logger = SimpleLogger()
async def main():
print_section("TEST: Notification System")
context = SimpleContext()
espo = EspoCRMAPI(context=context)
notification_mgr = NotificationManager(espocrm_api=espo, context=context)
# Finde echte Test-Adresse
print_info("Suche Test-Adresse in EspoCRM...")
import json
addresses = await espo.list_entities(
'CAdressen',
where=json.dumps([{
'type': 'contains',
'attribute': 'name',
'value': 'SYNC-TEST'
}]),
max_size=1
)
if not addresses.get('list'):
print_error("Keine SYNC-TEST Adresse gefunden - erstelle eine...")
# Hole Beteiligten
beteiligte = await espo.list_entities(
'CBeteiligte',
where=json.dumps([{
'type': 'equals',
'attribute': 'betNr',
'value': '104860'
}]),
max_size=1
)
if not beteiligte.get('list'):
print_error("Beteiligter nicht gefunden!")
return
# Erstelle Test-Adresse
import datetime as dt
test_addr = await espo.create_entity('CAdressen', {
'name': f'NOTIFICATION-TEST {dt.datetime.now().strftime("%H:%M:%S")}',
'adresseStreet': 'Notification Test Str. 999',
'adresseCity': 'Teststadt',
'adressePostalCode': '12345',
'beteiligteId': beteiligte['list'][0]['id']
})
TEST_ENTITY_ID = test_addr['id']
print_success(f"Test-Adresse erstellt: {TEST_ENTITY_ID}")
else:
TEST_ENTITY_ID = addresses['list'][0]['id']
print_success(f"Test-Adresse gefunden: {TEST_ENTITY_ID}")
# 1. Test: Address Delete Required
print_section("1. Test: Address Delete Notification")
print_info("Sende DELETE-Notification...")
result = await notification_mgr.notify_manual_action_required(
entity_type='CAdressen',
entity_id=TEST_ENTITY_ID,
action_type='address_delete_required',
details={
'message': 'TEST: Adresse in Advoware löschen',
'description': (
'TEST-Notification:\n'
'Diese Adresse wurde in EspoCRM gelöscht:\n'
'Teststraße 123\n'
'10115 Berlin\n\n'
'Bitte manuell in Advoware löschen:\n'
'1. Öffne Beteiligten 104860 in Advoware\n'
'2. Gehe zu Adressen-Tab\n'
'3. Lösche Adresse (Index 1)\n'
'4. Speichern'
),
'advowareIndex': 1,
'betnr': 104860,
'address': 'Teststraße 123, Berlin',
'priority': 'Medium'
}
)
if result:
print_success("✓ DELETE-Notification gesendet!")
if result.get('task_id'):
print(f" Task ID: {result['task_id']}")
if result.get('notification_id'):
print(f" Notification ID: {result['notification_id']}")
else:
print_error("✗ DELETE-Notification fehlgeschlagen!")
# 2. Test: READ-ONLY Field Conflict
print_section("2. Test: READ-ONLY Field Conflict Notification")
print_info("Sende READ-ONLY Conflict Notification...")
changes = [
{
'field': 'Hauptadresse',
'espoField': 'isPrimary',
'advoField': 'standardAnschrift',
'espoCRM_value': True,
'advoware_value': False
},
{
'field': 'Land',
'espoField': 'adresseCountry',
'advoField': 'land',
'espoCRM_value': 'AT',
'advoware_value': 'DE'
}
]
change_details = '\n'.join([
f"- {c['field']}: EspoCRM='{c['espoCRM_value']}'"
f"Advoware='{c['advoware_value']}'"
for c in changes
])
result2 = await notification_mgr.notify_manual_action_required(
entity_type='CAdressen',
entity_id=TEST_ENTITY_ID,
action_type='readonly_field_conflict',
details={
'message': f'TEST: {len(changes)} READ-ONLY Feld(er) geändert',
'description': (
f'TEST-Notification:\n'
f'Folgende Felder wurden in EspoCRM geändert, sind aber '
f'READ-ONLY in Advoware und können nicht automatisch '
f'synchronisiert werden:\n\n{change_details}\n\n'
f'Bitte manuell in Advoware anpassen:\n'
f'1. Öffne Beteiligten 104860 in Advoware\n'
f'2. Gehe zu Adressen-Tab\n'
f'3. Passe die Felder manuell an\n'
f'4. Speichern'
),
'changes': changes,
'address': 'Teststraße 123, Berlin',
'betnr': 104860,
'priority': 'High'
}
)
if result2:
print_success("✓ READ-ONLY Conflict Notification gesendet!")
if result2.get('task_id'):
print(f" Task ID: {result2['task_id']}")
if result2.get('notification_id'):
print(f" Notification ID: {result2['notification_id']}")
else:
print_error("✗ READ-ONLY Conflict Notification fehlgeschlagen!")
# 3. Test: General Manual Action
print_section("3. Test: General Manual Action Notification")
print_info("Sende allgemeine Notification...")
result3 = await notification_mgr.notify_manual_action_required(
entity_type='CBeteiligte',
entity_id='6987b30a9bbbfefd0',
action_type='general_manual_action',
details={
'message': 'TEST: Allgemeine manuelle Aktion erforderlich',
'description': (
'TEST-Notification:\n\n'
'Dies ist eine Test-Notification für das Notification-System.\n'
'Sie dient nur zu Testzwecken und kann ignoriert werden.\n\n'
f'Erstellt am: {datetime.now().strftime("%d.%m.%Y %H:%M:%S")}'
),
'priority': 'Low'
},
create_task=False # Kein Task für diesen Test
)
if result3:
print_success("✓ General Notification gesendet!")
if result3.get('task_id'):
print(f" Task ID: {result3['task_id']}")
if result3.get('notification_id'):
print(f" Notification ID: {result3['notification_id']}")
else:
print_error("✗ General Notification fehlgeschlagen!")
print_section("ZUSAMMENFASSUNG")
print_info("Prüfe EspoCRM:")
print(" 1. Öffne Tasks-Modul")
print(" 2. Suche nach 'TEST:'")
print(" 3. Prüfe Notifications (Glocken-Icon)")
print()
print_success("✓ 3 Test-Notifications versendet!")
print_info("⚠ Bitte manuell in EspoCRM löschen nach dem Test")
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""
Test: Welche Felder sind bei PUT wirklich änderbar?
====================================================
"""
import asyncio
import sys
import os
import json
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from services.advoware import AdvowareAPI
TEST_BETNR = 104860
BOLD = '\033[1m'
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RESET = '\033[0m'
def print_success(text):
print(f"{GREEN}{text}{RESET}")
def print_error(text):
print(f"{RED}{text}{RESET}")
def print_info(text):
print(f"{BLUE} {text}{RESET}")
class SimpleLogger:
def info(self, msg): pass
def error(self, msg): pass
def debug(self, msg): pass
def warning(self, msg): pass
class SimpleContext:
def __init__(self):
self.logger = SimpleLogger()
async def main():
print(f"\n{BOLD}=== PUT Response Analyse ==={RESET}\n")
context = SimpleContext()
advo = AdvowareAPI(context=context)
# Finde Test-Adresse
all_addresses = await advo.api_call(
f'/api/v1/advonet/Beteiligte/{TEST_BETNR}/Adressen',
method='GET'
)
test_addr = None
for addr in all_addresses:
bemerkung = addr.get('bemerkung') or ''
if 'TEST-SOFTDELETE' in bemerkung:
test_addr = addr
break
if not test_addr:
print_error("Test-Adresse nicht gefunden")
return
index = test_addr.get('reihenfolgeIndex')
print_info(f"Test-Adresse Index: {index}")
print_info("\nVORHER:")
print(json.dumps(test_addr, indent=2, ensure_ascii=False))
# PUT mit ALLEN Feldern inklusive gueltigBis
print_info("\n=== Sende PUT mit ALLEN Feldern ===")
update_data = {
"strasse": "GEÄNDERT Straße",
"plz": "11111",
"ort": "GEÄNDERT Ort",
"land": "AT",
"postfach": "PF 123",
"postfachPLZ": "11112",
"anschrift": "GEÄNDERT Anschrift",
"standardAnschrift": True,
"bemerkung": "VERSUCH: bemerkung ändern",
"gueltigVon": "2025-01-01T00:00:00", # ← GEÄNDERT
"gueltigBis": "2027-12-31T23:59:59" # ← NEU GESETZT
}
print(json.dumps(update_data, indent=2, ensure_ascii=False))
result = await advo.api_call(
f'/api/v1/advonet/Beteiligte/{TEST_BETNR}/Adressen/{index}',
method='PUT',
json_data=update_data
)
print_info("\n=== PUT Response: ===")
print(json.dumps(result, indent=2, ensure_ascii=False))
# GET und vergleichen
print_info("\n=== GET nach PUT: ===")
all_addresses = await advo.api_call(
f'/api/v1/advonet/Beteiligte/{TEST_BETNR}/Adressen',
method='GET'
)
updated_addr = next((a for a in all_addresses if a.get('reihenfolgeIndex') == index), None)
if updated_addr:
print(json.dumps(updated_addr, indent=2, ensure_ascii=False))
print(f"\n{BOLD}=== VERGLEICH: Was wurde wirklich geändert? ==={RESET}\n")
fields = ['strasse', 'plz', 'ort', 'land', 'postfach', 'postfachPLZ',
'anschrift', 'standardAnschrift', 'bemerkung', 'gueltigVon', 'gueltigBis']
for field in fields:
sent = update_data.get(field)
received = updated_addr.get(field)
if sent == received:
print_success(f"{field:20s}: ✓ GEÄNDERT → {received}")
else:
print_error(f"{field:20s}: ✗ NICHT geändert (sent: {sent}, got: {received})")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,370 @@
#!/usr/bin/env python3
"""
Code Validation Script
Automatisierte Validierung nach Änderungen an steps/ und services/
Features:
- Syntax-Check (compile)
- Import-Check (importlib)
- Type-Hint Validation (mypy optional)
- Async/Await Pattern Check
- Logger Usage Check
- Quick execution (~1-2 seconds)
Usage:
python scripts/validate_code.py # Check all
python scripts/validate_code.py services/ # Check services only
python scripts/validate_code.py --changed # Check only git changed files
python scripts/validate_code.py --mypy # Include mypy checks
"""
import sys
import os
import ast
import importlib.util
import traceback
from pathlib import Path
from typing import List, Tuple, Optional
import subprocess
import argparse
# ANSI Colors
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RESET = '\033[0m'
BOLD = '\033[1m'
class ValidationError:
def __init__(self, file: str, error_type: str, message: str, line: Optional[int] = None):
self.file = file
self.error_type = error_type
self.message = message
self.line = line
def __str__(self):
loc = f":{self.line}" if self.line else ""
return f"{RED}{RESET} {self.file}{loc}\n {YELLOW}[{self.error_type}]{RESET} {self.message}"
class CodeValidator:
def __init__(self, root_dir: Path):
self.root_dir = root_dir
self.errors: List[ValidationError] = []
self.warnings: List[ValidationError] = []
self.checked_files = 0
def add_error(self, file: str, error_type: str, message: str, line: Optional[int] = None):
self.errors.append(ValidationError(file, error_type, message, line))
def add_warning(self, file: str, error_type: str, message: str, line: Optional[int] = None):
self.warnings.append(ValidationError(file, error_type, message, line))
def check_syntax(self, file_path: Path) -> bool:
"""Check Python syntax by compiling"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
source = f.read()
compile(source, str(file_path), 'exec')
return True
except SyntaxError as e:
self.add_error(
str(file_path.relative_to(self.root_dir)),
"SYNTAX",
f"{e.msg}",
e.lineno
)
return False
except Exception as e:
self.add_error(
str(file_path.relative_to(self.root_dir)),
"SYNTAX",
f"Unexpected error: {e}"
)
return False
def check_imports(self, file_path: Path) -> bool:
"""Check if imports are valid"""
try:
# Add project root to path
sys.path.insert(0, str(self.root_dir))
spec = importlib.util.spec_from_file_location("module", file_path)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return True
except ImportError as e:
self.add_error(
str(file_path.relative_to(self.root_dir)),
"IMPORT",
f"{e}"
)
return False
except Exception as e:
# Ignore runtime errors, we only care about imports
if "ImportError" in str(type(e)) or "ModuleNotFoundError" in str(type(e)):
self.add_error(
str(file_path.relative_to(self.root_dir)),
"IMPORT",
f"{e}"
)
return False
return True
finally:
# Remove from path
if str(self.root_dir) in sys.path:
sys.path.remove(str(self.root_dir))
def check_patterns(self, file_path: Path) -> bool:
"""Check common patterns and anti-patterns"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
source = f.read()
tree = ast.parse(source, str(file_path))
# Check 1: Async functions should use await, not asyncio.run()
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
is_async = isinstance(node, ast.AsyncFunctionDef)
# Check for asyncio.run() in async function
if is_async:
for child in ast.walk(node):
if isinstance(child, ast.Call):
if isinstance(child.func, ast.Attribute):
if (isinstance(child.func.value, ast.Name) and
child.func.value.id == 'asyncio' and
child.func.attr == 'run'):
self.add_warning(
str(file_path.relative_to(self.root_dir)),
"ASYNC",
f"asyncio.run() in async function '{node.name}' - use await instead",
node.lineno
)
# Check for logger.warn (should be logger.warning)
for child in ast.walk(node):
if isinstance(child, ast.Call):
if isinstance(child.func, ast.Attribute):
# MOTIA-SPECIFIC: warn() is correct, warning() is NOT supported
if child.func.attr == 'warning':
self.add_warning(
str(file_path.relative_to(self.root_dir)),
"LOGGER",
f"logger.warning() not supported by Motia - use logger.warn()",
child.lineno
)
# Check 2: Services should use self.logger if context available
if 'services/' in str(file_path):
# Check if class has context parameter but uses logger instead of self.logger
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
has_context = False
uses_module_logger = False
# Check __init__ for context parameter
for child in node.body:
if isinstance(child, ast.FunctionDef) and child.name == '__init__':
for arg in child.args.args:
if arg.arg == 'context':
has_context = True
# Check for logger.info/error/etc calls
for child in ast.walk(node):
if isinstance(child, ast.Call):
if isinstance(child.func, ast.Attribute):
if (isinstance(child.func.value, ast.Name) and
child.func.value.id == 'logger'):
uses_module_logger = True
if has_context and uses_module_logger:
self.add_warning(
str(file_path.relative_to(self.root_dir)),
"LOGGER",
f"Class '{node.name}' has context but uses 'logger' - use 'self.logger' for Workbench visibility",
node.lineno
)
return True
except Exception as e:
# Don't fail validation for pattern checks
return True
def check_file(self, file_path: Path) -> bool:
"""Run all checks on a file"""
self.checked_files += 1
# 1. Syntax check (must pass)
if not self.check_syntax(file_path):
return False
# 2. Import check (must pass)
if not self.check_imports(file_path):
return False
# 3. Pattern checks (warnings only)
self.check_patterns(file_path)
return True
def find_python_files(self, paths: List[str]) -> List[Path]:
"""Find all Python files in given paths"""
files = []
for path_str in paths:
path = self.root_dir / path_str
if path.is_file() and path.suffix == '.py':
files.append(path)
elif path.is_dir():
files.extend(path.rglob('*.py'))
return files
def get_changed_files(self) -> List[Path]:
"""Get git changed files"""
try:
result = subprocess.run(
['git', 'diff', '--name-only', 'HEAD'],
cwd=self.root_dir,
capture_output=True,
text=True
)
# Also get staged files
result2 = subprocess.run(
['git', 'diff', '--cached', '--name-only'],
cwd=self.root_dir,
capture_output=True,
text=True
)
all_files = result.stdout.strip().split('\n') + result2.stdout.strip().split('\n')
python_files = []
for f in all_files:
if f and f.endswith('.py'):
file_path = self.root_dir / f
if file_path.exists():
# Only include services/ and steps/
if 'services/' in f or 'steps/' in f:
python_files.append(file_path)
return python_files
except Exception as e:
print(f"{YELLOW}⚠ Could not get git changed files: {e}{RESET}")
return []
def validate(self, paths: List[str], only_changed: bool = False) -> bool:
"""Run validation on all files"""
print(f"{BOLD}🔍 Code Validation{RESET}\n")
if only_changed:
files = self.get_changed_files()
if not files:
print(f"{GREEN}{RESET} No changed Python files in services/ or steps/")
return True
print(f"Checking {len(files)} changed files...\n")
else:
files = self.find_python_files(paths)
print(f"Checking {len(files)} files in {', '.join(paths)}...\n")
# Check each file
for file_path in sorted(files):
rel_path = str(file_path.relative_to(self.root_dir))
print(f" {BLUE}{RESET} {rel_path}...", end='')
if self.check_file(file_path):
print(f" {GREEN}{RESET}")
else:
print(f" {RED}{RESET}")
# Print results
print(f"\n{BOLD}Results:{RESET}")
print(f" Files checked: {self.checked_files}")
print(f" Errors: {len(self.errors)}")
print(f" Warnings: {len(self.warnings)}")
# Print errors
if self.errors:
print(f"\n{BOLD}{RED}Errors:{RESET}")
for error in self.errors:
print(f" {error}")
# Print warnings
if self.warnings:
print(f"\n{BOLD}{YELLOW}Warnings:{RESET}")
for warning in self.warnings:
print(f" {warning}")
# Summary
print()
if self.errors:
print(f"{RED}✗ Validation failed with {len(self.errors)} error(s){RESET}")
return False
elif self.warnings:
print(f"{YELLOW}⚠ Validation passed with {len(self.warnings)} warning(s){RESET}")
return True
else:
print(f"{GREEN}✓ All checks passed!{RESET}")
return True
def run_mypy(root_dir: Path, paths: List[str]) -> bool:
"""Run mypy type checker"""
print(f"\n{BOLD}🔍 Running mypy type checker...{RESET}\n")
try:
result = subprocess.run(
['mypy'] + paths + ['--ignore-missing-imports', '--no-error-summary'],
cwd=root_dir,
capture_output=True,
text=True
)
if result.stdout:
print(result.stdout)
if result.returncode == 0:
print(f"{GREEN}✓ mypy: No type errors{RESET}")
return True
else:
print(f"{RED}✗ mypy found type errors{RESET}")
return False
except FileNotFoundError:
print(f"{YELLOW}⚠ mypy not installed - skipping type checks{RESET}")
print(f" Install with: pip install mypy")
return True
def main():
parser = argparse.ArgumentParser(description='Validate Python code in services/ and steps/')
parser.add_argument('paths', nargs='*', default=['services/', 'steps/'],
help='Paths to check (default: services/ steps/)')
parser.add_argument('--changed', '-c', action='store_true',
help='Only check git changed files')
parser.add_argument('--mypy', '-m', action='store_true',
help='Run mypy type checker')
parser.add_argument('--verbose', '-v', action='store_true',
help='Verbose output')
args = parser.parse_args()
root_dir = Path(__file__).parent.parent
validator = CodeValidator(root_dir)
# Run validation
success = validator.validate(args.paths, only_changed=args.changed)
# Run mypy if requested
if args.mypy and success:
mypy_success = run_mypy(root_dir, args.paths)
success = success and mypy_success
# Exit with appropriate code
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()