From 9b18a63acfed9663b0d5fa0fec0c78cbcdd29e8e Mon Sep 17 00:00:00 2001 From: bsiggel Date: Tue, 10 Mar 2026 12:35:01 +0100 Subject: [PATCH] Update configuration timestamps, add custom services, and implement validation and rebuild script for EspoCRM --- custom/Espo/Custom/Services/CAICollection.php | 12 + .../Espo/Custom/Services/CAdvowareAkten.php | 12 + custom/scripts/validate_and_rebuild.py | 1444 +++++++---------- custom/scripts/validate_and_rebuild.py.backup | 1142 +++++++++++++ data/config.php | 2 +- data/state.php | 4 +- 6 files changed, 1781 insertions(+), 835 deletions(-) create mode 100644 custom/Espo/Custom/Services/CAICollection.php create mode 100644 custom/Espo/Custom/Services/CAdvowareAkten.php mode change 100644 => 100755 custom/scripts/validate_and_rebuild.py create mode 100644 custom/scripts/validate_and_rebuild.py.backup diff --git a/custom/Espo/Custom/Services/CAICollection.php b/custom/Espo/Custom/Services/CAICollection.php new file mode 100644 index 00000000..8737adef --- /dev/null +++ b/custom/Espo/Custom/Services/CAICollection.php @@ -0,0 +1,12 @@ + Optional[Path]: + """Findet das aktuelle Log-File""" + if not self.log_path.exists(): + return None + + today = datetime.now().strftime("%Y-%m-%d") + log_file = self.log_path / f"espo-{today}.log" + + if log_file.exists(): + return log_file + + # Fallback: neuestes Log + log_files = sorted(self.log_path.glob("espo-*.log"), + key=lambda f: f.stat().st_mtime, reverse=True) + return log_files[0] if log_files else None + + def check_for_errors(self, since_timestamp: float = None) -> Tuple[bool, List[str]]: + """ + Prüft Logs auf Fehler seit einem bestimmten Zeitpunkt + Returns: (has_errors, error_messages) + """ + log_file = self.get_log_file() + if not log_file: + return False, ["Log-Datei nicht gefunden"] + + try: + errors = [] + cutoff_time = since_timestamp or time.time() - 5 # 5 Sekunden Puffer + + with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: + # Lese nur die letzten N Zeilen (effizienter) + f.seek(0, 2) # Gehe ans Ende + file_size = f.tell() + f.seek(max(0, file_size - 100000)) # Letzte ~100KB + lines = f.readlines() + + for line in lines: + # Extrahiere Timestamp aus Log-Zeile + match = re.match(r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]', line) + if match: + try: + log_time = datetime.strptime(match.group(1), '%Y-%m-%d %H:%M:%S').timestamp() + if log_time < cutoff_time: + continue + except: + pass + + # Prüfe auf Fehler-Keywords + line_upper = line.upper() + if any(keyword in line_upper for keyword in ['ERROR', 'CRITICAL', 'FATAL', 'EXCEPTION']): + # Filtere bekannte unwichtige Fehler + if 'SQL failed: INSERT INTO `app_log_record`' in line: + continue + if 'BPM:' in line and 'ERROR' not in line_upper: + continue + + errors.append(line.strip()) + + return len(errors) > 0, errors + + except Exception as e: + return True, [f"Fehler beim Lesen der Logs: {e}"] -def print_warning(text: str): - print(f"{Colors.YELLOW}⚠{Colors.END} {text}") - -def print_error(text: str): - print(f"{Colors.RED}✗{Colors.END} {text}") - -def print_info(text: str): - print(f"{Colors.BLUE}ℹ{Colors.END} {text}") +class EntityTester: + """Führt CRUD-Tests für Entities durch""" + def __init__(self, base_url: str, username: str, password: str, + output: OutputManager, log_checker: LogChecker): + self.base_url = base_url.rstrip('/') + self.auth = HTTPBasicAuth(username, password) + self.output = output + self.log_checker = log_checker + self.created_records = [] # Für Cleanup + + def test_entity(self, entity_name: str, test_data: dict = None) -> bool: + """ + Führt vollständigen CRUD-Test für eine Entity durch + Returns: True wenn alle Tests erfolgreich + """ + all_success = True + record_id = None + + # Verwende Default-Testdaten wenn keine angegeben + if test_data is None: + test_data = {'name': f'Test {entity_name} {int(time.time())}'} + + # 1. CREATE Test + self.output.test_start(f"{entity_name}: Create") + test_start = time.time() + + try: + response = requests.post( + f"{self.base_url}/api/v1/{entity_name}", + json=test_data, + auth=self.auth, + timeout=10 + ) + + # Prüfe Log nach Request + has_errors, error_msgs = self.log_checker.check_for_errors(test_start) + + if response.status_code == 200: + data = response.json() + record_id = data.get('id') + if record_id: + self.created_records.append((entity_name, record_id)) + if not has_errors: + self.output.test_end(True, f"ID: {record_id}") + else: + self.output.test_end(False, f"Logs enthalten Fehler!") + self.output.error("Log-Fehler bei CREATE:", "\n".join(error_msgs[:3])) + all_success = False + else: + self.output.test_end(False, "Keine ID in Response") + all_success = False + else: + self.output.test_end(False, f"HTTP {response.status_code}") + self.output.error(f"Response: {response.text[:200]}") + if has_errors: + self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) + all_success = False + return all_success + + except Exception as e: + self.output.test_end(False, str(e)) + all_success = False + return all_success + + time.sleep(0.1) # Kleine Pause zwischen Tests + + # 2. READ Test + if record_id: + self.output.test_start(f"{entity_name}: Read") + test_start = time.time() + + try: + response = requests.get( + f"{self.base_url}/api/v1/{entity_name}/{record_id}", + auth=self.auth, + timeout=10 + ) + + has_errors, error_msgs = self.log_checker.check_for_errors(test_start) + + if response.status_code == 200: + data = response.json() + if data.get('id') == record_id and not has_errors: + self.output.test_end(True, f"Name: {data.get('name', 'N/A')}") + else: + if has_errors: + self.output.test_end(False, "Log-Fehler") + self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) + else: + self.output.test_end(False, "ID stimmt nicht überein") + all_success = False + else: + self.output.test_end(False, f"HTTP {response.status_code}") + if has_errors: + self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) + all_success = False + + except Exception as e: + self.output.test_end(False, str(e)) + all_success = False + + time.sleep(0.1) + + # 3. UPDATE Test + if record_id: + self.output.test_start(f"{entity_name}: Update") + test_start = time.time() + + try: + update_data = {'description': f'Updated at {datetime.now().isoformat()}'} + response = requests.put( + f"{self.base_url}/api/v1/{entity_name}/{record_id}", + json=update_data, + auth=self.auth, + timeout=10 + ) + + has_errors, error_msgs = self.log_checker.check_for_errors(test_start) + + if response.status_code == 200 and not has_errors: + self.output.test_end(True) + else: + if response.status_code != 200: + self.output.test_end(False, f"HTTP {response.status_code}") + else: + self.output.test_end(False, "Log-Fehler") + self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) + all_success = False + + except Exception as e: + self.output.test_end(False, str(e)) + all_success = False + + time.sleep(0.1) + + # 4. LIST Test + self.output.test_start(f"{entity_name}: List") + test_start = time.time() + + try: + response = requests.get( + f"{self.base_url}/api/v1/{entity_name}", + auth=self.auth, + timeout=10 + ) + + has_errors, error_msgs = self.log_checker.check_for_errors(test_start) + + if response.status_code == 200: + data = response.json() + total = data.get('total', 0) + if not has_errors: + self.output.test_end(True, f"{total} Einträge") + else: + self.output.test_end(False, "Log-Fehler") + self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) + all_success = False + else: + self.output.test_end(False, f"HTTP {response.status_code}") + if has_errors: + self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) + all_success = False + + except Exception as e: + self.output.test_end(False, str(e)) + all_success = False + + time.sleep(0.1) + + # 5. DELETE Test + if record_id: + self.output.test_start(f"{entity_name}: Delete") + test_start = time.time() + + try: + response = requests.delete( + f"{self.base_url}/api/v1/{entity_name}/{record_id}", + auth=self.auth, + timeout=10 + ) + + has_errors, error_msgs = self.log_checker.check_for_errors(test_start) + + if response.status_code == 200 and not has_errors: + self.output.test_end(True) + # Entferne aus cleanup-Liste + self.created_records = [(e, i) for e, i in self.created_records + if not (e == entity_name and i == record_id)] + else: + if response.status_code != 200: + self.output.test_end(False, f"HTTP {response.status_code}") + else: + self.output.test_end(False, "Log-Fehler") + self.output.error("Log-Fehler:", "\n".join(error_msgs[:3])) + all_success = False + + except Exception as e: + self.output.test_end(False, str(e)) + all_success = False + + return all_success + + def cleanup(self): + """Lösche alle erstellten Test-Records""" + if not self.created_records: + return + + self.output.info(f"Cleanup: Lösche {len(self.created_records)} Test-Records...") + + for entity_name, record_id in self.created_records: + try: + requests.delete( + f"{self.base_url}/api/v1/{entity_name}/{record_id}", + auth=self.auth, + timeout=5 + ) + except: + pass # Ignoriere Fehler beim Cleanup class EntityValidator: - def __init__(self, base_path: str): + def __init__(self, base_path: str, verbose: bool = False): self.base_path = Path(base_path) self.custom_path = self.base_path / "custom" / "Espo" / "Custom" / "Resources" self.metadata_path = self.custom_path / "metadata" self.i18n_path = self.custom_path / "i18n" self.client_custom_path = self.base_path / "client" / "custom" + self.output = OutputManager(verbose) self.errors = [] self.warnings = [] self.entity_defs = {} - self.relationships = defaultdict(list) self.skip_e2e_tests = False + self.log_checker = LogChecker(self.base_path / "data" / "logs", self.output) def validate_json_syntax(self) -> bool: """Validiere JSON-Syntax aller Dateien im custom-Verzeichnis.""" - print_header("1. JSON-SYNTAX VALIDIERUNG") + self.output.header("1. JSON-SYNTAX VALIDIERUNG") json_files = list(self.custom_path.rglob("*.json")) if not json_files: - print_warning("Keine JSON-Dateien gefunden") + self.output.warning("Keine JSON-Dateien gefunden") return True + self.output.info(f"Prüfe {len(json_files)} JSON-Dateien...") + invalid_files = [] for json_file in json_files: try: with open(json_file, 'r', encoding='utf-8') as f: json.load(f) except json.JSONDecodeError as e: - self.errors.append(f"JSON-Fehler in {json_file.relative_to(self.base_path)}: {e}") - invalid_files.append(str(json_file.relative_to(self.base_path))) + error_msg = f"{json_file.relative_to(self.base_path)}: {e}" + self.errors.append(error_msg) + invalid_files.append(error_msg) if invalid_files: - print_error(f"{len(invalid_files)} Datei(en) mit JSON-Fehlern gefunden:") - for f in invalid_files: - print(f" {Colors.RED}•{Colors.END} {f}") + self.output.error(f"{len(invalid_files)} Datei(en) mit JSON-Fehlern", + "\n".join(invalid_files[:5])) return False else: - print_success(f"Alle {len(json_files)} JSON-Dateien sind syntaktisch korrekt") + self.output.success(f"Alle {len(json_files)} JSON-Dateien valide") return True def load_entity_defs(self): @@ -90,22 +440,19 @@ class EntityValidator: try: with open(json_file, 'r', encoding='utf-8') as f: self.entity_defs[entity_name] = json.load(f) - except Exception as e: - # Fehler wird bereits in JSON-Validierung gemeldet + except Exception: pass def validate_relationships(self) -> bool: """Validiere Relationship-Definitionen zwischen Entities.""" - print_header("2. RELATIONSHIP-KONSISTENZ") + self.output.header("2. RELATIONSHIP-KONSISTENZ") if not self.entity_defs: - print_warning("Keine entityDefs geladen") + self.output.warning("Keine entityDefs geladen") return True relationship_errors = [] checked_pairs = set() - - # Links die nicht geprüft werden (Standard-EspoCRM parent-Relationships) skip_foreign_links = {'parent', 'parents'} for entity_name, entity_def in self.entity_defs.items(): @@ -117,18 +464,16 @@ class EntityValidator: foreign = link_def.get('foreign') relation_name = link_def.get('relationName') - # Überspringe parent-Links (Standard Activities-Relationship) if foreign in skip_foreign_links: continue - # Nur hasMany und hasOne prüfen (nicht belongsTo, da das die Gegenseite ist) if link_type in ['hasMany', 'hasOne'] and target_entity and foreign: - pair_key = tuple(sorted([f"{entity_name}.{link_name}", f"{target_entity}.{foreign}"])) + pair_key = tuple(sorted([f"{entity_name}.{link_name}", + f"{target_entity}.{foreign}"])) if pair_key in checked_pairs: continue checked_pairs.add(pair_key) - # Prüfe ob Ziel-Entity existiert if target_entity not in self.entity_defs: relationship_errors.append( f"{entity_name}.{link_name}: Ziel-Entity '{target_entity}' existiert nicht" @@ -137,11 +482,10 @@ class EntityValidator: target_links = self.entity_defs[target_entity].get('links', {}) - # Prüfe ob foreign Link existiert if foreign not in target_links: relationship_errors.append( f"{entity_name}.{link_name} → {target_entity}: " - f"Foreign link '{foreign}' fehlt in {target_entity}" + f"Foreign link '{foreign}' fehlt" ) continue @@ -149,261 +493,75 @@ class EntityValidator: foreign_foreign = foreign_def.get('foreign') foreign_relation_name = foreign_def.get('relationName') - # Prüfe ob foreign.foreign zurück zeigt if foreign_foreign != link_name: relationship_errors.append( f"{entity_name}.{link_name} ↔ {target_entity}.{foreign}: " - f"Foreign zeigt auf '{foreign_foreign}' statt auf '{link_name}'" + f"Foreign zeigt auf '{foreign_foreign}' statt '{link_name}'" ) - # Prüfe ob relationName übereinstimmt (falls beide definiert) if relation_name and foreign_relation_name and relation_name != foreign_relation_name: relationship_errors.append( f"{entity_name}.{link_name} ↔ {target_entity}.{foreign}: " - f"relationName unterschiedlich ('{relation_name}' vs '{foreign_relation_name}')" + f"relationName unterschiedlich" ) if relationship_errors: - print_error(f"{len(relationship_errors)} Relationship-Fehler gefunden:") - for err in relationship_errors: - print(f" {Colors.RED}•{Colors.END} {err}") + self.output.error(f"{len(relationship_errors)} Relationship-Fehler", + "\n".join(relationship_errors[:5])) self.errors.extend(relationship_errors) return False else: - print_success(f"{len(checked_pairs)} Relationships geprüft - alle konsistent") + self.output.success(f"{len(checked_pairs)} Relationships geprüft") return True - def validate_formula_placement(self) -> bool: - """Prüfe ob Formula-Scripts korrekt in /formula/ statt /entityDefs/ platziert sind.""" - print_header("3. FORMULA-SCRIPT PLATZIERUNG") - - misplaced_formulas = [] - - # Prüfe entityDefs auf formula-Definitionen (sollte nicht da sein) - for entity_name, entity_def in self.entity_defs.items(): - if 'formula' in entity_def: - misplaced_formulas.append( - f"entityDefs/{entity_name}.json enthält 'formula' - " - f"sollte in formula/{entity_name}.json sein" - ) - - # Prüfe ob formula-Dateien existieren und valide sind - formula_path = self.metadata_path / "formula" - formula_count = 0 - if formula_path.exists(): - for formula_file in formula_path.glob("*.json"): - formula_count += 1 - try: - with open(formula_file, 'r', encoding='utf-8') as f: - formula_def = json.load(f) - # Prüfe auf leere oder null Scripts - for key, value in formula_def.items(): - if value == "" or value is None: - self.warnings.append( - f"formula/{formula_file.name}: '{key}' ist leer oder null" - ) - except Exception: - pass # JSON-Fehler bereits gemeldet - - if misplaced_formulas: - print_error(f"{len(misplaced_formulas)} Formula-Platzierungsfehler:") - for err in misplaced_formulas: - print(f" {Colors.RED}•{Colors.END} {err}") - self.errors.extend(misplaced_formulas) - return False - else: - print_success(f"{formula_count} Formula-Definitionen korrekt platziert") - return True - - def validate_i18n_completeness(self) -> bool: - """Prüfe i18n-Definitionen auf Vollständigkeit.""" - print_header("4. i18n-VOLLSTÄNDIGKEIT") + def validate_required_files(self) -> bool: + """Prüfe ob für jede Entity alle erforderlichen Dateien existieren.""" + self.output.header("3. ERFORDERLICHE DATEIEN") if not self.entity_defs: - print_warning("Keine entityDefs zum Prüfen") + self.output.warning("Keine entityDefs geladen") return True - missing_i18n = [] - incomplete_i18n = [] - - languages = ['de_DE', 'en_US'] + missing_files = [] custom_entities = [name for name in self.entity_defs.keys() - if name.startswith('C') or name.startswith('CVmh')] + if name.startswith('C')] + + self.output.info(f"Prüfe {len(custom_entities)} Custom-Entities...") for entity_name in custom_entities: - entity_def = self.entity_defs[entity_name] - links = entity_def.get('links', {}) + # 1. Scope-Datei + scope_file = self.metadata_path / "scopes" / f"{entity_name}.json" + if not scope_file.exists(): + missing_files.append(f"{entity_name}: scopes/{entity_name}.json fehlt") - # Finde alle hasMany/hasOne Links die übersetzt werden sollten - custom_links = [] - for link_name, link_def in links.items(): - link_type = link_def.get('type') - if link_type in ['hasMany', 'hasOne']: - # Überspringe System-Links - if link_name not in ['createdBy', 'modifiedBy', 'assignedUser', 'teams']: - custom_links.append(link_name) - - if not custom_links: - continue - - for lang in languages: + # 2. i18n-Dateien + for lang in ['de_DE', 'en_US']: i18n_file = self.i18n_path / lang / f"{entity_name}.json" - if not i18n_file.exists(): - missing_i18n.append(f"{entity_name}: {lang} fehlt komplett") - continue - - try: - with open(i18n_file, 'r', encoding='utf-8') as f: - i18n_def = json.load(f) - links_i18n = i18n_def.get('links', {}) - - # Prüfe ob alle custom Links übersetzt sind - for link_name in custom_links: - if link_name not in links_i18n: - incomplete_i18n.append( - f"{entity_name} ({lang}): Link '{link_name}' fehlt in i18n" - ) - except Exception: - pass # JSON-Fehler bereits gemeldet + missing_files.append(f"{entity_name}: i18n/{lang}/{entity_name}.json fehlt") - total_issues = len(missing_i18n) + len(incomplete_i18n) - - if missing_i18n: - print_error(f"{len(missing_i18n)} komplett fehlende i18n-Dateien:") - for err in missing_i18n[:10]: # Max 10 anzeigen - print(f" {Colors.RED}•{Colors.END} {err}") - if len(missing_i18n) > 10: - print(f" {Colors.RED}...{Colors.END} und {len(missing_i18n) - 10} weitere") - - if incomplete_i18n: - print_warning(f"{len(incomplete_i18n)} unvollständige i18n-Definitionen:") - for err in incomplete_i18n[:10]: # Max 10 anzeigen - print(f" {Colors.YELLOW}•{Colors.END} {err}") - if len(incomplete_i18n) > 10: - print(f" {Colors.YELLOW}...{Colors.END} und {len(incomplete_i18n) - 10} weitere") - - if not missing_i18n and not incomplete_i18n: - print_success(f"i18n für {len(custom_entities)} Custom-Entities vollständig") - - # i18n-Fehler sind nur Warnungen, kein Abbruch - self.warnings.extend(missing_i18n + incomplete_i18n) - return True - - def validate_layout_structure(self) -> bool: - """Prüfe Layout-Dateien auf häufige Fehler.""" - print_header("5. LAYOUT-STRUKTUR VALIDIERUNG") - - layouts_base = self.custom_path / "layouts" - layout_errors = [] - layout_warnings = [] - checked_layouts = 0 - - # 1. Prüfe bottomPanelsDetail.json Dateien (KRITISCH: Muss Objekt sein, kein Array!) - if layouts_base.exists(): - for bottom_panel_file in layouts_base.rglob("bottomPanelsDetail.json"): - try: - with open(bottom_panel_file, 'r', encoding='utf-8') as f: - content = json.load(f) - - checked_layouts += 1 - entity_name = bottom_panel_file.parent.name - - # KRITISCHER CHECK: bottomPanelsDetail.json MUSS Objekt sein (nicht Array)! - if isinstance(content, list): - layout_errors.append( - f"{entity_name}/bottomPanelsDetail.json: FEHLER - Ist Array statt Objekt! " - f"EspoCRM 7.x erfordert Objekt-Format mit Keys wie 'contacts', '_tabBreak_0', etc." - ) - elif isinstance(content, dict): - # Prüfe auf false-Werte in falschen Kontexten - for key, value in content.items(): - if isinstance(value, dict): - for subkey, subvalue in value.items(): - if subvalue is False and subkey not in ['disabled', 'sticked']: - layout_warnings.append( - f"{entity_name}/bottomPanelsDetail.json: {key}.{subkey} " - f"sollte nicht 'false' sein" - ) - except Exception: - pass # JSON-Fehler bereits in validate_json_syntax gemeldet - - # 2. Prüfe detail.json Dateien auf deprecated false-Platzhalter - if layouts_base.exists(): - for detail_file in layouts_base.rglob("detail.json"): - try: - with open(detail_file, 'r', encoding='utf-8') as f: - content = json.load(f) - - entity_name = detail_file.parent.name - - # Prüfe ob content ein Array von Panels ist - if isinstance(content, list): - for panel_idx, panel in enumerate(content): - if not isinstance(panel, dict): - continue - - rows = panel.get('rows', []) - for row_idx, row in enumerate(rows): - if not isinstance(row, list): - continue - - for cell_idx, cell in enumerate(row): - # KRITISCH: false als Platzhalter ist deprecated in EspoCRM 7.x! - if cell is False: - layout_errors.append( - f"{entity_name}/detail.json: Panel {panel_idx}, Row {row_idx}, " - f"Cell {cell_idx} verwendet 'false' als Platzhalter. " - f"In EspoCRM 7.x muss '{{}}' (leeres Objekt) verwendet werden!" - ) - except Exception: - pass # JSON-Fehler bereits gemeldet - - # Ergebnisse ausgeben - if layout_errors: - print_error(f"{len(layout_errors)} KRITISCHE Layout-Fehler gefunden:") - for err in layout_errors: - print(f" {Colors.RED}✗{Colors.END} {err}") - self.errors.extend(layout_errors) - return False - - if layout_warnings: - print_warning(f"{len(layout_warnings)} Layout-Warnungen:") - for warn in layout_warnings[:5]: - print(f" {Colors.YELLOW}⚠{Colors.END} {warn}") - if len(layout_warnings) > 5: - print(f" {Colors.YELLOW}...{Colors.END} und {len(layout_warnings) - 5} weitere") - self.warnings.extend(layout_warnings) - - if not layout_errors and not layout_warnings: - print_success(f"{checked_layouts} Layout-Dateien geprüft, keine Fehler") - elif not layout_errors: - print_success(f"{checked_layouts} Layout-Dateien geprüft, keine kritischen Fehler") - - return True + if missing_files: + self.output.error(f"{len(missing_files)} fehlende Dateien", + "\n".join(missing_files[:10])) + self.warnings.extend(missing_files) + return True # Nur Warnung, kein Hard-Fail + else: + self.output.success(f"Alle erforderlichen Dateien vorhanden") + return True def check_file_permissions(self) -> bool: - """Prüfe Dateirechte im custom-Verzeichnis und kritischen System-Dateien.""" - print_header("6. DATEIRECHTE-PRÜFUNG") + """Prüfe Dateirechte im custom-Verzeichnis.""" + self.output.header("4. DATEIRECHTE-PRÜFUNG") - # Verzeichnisse/Dateien die geprüft werden sollen paths_to_check = [ - self.custom_path, # custom/Espo/Custom/Resources - self.client_custom_path, # client/custom - self.base_path / "data", # Gesamtes data/ Verzeichnis - ] - - # Kritische einzelne Dateien die UNBEDINGT www-data gehören müssen - critical_files = [ - self.base_path / "data" / "config.php", - self.base_path / "data" / "config-internal.php", + self.custom_path, + self.client_custom_path, + self.base_path / "data", ] all_wrong_files = [] - critical_wrong_files = [] try: - # Prüfe jedes Verzeichnis for path in paths_to_check: if not path.exists(): continue @@ -411,270 +569,48 @@ class EntityValidator: result = subprocess.run( ['find', str(path), '!', '-user', 'www-data', '-o', '!', '-group', 'www-data'], capture_output=True, - text=True + text=True, + timeout=10 ) wrong_files = [line for line in result.stdout.strip().split('\n') if line] all_wrong_files.extend(wrong_files) - # Prüfe kritische Dateien einzeln - for critical_file in critical_files: - if not critical_file.exists(): - continue - - stat_result = critical_file.stat() - import pwd - import grp - - try: - owner = pwd.getpwuid(stat_result.st_uid).pw_name - group = grp.getgrgid(stat_result.st_gid).gr_name - - if owner != 'www-data' or group != 'www-data': - critical_wrong_files.append(str(critical_file)) - all_wrong_files.append(str(critical_file)) - print_error(f"KRITISCH: {critical_file.relative_to(self.base_path)} gehört {owner}:{group} statt www-data:www-data") - except (KeyError, OSError): - pass - if all_wrong_files: - print_warning(f"{len(all_wrong_files)} Dateien/Verzeichnisse mit falschen Rechten gefunden") + self.output.warning(f"{len(all_wrong_files)} Dateien mit falschen Rechten") + self.output.info("Versuche automatische Korrektur...") - if critical_wrong_files: - print_error(f"{len(critical_wrong_files)} davon sind KRITISCHE System-Dateien!") - - print_info("Versuche automatische Korrektur aller Verzeichnisse...") - - # Korrigiere alle Pfade success_count = 0 for path in paths_to_check: if not path.exists(): continue try: - # Setze Owner - subprocess.run( - ['sudo', 'chown', '-R', 'www-data:www-data', str(path)], - check=True, - capture_output=True - ) - # Setze Permissions für Dateien - subprocess.run( - ['sudo', 'find', str(path), '-type', 'f', '-exec', 'chmod', '664', '{}', ';'], - check=True, - capture_output=True - ) - # Setze Permissions für Verzeichnisse - subprocess.run( - ['sudo', 'find', str(path), '-type', 'd', '-exec', 'chmod', '775', '{}', ';'], - check=True, - capture_output=True - ) + subprocess.run(['sudo', 'chown', '-R', 'www-data:www-data', str(path)], + check=True, capture_output=True, timeout=30) + subprocess.run(['sudo', 'find', str(path), '-type', 'f', '-exec', + 'chmod', '664', '{}', ';'], + check=True, capture_output=True, timeout=30) + subprocess.run(['sudo', 'find', str(path), '-type', 'd', '-exec', + 'chmod', '775', '{}', ';'], + check=True, capture_output=True, timeout=30) success_count += 1 - except subprocess.CalledProcessError as e: - print_warning(f"Konnte {path.relative_to(self.base_path)} nicht korrigieren: {e}") + except: + pass if success_count > 0: - print_success(f"Dateirechte für {success_count} Verzeichnis(se) korrigiert") - else: - print_warning("Konnte Dateirechte nicht automatisch korrigieren (sudo erforderlich)") + self.output.success(f"Dateirechte für {success_count} Verzeichnisse korrigiert") else: - print_success("Alle Dateirechte korrekt (www-data:www-data)") + self.output.success("Alle Dateirechte korrekt (www-data:www-data)") return True except Exception as e: - print_warning(f"Konnte Dateirechte nicht prüfen: {e}") - return True - - def validate_css_files(self) -> bool: - """Validiere CSS-Dateien mit csslint.""" - print_header("7. CSS-VALIDIERUNG") - - css_files = [] - if self.client_custom_path.exists(): - css_files = list((self.client_custom_path / "css").rglob("*.css")) if (self.client_custom_path / "css").exists() else [] - - if not css_files: - print_info("Keine CSS-Dateien gefunden") - return True - - # Prüfe ob csslint verfügbar ist - try: - subprocess.run(['csslint', '--version'], capture_output=True, timeout=5) - use_csslint = True - except (FileNotFoundError, subprocess.TimeoutExpired): - use_csslint = False - print_warning("csslint nicht gefunden, verwende Basis-Validierung") - - invalid_files = [] - for css_file in css_files: - if use_csslint: - # Verwende csslint für professionelle Validierung - try: - result = subprocess.run( - ['csslint', '--format=compact', '--quiet', str(css_file)], - capture_output=True, - text=True, - timeout=10 - ) - - if result.returncode != 0 and result.stdout.strip(): - # Parse csslint Ausgabe - errors = [] - for line in result.stdout.strip().split('\n'): - if 'Error' in line or 'Warning' in line: - # Extrahiere nur die Fehlermeldung ohne Pfad - parts = line.split(': ', 2) - if len(parts) >= 3: - errors.append(parts[2]) - else: - errors.append(line) - - if errors: - # Nur echte Fehler, keine Warnungen als kritisch behandeln - if any('Error' in err for err in errors): - self.errors.append(f"CSS-Fehler in {css_file.relative_to(self.base_path)}") - invalid_files.append((str(css_file.relative_to(self.base_path)), errors)) - else: - # Nur Warnungen - for err in errors: - self.warnings.append(f"{css_file.relative_to(self.base_path)}: {err}") - - except subprocess.TimeoutExpired: - self.warnings.append(f"CSS-Validierung timeout für {css_file.relative_to(self.base_path)}") - except Exception as e: - self.warnings.append(f"Konnte {css_file.relative_to(self.base_path)} nicht validieren: {e}") - else: - # Fallback: Basis-Validierung - try: - with open(css_file, 'r', encoding='utf-8') as f: - content = f.read() - - errors = [] - - # Geschlossene Klammern - open_braces = content.count('{') - close_braces = content.count('}') - if open_braces != close_braces: - errors.append(f"Ungleiche Klammern: {open_braces} {{ vs {close_braces} }}") - - if errors: - self.errors.append(f"CSS-Fehler in {css_file.relative_to(self.base_path)}") - invalid_files.append((str(css_file.relative_to(self.base_path)), errors)) - - except Exception as e: - self.errors.append(f"Konnte {css_file.relative_to(self.base_path)} nicht lesen: {e}") - invalid_files.append((str(css_file.relative_to(self.base_path)), [str(e)])) - - if invalid_files: - print_error(f"{len(invalid_files)} CSS-Datei(en) mit Fehlern:") - for filepath, errors in invalid_files: - print(f" {Colors.RED}•{Colors.END} {filepath}") - for err in errors[:5]: # Maximal 5 Fehler pro Datei anzeigen - print(f" {Colors.RED}→{Colors.END} {err}") - if len(errors) > 5: - print(f" {Colors.RED}...{Colors.END} und {len(errors) - 5} weitere Fehler") - return False - else: - print_success(f"Alle {len(css_files)} CSS-Dateien sind syntaktisch korrekt") - return True - - def validate_js_files(self) -> bool: - """Validiere JavaScript-Dateien mit jshint.""" - print_header("8. JAVASCRIPT-VALIDIERUNG") - - js_files = [] - if self.client_custom_path.exists(): - src_path = self.client_custom_path / "src" - js_files = list(src_path.rglob("*.js")) if src_path.exists() else [] - - if not js_files: - print_info("Keine JavaScript-Dateien gefunden") - return True - - # Prüfe ob jshint verfügbar ist - try: - subprocess.run(['jshint', '--version'], capture_output=True, timeout=5) - use_jshint = True - except (FileNotFoundError, subprocess.TimeoutExpired): - use_jshint = False - print_warning("jshint nicht gefunden, verwende Basis-Validierung") - - invalid_files = [] - for js_file in js_files: - if use_jshint: - # Verwende jshint für professionelle Validierung - try: - result = subprocess.run( - ['jshint', '--config=/dev/null', str(js_file)], - capture_output=True, - text=True, - timeout=10 - ) - - if result.returncode != 0 and result.stdout.strip(): - errors = [] - for line in result.stdout.strip().split('\n'): - if line and not line.startswith('Lint'): - # Parse jshint Ausgabe - errors.append(line) - - if errors: - self.errors.append(f"JavaScript-Fehler in {js_file.relative_to(self.base_path)}") - invalid_files.append((str(js_file.relative_to(self.base_path)), errors)) - - except subprocess.TimeoutExpired: - self.warnings.append(f"JavaScript-Validierung timeout für {js_file.relative_to(self.base_path)}") - except Exception as e: - self.warnings.append(f"Konnte {js_file.relative_to(self.base_path)} nicht validieren: {e}") - else: - # Fallback: Basis-Validierung - try: - with open(js_file, 'r', encoding='utf-8') as f: - content = f.read() - - errors = [] - - # Geschlossene Klammern - open_paren = content.count('(') - close_paren = content.count(')') - if open_paren != close_paren: - errors.append(f"Ungleiche runde Klammern: {open_paren} ( vs {close_paren} )") - - open_braces = content.count('{') - close_braces = content.count('}') - if open_braces != close_braces: - errors.append(f"Ungleiche geschweifte Klammern: {open_braces} {{ vs {close_braces} }}") - - open_brackets = content.count('[') - close_brackets = content.count(']') - if open_brackets != close_brackets: - errors.append(f"Ungleiche eckige Klammern: {open_brackets} [ vs {close_brackets} ]") - - if errors: - self.errors.append(f"JavaScript-Fehler in {js_file.relative_to(self.base_path)}") - invalid_files.append((str(js_file.relative_to(self.base_path)), errors)) - - except Exception as e: - self.errors.append(f"Konnte {js_file.relative_to(self.base_path)} nicht lesen: {e}") - invalid_files.append((str(js_file.relative_to(self.base_path)), [str(e)])) - - if invalid_files: - print_error(f"{len(invalid_files)} JavaScript-Datei(en) mit Fehlern:") - for filepath, errors in invalid_files: - print(f" {Colors.RED}•{Colors.END} {filepath}") - for err in errors[:5]: # Maximal 5 Fehler pro Datei - print(f" {Colors.RED}→{Colors.END} {err}") - if len(errors) > 5: - print(f" {Colors.RED}...{Colors.END} und {len(errors) - 5} weitere Fehler") - return False - else: - print_success(f"Alle {len(js_files)} JavaScript-Dateien sind syntaktisch korrekt") + self.output.warning(f"Konnte Dateirechte nicht prüfen: {e}") return True def validate_php_files(self) -> bool: - """Validiere PHP-Dateien mit php -l (Lint).""" - print_header("9. PHP-VALIDIERUNG") + """Validiere PHP-Dateien mit php -l.""" + self.output.header("5. PHP-SYNTAX VALIDIERUNG") php_files = [] custom_espo_path = self.base_path / "custom" / "Espo" @@ -682,342 +618,211 @@ class EntityValidator: php_files = list(custom_espo_path.rglob("*.php")) if not php_files: - print_info("Keine PHP-Dateien gefunden") + self.output.info("Keine PHP-Dateien gefunden") return True - # Prüfe ob php verfügbar ist + self.output.info(f"Prüfe {len(php_files)} PHP-Dateien...") + try: subprocess.run(['php', '--version'], capture_output=True, timeout=5) - except (FileNotFoundError, subprocess.TimeoutExpired): - print_warning("PHP-CLI nicht gefunden, überspringe PHP-Validierung") + except: + self.output.warning("PHP-CLI nicht verfügbar, überspringe PHP-Validierung") return True invalid_files = [] for php_file in php_files: try: - # Verwende php -l für Syntax-Check - result = subprocess.run( - ['php', '-l', str(php_file)], - capture_output=True, - text=True, - timeout=5 - ) + result = subprocess.run(['php', '-l', str(php_file)], + capture_output=True, text=True, timeout=5) if result.returncode != 0: - error_lines = [] - output = result.stderr.strip() or result.stdout.strip() - - for line in output.split('\n'): - # Filtere die relevanten Fehlerzeilen - if line and not line.startswith('No syntax errors'): - # Entferne Datei-Pfad aus Fehlermeldung für bessere Lesbarkeit - clean_line = re.sub(r'^.*?in\s+.*?on\s+', '', line) - if clean_line != line: # Wenn Ersetzung stattfand - error_lines.append(clean_line) - else: - error_lines.append(line) - - if error_lines: - self.errors.append(f"PHP-Syntax-Fehler in {php_file.relative_to(self.base_path)}") - invalid_files.append((str(php_file.relative_to(self.base_path)), error_lines)) - - except subprocess.TimeoutExpired: - self.warnings.append(f"PHP-Validierung timeout für {php_file.relative_to(self.base_path)}") - except Exception as e: - self.warnings.append(f"Konnte {php_file.relative_to(self.base_path)} nicht validieren: {e}") + error_msg = f"{php_file.relative_to(self.base_path)}" + invalid_files.append(error_msg) + self.errors.append(error_msg) + except: + pass if invalid_files: - print_error(f"{len(invalid_files)} PHP-Datei(en) mit Syntax-Fehlern:") - for filepath, errors in invalid_files: - print(f" {Colors.RED}•{Colors.END} {filepath}") - for err in errors[:3]: # Maximal 3 Fehler pro Datei - print(f" {Colors.RED}→{Colors.END} {err}") - if len(errors) > 3: - print(f" {Colors.RED}...{Colors.END} und {len(errors) - 3} weitere Fehler") + self.output.error(f"{len(invalid_files)} PHP-Dateien mit Syntax-Fehlern", + "\n".join(invalid_files[:5])) return False else: - print_success(f"Alle {len(php_files)} PHP-Dateien sind syntaktisch korrekt") + self.output.success(f"Alle {len(php_files)} PHP-Dateien valide") return True - def show_error_logs(self): - """Zeige die letzten Fehlerlog-Einträge aus data/logs/.""" - from datetime import datetime - - print_header("FEHLERLOG ANALYSE") - - logs_path = self.base_path / "data" / "logs" - if not logs_path.exists(): - print_warning("Logs-Verzeichnis nicht gefunden") - return - - # Finde das neueste Log-File - today = datetime.now().strftime("%Y-%m-%d") - log_file = logs_path / f"espo-{today}.log" - - if not log_file.exists(): - # Fallback: Finde das neueste Log-File - log_files = sorted(logs_path.glob("espo-*.log"), key=lambda f: f.stat().st_mtime, reverse=True) - if log_files: - log_file = log_files[0] - print_info(f"Kein Log für heute gefunden, verwende: {log_file.name}") - else: - print_warning("Keine Log-Dateien gefunden") - return - - print_info(f"Analysiere: {log_file.name}") - - try: - with open(log_file, 'r', encoding='utf-8') as f: - lines = f.readlines() - - if not lines: - print_info("Log-Datei ist leer") - return - - # Zeige die letzten 50 Zeilen - last_lines = lines[-50:] - - # Filter für Fehler und Warnungen - errors = [] - warnings = [] - - for line in last_lines: - line_upper = line.upper() - if 'ERROR' in line_upper or 'FATAL' in line_upper or 'EXCEPTION' in line_upper: - errors.append(line.strip()) - elif 'WARNING' in line_upper or 'WARN' in line_upper: - warnings.append(line.strip()) - - if errors: - print_error(f"\n{len(errors)} Fehler in den letzten 50 Log-Zeilen gefunden:\n") - for i, error in enumerate(errors[-10:], 1): # Zeige max. 10 Fehler - print(f"{Colors.RED}{i}.{Colors.END} {error}") - if len(errors) > 10: - print(f"\n{Colors.YELLOW}... und {len(errors) - 10} weitere Fehler{Colors.END}") - - if warnings: - print_warning(f"\n{len(warnings)} Warnungen in den letzten 50 Log-Zeilen gefunden:\n") - for i, warning in enumerate(warnings[-5:], 1): # Zeige max. 5 Warnungen - print(f"{Colors.YELLOW}{i}.{Colors.END} {warning}") - if len(warnings) > 5: - print(f"\n{Colors.YELLOW}... und {len(warnings) - 5} weitere Warnungen{Colors.END}") - - if not errors and not warnings: - print_info("Keine Fehler oder Warnungen in den letzten 50 Log-Zeilen gefunden") - print_info("\nLetzte 10 Log-Zeilen:") - for line in last_lines[-10:]: - print(f" {line.strip()}") - - print(f"\n{Colors.BLUE}ℹ{Colors.END} Vollständige Log-Datei: {log_file}") - print(f"{Colors.BLUE}ℹ{Colors.END} Zum Anzeigen: tail -50 {log_file}") - - except Exception as e: - print_error(f"Fehler beim Lesen der Log-Datei: {e}") - def run_rebuild(self) -> bool: """Führe den EspoCRM Rebuild aus.""" - print_header("10. ESPOCRM REBUILD") + self.output.header("6. ESPOCRM REBUILD") - # Prüfe ob wir in einem Docker-Volume sind is_docker_volume = '/docker/volumes/' in str(self.base_path) if is_docker_volume: - # Versuche Docker-Container zu finden try: - result = subprocess.run( - ['docker', 'ps', '--format', '{{.Names}}'], - capture_output=True, - text=True, - timeout=5 - ) + result = subprocess.run(['docker', 'ps', '--format', '{{.Names}}'], + capture_output=True, text=True, timeout=5) containers = result.stdout.strip().split('\n') espo_container = None - # Suche nach EspoCRM Container (meist "espocrm" ohne Suffix) for container in containers: - if container.lower() in ['espocrm', 'espocrm-app']: + if container.lower() == 'espocrm': espo_container = container break if not espo_container: - # Fallback: erster Container mit "espo" im Namen for container in containers: - if 'espo' in container.lower() and 'websocket' not in container.lower() and 'daemon' not in container.lower() and 'db' not in container.lower(): + if 'espo' in container.lower() and 'db' not in container.lower(): espo_container = container break if espo_container: - print_info(f"Docker-Container erkannt: {espo_container}") + self.output.info(f"Container: {espo_container}") - # Schritt 1: Cache löschen - print_info("Lösche Cache...") - cache_result = subprocess.run( - ['docker', 'exec', espo_container, 'php', 'command.php', 'clear-cache'], - capture_output=True, - text=True, - timeout=30 - ) + # Cache löschen + self.output.info("Lösche Cache...") + subprocess.run(['docker', 'exec', espo_container, 'php', + 'command.php', 'clear-cache'], + capture_output=True, timeout=30) - if cache_result.returncode == 0: - print_success("Cache erfolgreich gelöscht") - else: - print_warning("Cache-Löschung fehlgeschlagen, fahre trotzdem fort...") + # Rebuild + self.output.info("Starte Rebuild...") + rebuild_start = time.time() - # Schritt 2: Rebuild - print_info("Starte Rebuild (kann 10-30 Sekunden dauern)...") result = subprocess.run( ['docker', 'exec', espo_container, 'php', 'command.php', 'rebuild'], - capture_output=True, - text=True, - timeout=60 + capture_output=True, text=True, timeout=60 ) - if result.returncode == 0: - print_success("Rebuild erfolgreich abgeschlossen") - if result.stdout: - print(f" {result.stdout.strip()}") - - # E2E-Tests nach erfolgreichem Rebuild - self.run_e2e_tests() - + # Prüfe Logs nach Rebuild + has_errors, error_msgs = self.log_checker.check_for_errors(rebuild_start) + + if result.returncode == 0 and not has_errors: + self.output.success("Rebuild erfolgreich abgeschlossen") return True else: - print_error("Rebuild fehlgeschlagen:") - if result.stderr: - print(f"\n{result.stderr}") - - # Zeige automatisch die letzten Fehlerlog-Einträge an - self.show_error_logs() + if result.returncode != 0: + self.output.error("Rebuild fehlgeschlagen", result.stderr) + else: + self.output.error("Rebuild abgeschlossen, aber Fehler in Logs") + for err in error_msgs[:5]: + self.output.error("", err) return False else: - print_warning("Kein EspoCRM Docker-Container gefunden") - print_info("Versuche lokalen Rebuild...") + self.output.error("Kein EspoCRM Docker-Container gefunden") + return False except Exception as e: - print_warning(f"Docker-Erkennung fehlgeschlagen: {e}") - print_info("Versuche lokalen Rebuild...") + self.output.error(f"Docker-Rebuild fehlgeschlagen: {e}") + return False # Lokaler Rebuild (Fallback) - rebuild_script = self.base_path / "rebuild.php" - if not rebuild_script.exists(): - print_error(f"rebuild.php nicht gefunden in {self.base_path}") - return False + self.output.error("Lokaler Rebuild nicht implementiert, verwende Docker") + return False + + def run_entity_tests(self, base_url: str, username: str, password: str) -> bool: + """Führe umfassende Entity-Tests durch.""" + self.output.header("7. ENTITY CRUD-TESTS") - try: - # Schritt 1: Cache löschen - print_info("Lösche Cache...") - cache_result = subprocess.run( - ['php', 'command.php', 'clear-cache'], - cwd=str(self.base_path), - capture_output=True, - text=True, - timeout=30 - ) - - if cache_result.returncode == 0: - print_success("Cache erfolgreich gelöscht") - else: - print_warning("Cache-Löschung fehlgeschlagen, fahre trotzdem fort...") - - # Schritt 2: Rebuild - print_info("Starte lokalen Rebuild (kann 10-30 Sekunden dauern)...") - result = subprocess.run( - ['php', 'command.php', 'rebuild'], - cwd=str(self.base_path), - capture_output=True, - text=True, - timeout=60 - ) - - if result.returncode == 0: - print_success("Rebuild erfolgreich abgeschlossen") - - # E2E-Tests nach erfolgreichem Rebuild - self.run_e2e_tests() - - return True - else: - print_error("Rebuild fehlgeschlagen:") - if result.stderr: - print(f"\n{result.stderr}") - - # Zeige automatisch die letzten Fehlerlog-Einträge an - self.show_error_logs() - return False - except subprocess.TimeoutExpired: - print_error("Rebuild-Timeout (>60 Sekunden)") - return False - except Exception as e: - print_error(f"Rebuild-Fehler: {e}") - return False - def run_e2e_tests(self) -> bool: - """Führe End-to-End Tests nach erfolgreichem Rebuild aus.""" + # Finde alle Custom-Entities + custom_entities = sorted([name for name in self.entity_defs.keys() + if name.startswith('C') and + not name.endswith('CDokumente')]) # Überspringe Junction-Tables - # Überspringe wenn Flag gesetzt - if self.skip_e2e_tests: - print_info("\nE2E-Tests wurden übersprungen (--skip-e2e)") + if not custom_entities: + self.output.warning("Keine Custom-Entities zum Testen gefunden") return True - print_header("11. END-TO-END TESTS") + self.output.info(f"Teste {len(custom_entities)} Custom-Entities...") - # Prüfe ob E2E-Test Skript existiert - e2e_script = self.base_path / "custom" / "scripts" / "e2e_tests.py" - if not e2e_script.exists(): - print_warning("E2E-Test Skript nicht gefunden, überspringe Tests") - return True + tester = EntityTester(base_url, username, password, self.output, self.log_checker) - print_info("Starte automatisierte End-to-End Tests...") - print_info("Dies validiert CRUD-Operationen für Custom Entities\n") + all_success = True + success_count = 0 + fail_count = 0 - try: - result = subprocess.run( - ['python3', 'e2e_tests.py'], - cwd=str(e2e_script.parent), - capture_output=True, - text=True, - timeout=120 - ) - - # Ausgabe anzeigen - if result.stdout: - print(result.stdout) - - if result.returncode == 0: - print_success("E2E-Tests erfolgreich abgeschlossen") - return True + for entity_name in custom_entities: + if tester.test_entity(entity_name): + success_count += 1 else: - print_warning("E2E-Tests haben Fehler gemeldet") - if result.stderr: - print(f"\n{Colors.YELLOW}{result.stderr}{Colors.END}") - print_info("Dies ist keine kritische Fehler - der Rebuild war erfolgreich") - return True # Nicht als Fehler werten - - except subprocess.TimeoutExpired: - print_warning("E2E-Tests Timeout (>120 Sekunden)") - return True # Nicht als Fehler werten - except Exception as e: - print_warning(f"E2E-Tests konnten nicht ausgeführt werden: {e}") - return True # Nicht als Fehler werten + fail_count += 1 + all_success = False + + # Cleanup + tester.cleanup() + + # Zusammenfassung + if not self.output.verbose: + print(f"\n{Colors.CYAN}Ergebnis:{Colors.END} {success_count} erfolgreich, {fail_count} fehlgeschlagen") + + if all_success: + self.output.success(f"Alle {success_count} Entities getestet") + else: + self.output.error(f"{fail_count} Entity-Tests fehlgeschlagen") + + return all_success + + def generate_ki_feedback(self) -> Dict: + """Generiere strukturiertes Feedback für KI.""" + feedback = { + "status": "success" if not self.errors else "failed", + "summary": { + "errors": len(self.errors), + "warnings": len(self.warnings), + "entities_checked": len(self.entity_defs), + }, + "errors": self.errors[:10], # Top 10 Fehler + "warnings": self.warnings[:10], # Top 10 Warnungen + "recommendations": [] + } + + # Generiere Empfehlungen basierend auf Fehlern + if any('Service' in e or 'Class' in e for e in self.errors): + feedback["recommendations"].append( + "Fehlende Service-Klassen - erstelle für jede Entity eine Service-Klasse" + ) + + if any('i18n' in w for w in self.warnings): + feedback["recommendations"].append( + "Unvollständige Übersetzungen - füge fehlende i18n-Dateien hinzu" + ) + + if any('relationship' in e.lower() for e in self.errors): + feedback["recommendations"].append( + "Relationship-Fehler - prüfe bidirektionale Link-Definitionen" + ) + + if any('InjectableFactory' in e for e in self.errors): + feedback["recommendations"].append( + "KRITISCH: Service-Klassen fehlen! Erstelle Services in custom/Espo/Custom/Services/" + ) + + return feedback + def print_summary(self): - """Drucke Zusammenfassung aller Ergebnisse.""" - print_header("ZUSAMMENFASSUNG") + """Drucke Zusammenfassung.""" + self.output.header("ZUSAMMENFASSUNG") if self.errors: print(f"\n{Colors.RED}{Colors.BOLD}FEHLER: {len(self.errors)}{Colors.END}") - for err in self.errors: + for err in self.errors[:10]: print(f" {Colors.RED}✗{Colors.END} {err}") + if len(self.errors) > 10: + print(f" {Colors.RED}...{Colors.END} und {len(self.errors) - 10} weitere") if self.warnings: print(f"\n{Colors.YELLOW}{Colors.BOLD}WARNUNGEN: {len(self.warnings)}{Colors.END}") - for warn in self.warnings[:10]: + for warn in self.warnings[:5]: print(f" {Colors.YELLOW}⚠{Colors.END} {warn}") - if len(self.warnings) > 10: - print(f" {Colors.YELLOW}...{Colors.END} und {len(self.warnings) - 10} weitere Warnungen") + if len(self.warnings) > 5: + print(f" {Colors.YELLOW}...{Colors.END} und {len(self.warnings) - 5} weitere") if not self.errors and not self.warnings: - print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ALLE PRÜFUNGEN BESTANDEN{Colors.END}") + print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ALLE PRÜFUNGEN BESTANDEN{Colors.END}\n") - print() + # KI-Feedback ausgeben + if self.errors or self.warnings: + feedback = self.generate_ki_feedback() + print(f"\n{Colors.CYAN}{Colors.BOLD}KI-FEEDBACK:{Colors.END}") + print(json.dumps(feedback, indent=2, ensure_ascii=False)) def validate_all(self) -> bool: """Führe alle Validierungen durch.""" @@ -1026,38 +831,23 @@ class EntityValidator: # 1. JSON-Syntax (kritisch) if not self.validate_json_syntax(): all_valid = False - print_error("\nAbbruch: JSON-Syntax-Fehler müssen behoben werden!\n") + self.output.error("Abbruch: JSON-Syntax-Fehler!") return False - # Lade entityDefs für weitere Checks + # Lade entityDefs self.load_entity_defs() # 2. Relationships (kritisch) if not self.validate_relationships(): all_valid = False - # 3. Formula-Platzierung (kritisch) - if not self.validate_formula_placement(): - all_valid = False + # 3. Erforderliche Dateien (Warnung) + self.validate_required_files() - # 4. i18n-Vollständigkeit (nur Warnung) - self.validate_i18n_completeness() - - # 5. Layout-Struktur (nur Warnung) - self.validate_layout_structure() - - # 6. Dateirechte (nicht kritisch für Rebuild) + # 4. Dateirechte (nicht kritisch) self.check_file_permissions() - # 7. CSS-Validierung (kritisch) - if not self.validate_css_files(): - all_valid = False - - # 8. JavaScript-Validierung (kritisch) - if not self.validate_js_files(): - all_valid = False - - # 9. PHP-Validierung (kritisch) + # 5. PHP-Syntax (kritisch) if not self.validate_php_files(): all_valid = False @@ -1067,76 +857,66 @@ def main(): import argparse parser = argparse.ArgumentParser( - description='EspoCRM Custom Entity Validator & Rebuild Tool' - ) - parser.add_argument( - '--dry-run', - action='store_true', - help='Nur Validierungen durchführen, kein Rebuild' - ) - parser.add_argument( - '--no-rebuild', - action='store_true', - help='Synonym für --dry-run' - ) - parser.add_argument( - '--skip-e2e', - action='store_true', - help='Überspringe E2E-Tests nach Rebuild' + description='EspoCRM Custom Entity Validator & Rebuild Tool - Erweiterte Version' ) + parser.add_argument('-v', '--verbose', action='store_true', + help='Ausführliche Ausgabe') + parser.add_argument('--dry-run', action='store_true', + help='Nur Validierungen, kein Rebuild') + parser.add_argument('--skip-tests', action='store_true', + help='Überspringe Entity-Tests') + parser.add_argument('--base-url', default='http://localhost', + help='EspoCRM Base-URL (default: http://localhost)') + parser.add_argument('--username', default='admin', + help='API-Username (default: admin)') + parser.add_argument('--password', default='admin', + help='API-Password (default: admin)') args = parser.parse_args() - dry_run = args.dry_run or args.no_rebuild - skip_e2e = args.skip_e2e - - # Finde EspoCRM Root-Verzeichnis + # Finde EspoCRM Root script_dir = Path(__file__).parent.parent.parent if not (script_dir / "rebuild.php").exists(): - print_error("Fehler: Nicht im EspoCRM-Root-Verzeichnis!") - print_info(f"Aktueller Pfad: {script_dir}") + print(f"{Colors.RED}✗{Colors.END} Fehler: Nicht im EspoCRM-Root-Verzeichnis!") sys.exit(1) - print(f"{Colors.BOLD}EspoCRM Custom Entity Validator & Rebuild Tool{Colors.END}") - print(f"Arbeitsverzeichnis: {script_dir}") - if dry_run: - print(f"{Colors.YELLOW}Modus: DRY-RUN (kein Rebuild){Colors.END}") - if skip_e2e: - print(f"{Colors.YELLOW}E2E-Tests werden übersprungen{Colors.END}") + print(f"{Colors.BOLD}EspoCRM Validator & Rebuild Tool v2.0{Colors.END}") + if args.verbose: + print(f"{Colors.DIM}Verbose-Modus aktiviert{Colors.END}") print() - validator = EntityValidator(str(script_dir)) - validator.skip_e2e_tests = skip_e2e + validator = EntityValidator(str(script_dir), verbose=args.verbose) - # Validierungen durchführen + # Validierungen all_valid = validator.validate_all() - # Zusammenfassung drucken - validator.print_summary() - - # Entscheidung über Rebuild if not all_valid: - print_error("REBUILD ABGEBROCHEN: Kritische Fehler müssen behoben werden!") + validator.print_summary() + print(f"\n{Colors.RED}{Colors.BOLD}✗ REBUILD ABGEBROCHEN{Colors.END}\n") sys.exit(1) - if dry_run: - print_info("Dry-Run Modus: Rebuild übersprungen") + if args.dry_run: + validator.print_summary() print(f"\n{Colors.GREEN}{Colors.BOLD}✓ VALIDIERUNGEN ABGESCHLOSSEN{Colors.END}\n") sys.exit(0) - if validator.warnings: - print_warning( - f"Es gibt {len(validator.warnings)} Warnungen, aber keine kritischen Fehler." - ) - print_info("Rebuild wird trotzdem durchgeführt...\n") - - # Rebuild ausführen - if validator.run_rebuild(): - print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ERFOLGREICH ABGESCHLOSSEN{Colors.END}\n") - sys.exit(0) - else: + # Rebuild + if not validator.run_rebuild(): + validator.print_summary() print(f"\n{Colors.RED}{Colors.BOLD}✗ REBUILD FEHLGESCHLAGEN{Colors.END}\n") sys.exit(1) + + # Entity-Tests + if not args.skip_tests: + time.sleep(2) # Kurze Pause nach Rebuild + if not validator.run_entity_tests(args.base_url, args.username, args.password): + validator.print_summary() + print(f"\n{Colors.YELLOW}{Colors.BOLD}⚠ TESTS FEHLGESCHLAGEN{Colors.END}\n") + sys.exit(1) + + validator.print_summary() + print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ERFOLGREICH ABGESCHLOSSEN{Colors.END}\n") + sys.exit(0) if __name__ == "__main__": main() diff --git a/custom/scripts/validate_and_rebuild.py.backup b/custom/scripts/validate_and_rebuild.py.backup new file mode 100644 index 00000000..6698677e --- /dev/null +++ b/custom/scripts/validate_and_rebuild.py.backup @@ -0,0 +1,1142 @@ +#!/usr/bin/env python3 +""" +EspoCRM Custom Entity Validator & Rebuild Tool +Führt umfassende Validierungen durch bevor der Rebuild ausgeführt wird. +""" + +import json +import os +import sys +import subprocess +import re +from pathlib import Path +from typing import Dict, List, Tuple, Set +from collections import defaultdict + +# ANSI Color Codes +class Colors: + GREEN = '\033[92m' + YELLOW = '\033[93m' + RED = '\033[91m' + BLUE = '\033[94m' + BOLD = '\033[1m' + END = '\033[0m' + +def print_header(text: str): + print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*70}{Colors.END}") + print(f"{Colors.BOLD}{Colors.BLUE}{text.center(70)}{Colors.END}") + print(f"{Colors.BOLD}{Colors.BLUE}{'='*70}{Colors.END}\n") + +def print_success(text: str): + print(f"{Colors.GREEN}✓{Colors.END} {text}") + +def print_warning(text: str): + print(f"{Colors.YELLOW}⚠{Colors.END} {text}") + +def print_error(text: str): + print(f"{Colors.RED}✗{Colors.END} {text}") + +def print_info(text: str): + print(f"{Colors.BLUE}ℹ{Colors.END} {text}") + +class EntityValidator: + def __init__(self, base_path: str): + self.base_path = Path(base_path) + self.custom_path = self.base_path / "custom" / "Espo" / "Custom" / "Resources" + self.metadata_path = self.custom_path / "metadata" + self.i18n_path = self.custom_path / "i18n" + self.client_custom_path = self.base_path / "client" / "custom" + self.errors = [] + self.warnings = [] + self.entity_defs = {} + self.relationships = defaultdict(list) + self.skip_e2e_tests = False + + def validate_json_syntax(self) -> bool: + """Validiere JSON-Syntax aller Dateien im custom-Verzeichnis.""" + print_header("1. JSON-SYNTAX VALIDIERUNG") + + json_files = list(self.custom_path.rglob("*.json")) + if not json_files: + print_warning("Keine JSON-Dateien gefunden") + return True + + invalid_files = [] + for json_file in json_files: + try: + with open(json_file, 'r', encoding='utf-8') as f: + json.load(f) + except json.JSONDecodeError as e: + self.errors.append(f"JSON-Fehler in {json_file.relative_to(self.base_path)}: {e}") + invalid_files.append(str(json_file.relative_to(self.base_path))) + + if invalid_files: + print_error(f"{len(invalid_files)} Datei(en) mit JSON-Fehlern gefunden:") + for f in invalid_files: + print(f" {Colors.RED}•{Colors.END} {f}") + return False + else: + print_success(f"Alle {len(json_files)} JSON-Dateien sind syntaktisch korrekt") + return True + + def load_entity_defs(self): + """Lade alle entityDefs für weitere Analysen.""" + entity_defs_path = self.metadata_path / "entityDefs" + if not entity_defs_path.exists(): + return + + for json_file in entity_defs_path.glob("*.json"): + entity_name = json_file.stem + try: + with open(json_file, 'r', encoding='utf-8') as f: + self.entity_defs[entity_name] = json.load(f) + except Exception as e: + # Fehler wird bereits in JSON-Validierung gemeldet + pass + + def validate_relationships(self) -> bool: + """Validiere Relationship-Definitionen zwischen Entities.""" + print_header("2. RELATIONSHIP-KONSISTENZ") + + if not self.entity_defs: + print_warning("Keine entityDefs geladen") + return True + + relationship_errors = [] + checked_pairs = set() + + # Links die nicht geprüft werden (Standard-EspoCRM parent-Relationships) + skip_foreign_links = {'parent', 'parents'} + + for entity_name, entity_def in self.entity_defs.items(): + links = entity_def.get('links', {}) + + for link_name, link_def in links.items(): + link_type = link_def.get('type') + target_entity = link_def.get('entity') + foreign = link_def.get('foreign') + relation_name = link_def.get('relationName') + + # Überspringe parent-Links (Standard Activities-Relationship) + if foreign in skip_foreign_links: + continue + + # Nur hasMany und hasOne prüfen (nicht belongsTo, da das die Gegenseite ist) + if link_type in ['hasMany', 'hasOne'] and target_entity and foreign: + pair_key = tuple(sorted([f"{entity_name}.{link_name}", f"{target_entity}.{foreign}"])) + if pair_key in checked_pairs: + continue + checked_pairs.add(pair_key) + + # Prüfe ob Ziel-Entity existiert + if target_entity not in self.entity_defs: + relationship_errors.append( + f"{entity_name}.{link_name}: Ziel-Entity '{target_entity}' existiert nicht" + ) + continue + + target_links = self.entity_defs[target_entity].get('links', {}) + + # Prüfe ob foreign Link existiert + if foreign not in target_links: + relationship_errors.append( + f"{entity_name}.{link_name} → {target_entity}: " + f"Foreign link '{foreign}' fehlt in {target_entity}" + ) + continue + + foreign_def = target_links[foreign] + foreign_foreign = foreign_def.get('foreign') + foreign_relation_name = foreign_def.get('relationName') + + # Prüfe ob foreign.foreign zurück zeigt + if foreign_foreign != link_name: + relationship_errors.append( + f"{entity_name}.{link_name} ↔ {target_entity}.{foreign}: " + f"Foreign zeigt auf '{foreign_foreign}' statt auf '{link_name}'" + ) + + # Prüfe ob relationName übereinstimmt (falls beide definiert) + if relation_name and foreign_relation_name and relation_name != foreign_relation_name: + relationship_errors.append( + f"{entity_name}.{link_name} ↔ {target_entity}.{foreign}: " + f"relationName unterschiedlich ('{relation_name}' vs '{foreign_relation_name}')" + ) + + if relationship_errors: + print_error(f"{len(relationship_errors)} Relationship-Fehler gefunden:") + for err in relationship_errors: + print(f" {Colors.RED}•{Colors.END} {err}") + self.errors.extend(relationship_errors) + return False + else: + print_success(f"{len(checked_pairs)} Relationships geprüft - alle konsistent") + return True + + def validate_formula_placement(self) -> bool: + """Prüfe ob Formula-Scripts korrekt in /formula/ statt /entityDefs/ platziert sind.""" + print_header("3. FORMULA-SCRIPT PLATZIERUNG") + + misplaced_formulas = [] + + # Prüfe entityDefs auf formula-Definitionen (sollte nicht da sein) + for entity_name, entity_def in self.entity_defs.items(): + if 'formula' in entity_def: + misplaced_formulas.append( + f"entityDefs/{entity_name}.json enthält 'formula' - " + f"sollte in formula/{entity_name}.json sein" + ) + + # Prüfe ob formula-Dateien existieren und valide sind + formula_path = self.metadata_path / "formula" + formula_count = 0 + if formula_path.exists(): + for formula_file in formula_path.glob("*.json"): + formula_count += 1 + try: + with open(formula_file, 'r', encoding='utf-8') as f: + formula_def = json.load(f) + # Prüfe auf leere oder null Scripts + for key, value in formula_def.items(): + if value == "" or value is None: + self.warnings.append( + f"formula/{formula_file.name}: '{key}' ist leer oder null" + ) + except Exception: + pass # JSON-Fehler bereits gemeldet + + if misplaced_formulas: + print_error(f"{len(misplaced_formulas)} Formula-Platzierungsfehler:") + for err in misplaced_formulas: + print(f" {Colors.RED}•{Colors.END} {err}") + self.errors.extend(misplaced_formulas) + return False + else: + print_success(f"{formula_count} Formula-Definitionen korrekt platziert") + return True + + def validate_i18n_completeness(self) -> bool: + """Prüfe i18n-Definitionen auf Vollständigkeit.""" + print_header("4. i18n-VOLLSTÄNDIGKEIT") + + if not self.entity_defs: + print_warning("Keine entityDefs zum Prüfen") + return True + + missing_i18n = [] + incomplete_i18n = [] + + languages = ['de_DE', 'en_US'] + custom_entities = [name for name in self.entity_defs.keys() + if name.startswith('C') or name.startswith('CVmh')] + + for entity_name in custom_entities: + entity_def = self.entity_defs[entity_name] + links = entity_def.get('links', {}) + + # Finde alle hasMany/hasOne Links die übersetzt werden sollten + custom_links = [] + for link_name, link_def in links.items(): + link_type = link_def.get('type') + if link_type in ['hasMany', 'hasOne']: + # Überspringe System-Links + if link_name not in ['createdBy', 'modifiedBy', 'assignedUser', 'teams']: + custom_links.append(link_name) + + if not custom_links: + continue + + for lang in languages: + i18n_file = self.i18n_path / lang / f"{entity_name}.json" + + if not i18n_file.exists(): + missing_i18n.append(f"{entity_name}: {lang} fehlt komplett") + continue + + try: + with open(i18n_file, 'r', encoding='utf-8') as f: + i18n_def = json.load(f) + links_i18n = i18n_def.get('links', {}) + + # Prüfe ob alle custom Links übersetzt sind + for link_name in custom_links: + if link_name not in links_i18n: + incomplete_i18n.append( + f"{entity_name} ({lang}): Link '{link_name}' fehlt in i18n" + ) + except Exception: + pass # JSON-Fehler bereits gemeldet + + total_issues = len(missing_i18n) + len(incomplete_i18n) + + if missing_i18n: + print_error(f"{len(missing_i18n)} komplett fehlende i18n-Dateien:") + for err in missing_i18n[:10]: # Max 10 anzeigen + print(f" {Colors.RED}•{Colors.END} {err}") + if len(missing_i18n) > 10: + print(f" {Colors.RED}...{Colors.END} und {len(missing_i18n) - 10} weitere") + + if incomplete_i18n: + print_warning(f"{len(incomplete_i18n)} unvollständige i18n-Definitionen:") + for err in incomplete_i18n[:10]: # Max 10 anzeigen + print(f" {Colors.YELLOW}•{Colors.END} {err}") + if len(incomplete_i18n) > 10: + print(f" {Colors.YELLOW}...{Colors.END} und {len(incomplete_i18n) - 10} weitere") + + if not missing_i18n and not incomplete_i18n: + print_success(f"i18n für {len(custom_entities)} Custom-Entities vollständig") + + # i18n-Fehler sind nur Warnungen, kein Abbruch + self.warnings.extend(missing_i18n + incomplete_i18n) + return True + + def validate_layout_structure(self) -> bool: + """Prüfe Layout-Dateien auf häufige Fehler.""" + print_header("5. LAYOUT-STRUKTUR VALIDIERUNG") + + layouts_base = self.custom_path / "layouts" + layout_errors = [] + layout_warnings = [] + checked_layouts = 0 + + # 1. Prüfe bottomPanelsDetail.json Dateien (KRITISCH: Muss Objekt sein, kein Array!) + if layouts_base.exists(): + for bottom_panel_file in layouts_base.rglob("bottomPanelsDetail.json"): + try: + with open(bottom_panel_file, 'r', encoding='utf-8') as f: + content = json.load(f) + + checked_layouts += 1 + entity_name = bottom_panel_file.parent.name + + # KRITISCHER CHECK: bottomPanelsDetail.json MUSS Objekt sein (nicht Array)! + if isinstance(content, list): + layout_errors.append( + f"{entity_name}/bottomPanelsDetail.json: FEHLER - Ist Array statt Objekt! " + f"EspoCRM 7.x erfordert Objekt-Format mit Keys wie 'contacts', '_tabBreak_0', etc." + ) + elif isinstance(content, dict): + # Prüfe auf false-Werte in falschen Kontexten + for key, value in content.items(): + if isinstance(value, dict): + for subkey, subvalue in value.items(): + if subvalue is False and subkey not in ['disabled', 'sticked']: + layout_warnings.append( + f"{entity_name}/bottomPanelsDetail.json: {key}.{subkey} " + f"sollte nicht 'false' sein" + ) + except Exception: + pass # JSON-Fehler bereits in validate_json_syntax gemeldet + + # 2. Prüfe detail.json Dateien auf deprecated false-Platzhalter + if layouts_base.exists(): + for detail_file in layouts_base.rglob("detail.json"): + try: + with open(detail_file, 'r', encoding='utf-8') as f: + content = json.load(f) + + entity_name = detail_file.parent.name + + # Prüfe ob content ein Array von Panels ist + if isinstance(content, list): + for panel_idx, panel in enumerate(content): + if not isinstance(panel, dict): + continue + + rows = panel.get('rows', []) + for row_idx, row in enumerate(rows): + if not isinstance(row, list): + continue + + for cell_idx, cell in enumerate(row): + # KRITISCH: false als Platzhalter ist deprecated in EspoCRM 7.x! + if cell is False: + layout_errors.append( + f"{entity_name}/detail.json: Panel {panel_idx}, Row {row_idx}, " + f"Cell {cell_idx} verwendet 'false' als Platzhalter. " + f"In EspoCRM 7.x muss '{{}}' (leeres Objekt) verwendet werden!" + ) + except Exception: + pass # JSON-Fehler bereits gemeldet + + # Ergebnisse ausgeben + if layout_errors: + print_error(f"{len(layout_errors)} KRITISCHE Layout-Fehler gefunden:") + for err in layout_errors: + print(f" {Colors.RED}✗{Colors.END} {err}") + self.errors.extend(layout_errors) + return False + + if layout_warnings: + print_warning(f"{len(layout_warnings)} Layout-Warnungen:") + for warn in layout_warnings[:5]: + print(f" {Colors.YELLOW}⚠{Colors.END} {warn}") + if len(layout_warnings) > 5: + print(f" {Colors.YELLOW}...{Colors.END} und {len(layout_warnings) - 5} weitere") + self.warnings.extend(layout_warnings) + + if not layout_errors and not layout_warnings: + print_success(f"{checked_layouts} Layout-Dateien geprüft, keine Fehler") + elif not layout_errors: + print_success(f"{checked_layouts} Layout-Dateien geprüft, keine kritischen Fehler") + + return True + + def check_file_permissions(self) -> bool: + """Prüfe Dateirechte im custom-Verzeichnis und kritischen System-Dateien.""" + print_header("6. DATEIRECHTE-PRÜFUNG") + + # Verzeichnisse/Dateien die geprüft werden sollen + paths_to_check = [ + self.custom_path, # custom/Espo/Custom/Resources + self.client_custom_path, # client/custom + self.base_path / "data", # Gesamtes data/ Verzeichnis + ] + + # Kritische einzelne Dateien die UNBEDINGT www-data gehören müssen + critical_files = [ + self.base_path / "data" / "config.php", + self.base_path / "data" / "config-internal.php", + ] + + all_wrong_files = [] + critical_wrong_files = [] + + try: + # Prüfe jedes Verzeichnis + for path in paths_to_check: + if not path.exists(): + continue + + result = subprocess.run( + ['find', str(path), '!', '-user', 'www-data', '-o', '!', '-group', 'www-data'], + capture_output=True, + text=True + ) + + wrong_files = [line for line in result.stdout.strip().split('\n') if line] + all_wrong_files.extend(wrong_files) + + # Prüfe kritische Dateien einzeln + for critical_file in critical_files: + if not critical_file.exists(): + continue + + stat_result = critical_file.stat() + import pwd + import grp + + try: + owner = pwd.getpwuid(stat_result.st_uid).pw_name + group = grp.getgrgid(stat_result.st_gid).gr_name + + if owner != 'www-data' or group != 'www-data': + critical_wrong_files.append(str(critical_file)) + all_wrong_files.append(str(critical_file)) + print_error(f"KRITISCH: {critical_file.relative_to(self.base_path)} gehört {owner}:{group} statt www-data:www-data") + except (KeyError, OSError): + pass + + if all_wrong_files: + print_warning(f"{len(all_wrong_files)} Dateien/Verzeichnisse mit falschen Rechten gefunden") + + if critical_wrong_files: + print_error(f"{len(critical_wrong_files)} davon sind KRITISCHE System-Dateien!") + + print_info("Versuche automatische Korrektur aller Verzeichnisse...") + + # Korrigiere alle Pfade + success_count = 0 + for path in paths_to_check: + if not path.exists(): + continue + + try: + # Setze Owner + subprocess.run( + ['sudo', 'chown', '-R', 'www-data:www-data', str(path)], + check=True, + capture_output=True + ) + # Setze Permissions für Dateien + subprocess.run( + ['sudo', 'find', str(path), '-type', 'f', '-exec', 'chmod', '664', '{}', ';'], + check=True, + capture_output=True + ) + # Setze Permissions für Verzeichnisse + subprocess.run( + ['sudo', 'find', str(path), '-type', 'd', '-exec', 'chmod', '775', '{}', ';'], + check=True, + capture_output=True + ) + success_count += 1 + except subprocess.CalledProcessError as e: + print_warning(f"Konnte {path.relative_to(self.base_path)} nicht korrigieren: {e}") + + if success_count > 0: + print_success(f"Dateirechte für {success_count} Verzeichnis(se) korrigiert") + else: + print_warning("Konnte Dateirechte nicht automatisch korrigieren (sudo erforderlich)") + else: + print_success("Alle Dateirechte korrekt (www-data:www-data)") + + return True + except Exception as e: + print_warning(f"Konnte Dateirechte nicht prüfen: {e}") + return True + + def validate_css_files(self) -> bool: + """Validiere CSS-Dateien mit csslint.""" + print_header("7. CSS-VALIDIERUNG") + + css_files = [] + if self.client_custom_path.exists(): + css_files = list((self.client_custom_path / "css").rglob("*.css")) if (self.client_custom_path / "css").exists() else [] + + if not css_files: + print_info("Keine CSS-Dateien gefunden") + return True + + # Prüfe ob csslint verfügbar ist + try: + subprocess.run(['csslint', '--version'], capture_output=True, timeout=5) + use_csslint = True + except (FileNotFoundError, subprocess.TimeoutExpired): + use_csslint = False + print_warning("csslint nicht gefunden, verwende Basis-Validierung") + + invalid_files = [] + for css_file in css_files: + if use_csslint: + # Verwende csslint für professionelle Validierung + try: + result = subprocess.run( + ['csslint', '--format=compact', '--quiet', str(css_file)], + capture_output=True, + text=True, + timeout=10 + ) + + if result.returncode != 0 and result.stdout.strip(): + # Parse csslint Ausgabe + errors = [] + for line in result.stdout.strip().split('\n'): + if 'Error' in line or 'Warning' in line: + # Extrahiere nur die Fehlermeldung ohne Pfad + parts = line.split(': ', 2) + if len(parts) >= 3: + errors.append(parts[2]) + else: + errors.append(line) + + if errors: + # Nur echte Fehler, keine Warnungen als kritisch behandeln + if any('Error' in err for err in errors): + self.errors.append(f"CSS-Fehler in {css_file.relative_to(self.base_path)}") + invalid_files.append((str(css_file.relative_to(self.base_path)), errors)) + else: + # Nur Warnungen + for err in errors: + self.warnings.append(f"{css_file.relative_to(self.base_path)}: {err}") + + except subprocess.TimeoutExpired: + self.warnings.append(f"CSS-Validierung timeout für {css_file.relative_to(self.base_path)}") + except Exception as e: + self.warnings.append(f"Konnte {css_file.relative_to(self.base_path)} nicht validieren: {e}") + else: + # Fallback: Basis-Validierung + try: + with open(css_file, 'r', encoding='utf-8') as f: + content = f.read() + + errors = [] + + # Geschlossene Klammern + open_braces = content.count('{') + close_braces = content.count('}') + if open_braces != close_braces: + errors.append(f"Ungleiche Klammern: {open_braces} {{ vs {close_braces} }}") + + if errors: + self.errors.append(f"CSS-Fehler in {css_file.relative_to(self.base_path)}") + invalid_files.append((str(css_file.relative_to(self.base_path)), errors)) + + except Exception as e: + self.errors.append(f"Konnte {css_file.relative_to(self.base_path)} nicht lesen: {e}") + invalid_files.append((str(css_file.relative_to(self.base_path)), [str(e)])) + + if invalid_files: + print_error(f"{len(invalid_files)} CSS-Datei(en) mit Fehlern:") + for filepath, errors in invalid_files: + print(f" {Colors.RED}•{Colors.END} {filepath}") + for err in errors[:5]: # Maximal 5 Fehler pro Datei anzeigen + print(f" {Colors.RED}→{Colors.END} {err}") + if len(errors) > 5: + print(f" {Colors.RED}...{Colors.END} und {len(errors) - 5} weitere Fehler") + return False + else: + print_success(f"Alle {len(css_files)} CSS-Dateien sind syntaktisch korrekt") + return True + + def validate_js_files(self) -> bool: + """Validiere JavaScript-Dateien mit jshint.""" + print_header("8. JAVASCRIPT-VALIDIERUNG") + + js_files = [] + if self.client_custom_path.exists(): + src_path = self.client_custom_path / "src" + js_files = list(src_path.rglob("*.js")) if src_path.exists() else [] + + if not js_files: + print_info("Keine JavaScript-Dateien gefunden") + return True + + # Prüfe ob jshint verfügbar ist + try: + subprocess.run(['jshint', '--version'], capture_output=True, timeout=5) + use_jshint = True + except (FileNotFoundError, subprocess.TimeoutExpired): + use_jshint = False + print_warning("jshint nicht gefunden, verwende Basis-Validierung") + + invalid_files = [] + for js_file in js_files: + if use_jshint: + # Verwende jshint für professionelle Validierung + try: + result = subprocess.run( + ['jshint', '--config=/dev/null', str(js_file)], + capture_output=True, + text=True, + timeout=10 + ) + + if result.returncode != 0 and result.stdout.strip(): + errors = [] + for line in result.stdout.strip().split('\n'): + if line and not line.startswith('Lint'): + # Parse jshint Ausgabe + errors.append(line) + + if errors: + self.errors.append(f"JavaScript-Fehler in {js_file.relative_to(self.base_path)}") + invalid_files.append((str(js_file.relative_to(self.base_path)), errors)) + + except subprocess.TimeoutExpired: + self.warnings.append(f"JavaScript-Validierung timeout für {js_file.relative_to(self.base_path)}") + except Exception as e: + self.warnings.append(f"Konnte {js_file.relative_to(self.base_path)} nicht validieren: {e}") + else: + # Fallback: Basis-Validierung + try: + with open(js_file, 'r', encoding='utf-8') as f: + content = f.read() + + errors = [] + + # Geschlossene Klammern + open_paren = content.count('(') + close_paren = content.count(')') + if open_paren != close_paren: + errors.append(f"Ungleiche runde Klammern: {open_paren} ( vs {close_paren} )") + + open_braces = content.count('{') + close_braces = content.count('}') + if open_braces != close_braces: + errors.append(f"Ungleiche geschweifte Klammern: {open_braces} {{ vs {close_braces} }}") + + open_brackets = content.count('[') + close_brackets = content.count(']') + if open_brackets != close_brackets: + errors.append(f"Ungleiche eckige Klammern: {open_brackets} [ vs {close_brackets} ]") + + if errors: + self.errors.append(f"JavaScript-Fehler in {js_file.relative_to(self.base_path)}") + invalid_files.append((str(js_file.relative_to(self.base_path)), errors)) + + except Exception as e: + self.errors.append(f"Konnte {js_file.relative_to(self.base_path)} nicht lesen: {e}") + invalid_files.append((str(js_file.relative_to(self.base_path)), [str(e)])) + + if invalid_files: + print_error(f"{len(invalid_files)} JavaScript-Datei(en) mit Fehlern:") + for filepath, errors in invalid_files: + print(f" {Colors.RED}•{Colors.END} {filepath}") + for err in errors[:5]: # Maximal 5 Fehler pro Datei + print(f" {Colors.RED}→{Colors.END} {err}") + if len(errors) > 5: + print(f" {Colors.RED}...{Colors.END} und {len(errors) - 5} weitere Fehler") + return False + else: + print_success(f"Alle {len(js_files)} JavaScript-Dateien sind syntaktisch korrekt") + return True + + def validate_php_files(self) -> bool: + """Validiere PHP-Dateien mit php -l (Lint).""" + print_header("9. PHP-VALIDIERUNG") + + php_files = [] + custom_espo_path = self.base_path / "custom" / "Espo" + if custom_espo_path.exists(): + php_files = list(custom_espo_path.rglob("*.php")) + + if not php_files: + print_info("Keine PHP-Dateien gefunden") + return True + + # Prüfe ob php verfügbar ist + try: + subprocess.run(['php', '--version'], capture_output=True, timeout=5) + except (FileNotFoundError, subprocess.TimeoutExpired): + print_warning("PHP-CLI nicht gefunden, überspringe PHP-Validierung") + return True + + invalid_files = [] + for php_file in php_files: + try: + # Verwende php -l für Syntax-Check + result = subprocess.run( + ['php', '-l', str(php_file)], + capture_output=True, + text=True, + timeout=5 + ) + + if result.returncode != 0: + error_lines = [] + output = result.stderr.strip() or result.stdout.strip() + + for line in output.split('\n'): + # Filtere die relevanten Fehlerzeilen + if line and not line.startswith('No syntax errors'): + # Entferne Datei-Pfad aus Fehlermeldung für bessere Lesbarkeit + clean_line = re.sub(r'^.*?in\s+.*?on\s+', '', line) + if clean_line != line: # Wenn Ersetzung stattfand + error_lines.append(clean_line) + else: + error_lines.append(line) + + if error_lines: + self.errors.append(f"PHP-Syntax-Fehler in {php_file.relative_to(self.base_path)}") + invalid_files.append((str(php_file.relative_to(self.base_path)), error_lines)) + + except subprocess.TimeoutExpired: + self.warnings.append(f"PHP-Validierung timeout für {php_file.relative_to(self.base_path)}") + except Exception as e: + self.warnings.append(f"Konnte {php_file.relative_to(self.base_path)} nicht validieren: {e}") + + if invalid_files: + print_error(f"{len(invalid_files)} PHP-Datei(en) mit Syntax-Fehlern:") + for filepath, errors in invalid_files: + print(f" {Colors.RED}•{Colors.END} {filepath}") + for err in errors[:3]: # Maximal 3 Fehler pro Datei + print(f" {Colors.RED}→{Colors.END} {err}") + if len(errors) > 3: + print(f" {Colors.RED}...{Colors.END} und {len(errors) - 3} weitere Fehler") + return False + else: + print_success(f"Alle {len(php_files)} PHP-Dateien sind syntaktisch korrekt") + return True + + def show_error_logs(self): + """Zeige die letzten Fehlerlog-Einträge aus data/logs/.""" + from datetime import datetime + + print_header("FEHLERLOG ANALYSE") + + logs_path = self.base_path / "data" / "logs" + if not logs_path.exists(): + print_warning("Logs-Verzeichnis nicht gefunden") + return + + # Finde das neueste Log-File + today = datetime.now().strftime("%Y-%m-%d") + log_file = logs_path / f"espo-{today}.log" + + if not log_file.exists(): + # Fallback: Finde das neueste Log-File + log_files = sorted(logs_path.glob("espo-*.log"), key=lambda f: f.stat().st_mtime, reverse=True) + if log_files: + log_file = log_files[0] + print_info(f"Kein Log für heute gefunden, verwende: {log_file.name}") + else: + print_warning("Keine Log-Dateien gefunden") + return + + print_info(f"Analysiere: {log_file.name}") + + try: + with open(log_file, 'r', encoding='utf-8') as f: + lines = f.readlines() + + if not lines: + print_info("Log-Datei ist leer") + return + + # Zeige die letzten 50 Zeilen + last_lines = lines[-50:] + + # Filter für Fehler und Warnungen + errors = [] + warnings = [] + + for line in last_lines: + line_upper = line.upper() + if 'ERROR' in line_upper or 'FATAL' in line_upper or 'EXCEPTION' in line_upper: + errors.append(line.strip()) + elif 'WARNING' in line_upper or 'WARN' in line_upper: + warnings.append(line.strip()) + + if errors: + print_error(f"\n{len(errors)} Fehler in den letzten 50 Log-Zeilen gefunden:\n") + for i, error in enumerate(errors[-10:], 1): # Zeige max. 10 Fehler + print(f"{Colors.RED}{i}.{Colors.END} {error}") + if len(errors) > 10: + print(f"\n{Colors.YELLOW}... und {len(errors) - 10} weitere Fehler{Colors.END}") + + if warnings: + print_warning(f"\n{len(warnings)} Warnungen in den letzten 50 Log-Zeilen gefunden:\n") + for i, warning in enumerate(warnings[-5:], 1): # Zeige max. 5 Warnungen + print(f"{Colors.YELLOW}{i}.{Colors.END} {warning}") + if len(warnings) > 5: + print(f"\n{Colors.YELLOW}... und {len(warnings) - 5} weitere Warnungen{Colors.END}") + + if not errors and not warnings: + print_info("Keine Fehler oder Warnungen in den letzten 50 Log-Zeilen gefunden") + print_info("\nLetzte 10 Log-Zeilen:") + for line in last_lines[-10:]: + print(f" {line.strip()}") + + print(f"\n{Colors.BLUE}ℹ{Colors.END} Vollständige Log-Datei: {log_file}") + print(f"{Colors.BLUE}ℹ{Colors.END} Zum Anzeigen: tail -50 {log_file}") + + except Exception as e: + print_error(f"Fehler beim Lesen der Log-Datei: {e}") + + def run_rebuild(self) -> bool: + """Führe den EspoCRM Rebuild aus.""" + print_header("10. ESPOCRM REBUILD") + + # Prüfe ob wir in einem Docker-Volume sind + is_docker_volume = '/docker/volumes/' in str(self.base_path) + + if is_docker_volume: + # Versuche Docker-Container zu finden + try: + result = subprocess.run( + ['docker', 'ps', '--format', '{{.Names}}'], + capture_output=True, + text=True, + timeout=5 + ) + + containers = result.stdout.strip().split('\n') + espo_container = None + + # Suche nach EspoCRM Container (meist "espocrm" ohne Suffix) + for container in containers: + if container.lower() in ['espocrm', 'espocrm-app']: + espo_container = container + break + + if not espo_container: + # Fallback: erster Container mit "espo" im Namen + for container in containers: + if 'espo' in container.lower() and 'websocket' not in container.lower() and 'daemon' not in container.lower() and 'db' not in container.lower(): + espo_container = container + break + + if espo_container: + print_info(f"Docker-Container erkannt: {espo_container}") + + # Schritt 1: Cache löschen + print_info("Lösche Cache...") + cache_result = subprocess.run( + ['docker', 'exec', espo_container, 'php', 'command.php', 'clear-cache'], + capture_output=True, + text=True, + timeout=30 + ) + + if cache_result.returncode == 0: + print_success("Cache erfolgreich gelöscht") + else: + print_warning("Cache-Löschung fehlgeschlagen, fahre trotzdem fort...") + + # Schritt 2: Rebuild + print_info("Starte Rebuild (kann 10-30 Sekunden dauern)...") + result = subprocess.run( + ['docker', 'exec', espo_container, 'php', 'command.php', 'rebuild'], + capture_output=True, + text=True, + timeout=60 + ) + + if result.returncode == 0: + print_success("Rebuild erfolgreich abgeschlossen") + if result.stdout: + print(f" {result.stdout.strip()}") + + # E2E-Tests nach erfolgreichem Rebuild + self.run_e2e_tests() + + return True + else: + print_error("Rebuild fehlgeschlagen:") + if result.stderr: + print(f"\n{result.stderr}") + + # Zeige automatisch die letzten Fehlerlog-Einträge an + self.show_error_logs() + return False + else: + print_warning("Kein EspoCRM Docker-Container gefunden") + print_info("Versuche lokalen Rebuild...") + except Exception as e: + print_warning(f"Docker-Erkennung fehlgeschlagen: {e}") + print_info("Versuche lokalen Rebuild...") + + # Lokaler Rebuild (Fallback) + rebuild_script = self.base_path / "rebuild.php" + if not rebuild_script.exists(): + print_error(f"rebuild.php nicht gefunden in {self.base_path}") + return False + + try: + # Schritt 1: Cache löschen + print_info("Lösche Cache...") + cache_result = subprocess.run( + ['php', 'command.php', 'clear-cache'], + cwd=str(self.base_path), + capture_output=True, + text=True, + timeout=30 + ) + + if cache_result.returncode == 0: + print_success("Cache erfolgreich gelöscht") + else: + print_warning("Cache-Löschung fehlgeschlagen, fahre trotzdem fort...") + + # Schritt 2: Rebuild + print_info("Starte lokalen Rebuild (kann 10-30 Sekunden dauern)...") + result = subprocess.run( + ['php', 'command.php', 'rebuild'], + cwd=str(self.base_path), + capture_output=True, + text=True, + timeout=60 + ) + + if result.returncode == 0: + print_success("Rebuild erfolgreich abgeschlossen") + + # E2E-Tests nach erfolgreichem Rebuild + self.run_e2e_tests() + + return True + else: + print_error("Rebuild fehlgeschlagen:") + if result.stderr: + print(f"\n{result.stderr}") + + # Zeige automatisch die letzten Fehlerlog-Einträge an + self.show_error_logs() + return False + except subprocess.TimeoutExpired: + print_error("Rebuild-Timeout (>60 Sekunden)") + return False + except Exception as e: + print_error(f"Rebuild-Fehler: {e}") + return False + def run_e2e_tests(self) -> bool: + """Führe End-to-End Tests nach erfolgreichem Rebuild aus.""" + + # Überspringe wenn Flag gesetzt + if self.skip_e2e_tests: + print_info("\nE2E-Tests wurden übersprungen (--skip-e2e)") + return True + + print_header("11. END-TO-END TESTS") + + # Prüfe ob E2E-Test Skript existiert + e2e_script = self.base_path / "custom" / "scripts" / "e2e_tests.py" + if not e2e_script.exists(): + print_warning("E2E-Test Skript nicht gefunden, überspringe Tests") + return True + + print_info("Starte automatisierte End-to-End Tests...") + print_info("Dies validiert CRUD-Operationen für Custom Entities\n") + + try: + result = subprocess.run( + ['python3', 'e2e_tests.py'], + cwd=str(e2e_script.parent), + capture_output=True, + text=True, + timeout=120 + ) + + # Ausgabe anzeigen + if result.stdout: + print(result.stdout) + + if result.returncode == 0: + print_success("E2E-Tests erfolgreich abgeschlossen") + return True + else: + print_warning("E2E-Tests haben Fehler gemeldet") + if result.stderr: + print(f"\n{Colors.YELLOW}{result.stderr}{Colors.END}") + print_info("Dies ist keine kritische Fehler - der Rebuild war erfolgreich") + return True # Nicht als Fehler werten + + except subprocess.TimeoutExpired: + print_warning("E2E-Tests Timeout (>120 Sekunden)") + return True # Nicht als Fehler werten + except Exception as e: + print_warning(f"E2E-Tests konnten nicht ausgeführt werden: {e}") + return True # Nicht als Fehler werten + def print_summary(self): + """Drucke Zusammenfassung aller Ergebnisse.""" + print_header("ZUSAMMENFASSUNG") + + if self.errors: + print(f"\n{Colors.RED}{Colors.BOLD}FEHLER: {len(self.errors)}{Colors.END}") + for err in self.errors: + print(f" {Colors.RED}✗{Colors.END} {err}") + + if self.warnings: + print(f"\n{Colors.YELLOW}{Colors.BOLD}WARNUNGEN: {len(self.warnings)}{Colors.END}") + for warn in self.warnings[:10]: + print(f" {Colors.YELLOW}⚠{Colors.END} {warn}") + if len(self.warnings) > 10: + print(f" {Colors.YELLOW}...{Colors.END} und {len(self.warnings) - 10} weitere Warnungen") + + if not self.errors and not self.warnings: + print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ALLE PRÜFUNGEN BESTANDEN{Colors.END}") + + print() + + def validate_all(self) -> bool: + """Führe alle Validierungen durch.""" + all_valid = True + + # 1. JSON-Syntax (kritisch) + if not self.validate_json_syntax(): + all_valid = False + print_error("\nAbbruch: JSON-Syntax-Fehler müssen behoben werden!\n") + return False + + # Lade entityDefs für weitere Checks + self.load_entity_defs() + + # 2. Relationships (kritisch) + if not self.validate_relationships(): + all_valid = False + + # 3. Formula-Platzierung (kritisch) + if not self.validate_formula_placement(): + all_valid = False + + # 4. i18n-Vollständigkeit (nur Warnung) + self.validate_i18n_completeness() + + # 5. Layout-Struktur (nur Warnung) + self.validate_layout_structure() + + # 6. Dateirechte (nicht kritisch für Rebuild) + self.check_file_permissions() + + # 7. CSS-Validierung (kritisch) + if not self.validate_css_files(): + all_valid = False + + # 8. JavaScript-Validierung (kritisch) + if not self.validate_js_files(): + all_valid = False + + # 9. PHP-Validierung (kritisch) + if not self.validate_php_files(): + all_valid = False + + return all_valid + +def main(): + import argparse + + parser = argparse.ArgumentParser( + description='EspoCRM Custom Entity Validator & Rebuild Tool' + ) + parser.add_argument( + '--dry-run', + action='store_true', + help='Nur Validierungen durchführen, kein Rebuild' + ) + parser.add_argument( + '--no-rebuild', + action='store_true', + help='Synonym für --dry-run' + ) + parser.add_argument( + '--skip-e2e', + action='store_true', + help='Überspringe E2E-Tests nach Rebuild' + ) + args = parser.parse_args() + + dry_run = args.dry_run or args.no_rebuild + skip_e2e = args.skip_e2e + + # Finde EspoCRM Root-Verzeichnis + script_dir = Path(__file__).parent.parent.parent + + if not (script_dir / "rebuild.php").exists(): + print_error("Fehler: Nicht im EspoCRM-Root-Verzeichnis!") + print_info(f"Aktueller Pfad: {script_dir}") + sys.exit(1) + + print(f"{Colors.BOLD}EspoCRM Custom Entity Validator & Rebuild Tool{Colors.END}") + print(f"Arbeitsverzeichnis: {script_dir}") + if dry_run: + print(f"{Colors.YELLOW}Modus: DRY-RUN (kein Rebuild){Colors.END}") + if skip_e2e: + print(f"{Colors.YELLOW}E2E-Tests werden übersprungen{Colors.END}") + print() + + validator = EntityValidator(str(script_dir)) + validator.skip_e2e_tests = skip_e2e + + # Validierungen durchführen + all_valid = validator.validate_all() + + # Zusammenfassung drucken + validator.print_summary() + + # Entscheidung über Rebuild + if not all_valid: + print_error("REBUILD ABGEBROCHEN: Kritische Fehler müssen behoben werden!") + sys.exit(1) + + if dry_run: + print_info("Dry-Run Modus: Rebuild übersprungen") + print(f"\n{Colors.GREEN}{Colors.BOLD}✓ VALIDIERUNGEN ABGESCHLOSSEN{Colors.END}\n") + sys.exit(0) + + if validator.warnings: + print_warning( + f"Es gibt {len(validator.warnings)} Warnungen, aber keine kritischen Fehler." + ) + print_info("Rebuild wird trotzdem durchgeführt...\n") + + # Rebuild ausführen + if validator.run_rebuild(): + print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ERFOLGREICH ABGESCHLOSSEN{Colors.END}\n") + sys.exit(0) + else: + print(f"\n{Colors.RED}{Colors.BOLD}✗ REBUILD FEHLGESCHLAGEN{Colors.END}\n") + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/data/config.php b/data/config.php index 06faafa5..d2fe9a78 100644 --- a/data/config.php +++ b/data/config.php @@ -360,7 +360,7 @@ return [ 0 => 'youtube.com', 1 => 'google.com' ], - 'microtime' => 1773141794.293915, + 'microtime' => 1773142436.617376, 'siteUrl' => 'https://crm.bitbylaw.com', 'fullTextSearchMinLength' => 4, 'webSocketUrl' => 'ws://api.bitbylaw.com:5000/espocrm/ws', diff --git a/data/state.php b/data/state.php index 313a2c77..328a60ef 100644 --- a/data/state.php +++ b/data/state.php @@ -1,7 +1,7 @@ 1773141794, - 'microtimeState' => 1773141794.444418, + 'cacheTimestamp' => 1773142436, + 'microtimeState' => 1773142436.753972, 'currencyRates' => [ 'EUR' => 1.0 ],