Files
espocrm/custom/scripts/validate_and_rebuild.py

923 lines
36 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
EspoCRM Custom Entity Validator & Rebuild Tool
Erweiterte Version mit umfassenden Tests und Log-Prüfung
"""
import json
import os
import sys
import subprocess
import re
import time
import requests
from pathlib import Path
from typing import Dict, List, Tuple, Set, Optional
from collections import defaultdict
from datetime import datetime
from requests.auth import HTTPBasicAuth
# ANSI Color Codes
class Colors:
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BLUE = '\033[94m'
CYAN = '\033[96m'
BOLD = '\033[1m'
DIM = '\033[2m'
END = '\033[0m'
class OutputManager:
"""Verwaltet minimierte vs. verbose Ausgabe"""
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.test_results = []
self.current_test = None
self.test_start_time = None
def header(self, text: str):
"""Ausgabe bei Header"""
if self.verbose:
print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*80}{Colors.END}")
print(f"{Colors.BOLD}{Colors.BLUE}{text.center(80)}{Colors.END}")
print(f"{Colors.BOLD}{Colors.BLUE}{'='*80}{Colors.END}\n")
else:
print(f"\n{Colors.BOLD}{text}{Colors.END}")
def success(self, text: str, details: str = None):
"""Erfolgs-Meldung"""
if self.verbose:
print(f"{Colors.GREEN}{Colors.END} {text}")
if details:
print(f" {Colors.DIM}{details}{Colors.END}")
else:
print(f"{Colors.GREEN}{Colors.END} {text}")
def warning(self, text: str, details: str = None):
"""Warnungs-Meldung"""
print(f"{Colors.YELLOW}{Colors.END} {text}")
if self.verbose and details:
print(f" {Colors.DIM}{details}{Colors.END}")
def error(self, text: str, details: str = None):
"""Fehler-Meldung"""
print(f"{Colors.RED}{Colors.END} {text}")
if details:
print(f" {Colors.RED}{details}{Colors.END}")
def info(self, text: str):
"""Info-Meldung (nur in verbose)"""
if self.verbose:
print(f"{Colors.BLUE}{Colors.END} {text}")
def test_start(self, test_name: str):
"""Test wurde gestartet"""
self.current_test = test_name
self.test_start_time = time.time()
if self.verbose:
print(f"{Colors.CYAN}{Colors.END} {test_name}...", end='', flush=True)
def test_end(self, success: bool, message: str = None):
"""Test wurde beendet"""
duration = time.time() - self.test_start_time if self.test_start_time else 0
if self.verbose:
if success:
print(f" {Colors.GREEN}{Colors.END} {duration:.3f}s")
else:
print(f" {Colors.RED}{Colors.END} {duration:.3f}s")
if message:
print(f" {message}")
self.test_results.append({
'name': self.current_test,
'success': success,
'duration': duration,
'message': message
})
class LogChecker:
"""Prüft EspoCRM Logs auf Fehler"""
def __init__(self, log_path: Path, output: OutputManager):
self.log_path = log_path
self.output = output
self.last_check_time = None
def get_log_file(self) -> Optional[Path]:
"""Findet das aktuelle Log-File"""
if not self.log_path.exists():
return None
today = datetime.now().strftime("%Y-%m-%d")
log_file = self.log_path / f"espo-{today}.log"
if log_file.exists():
return log_file
# Fallback: neuestes Log
log_files = sorted(self.log_path.glob("espo-*.log"),
key=lambda f: f.stat().st_mtime, reverse=True)
return log_files[0] if log_files else None
def check_for_errors(self, since_timestamp: float = None) -> Tuple[bool, List[str]]:
"""
Prüft Logs auf Fehler seit einem bestimmten Zeitpunkt
Returns: (has_errors, error_messages)
"""
log_file = self.get_log_file()
if not log_file:
return False, ["Log-Datei nicht gefunden"]
try:
errors = []
cutoff_time = since_timestamp or time.time() - 5 # 5 Sekunden Puffer
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
# Lese nur die letzten N Zeilen (effizienter)
f.seek(0, 2) # Gehe ans Ende
file_size = f.tell()
f.seek(max(0, file_size - 100000)) # Letzte ~100KB
lines = f.readlines()
for line in lines:
# Extrahiere Timestamp aus Log-Zeile
match = re.match(r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]', line)
if match:
try:
log_time = datetime.strptime(match.group(1), '%Y-%m-%d %H:%M:%S').timestamp()
if log_time < cutoff_time:
continue
except:
pass
# Prüfe auf Fehler-Keywords
line_upper = line.upper()
if any(keyword in line_upper for keyword in ['ERROR', 'CRITICAL', 'FATAL', 'EXCEPTION']):
# Filtere bekannte unwichtige Fehler
if 'SQL failed: INSERT INTO `app_log_record`' in line:
continue
if 'BPM:' in line and 'ERROR' not in line_upper:
continue
errors.append(line.strip())
return len(errors) > 0, errors
except Exception as e:
return True, [f"Fehler beim Lesen der Logs: {e}"]
class EntityTester:
"""Führt CRUD-Tests für Entities durch"""
def __init__(self, base_url: str, username: str, password: str,
output: OutputManager, log_checker: LogChecker):
self.base_url = base_url.rstrip('/')
self.auth = HTTPBasicAuth(username, password)
self.output = output
self.log_checker = log_checker
self.created_records = [] # Für Cleanup
def test_entity(self, entity_name: str, test_data: dict = None) -> bool:
"""
Führt vollständigen CRUD-Test für eine Entity durch
Returns: True wenn alle Tests erfolgreich
"""
all_success = True
record_id = None
# Verwende Default-Testdaten wenn keine angegeben
if test_data is None:
test_data = {'name': f'Test {entity_name} {int(time.time())}'}
# 1. CREATE Test
self.output.test_start(f"{entity_name}: Create")
test_start = time.time()
try:
response = requests.post(
f"{self.base_url}/api/v1/{entity_name}",
json=test_data,
auth=self.auth,
timeout=10
)
# Prüfe Log nach Request
has_errors, error_msgs = self.log_checker.check_for_errors(test_start)
if response.status_code == 200:
data = response.json()
record_id = data.get('id')
if record_id:
self.created_records.append((entity_name, record_id))
if not has_errors:
self.output.test_end(True, f"ID: {record_id}")
else:
self.output.test_end(False, f"Logs enthalten Fehler!")
self.output.error("Log-Fehler bei CREATE:", "\n".join(error_msgs[:3]))
all_success = False
else:
self.output.test_end(False, "Keine ID in Response")
all_success = False
else:
self.output.test_end(False, f"HTTP {response.status_code}")
self.output.error(f"Response: {response.text[:200]}")
if has_errors:
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
all_success = False
return all_success
except Exception as e:
self.output.test_end(False, str(e))
all_success = False
return all_success
time.sleep(0.1) # Kleine Pause zwischen Tests
# 2. READ Test
if record_id:
self.output.test_start(f"{entity_name}: Read")
test_start = time.time()
try:
response = requests.get(
f"{self.base_url}/api/v1/{entity_name}/{record_id}",
auth=self.auth,
timeout=10
)
has_errors, error_msgs = self.log_checker.check_for_errors(test_start)
if response.status_code == 200:
data = response.json()
if data.get('id') == record_id and not has_errors:
self.output.test_end(True, f"Name: {data.get('name', 'N/A')}")
else:
if has_errors:
self.output.test_end(False, "Log-Fehler")
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
else:
self.output.test_end(False, "ID stimmt nicht überein")
all_success = False
else:
self.output.test_end(False, f"HTTP {response.status_code}")
if has_errors:
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
all_success = False
except Exception as e:
self.output.test_end(False, str(e))
all_success = False
time.sleep(0.1)
# 3. UPDATE Test
if record_id:
self.output.test_start(f"{entity_name}: Update")
test_start = time.time()
try:
update_data = {'description': f'Updated at {datetime.now().isoformat()}'}
response = requests.put(
f"{self.base_url}/api/v1/{entity_name}/{record_id}",
json=update_data,
auth=self.auth,
timeout=10
)
has_errors, error_msgs = self.log_checker.check_for_errors(test_start)
if response.status_code == 200 and not has_errors:
self.output.test_end(True)
else:
if response.status_code != 200:
self.output.test_end(False, f"HTTP {response.status_code}")
else:
self.output.test_end(False, "Log-Fehler")
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
all_success = False
except Exception as e:
self.output.test_end(False, str(e))
all_success = False
time.sleep(0.1)
# 4. LIST Test
self.output.test_start(f"{entity_name}: List")
test_start = time.time()
try:
response = requests.get(
f"{self.base_url}/api/v1/{entity_name}",
auth=self.auth,
timeout=10
)
has_errors, error_msgs = self.log_checker.check_for_errors(test_start)
if response.status_code == 200:
data = response.json()
total = data.get('total', 0)
if not has_errors:
self.output.test_end(True, f"{total} Einträge")
else:
self.output.test_end(False, "Log-Fehler")
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
all_success = False
else:
self.output.test_end(False, f"HTTP {response.status_code}")
if has_errors:
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
all_success = False
except Exception as e:
self.output.test_end(False, str(e))
all_success = False
time.sleep(0.1)
# 5. DELETE Test
if record_id:
self.output.test_start(f"{entity_name}: Delete")
test_start = time.time()
try:
response = requests.delete(
f"{self.base_url}/api/v1/{entity_name}/{record_id}",
auth=self.auth,
timeout=10
)
has_errors, error_msgs = self.log_checker.check_for_errors(test_start)
if response.status_code == 200 and not has_errors:
self.output.test_end(True)
# Entferne aus cleanup-Liste
self.created_records = [(e, i) for e, i in self.created_records
if not (e == entity_name and i == record_id)]
else:
if response.status_code != 200:
self.output.test_end(False, f"HTTP {response.status_code}")
else:
self.output.test_end(False, "Log-Fehler")
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
all_success = False
except Exception as e:
self.output.test_end(False, str(e))
all_success = False
return all_success
def cleanup(self):
"""Lösche alle erstellten Test-Records"""
if not self.created_records:
return
self.output.info(f"Cleanup: Lösche {len(self.created_records)} Test-Records...")
for entity_name, record_id in self.created_records:
try:
requests.delete(
f"{self.base_url}/api/v1/{entity_name}/{record_id}",
auth=self.auth,
timeout=5
)
except:
pass # Ignoriere Fehler beim Cleanup
class EntityValidator:
def __init__(self, base_path: str, verbose: bool = False):
self.base_path = Path(base_path)
self.custom_path = self.base_path / "custom" / "Espo" / "Custom" / "Resources"
self.metadata_path = self.custom_path / "metadata"
self.i18n_path = self.custom_path / "i18n"
self.client_custom_path = self.base_path / "client" / "custom"
self.output = OutputManager(verbose)
self.errors = []
self.warnings = []
self.entity_defs = {}
self.skip_e2e_tests = False
self.log_checker = LogChecker(self.base_path / "data" / "logs", self.output)
def validate_json_syntax(self) -> bool:
"""Validiere JSON-Syntax aller Dateien im custom-Verzeichnis."""
self.output.header("1. JSON-SYNTAX VALIDIERUNG")
json_files = list(self.custom_path.rglob("*.json"))
if not json_files:
self.output.warning("Keine JSON-Dateien gefunden")
return True
self.output.info(f"Prüfe {len(json_files)} JSON-Dateien...")
invalid_files = []
for json_file in json_files:
try:
with open(json_file, 'r', encoding='utf-8') as f:
json.load(f)
except json.JSONDecodeError as e:
error_msg = f"{json_file.relative_to(self.base_path)}: {e}"
self.errors.append(error_msg)
invalid_files.append(error_msg)
if invalid_files:
self.output.error(f"{len(invalid_files)} Datei(en) mit JSON-Fehlern",
"\n".join(invalid_files[:5]))
return False
else:
self.output.success(f"Alle {len(json_files)} JSON-Dateien valide")
return True
def load_entity_defs(self):
"""Lade alle entityDefs für weitere Analysen."""
entity_defs_path = self.metadata_path / "entityDefs"
if not entity_defs_path.exists():
return
for json_file in entity_defs_path.glob("*.json"):
entity_name = json_file.stem
try:
with open(json_file, 'r', encoding='utf-8') as f:
self.entity_defs[entity_name] = json.load(f)
except Exception:
pass
def validate_relationships(self) -> bool:
"""Validiere Relationship-Definitionen zwischen Entities."""
self.output.header("2. RELATIONSHIP-KONSISTENZ")
if not self.entity_defs:
self.output.warning("Keine entityDefs geladen")
return True
relationship_errors = []
checked_pairs = set()
skip_foreign_links = {'parent', 'parents'}
for entity_name, entity_def in self.entity_defs.items():
links = entity_def.get('links', {})
for link_name, link_def in links.items():
link_type = link_def.get('type')
target_entity = link_def.get('entity')
foreign = link_def.get('foreign')
relation_name = link_def.get('relationName')
if foreign in skip_foreign_links:
continue
if link_type in ['hasMany', 'hasOne'] and target_entity and foreign:
pair_key = tuple(sorted([f"{entity_name}.{link_name}",
f"{target_entity}.{foreign}"]))
if pair_key in checked_pairs:
continue
checked_pairs.add(pair_key)
if target_entity not in self.entity_defs:
relationship_errors.append(
f"{entity_name}.{link_name}: Ziel-Entity '{target_entity}' existiert nicht"
)
continue
target_links = self.entity_defs[target_entity].get('links', {})
if foreign not in target_links:
relationship_errors.append(
f"{entity_name}.{link_name}{target_entity}: "
f"Foreign link '{foreign}' fehlt"
)
continue
foreign_def = target_links[foreign]
foreign_foreign = foreign_def.get('foreign')
foreign_relation_name = foreign_def.get('relationName')
if foreign_foreign != link_name:
relationship_errors.append(
f"{entity_name}.{link_name}{target_entity}.{foreign}: "
f"Foreign zeigt auf '{foreign_foreign}' statt '{link_name}'"
)
if relation_name and foreign_relation_name and relation_name != foreign_relation_name:
relationship_errors.append(
f"{entity_name}.{link_name}{target_entity}.{foreign}: "
f"relationName unterschiedlich"
)
if relationship_errors:
self.output.error(f"{len(relationship_errors)} Relationship-Fehler",
"\n".join(relationship_errors[:5]))
self.errors.extend(relationship_errors)
return False
else:
self.output.success(f"{len(checked_pairs)} Relationships geprüft")
return True
def validate_required_files(self) -> bool:
"""Prüfe ob für jede Entity alle erforderlichen Dateien existieren."""
self.output.header("3. ERFORDERLICHE DATEIEN")
if not self.entity_defs:
self.output.warning("Keine entityDefs geladen")
return True
missing_files = []
custom_entities = [name for name in self.entity_defs.keys()
if name.startswith('C')]
self.output.info(f"Prüfe {len(custom_entities)} Custom-Entities...")
for entity_name in custom_entities:
# 1. Scope-Datei
scope_file = self.metadata_path / "scopes" / f"{entity_name}.json"
if not scope_file.exists():
missing_files.append(f"{entity_name}: scopes/{entity_name}.json fehlt")
# 2. i18n-Dateien
for lang in ['de_DE', 'en_US']:
i18n_file = self.i18n_path / lang / f"{entity_name}.json"
if not i18n_file.exists():
missing_files.append(f"{entity_name}: i18n/{lang}/{entity_name}.json fehlt")
if missing_files:
self.output.error(f"{len(missing_files)} fehlende Dateien",
"\n".join(missing_files[:10]))
self.warnings.extend(missing_files)
return True # Nur Warnung, kein Hard-Fail
else:
self.output.success(f"Alle erforderlichen Dateien vorhanden")
return True
def check_file_permissions(self) -> bool:
"""Prüfe Dateirechte im custom-Verzeichnis."""
self.output.header("4. DATEIRECHTE-PRÜFUNG")
paths_to_check = [
self.custom_path,
self.client_custom_path,
self.base_path / "data",
]
all_wrong_files = []
try:
for path in paths_to_check:
if not path.exists():
continue
result = subprocess.run(
['find', str(path), '!', '-user', 'www-data', '-o', '!', '-group', 'www-data'],
capture_output=True,
text=True,
timeout=10
)
wrong_files = [line for line in result.stdout.strip().split('\n') if line]
all_wrong_files.extend(wrong_files)
if all_wrong_files:
self.output.warning(f"{len(all_wrong_files)} Dateien mit falschen Rechten")
self.output.info("Versuche automatische Korrektur...")
success_count = 0
for path in paths_to_check:
if not path.exists():
continue
try:
subprocess.run(['sudo', 'chown', '-R', 'www-data:www-data', str(path)],
check=True, capture_output=True, timeout=30)
subprocess.run(['sudo', 'find', str(path), '-type', 'f', '-exec',
'chmod', '664', '{}', ';'],
check=True, capture_output=True, timeout=30)
subprocess.run(['sudo', 'find', str(path), '-type', 'd', '-exec',
'chmod', '775', '{}', ';'],
check=True, capture_output=True, timeout=30)
success_count += 1
except:
pass
if success_count > 0:
self.output.success(f"Dateirechte für {success_count} Verzeichnisse korrigiert")
else:
self.output.success("Alle Dateirechte korrekt (www-data:www-data)")
return True
except Exception as e:
self.output.warning(f"Konnte Dateirechte nicht prüfen: {e}")
return True
def validate_php_files(self) -> bool:
"""Validiere PHP-Dateien mit php -l."""
self.output.header("5. PHP-SYNTAX VALIDIERUNG")
php_files = []
custom_espo_path = self.base_path / "custom" / "Espo"
if custom_espo_path.exists():
php_files = list(custom_espo_path.rglob("*.php"))
if not php_files:
self.output.info("Keine PHP-Dateien gefunden")
return True
self.output.info(f"Prüfe {len(php_files)} PHP-Dateien...")
try:
subprocess.run(['php', '--version'], capture_output=True, timeout=5)
except:
self.output.warning("PHP-CLI nicht verfügbar, überspringe PHP-Validierung")
return True
invalid_files = []
for php_file in php_files:
try:
result = subprocess.run(['php', '-l', str(php_file)],
capture_output=True, text=True, timeout=5)
if result.returncode != 0:
error_msg = f"{php_file.relative_to(self.base_path)}"
invalid_files.append(error_msg)
self.errors.append(error_msg)
except:
pass
if invalid_files:
self.output.error(f"{len(invalid_files)} PHP-Dateien mit Syntax-Fehlern",
"\n".join(invalid_files[:5]))
return False
else:
self.output.success(f"Alle {len(php_files)} PHP-Dateien valide")
return True
def run_rebuild(self) -> bool:
"""Führe den EspoCRM Rebuild aus."""
self.output.header("6. ESPOCRM REBUILD")
is_docker_volume = '/docker/volumes/' in str(self.base_path)
if is_docker_volume:
try:
result = subprocess.run(['docker', 'ps', '--format', '{{.Names}}'],
capture_output=True, text=True, timeout=5)
containers = result.stdout.strip().split('\n')
espo_container = None
for container in containers:
if container.lower() == 'espocrm':
espo_container = container
break
if not espo_container:
for container in containers:
if 'espo' in container.lower() and 'db' not in container.lower():
espo_container = container
break
if espo_container:
self.output.info(f"Container: {espo_container}")
# Cache löschen
self.output.info("Lösche Cache...")
subprocess.run(['docker', 'exec', espo_container, 'php',
'command.php', 'clear-cache'],
capture_output=True, timeout=30)
# Rebuild
self.output.info("Starte Rebuild...")
rebuild_start = time.time()
result = subprocess.run(
['docker', 'exec', espo_container, 'php', 'command.php', 'rebuild'],
capture_output=True, text=True, timeout=60
)
# Prüfe Logs nach Rebuild
has_errors, error_msgs = self.log_checker.check_for_errors(rebuild_start)
if result.returncode == 0 and not has_errors:
self.output.success("Rebuild erfolgreich abgeschlossen")
return True
else:
if result.returncode != 0:
self.output.error("Rebuild fehlgeschlagen", result.stderr)
else:
self.output.error("Rebuild abgeschlossen, aber Fehler in Logs")
for err in error_msgs[:5]:
self.output.error("", err)
return False
else:
self.output.error("Kein EspoCRM Docker-Container gefunden")
return False
except Exception as e:
self.output.error(f"Docker-Rebuild fehlgeschlagen: {e}")
return False
# Lokaler Rebuild (Fallback)
self.output.error("Lokaler Rebuild nicht implementiert, verwende Docker")
return False
def run_entity_tests(self, base_url: str, username: str, password: str) -> bool:
"""Führe umfassende Entity-Tests durch."""
self.output.header("7. ENTITY CRUD-TESTS")
# Finde alle Custom-Entities
custom_entities = sorted([name for name in self.entity_defs.keys()
if name.startswith('C') and
not name.endswith('CDokumente')]) # Überspringe Junction-Tables
if not custom_entities:
self.output.warning("Keine Custom-Entities zum Testen gefunden")
return True
self.output.info(f"Teste {len(custom_entities)} Custom-Entities...")
tester = EntityTester(base_url, username, password, self.output, self.log_checker)
all_success = True
success_count = 0
fail_count = 0
for entity_name in custom_entities:
if tester.test_entity(entity_name):
success_count += 1
else:
fail_count += 1
all_success = False
# Cleanup
tester.cleanup()
# Zusammenfassung
if not self.output.verbose:
print(f"\n{Colors.CYAN}Ergebnis:{Colors.END} {success_count} erfolgreich, {fail_count} fehlgeschlagen")
if all_success:
self.output.success(f"Alle {success_count} Entities getestet")
else:
self.output.error(f"{fail_count} Entity-Tests fehlgeschlagen")
return all_success
def generate_ki_feedback(self) -> Dict:
"""Generiere strukturiertes Feedback für KI."""
feedback = {
"status": "success" if not self.errors else "failed",
"summary": {
"errors": len(self.errors),
"warnings": len(self.warnings),
"entities_checked": len(self.entity_defs),
},
"errors": self.errors[:10], # Top 10 Fehler
"warnings": self.warnings[:10], # Top 10 Warnungen
"recommendations": []
}
# Generiere Empfehlungen basierend auf Fehlern
if any('Service' in e or 'Class' in e for e in self.errors):
feedback["recommendations"].append(
"Fehlende Service-Klassen - erstelle für jede Entity eine Service-Klasse"
)
if any('i18n' in w for w in self.warnings):
feedback["recommendations"].append(
"Unvollständige Übersetzungen - füge fehlende i18n-Dateien hinzu"
)
if any('relationship' in e.lower() for e in self.errors):
feedback["recommendations"].append(
"Relationship-Fehler - prüfe bidirektionale Link-Definitionen"
)
if any('InjectableFactory' in e for e in self.errors):
feedback["recommendations"].append(
"KRITISCH: Service-Klassen fehlen! Erstelle Services in custom/Espo/Custom/Services/"
)
return feedback
def print_summary(self):
"""Drucke Zusammenfassung."""
self.output.header("ZUSAMMENFASSUNG")
if self.errors:
print(f"\n{Colors.RED}{Colors.BOLD}FEHLER: {len(self.errors)}{Colors.END}")
for err in self.errors[:10]:
print(f" {Colors.RED}{Colors.END} {err}")
if len(self.errors) > 10:
print(f" {Colors.RED}...{Colors.END} und {len(self.errors) - 10} weitere")
if self.warnings:
print(f"\n{Colors.YELLOW}{Colors.BOLD}WARNUNGEN: {len(self.warnings)}{Colors.END}")
for warn in self.warnings[:5]:
print(f" {Colors.YELLOW}{Colors.END} {warn}")
if len(self.warnings) > 5:
print(f" {Colors.YELLOW}...{Colors.END} und {len(self.warnings) - 5} weitere")
if not self.errors and not self.warnings:
print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ALLE PRÜFUNGEN BESTANDEN{Colors.END}\n")
# KI-Feedback ausgeben
if self.errors or self.warnings:
feedback = self.generate_ki_feedback()
print(f"\n{Colors.CYAN}{Colors.BOLD}KI-FEEDBACK:{Colors.END}")
print(json.dumps(feedback, indent=2, ensure_ascii=False))
def validate_all(self) -> bool:
"""Führe alle Validierungen durch."""
all_valid = True
# 1. JSON-Syntax (kritisch)
if not self.validate_json_syntax():
all_valid = False
self.output.error("Abbruch: JSON-Syntax-Fehler!")
return False
# Lade entityDefs
self.load_entity_defs()
# 2. Relationships (kritisch)
if not self.validate_relationships():
all_valid = False
# 3. Erforderliche Dateien (Warnung)
self.validate_required_files()
# 4. Dateirechte (nicht kritisch)
self.check_file_permissions()
# 5. PHP-Syntax (kritisch)
if not self.validate_php_files():
all_valid = False
return all_valid
def main():
import argparse
parser = argparse.ArgumentParser(
description='EspoCRM Custom Entity Validator & Rebuild Tool - Erweiterte Version'
)
parser.add_argument('-v', '--verbose', action='store_true',
help='Ausführliche Ausgabe')
parser.add_argument('--dry-run', action='store_true',
help='Nur Validierungen, kein Rebuild')
parser.add_argument('--skip-tests', action='store_true',
help='Überspringe Entity-Tests')
parser.add_argument('--base-url', default='http://localhost',
help='EspoCRM Base-URL (default: http://localhost)')
parser.add_argument('--username', default='admin',
help='API-Username (default: admin)')
parser.add_argument('--password', default='admin',
help='API-Password (default: admin)')
args = parser.parse_args()
# Finde EspoCRM Root
script_dir = Path(__file__).parent.parent.parent
if not (script_dir / "rebuild.php").exists():
print(f"{Colors.RED}{Colors.END} Fehler: Nicht im EspoCRM-Root-Verzeichnis!")
sys.exit(1)
print(f"{Colors.BOLD}EspoCRM Validator & Rebuild Tool v2.0{Colors.END}")
if args.verbose:
print(f"{Colors.DIM}Verbose-Modus aktiviert{Colors.END}")
print()
validator = EntityValidator(str(script_dir), verbose=args.verbose)
# Validierungen
all_valid = validator.validate_all()
if not all_valid:
validator.print_summary()
print(f"\n{Colors.RED}{Colors.BOLD}✗ REBUILD ABGEBROCHEN{Colors.END}\n")
sys.exit(1)
if args.dry_run:
validator.print_summary()
print(f"\n{Colors.GREEN}{Colors.BOLD}✓ VALIDIERUNGEN ABGESCHLOSSEN{Colors.END}\n")
sys.exit(0)
# Rebuild
if not validator.run_rebuild():
validator.print_summary()
print(f"\n{Colors.RED}{Colors.BOLD}✗ REBUILD FEHLGESCHLAGEN{Colors.END}\n")
sys.exit(1)
# Entity-Tests
if not args.skip_tests:
time.sleep(2) # Kurze Pause nach Rebuild
if not validator.run_entity_tests(args.base_url, args.username, args.password):
validator.print_summary()
print(f"\n{Colors.YELLOW}{Colors.BOLD}⚠ TESTS FEHLGESCHLAGEN{Colors.END}\n")
sys.exit(1)
validator.print_summary()
print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ERFOLGREICH ABGESCHLOSSEN{Colors.END}\n")
sys.exit(0)
if __name__ == "__main__":
main()