#!/usr/bin/env python3 """ EspoCRM Custom Entity Validator & Rebuild Tool Erweiterte Version mit umfassenden Tests und Log-Prüfung """ import json import os import sys import subprocess import re import time import requests from pathlib import Path from typing import Dict, List, Tuple, Set, Optional from collections import defaultdict from datetime import datetime from requests.auth import HTTPBasicAuth # ANSI Color Codes class Colors: GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BLUE = '\033[94m' CYAN = '\033[96m' BOLD = '\033[1m' DIM = '\033[2m' END = '\033[0m' class OutputManager: """Verwaltet minimierte vs. verbose Ausgabe""" def __init__(self, verbose: bool = False): self.verbose = verbose self.test_results = [] self.current_test = None self.test_start_time = None def header(self, text: str): """Ausgabe bei Header""" if self.verbose: print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*80}{Colors.END}") print(f"{Colors.BOLD}{Colors.BLUE}{text.center(80)}{Colors.END}") print(f"{Colors.BOLD}{Colors.BLUE}{'='*80}{Colors.END}\n") else: print(f"\n{Colors.BOLD}{text}{Colors.END}") def success(self, text: str, details: str = None): """Erfolgs-Meldung""" if self.verbose: print(f"{Colors.GREEN}✓{Colors.END} {text}") if details: print(f" {Colors.DIM}{details}{Colors.END}") else: print(f"{Colors.GREEN}✓{Colors.END} {text}") def warning(self, text: str, details: str = None): """Warnungs-Meldung""" print(f"{Colors.YELLOW}⚠{Colors.END} {text}") if self.verbose and details: print(f" {Colors.DIM}{details}{Colors.END}") def error(self, text: str, details: str = None): """Fehler-Meldung""" print(f"{Colors.RED}✗{Colors.END} {text}") if details: print(f" {Colors.RED}{details}{Colors.END}") def info(self, text: str): """Info-Meldung (nur in verbose)""" if self.verbose: print(f"{Colors.BLUE}ℹ{Colors.END} {text}") def test_start(self, test_name: str): """Test wurde gestartet""" self.current_test = test_name self.test_start_time = time.time() if self.verbose: print(f"{Colors.CYAN}▶{Colors.END} {test_name}...", end='', flush=True) def test_end(self, success: bool, message: str = None): """Test wurde beendet""" duration = time.time() - self.test_start_time if self.test_start_time else 0 if self.verbose: if success: print(f" {Colors.GREEN}✓{Colors.END} {duration:.3f}s") else: print(f" {Colors.RED}✗{Colors.END} {duration:.3f}s") if message: print(f" {message}") self.test_results.append({ 'name': self.current_test, 'success': success, 'duration': duration, 'message': message }) class LogChecker: """Prüft EspoCRM Logs auf Fehler""" def __init__(self, log_path: Path, output: OutputManager): self.log_path = log_path self.output = output self.last_check_time = None def get_log_file(self) -> Optional[Path]: """Findet das aktuelle Log-File""" if not self.log_path.exists(): return None today = datetime.now().strftime("%Y-%m-%d") log_file = self.log_path / f"espo-{today}.log" if log_file.exists(): return log_file # Fallback: neuestes Log log_files = sorted(self.log_path.glob("espo-*.log"), key=lambda f: f.stat().st_mtime, reverse=True) return log_files[0] if log_files else None def check_for_errors(self, since_timestamp: float = None) -> Tuple[bool, List[str]]: """ Prüft Logs auf Fehler seit einem bestimmten Zeitpunkt Returns: (has_errors, error_messages) """ log_file = self.get_log_file() if not log_file: return False, ["Log-Datei nicht gefunden"] try: errors = [] cutoff_time = since_timestamp or time.time() - 5 # 5 Sekunden Puffer with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: # Lese nur die letzten N Zeilen (effizienter) f.seek(0, 2) # Gehe ans Ende file_size = f.tell() f.seek(max(0, file_size - 100000)) # Letzte ~100KB lines = f.readlines() for line in lines: # Extrahiere Timestamp aus Log-Zeile match = re.match(r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]', line) if match: try: log_time = datetime.strptime(match.group(1), '%Y-%m-%d %H:%M:%S').timestamp() if log_time < cutoff_time: continue except: pass # Prüfe auf Fehler-Keywords line_upper = line.upper() if any(keyword in line_upper for keyword in ['ERROR', 'CRITICAL', 'FATAL', 'EXCEPTION']): # Filtere bekannte unwichtige Fehler if 'SQL failed: INSERT INTO `app_log_record`' in line: continue if 'BPM:' in line and 'ERROR' not in line_upper: continue errors.append(line.strip()) return len(errors) > 0, errors except Exception as e: return True, [f"Fehler beim Lesen der Logs: {e}"] class EntityTester: """Führt CRUD-Tests für Entities durch""" def __init__(self, base_url: str, username: str, password: str, output: OutputManager, log_checker: LogChecker): self.base_url = base_url.rstrip('/') self.auth = HTTPBasicAuth(username, password) self.output = output self.log_checker = log_checker self.created_records = [] # Für Cleanup def test_entity(self, entity_name: str, test_data: dict = None) -> bool: """ Führt vollständigen CRUD-Test für eine Entity durch Returns: True wenn alle Tests erfolgreich """ all_success = True record_id = None # Verwende Default-Testdaten wenn keine angegeben if test_data is None: test_data = {'name': f'Test {entity_name} {int(time.time())}'} # 1. CREATE Test self.output.test_start(f"{entity_name}: Create") test_start = time.time() try: response = requests.post( f"{self.base_url}/api/v1/{entity_name}", json=test_data, auth=self.auth, timeout=10 ) # Prüfe Log nach Request has_errors, error_msgs = self.log_checker.check_for_errors(test_start) if response.status_code == 200: data = response.json() record_id = data.get('id') if record_id: self.created_records.append((entity_name, record_id)) if not has_errors: self.output.test_end(True, f"ID: {record_id}") else: self.output.test_end(False, f"Logs enthalten Fehler!") self.output.error("Log-Fehler bei CREATE:", "\n".join(error_msgs[:3])) all_success = False else: self.output.test_end(False, "Keine ID in Response") all_success = False else: self.output.test_end(False, f"HTTP {response.status_code}") self.output.error(f"Response: {response.text[:200]}") if has_errors: self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) all_success = False return all_success except Exception as e: self.output.test_end(False, str(e)) all_success = False return all_success time.sleep(0.1) # Kleine Pause zwischen Tests # 2. READ Test if record_id: self.output.test_start(f"{entity_name}: Read") test_start = time.time() try: response = requests.get( f"{self.base_url}/api/v1/{entity_name}/{record_id}", auth=self.auth, timeout=10 ) has_errors, error_msgs = self.log_checker.check_for_errors(test_start) if response.status_code == 200: data = response.json() if data.get('id') == record_id and not has_errors: self.output.test_end(True, f"Name: {data.get('name', 'N/A')}") else: if has_errors: self.output.test_end(False, "Log-Fehler") self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) else: self.output.test_end(False, "ID stimmt nicht überein") all_success = False else: self.output.test_end(False, f"HTTP {response.status_code}") if has_errors: self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) all_success = False except Exception as e: self.output.test_end(False, str(e)) all_success = False time.sleep(0.1) # 3. UPDATE Test if record_id: self.output.test_start(f"{entity_name}: Update") test_start = time.time() try: update_data = {'description': f'Updated at {datetime.now().isoformat()}'} response = requests.put( f"{self.base_url}/api/v1/{entity_name}/{record_id}", json=update_data, auth=self.auth, timeout=10 ) has_errors, error_msgs = self.log_checker.check_for_errors(test_start) if response.status_code == 200 and not has_errors: self.output.test_end(True) else: if response.status_code != 200: self.output.test_end(False, f"HTTP {response.status_code}") else: self.output.test_end(False, "Log-Fehler") self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) all_success = False except Exception as e: self.output.test_end(False, str(e)) all_success = False time.sleep(0.1) # 4. LIST Test self.output.test_start(f"{entity_name}: List") test_start = time.time() try: response = requests.get( f"{self.base_url}/api/v1/{entity_name}", auth=self.auth, timeout=10 ) has_errors, error_msgs = self.log_checker.check_for_errors(test_start) if response.status_code == 200: data = response.json() total = data.get('total', 0) if not has_errors: self.output.test_end(True, f"{total} Einträge") else: self.output.test_end(False, "Log-Fehler") self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) all_success = False else: self.output.test_end(False, f"HTTP {response.status_code}") if has_errors: self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) all_success = False except Exception as e: self.output.test_end(False, str(e)) all_success = False time.sleep(0.1) # 5. DELETE Test if record_id: self.output.test_start(f"{entity_name}: Delete") test_start = time.time() try: response = requests.delete( f"{self.base_url}/api/v1/{entity_name}/{record_id}", auth=self.auth, timeout=10 ) has_errors, error_msgs = self.log_checker.check_for_errors(test_start) if response.status_code == 200 and not has_errors: self.output.test_end(True) # Entferne aus cleanup-Liste self.created_records = [(e, i) for e, i in self.created_records if not (e == entity_name and i == record_id)] else: if response.status_code != 200: self.output.test_end(False, f"HTTP {response.status_code}") else: self.output.test_end(False, "Log-Fehler") self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) all_success = False except Exception as e: self.output.test_end(False, str(e)) all_success = False return all_success def cleanup(self): """Lösche alle erstellten Test-Records""" if not self.created_records: return self.output.info(f"Cleanup: Lösche {len(self.created_records)} Test-Records...") for entity_name, record_id in self.created_records: try: requests.delete( f"{self.base_url}/api/v1/{entity_name}/{record_id}", auth=self.auth, timeout=5 ) except: pass # Ignoriere Fehler beim Cleanup class EntityValidator: def __init__(self, base_path: str, verbose: bool = False): self.base_path = Path(base_path) self.custom_path = self.base_path / "custom" / "Espo" / "Custom" / "Resources" self.metadata_path = self.custom_path / "metadata" self.i18n_path = self.custom_path / "i18n" self.client_custom_path = self.base_path / "client" / "custom" self.output = OutputManager(verbose) self.errors = [] self.warnings = [] self.entity_defs = {} self.skip_e2e_tests = False self.log_checker = LogChecker(self.base_path / "data" / "logs", self.output) def validate_json_syntax(self) -> bool: """Validiere JSON-Syntax aller Dateien im custom-Verzeichnis.""" self.output.header("1. JSON-SYNTAX VALIDIERUNG") json_files = list(self.custom_path.rglob("*.json")) if not json_files: self.output.warning("Keine JSON-Dateien gefunden") return True self.output.info(f"Prüfe {len(json_files)} JSON-Dateien...") invalid_files = [] for json_file in json_files: try: with open(json_file, 'r', encoding='utf-8') as f: json.load(f) except json.JSONDecodeError as e: error_msg = f"{json_file.relative_to(self.base_path)}: {e}" self.errors.append(error_msg) invalid_files.append(error_msg) if invalid_files: self.output.error(f"{len(invalid_files)} Datei(en) mit JSON-Fehlern", "\n".join(invalid_files[:5])) return False else: self.output.success(f"Alle {len(json_files)} JSON-Dateien valide") return True def load_entity_defs(self): """Lade alle entityDefs für weitere Analysen.""" entity_defs_path = self.metadata_path / "entityDefs" if not entity_defs_path.exists(): return for json_file in entity_defs_path.glob("*.json"): entity_name = json_file.stem try: with open(json_file, 'r', encoding='utf-8') as f: self.entity_defs[entity_name] = json.load(f) except Exception: pass def validate_relationships(self) -> bool: """Validiere Relationship-Definitionen zwischen Entities.""" self.output.header("2. RELATIONSHIP-KONSISTENZ") if not self.entity_defs: self.output.warning("Keine entityDefs geladen") return True relationship_errors = [] checked_pairs = set() skip_foreign_links = {'parent', 'parents'} for entity_name, entity_def in self.entity_defs.items(): links = entity_def.get('links', {}) for link_name, link_def in links.items(): link_type = link_def.get('type') target_entity = link_def.get('entity') foreign = link_def.get('foreign') relation_name = link_def.get('relationName') if foreign in skip_foreign_links: continue if link_type in ['hasMany', 'hasOne'] and target_entity and foreign: pair_key = tuple(sorted([f"{entity_name}.{link_name}", f"{target_entity}.{foreign}"])) if pair_key in checked_pairs: continue checked_pairs.add(pair_key) if target_entity not in self.entity_defs: relationship_errors.append( f"{entity_name}.{link_name}: Ziel-Entity '{target_entity}' existiert nicht" ) continue target_links = self.entity_defs[target_entity].get('links', {}) if foreign not in target_links: relationship_errors.append( f"{entity_name}.{link_name} → {target_entity}: " f"Foreign link '{foreign}' fehlt" ) continue foreign_def = target_links[foreign] foreign_foreign = foreign_def.get('foreign') foreign_relation_name = foreign_def.get('relationName') if foreign_foreign != link_name: relationship_errors.append( f"{entity_name}.{link_name} ↔ {target_entity}.{foreign}: " f"Foreign zeigt auf '{foreign_foreign}' statt '{link_name}'" ) if relation_name and foreign_relation_name and relation_name != foreign_relation_name: relationship_errors.append( f"{entity_name}.{link_name} ↔ {target_entity}.{foreign}: " f"relationName unterschiedlich" ) if relationship_errors: self.output.error(f"{len(relationship_errors)} Relationship-Fehler", "\n".join(relationship_errors[:5])) self.errors.extend(relationship_errors) return False else: self.output.success(f"{len(checked_pairs)} Relationships geprüft") return True def validate_required_files(self) -> bool: """Prüfe ob für jede Entity alle erforderlichen Dateien existieren.""" self.output.header("3. ERFORDERLICHE DATEIEN") if not self.entity_defs: self.output.warning("Keine entityDefs geladen") return True missing_files = [] custom_entities = [name for name in self.entity_defs.keys() if name.startswith('C')] self.output.info(f"Prüfe {len(custom_entities)} Custom-Entities...") for entity_name in custom_entities: # 1. Scope-Datei scope_file = self.metadata_path / "scopes" / f"{entity_name}.json" if not scope_file.exists(): missing_files.append(f"{entity_name}: scopes/{entity_name}.json fehlt") # 2. i18n-Dateien for lang in ['de_DE', 'en_US']: i18n_file = self.i18n_path / lang / f"{entity_name}.json" if not i18n_file.exists(): missing_files.append(f"{entity_name}: i18n/{lang}/{entity_name}.json fehlt") if missing_files: self.output.error(f"{len(missing_files)} fehlende Dateien", "\n".join(missing_files[:10])) self.warnings.extend(missing_files) return True # Nur Warnung, kein Hard-Fail else: self.output.success(f"Alle erforderlichen Dateien vorhanden") return True def check_file_permissions(self) -> bool: """Prüfe Dateirechte im custom-Verzeichnis.""" self.output.header("4. DATEIRECHTE-PRÜFUNG") paths_to_check = [ self.custom_path, self.client_custom_path, self.base_path / "data", ] all_wrong_files = [] try: for path in paths_to_check: if not path.exists(): continue result = subprocess.run( ['find', str(path), '!', '-user', 'www-data', '-o', '!', '-group', 'www-data'], capture_output=True, text=True, timeout=10 ) wrong_files = [line for line in result.stdout.strip().split('\n') if line] all_wrong_files.extend(wrong_files) if all_wrong_files: self.output.warning(f"{len(all_wrong_files)} Dateien mit falschen Rechten") self.output.info("Versuche automatische Korrektur...") success_count = 0 for path in paths_to_check: if not path.exists(): continue try: subprocess.run(['sudo', 'chown', '-R', 'www-data:www-data', str(path)], check=True, capture_output=True, timeout=30) subprocess.run(['sudo', 'find', str(path), '-type', 'f', '-exec', 'chmod', '664', '{}', ';'], check=True, capture_output=True, timeout=30) subprocess.run(['sudo', 'find', str(path), '-type', 'd', '-exec', 'chmod', '775', '{}', ';'], check=True, capture_output=True, timeout=30) success_count += 1 except: pass if success_count > 0: self.output.success(f"Dateirechte für {success_count} Verzeichnisse korrigiert") else: self.output.success("Alle Dateirechte korrekt (www-data:www-data)") return True except Exception as e: self.output.warning(f"Konnte Dateirechte nicht prüfen: {e}") return True def validate_php_files(self) -> bool: """Validiere PHP-Dateien mit php -l.""" self.output.header("5. PHP-SYNTAX VALIDIERUNG") php_files = [] custom_espo_path = self.base_path / "custom" / "Espo" if custom_espo_path.exists(): php_files = list(custom_espo_path.rglob("*.php")) if not php_files: self.output.info("Keine PHP-Dateien gefunden") return True self.output.info(f"Prüfe {len(php_files)} PHP-Dateien...") try: subprocess.run(['php', '--version'], capture_output=True, timeout=5) except: self.output.warning("PHP-CLI nicht verfügbar, überspringe PHP-Validierung") return True invalid_files = [] for php_file in php_files: try: result = subprocess.run(['php', '-l', str(php_file)], capture_output=True, text=True, timeout=5) if result.returncode != 0: error_msg = f"{php_file.relative_to(self.base_path)}" invalid_files.append(error_msg) self.errors.append(error_msg) except: pass if invalid_files: self.output.error(f"{len(invalid_files)} PHP-Dateien mit Syntax-Fehlern", "\n".join(invalid_files[:5])) return False else: self.output.success(f"Alle {len(php_files)} PHP-Dateien valide") return True def run_rebuild(self) -> bool: """Führe den EspoCRM Rebuild aus.""" self.output.header("6. ESPOCRM REBUILD") is_docker_volume = '/docker/volumes/' in str(self.base_path) if is_docker_volume: try: result = subprocess.run(['docker', 'ps', '--format', '{{.Names}}'], capture_output=True, text=True, timeout=5) containers = result.stdout.strip().split('\n') espo_container = None for container in containers: if container.lower() == 'espocrm': espo_container = container break if not espo_container: for container in containers: if 'espo' in container.lower() and 'db' not in container.lower(): espo_container = container break if espo_container: self.output.info(f"Container: {espo_container}") # Cache löschen self.output.info("Lösche Cache...") subprocess.run(['docker', 'exec', espo_container, 'php', 'command.php', 'clear-cache'], capture_output=True, timeout=30) # Rebuild self.output.info("Starte Rebuild...") rebuild_start = time.time() result = subprocess.run( ['docker', 'exec', espo_container, 'php', 'command.php', 'rebuild'], capture_output=True, text=True, timeout=60 ) # Prüfe Logs nach Rebuild has_errors, error_msgs = self.log_checker.check_for_errors(rebuild_start) if result.returncode == 0 and not has_errors: self.output.success("Rebuild erfolgreich abgeschlossen") return True else: if result.returncode != 0: self.output.error("Rebuild fehlgeschlagen", result.stderr) else: self.output.error("Rebuild abgeschlossen, aber Fehler in Logs") for err in error_msgs[:5]: self.output.error("", err) return False else: self.output.error("Kein EspoCRM Docker-Container gefunden") return False except Exception as e: self.output.error(f"Docker-Rebuild fehlgeschlagen: {e}") return False # Lokaler Rebuild (Fallback) self.output.error("Lokaler Rebuild nicht implementiert, verwende Docker") return False def run_entity_tests(self, base_url: str, username: str, password: str) -> bool: """Führe umfassende Entity-Tests durch.""" self.output.header("7. ENTITY CRUD-TESTS") # Finde alle Custom-Entities custom_entities = sorted([name for name in self.entity_defs.keys() if name.startswith('C') and not name.endswith('CDokumente')]) # Überspringe Junction-Tables if not custom_entities: self.output.warning("Keine Custom-Entities zum Testen gefunden") return True self.output.info(f"Teste {len(custom_entities)} Custom-Entities...") tester = EntityTester(base_url, username, password, self.output, self.log_checker) all_success = True success_count = 0 fail_count = 0 for entity_name in custom_entities: if tester.test_entity(entity_name): success_count += 1 else: fail_count += 1 all_success = False # Cleanup tester.cleanup() # Zusammenfassung if not self.output.verbose: print(f"\n{Colors.CYAN}Ergebnis:{Colors.END} {success_count} erfolgreich, {fail_count} fehlgeschlagen") if all_success: self.output.success(f"Alle {success_count} Entities getestet") else: self.output.error(f"{fail_count} Entity-Tests fehlgeschlagen") return all_success def generate_ki_feedback(self) -> Dict: """Generiere strukturiertes Feedback für KI.""" feedback = { "status": "success" if not self.errors else "failed", "summary": { "errors": len(self.errors), "warnings": len(self.warnings), "entities_checked": len(self.entity_defs), }, "errors": self.errors[:10], # Top 10 Fehler "warnings": self.warnings[:10], # Top 10 Warnungen "recommendations": [] } # Generiere Empfehlungen basierend auf Fehlern if any('Service' in e or 'Class' in e for e in self.errors): feedback["recommendations"].append( "Fehlende Service-Klassen - erstelle für jede Entity eine Service-Klasse" ) if any('i18n' in w for w in self.warnings): feedback["recommendations"].append( "Unvollständige Übersetzungen - füge fehlende i18n-Dateien hinzu" ) if any('relationship' in e.lower() for e in self.errors): feedback["recommendations"].append( "Relationship-Fehler - prüfe bidirektionale Link-Definitionen" ) if any('InjectableFactory' in e for e in self.errors): feedback["recommendations"].append( "KRITISCH: Service-Klassen fehlen! Erstelle Services in custom/Espo/Custom/Services/" ) return feedback def print_summary(self): """Drucke Zusammenfassung.""" self.output.header("ZUSAMMENFASSUNG") if self.errors: print(f"\n{Colors.RED}{Colors.BOLD}FEHLER: {len(self.errors)}{Colors.END}") for err in self.errors[:10]: print(f" {Colors.RED}✗{Colors.END} {err}") if len(self.errors) > 10: print(f" {Colors.RED}...{Colors.END} und {len(self.errors) - 10} weitere") if self.warnings: print(f"\n{Colors.YELLOW}{Colors.BOLD}WARNUNGEN: {len(self.warnings)}{Colors.END}") for warn in self.warnings[:5]: print(f" {Colors.YELLOW}⚠{Colors.END} {warn}") if len(self.warnings) > 5: print(f" {Colors.YELLOW}...{Colors.END} und {len(self.warnings) - 5} weitere") if not self.errors and not self.warnings: print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ALLE PRÜFUNGEN BESTANDEN{Colors.END}\n") # KI-Feedback ausgeben if self.errors or self.warnings: feedback = self.generate_ki_feedback() print(f"\n{Colors.CYAN}{Colors.BOLD}KI-FEEDBACK:{Colors.END}") print(json.dumps(feedback, indent=2, ensure_ascii=False)) def validate_all(self) -> bool: """Führe alle Validierungen durch.""" all_valid = True # 1. JSON-Syntax (kritisch) if not self.validate_json_syntax(): all_valid = False self.output.error("Abbruch: JSON-Syntax-Fehler!") return False # Lade entityDefs self.load_entity_defs() # 2. Relationships (kritisch) if not self.validate_relationships(): all_valid = False # 3. Erforderliche Dateien (Warnung) self.validate_required_files() # 4. Dateirechte (nicht kritisch) self.check_file_permissions() # 5. PHP-Syntax (kritisch) if not self.validate_php_files(): all_valid = False return all_valid def main(): import argparse parser = argparse.ArgumentParser( description='EspoCRM Custom Entity Validator & Rebuild Tool - Erweiterte Version' ) parser.add_argument('-v', '--verbose', action='store_true', help='Ausführliche Ausgabe') parser.add_argument('--dry-run', action='store_true', help='Nur Validierungen, kein Rebuild') parser.add_argument('--skip-tests', action='store_true', help='Überspringe Entity-Tests') parser.add_argument('--base-url', default='http://localhost', help='EspoCRM Base-URL (default: http://localhost)') parser.add_argument('--username', default='admin', help='API-Username (default: admin)') parser.add_argument('--password', default='admin', help='API-Password (default: admin)') args = parser.parse_args() # Finde EspoCRM Root script_dir = Path(__file__).parent.parent.parent if not (script_dir / "rebuild.php").exists(): print(f"{Colors.RED}✗{Colors.END} Fehler: Nicht im EspoCRM-Root-Verzeichnis!") sys.exit(1) print(f"{Colors.BOLD}EspoCRM Validator & Rebuild Tool v2.0{Colors.END}") if args.verbose: print(f"{Colors.DIM}Verbose-Modus aktiviert{Colors.END}") print() validator = EntityValidator(str(script_dir), verbose=args.verbose) # Validierungen all_valid = validator.validate_all() if not all_valid: validator.print_summary() print(f"\n{Colors.RED}{Colors.BOLD}✗ REBUILD ABGEBROCHEN{Colors.END}\n") sys.exit(1) if args.dry_run: validator.print_summary() print(f"\n{Colors.GREEN}{Colors.BOLD}✓ VALIDIERUNGEN ABGESCHLOSSEN{Colors.END}\n") sys.exit(0) # Rebuild if not validator.run_rebuild(): validator.print_summary() print(f"\n{Colors.RED}{Colors.BOLD}✗ REBUILD FEHLGESCHLAGEN{Colors.END}\n") sys.exit(1) # Entity-Tests if not args.skip_tests: time.sleep(2) # Kurze Pause nach Rebuild if not validator.run_entity_tests(args.base_url, args.username, args.password): validator.print_summary() print(f"\n{Colors.YELLOW}{Colors.BOLD}⚠ TESTS FEHLGESCHLAGEN{Colors.END}\n") sys.exit(1) validator.print_summary() print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ERFOLGREICH ABGESCHLOSSEN{Colors.END}\n") sys.exit(0) if __name__ == "__main__": main()