923 lines
36 KiB
Python
Executable File
923 lines
36 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
EspoCRM Custom Entity Validator & Rebuild Tool
|
||
Erweiterte Version mit umfassenden Tests und Log-Prüfung
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import sys
|
||
import subprocess
|
||
import re
|
||
import time
|
||
import requests
|
||
from pathlib import Path
|
||
from typing import Dict, List, Tuple, Set, Optional
|
||
from collections import defaultdict
|
||
from datetime import datetime
|
||
from requests.auth import HTTPBasicAuth
|
||
|
||
# ANSI Color Codes
|
||
class Colors:
|
||
GREEN = '\033[92m'
|
||
YELLOW = '\033[93m'
|
||
RED = '\033[91m'
|
||
BLUE = '\033[94m'
|
||
CYAN = '\033[96m'
|
||
BOLD = '\033[1m'
|
||
DIM = '\033[2m'
|
||
END = '\033[0m'
|
||
|
||
class OutputManager:
|
||
"""Verwaltet minimierte vs. verbose Ausgabe"""
|
||
def __init__(self, verbose: bool = False):
|
||
self.verbose = verbose
|
||
self.test_results = []
|
||
self.current_test = None
|
||
self.test_start_time = None
|
||
|
||
def header(self, text: str):
|
||
"""Ausgabe bei Header"""
|
||
if self.verbose:
|
||
print(f"\n{Colors.BOLD}{Colors.BLUE}{'='*80}{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.BLUE}{text.center(80)}{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.BLUE}{'='*80}{Colors.END}\n")
|
||
else:
|
||
print(f"\n{Colors.BOLD}{text}{Colors.END}")
|
||
|
||
def success(self, text: str, details: str = None):
|
||
"""Erfolgs-Meldung"""
|
||
if self.verbose:
|
||
print(f"{Colors.GREEN}✓{Colors.END} {text}")
|
||
if details:
|
||
print(f" {Colors.DIM}{details}{Colors.END}")
|
||
else:
|
||
print(f"{Colors.GREEN}✓{Colors.END} {text}")
|
||
|
||
def warning(self, text: str, details: str = None):
|
||
"""Warnungs-Meldung"""
|
||
print(f"{Colors.YELLOW}⚠{Colors.END} {text}")
|
||
if self.verbose and details:
|
||
print(f" {Colors.DIM}{details}{Colors.END}")
|
||
|
||
def error(self, text: str, details: str = None):
|
||
"""Fehler-Meldung"""
|
||
print(f"{Colors.RED}✗{Colors.END} {text}")
|
||
if details:
|
||
print(f" {Colors.RED}{details}{Colors.END}")
|
||
|
||
def info(self, text: str):
|
||
"""Info-Meldung (nur in verbose)"""
|
||
if self.verbose:
|
||
print(f"{Colors.BLUE}ℹ{Colors.END} {text}")
|
||
|
||
def test_start(self, test_name: str):
|
||
"""Test wurde gestartet"""
|
||
self.current_test = test_name
|
||
self.test_start_time = time.time()
|
||
if self.verbose:
|
||
print(f"{Colors.CYAN}▶{Colors.END} {test_name}...", end='', flush=True)
|
||
|
||
def test_end(self, success: bool, message: str = None):
|
||
"""Test wurde beendet"""
|
||
duration = time.time() - self.test_start_time if self.test_start_time else 0
|
||
|
||
if self.verbose:
|
||
if success:
|
||
print(f" {Colors.GREEN}✓{Colors.END} {duration:.3f}s")
|
||
else:
|
||
print(f" {Colors.RED}✗{Colors.END} {duration:.3f}s")
|
||
if message:
|
||
print(f" {message}")
|
||
|
||
self.test_results.append({
|
||
'name': self.current_test,
|
||
'success': success,
|
||
'duration': duration,
|
||
'message': message
|
||
})
|
||
|
||
class LogChecker:
|
||
"""Prüft EspoCRM Logs auf Fehler"""
|
||
def __init__(self, log_path: Path, output: OutputManager):
|
||
self.log_path = log_path
|
||
self.output = output
|
||
self.last_check_time = None
|
||
|
||
def get_log_file(self) -> Optional[Path]:
|
||
"""Findet das aktuelle Log-File"""
|
||
if not self.log_path.exists():
|
||
return None
|
||
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
log_file = self.log_path / f"espo-{today}.log"
|
||
|
||
if log_file.exists():
|
||
return log_file
|
||
|
||
# Fallback: neuestes Log
|
||
log_files = sorted(self.log_path.glob("espo-*.log"),
|
||
key=lambda f: f.stat().st_mtime, reverse=True)
|
||
return log_files[0] if log_files else None
|
||
|
||
def check_for_errors(self, since_timestamp: float = None) -> Tuple[bool, List[str]]:
|
||
"""
|
||
Prüft Logs auf Fehler seit einem bestimmten Zeitpunkt
|
||
Returns: (has_errors, error_messages)
|
||
"""
|
||
log_file = self.get_log_file()
|
||
if not log_file:
|
||
return False, ["Log-Datei nicht gefunden"]
|
||
|
||
try:
|
||
errors = []
|
||
cutoff_time = since_timestamp or time.time() - 5 # 5 Sekunden Puffer
|
||
|
||
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
||
# Lese nur die letzten N Zeilen (effizienter)
|
||
f.seek(0, 2) # Gehe ans Ende
|
||
file_size = f.tell()
|
||
f.seek(max(0, file_size - 100000)) # Letzte ~100KB
|
||
lines = f.readlines()
|
||
|
||
for line in lines:
|
||
# Extrahiere Timestamp aus Log-Zeile
|
||
match = re.match(r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]', line)
|
||
if match:
|
||
try:
|
||
log_time = datetime.strptime(match.group(1), '%Y-%m-%d %H:%M:%S').timestamp()
|
||
if log_time < cutoff_time:
|
||
continue
|
||
except:
|
||
pass
|
||
|
||
# Prüfe auf Fehler-Keywords
|
||
line_upper = line.upper()
|
||
if any(keyword in line_upper for keyword in ['ERROR', 'CRITICAL', 'FATAL', 'EXCEPTION']):
|
||
# Filtere bekannte unwichtige Fehler
|
||
if 'SQL failed: INSERT INTO `app_log_record`' in line:
|
||
continue
|
||
if 'BPM:' in line and 'ERROR' not in line_upper:
|
||
continue
|
||
|
||
errors.append(line.strip())
|
||
|
||
return len(errors) > 0, errors
|
||
|
||
except Exception as e:
|
||
return True, [f"Fehler beim Lesen der Logs: {e}"]
|
||
|
||
class EntityTester:
|
||
"""Führt CRUD-Tests für Entities durch"""
|
||
def __init__(self, base_url: str, username: str, password: str,
|
||
output: OutputManager, log_checker: LogChecker):
|
||
self.base_url = base_url.rstrip('/')
|
||
self.auth = HTTPBasicAuth(username, password)
|
||
self.output = output
|
||
self.log_checker = log_checker
|
||
self.created_records = [] # Für Cleanup
|
||
|
||
def test_entity(self, entity_name: str, test_data: dict = None) -> bool:
|
||
"""
|
||
Führt vollständigen CRUD-Test für eine Entity durch
|
||
Returns: True wenn alle Tests erfolgreich
|
||
"""
|
||
all_success = True
|
||
record_id = None
|
||
|
||
# Verwende Default-Testdaten wenn keine angegeben
|
||
if test_data is None:
|
||
test_data = {'name': f'Test {entity_name} {int(time.time())}'}
|
||
|
||
# 1. CREATE Test
|
||
self.output.test_start(f"{entity_name}: Create")
|
||
test_start = time.time()
|
||
|
||
try:
|
||
response = requests.post(
|
||
f"{self.base_url}/api/v1/{entity_name}",
|
||
json=test_data,
|
||
auth=self.auth,
|
||
timeout=10
|
||
)
|
||
|
||
# Prüfe Log nach Request
|
||
has_errors, error_msgs = self.log_checker.check_for_errors(test_start)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
record_id = data.get('id')
|
||
if record_id:
|
||
self.created_records.append((entity_name, record_id))
|
||
if not has_errors:
|
||
self.output.test_end(True, f"ID: {record_id}")
|
||
else:
|
||
self.output.test_end(False, f"Logs enthalten Fehler!")
|
||
self.output.error("Log-Fehler bei CREATE:", "\n".join(error_msgs[:3]))
|
||
all_success = False
|
||
else:
|
||
self.output.test_end(False, "Keine ID in Response")
|
||
all_success = False
|
||
else:
|
||
self.output.test_end(False, f"HTTP {response.status_code}")
|
||
self.output.error(f"Response: {response.text[:200]}")
|
||
if has_errors:
|
||
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
|
||
all_success = False
|
||
return all_success
|
||
|
||
except Exception as e:
|
||
self.output.test_end(False, str(e))
|
||
all_success = False
|
||
return all_success
|
||
|
||
time.sleep(0.1) # Kleine Pause zwischen Tests
|
||
|
||
# 2. READ Test
|
||
if record_id:
|
||
self.output.test_start(f"{entity_name}: Read")
|
||
test_start = time.time()
|
||
|
||
try:
|
||
response = requests.get(
|
||
f"{self.base_url}/api/v1/{entity_name}/{record_id}",
|
||
auth=self.auth,
|
||
timeout=10
|
||
)
|
||
|
||
has_errors, error_msgs = self.log_checker.check_for_errors(test_start)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get('id') == record_id and not has_errors:
|
||
self.output.test_end(True, f"Name: {data.get('name', 'N/A')}")
|
||
else:
|
||
if has_errors:
|
||
self.output.test_end(False, "Log-Fehler")
|
||
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
|
||
else:
|
||
self.output.test_end(False, "ID stimmt nicht überein")
|
||
all_success = False
|
||
else:
|
||
self.output.test_end(False, f"HTTP {response.status_code}")
|
||
if has_errors:
|
||
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
|
||
all_success = False
|
||
|
||
except Exception as e:
|
||
self.output.test_end(False, str(e))
|
||
all_success = False
|
||
|
||
time.sleep(0.1)
|
||
|
||
# 3. UPDATE Test
|
||
if record_id:
|
||
self.output.test_start(f"{entity_name}: Update")
|
||
test_start = time.time()
|
||
|
||
try:
|
||
update_data = {'description': f'Updated at {datetime.now().isoformat()}'}
|
||
response = requests.put(
|
||
f"{self.base_url}/api/v1/{entity_name}/{record_id}",
|
||
json=update_data,
|
||
auth=self.auth,
|
||
timeout=10
|
||
)
|
||
|
||
has_errors, error_msgs = self.log_checker.check_for_errors(test_start)
|
||
|
||
if response.status_code == 200 and not has_errors:
|
||
self.output.test_end(True)
|
||
else:
|
||
if response.status_code != 200:
|
||
self.output.test_end(False, f"HTTP {response.status_code}")
|
||
else:
|
||
self.output.test_end(False, "Log-Fehler")
|
||
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
|
||
all_success = False
|
||
|
||
except Exception as e:
|
||
self.output.test_end(False, str(e))
|
||
all_success = False
|
||
|
||
time.sleep(0.1)
|
||
|
||
# 4. LIST Test
|
||
self.output.test_start(f"{entity_name}: List")
|
||
test_start = time.time()
|
||
|
||
try:
|
||
response = requests.get(
|
||
f"{self.base_url}/api/v1/{entity_name}",
|
||
auth=self.auth,
|
||
timeout=10
|
||
)
|
||
|
||
has_errors, error_msgs = self.log_checker.check_for_errors(test_start)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
total = data.get('total', 0)
|
||
if not has_errors:
|
||
self.output.test_end(True, f"{total} Einträge")
|
||
else:
|
||
self.output.test_end(False, "Log-Fehler")
|
||
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
|
||
all_success = False
|
||
else:
|
||
self.output.test_end(False, f"HTTP {response.status_code}")
|
||
if has_errors:
|
||
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
|
||
all_success = False
|
||
|
||
except Exception as e:
|
||
self.output.test_end(False, str(e))
|
||
all_success = False
|
||
|
||
time.sleep(0.1)
|
||
|
||
# 5. DELETE Test
|
||
if record_id:
|
||
self.output.test_start(f"{entity_name}: Delete")
|
||
test_start = time.time()
|
||
|
||
try:
|
||
response = requests.delete(
|
||
f"{self.base_url}/api/v1/{entity_name}/{record_id}",
|
||
auth=self.auth,
|
||
timeout=10
|
||
)
|
||
|
||
has_errors, error_msgs = self.log_checker.check_for_errors(test_start)
|
||
|
||
if response.status_code == 200 and not has_errors:
|
||
self.output.test_end(True)
|
||
# Entferne aus cleanup-Liste
|
||
self.created_records = [(e, i) for e, i in self.created_records
|
||
if not (e == entity_name and i == record_id)]
|
||
else:
|
||
if response.status_code != 200:
|
||
self.output.test_end(False, f"HTTP {response.status_code}")
|
||
else:
|
||
self.output.test_end(False, "Log-Fehler")
|
||
self.output.error("Log-Fehler:", "\n".join(error_msgs[:3]))
|
||
all_success = False
|
||
|
||
except Exception as e:
|
||
self.output.test_end(False, str(e))
|
||
all_success = False
|
||
|
||
return all_success
|
||
|
||
def cleanup(self):
|
||
"""Lösche alle erstellten Test-Records"""
|
||
if not self.created_records:
|
||
return
|
||
|
||
self.output.info(f"Cleanup: Lösche {len(self.created_records)} Test-Records...")
|
||
|
||
for entity_name, record_id in self.created_records:
|
||
try:
|
||
requests.delete(
|
||
f"{self.base_url}/api/v1/{entity_name}/{record_id}",
|
||
auth=self.auth,
|
||
timeout=5
|
||
)
|
||
except:
|
||
pass # Ignoriere Fehler beim Cleanup
|
||
|
||
class EntityValidator:
|
||
def __init__(self, base_path: str, verbose: bool = False):
|
||
self.base_path = Path(base_path)
|
||
self.custom_path = self.base_path / "custom" / "Espo" / "Custom" / "Resources"
|
||
self.metadata_path = self.custom_path / "metadata"
|
||
self.i18n_path = self.custom_path / "i18n"
|
||
self.client_custom_path = self.base_path / "client" / "custom"
|
||
self.output = OutputManager(verbose)
|
||
self.errors = []
|
||
self.warnings = []
|
||
self.entity_defs = {}
|
||
self.skip_e2e_tests = False
|
||
self.log_checker = LogChecker(self.base_path / "data" / "logs", self.output)
|
||
|
||
def validate_json_syntax(self) -> bool:
|
||
"""Validiere JSON-Syntax aller Dateien im custom-Verzeichnis."""
|
||
self.output.header("1. JSON-SYNTAX VALIDIERUNG")
|
||
|
||
json_files = list(self.custom_path.rglob("*.json"))
|
||
if not json_files:
|
||
self.output.warning("Keine JSON-Dateien gefunden")
|
||
return True
|
||
|
||
self.output.info(f"Prüfe {len(json_files)} JSON-Dateien...")
|
||
|
||
invalid_files = []
|
||
for json_file in json_files:
|
||
try:
|
||
with open(json_file, 'r', encoding='utf-8') as f:
|
||
json.load(f)
|
||
except json.JSONDecodeError as e:
|
||
error_msg = f"{json_file.relative_to(self.base_path)}: {e}"
|
||
self.errors.append(error_msg)
|
||
invalid_files.append(error_msg)
|
||
|
||
if invalid_files:
|
||
self.output.error(f"{len(invalid_files)} Datei(en) mit JSON-Fehlern",
|
||
"\n".join(invalid_files[:5]))
|
||
return False
|
||
else:
|
||
self.output.success(f"Alle {len(json_files)} JSON-Dateien valide")
|
||
return True
|
||
|
||
def load_entity_defs(self):
|
||
"""Lade alle entityDefs für weitere Analysen."""
|
||
entity_defs_path = self.metadata_path / "entityDefs"
|
||
if not entity_defs_path.exists():
|
||
return
|
||
|
||
for json_file in entity_defs_path.glob("*.json"):
|
||
entity_name = json_file.stem
|
||
try:
|
||
with open(json_file, 'r', encoding='utf-8') as f:
|
||
self.entity_defs[entity_name] = json.load(f)
|
||
except Exception:
|
||
pass
|
||
|
||
def validate_relationships(self) -> bool:
|
||
"""Validiere Relationship-Definitionen zwischen Entities."""
|
||
self.output.header("2. RELATIONSHIP-KONSISTENZ")
|
||
|
||
if not self.entity_defs:
|
||
self.output.warning("Keine entityDefs geladen")
|
||
return True
|
||
|
||
relationship_errors = []
|
||
checked_pairs = set()
|
||
skip_foreign_links = {'parent', 'parents'}
|
||
|
||
for entity_name, entity_def in self.entity_defs.items():
|
||
links = entity_def.get('links', {})
|
||
|
||
for link_name, link_def in links.items():
|
||
link_type = link_def.get('type')
|
||
target_entity = link_def.get('entity')
|
||
foreign = link_def.get('foreign')
|
||
relation_name = link_def.get('relationName')
|
||
|
||
if foreign in skip_foreign_links:
|
||
continue
|
||
|
||
if link_type in ['hasMany', 'hasOne'] and target_entity and foreign:
|
||
pair_key = tuple(sorted([f"{entity_name}.{link_name}",
|
||
f"{target_entity}.{foreign}"]))
|
||
if pair_key in checked_pairs:
|
||
continue
|
||
checked_pairs.add(pair_key)
|
||
|
||
if target_entity not in self.entity_defs:
|
||
relationship_errors.append(
|
||
f"{entity_name}.{link_name}: Ziel-Entity '{target_entity}' existiert nicht"
|
||
)
|
||
continue
|
||
|
||
target_links = self.entity_defs[target_entity].get('links', {})
|
||
|
||
if foreign not in target_links:
|
||
relationship_errors.append(
|
||
f"{entity_name}.{link_name} → {target_entity}: "
|
||
f"Foreign link '{foreign}' fehlt"
|
||
)
|
||
continue
|
||
|
||
foreign_def = target_links[foreign]
|
||
foreign_foreign = foreign_def.get('foreign')
|
||
foreign_relation_name = foreign_def.get('relationName')
|
||
|
||
if foreign_foreign != link_name:
|
||
relationship_errors.append(
|
||
f"{entity_name}.{link_name} ↔ {target_entity}.{foreign}: "
|
||
f"Foreign zeigt auf '{foreign_foreign}' statt '{link_name}'"
|
||
)
|
||
|
||
if relation_name and foreign_relation_name and relation_name != foreign_relation_name:
|
||
relationship_errors.append(
|
||
f"{entity_name}.{link_name} ↔ {target_entity}.{foreign}: "
|
||
f"relationName unterschiedlich"
|
||
)
|
||
|
||
if relationship_errors:
|
||
self.output.error(f"{len(relationship_errors)} Relationship-Fehler",
|
||
"\n".join(relationship_errors[:5]))
|
||
self.errors.extend(relationship_errors)
|
||
return False
|
||
else:
|
||
self.output.success(f"{len(checked_pairs)} Relationships geprüft")
|
||
return True
|
||
|
||
def validate_required_files(self) -> bool:
|
||
"""Prüfe ob für jede Entity alle erforderlichen Dateien existieren."""
|
||
self.output.header("3. ERFORDERLICHE DATEIEN")
|
||
|
||
if not self.entity_defs:
|
||
self.output.warning("Keine entityDefs geladen")
|
||
return True
|
||
|
||
missing_files = []
|
||
custom_entities = [name for name in self.entity_defs.keys()
|
||
if name.startswith('C')]
|
||
|
||
self.output.info(f"Prüfe {len(custom_entities)} Custom-Entities...")
|
||
|
||
for entity_name in custom_entities:
|
||
# 1. Scope-Datei
|
||
scope_file = self.metadata_path / "scopes" / f"{entity_name}.json"
|
||
if not scope_file.exists():
|
||
missing_files.append(f"{entity_name}: scopes/{entity_name}.json fehlt")
|
||
|
||
# 2. i18n-Dateien
|
||
for lang in ['de_DE', 'en_US']:
|
||
i18n_file = self.i18n_path / lang / f"{entity_name}.json"
|
||
if not i18n_file.exists():
|
||
missing_files.append(f"{entity_name}: i18n/{lang}/{entity_name}.json fehlt")
|
||
|
||
if missing_files:
|
||
self.output.error(f"{len(missing_files)} fehlende Dateien",
|
||
"\n".join(missing_files[:10]))
|
||
self.warnings.extend(missing_files)
|
||
return True # Nur Warnung, kein Hard-Fail
|
||
else:
|
||
self.output.success(f"Alle erforderlichen Dateien vorhanden")
|
||
return True
|
||
|
||
def check_file_permissions(self) -> bool:
|
||
"""Prüfe Dateirechte im custom-Verzeichnis."""
|
||
self.output.header("4. DATEIRECHTE-PRÜFUNG")
|
||
|
||
paths_to_check = [
|
||
self.custom_path,
|
||
self.client_custom_path,
|
||
self.base_path / "data",
|
||
]
|
||
|
||
all_wrong_files = []
|
||
|
||
try:
|
||
for path in paths_to_check:
|
||
if not path.exists():
|
||
continue
|
||
|
||
result = subprocess.run(
|
||
['find', str(path), '!', '-user', 'www-data', '-o', '!', '-group', 'www-data'],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=10
|
||
)
|
||
|
||
wrong_files = [line for line in result.stdout.strip().split('\n') if line]
|
||
all_wrong_files.extend(wrong_files)
|
||
|
||
if all_wrong_files:
|
||
self.output.warning(f"{len(all_wrong_files)} Dateien mit falschen Rechten")
|
||
self.output.info("Versuche automatische Korrektur...")
|
||
|
||
success_count = 0
|
||
for path in paths_to_check:
|
||
if not path.exists():
|
||
continue
|
||
|
||
try:
|
||
subprocess.run(['sudo', 'chown', '-R', 'www-data:www-data', str(path)],
|
||
check=True, capture_output=True, timeout=30)
|
||
subprocess.run(['sudo', 'find', str(path), '-type', 'f', '-exec',
|
||
'chmod', '664', '{}', ';'],
|
||
check=True, capture_output=True, timeout=30)
|
||
subprocess.run(['sudo', 'find', str(path), '-type', 'd', '-exec',
|
||
'chmod', '775', '{}', ';'],
|
||
check=True, capture_output=True, timeout=30)
|
||
success_count += 1
|
||
except:
|
||
pass
|
||
|
||
if success_count > 0:
|
||
self.output.success(f"Dateirechte für {success_count} Verzeichnisse korrigiert")
|
||
else:
|
||
self.output.success("Alle Dateirechte korrekt (www-data:www-data)")
|
||
|
||
return True
|
||
except Exception as e:
|
||
self.output.warning(f"Konnte Dateirechte nicht prüfen: {e}")
|
||
return True
|
||
|
||
def validate_php_files(self) -> bool:
|
||
"""Validiere PHP-Dateien mit php -l."""
|
||
self.output.header("5. PHP-SYNTAX VALIDIERUNG")
|
||
|
||
php_files = []
|
||
custom_espo_path = self.base_path / "custom" / "Espo"
|
||
if custom_espo_path.exists():
|
||
php_files = list(custom_espo_path.rglob("*.php"))
|
||
|
||
if not php_files:
|
||
self.output.info("Keine PHP-Dateien gefunden")
|
||
return True
|
||
|
||
self.output.info(f"Prüfe {len(php_files)} PHP-Dateien...")
|
||
|
||
try:
|
||
subprocess.run(['php', '--version'], capture_output=True, timeout=5)
|
||
except:
|
||
self.output.warning("PHP-CLI nicht verfügbar, überspringe PHP-Validierung")
|
||
return True
|
||
|
||
invalid_files = []
|
||
for php_file in php_files:
|
||
try:
|
||
result = subprocess.run(['php', '-l', str(php_file)],
|
||
capture_output=True, text=True, timeout=5)
|
||
|
||
if result.returncode != 0:
|
||
error_msg = f"{php_file.relative_to(self.base_path)}"
|
||
invalid_files.append(error_msg)
|
||
self.errors.append(error_msg)
|
||
except:
|
||
pass
|
||
|
||
if invalid_files:
|
||
self.output.error(f"{len(invalid_files)} PHP-Dateien mit Syntax-Fehlern",
|
||
"\n".join(invalid_files[:5]))
|
||
return False
|
||
else:
|
||
self.output.success(f"Alle {len(php_files)} PHP-Dateien valide")
|
||
return True
|
||
|
||
def run_rebuild(self) -> bool:
|
||
"""Führe den EspoCRM Rebuild aus."""
|
||
self.output.header("6. ESPOCRM REBUILD")
|
||
|
||
is_docker_volume = '/docker/volumes/' in str(self.base_path)
|
||
|
||
if is_docker_volume:
|
||
try:
|
||
result = subprocess.run(['docker', 'ps', '--format', '{{.Names}}'],
|
||
capture_output=True, text=True, timeout=5)
|
||
|
||
containers = result.stdout.strip().split('\n')
|
||
espo_container = None
|
||
|
||
for container in containers:
|
||
if container.lower() == 'espocrm':
|
||
espo_container = container
|
||
break
|
||
|
||
if not espo_container:
|
||
for container in containers:
|
||
if 'espo' in container.lower() and 'db' not in container.lower():
|
||
espo_container = container
|
||
break
|
||
|
||
if espo_container:
|
||
self.output.info(f"Container: {espo_container}")
|
||
|
||
# Cache löschen
|
||
self.output.info("Lösche Cache...")
|
||
subprocess.run(['docker', 'exec', espo_container, 'php',
|
||
'command.php', 'clear-cache'],
|
||
capture_output=True, timeout=30)
|
||
|
||
# Rebuild
|
||
self.output.info("Starte Rebuild...")
|
||
rebuild_start = time.time()
|
||
|
||
result = subprocess.run(
|
||
['docker', 'exec', espo_container, 'php', 'command.php', 'rebuild'],
|
||
capture_output=True, text=True, timeout=60
|
||
)
|
||
|
||
# Prüfe Logs nach Rebuild
|
||
has_errors, error_msgs = self.log_checker.check_for_errors(rebuild_start)
|
||
|
||
if result.returncode == 0 and not has_errors:
|
||
self.output.success("Rebuild erfolgreich abgeschlossen")
|
||
return True
|
||
else:
|
||
if result.returncode != 0:
|
||
self.output.error("Rebuild fehlgeschlagen", result.stderr)
|
||
else:
|
||
self.output.error("Rebuild abgeschlossen, aber Fehler in Logs")
|
||
for err in error_msgs[:5]:
|
||
self.output.error("", err)
|
||
return False
|
||
else:
|
||
self.output.error("Kein EspoCRM Docker-Container gefunden")
|
||
return False
|
||
except Exception as e:
|
||
self.output.error(f"Docker-Rebuild fehlgeschlagen: {e}")
|
||
return False
|
||
|
||
# Lokaler Rebuild (Fallback)
|
||
self.output.error("Lokaler Rebuild nicht implementiert, verwende Docker")
|
||
return False
|
||
|
||
def run_entity_tests(self, base_url: str, username: str, password: str) -> bool:
|
||
"""Führe umfassende Entity-Tests durch."""
|
||
self.output.header("7. ENTITY CRUD-TESTS")
|
||
|
||
# Finde alle Custom-Entities
|
||
custom_entities = sorted([name for name in self.entity_defs.keys()
|
||
if name.startswith('C') and
|
||
not name.endswith('CDokumente')]) # Überspringe Junction-Tables
|
||
|
||
if not custom_entities:
|
||
self.output.warning("Keine Custom-Entities zum Testen gefunden")
|
||
return True
|
||
|
||
self.output.info(f"Teste {len(custom_entities)} Custom-Entities...")
|
||
|
||
tester = EntityTester(base_url, username, password, self.output, self.log_checker)
|
||
|
||
all_success = True
|
||
success_count = 0
|
||
fail_count = 0
|
||
|
||
for entity_name in custom_entities:
|
||
if tester.test_entity(entity_name):
|
||
success_count += 1
|
||
else:
|
||
fail_count += 1
|
||
all_success = False
|
||
|
||
# Cleanup
|
||
tester.cleanup()
|
||
|
||
# Zusammenfassung
|
||
if not self.output.verbose:
|
||
print(f"\n{Colors.CYAN}Ergebnis:{Colors.END} {success_count} erfolgreich, {fail_count} fehlgeschlagen")
|
||
|
||
if all_success:
|
||
self.output.success(f"Alle {success_count} Entities getestet")
|
||
else:
|
||
self.output.error(f"{fail_count} Entity-Tests fehlgeschlagen")
|
||
|
||
return all_success
|
||
|
||
def generate_ki_feedback(self) -> Dict:
|
||
"""Generiere strukturiertes Feedback für KI."""
|
||
feedback = {
|
||
"status": "success" if not self.errors else "failed",
|
||
"summary": {
|
||
"errors": len(self.errors),
|
||
"warnings": len(self.warnings),
|
||
"entities_checked": len(self.entity_defs),
|
||
},
|
||
"errors": self.errors[:10], # Top 10 Fehler
|
||
"warnings": self.warnings[:10], # Top 10 Warnungen
|
||
"recommendations": []
|
||
}
|
||
|
||
# Generiere Empfehlungen basierend auf Fehlern
|
||
if any('Service' in e or 'Class' in e for e in self.errors):
|
||
feedback["recommendations"].append(
|
||
"Fehlende Service-Klassen - erstelle für jede Entity eine Service-Klasse"
|
||
)
|
||
|
||
if any('i18n' in w for w in self.warnings):
|
||
feedback["recommendations"].append(
|
||
"Unvollständige Übersetzungen - füge fehlende i18n-Dateien hinzu"
|
||
)
|
||
|
||
if any('relationship' in e.lower() for e in self.errors):
|
||
feedback["recommendations"].append(
|
||
"Relationship-Fehler - prüfe bidirektionale Link-Definitionen"
|
||
)
|
||
|
||
if any('InjectableFactory' in e for e in self.errors):
|
||
feedback["recommendations"].append(
|
||
"KRITISCH: Service-Klassen fehlen! Erstelle Services in custom/Espo/Custom/Services/"
|
||
)
|
||
|
||
return feedback
|
||
|
||
def print_summary(self):
|
||
"""Drucke Zusammenfassung."""
|
||
self.output.header("ZUSAMMENFASSUNG")
|
||
|
||
if self.errors:
|
||
print(f"\n{Colors.RED}{Colors.BOLD}FEHLER: {len(self.errors)}{Colors.END}")
|
||
for err in self.errors[:10]:
|
||
print(f" {Colors.RED}✗{Colors.END} {err}")
|
||
if len(self.errors) > 10:
|
||
print(f" {Colors.RED}...{Colors.END} und {len(self.errors) - 10} weitere")
|
||
|
||
if self.warnings:
|
||
print(f"\n{Colors.YELLOW}{Colors.BOLD}WARNUNGEN: {len(self.warnings)}{Colors.END}")
|
||
for warn in self.warnings[:5]:
|
||
print(f" {Colors.YELLOW}⚠{Colors.END} {warn}")
|
||
if len(self.warnings) > 5:
|
||
print(f" {Colors.YELLOW}...{Colors.END} und {len(self.warnings) - 5} weitere")
|
||
|
||
if not self.errors and not self.warnings:
|
||
print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ALLE PRÜFUNGEN BESTANDEN{Colors.END}\n")
|
||
|
||
# KI-Feedback ausgeben
|
||
if self.errors or self.warnings:
|
||
feedback = self.generate_ki_feedback()
|
||
print(f"\n{Colors.CYAN}{Colors.BOLD}KI-FEEDBACK:{Colors.END}")
|
||
print(json.dumps(feedback, indent=2, ensure_ascii=False))
|
||
|
||
def validate_all(self) -> bool:
|
||
"""Führe alle Validierungen durch."""
|
||
all_valid = True
|
||
|
||
# 1. JSON-Syntax (kritisch)
|
||
if not self.validate_json_syntax():
|
||
all_valid = False
|
||
self.output.error("Abbruch: JSON-Syntax-Fehler!")
|
||
return False
|
||
|
||
# Lade entityDefs
|
||
self.load_entity_defs()
|
||
|
||
# 2. Relationships (kritisch)
|
||
if not self.validate_relationships():
|
||
all_valid = False
|
||
|
||
# 3. Erforderliche Dateien (Warnung)
|
||
self.validate_required_files()
|
||
|
||
# 4. Dateirechte (nicht kritisch)
|
||
self.check_file_permissions()
|
||
|
||
# 5. PHP-Syntax (kritisch)
|
||
if not self.validate_php_files():
|
||
all_valid = False
|
||
|
||
return all_valid
|
||
|
||
def main():
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description='EspoCRM Custom Entity Validator & Rebuild Tool - Erweiterte Version'
|
||
)
|
||
parser.add_argument('-v', '--verbose', action='store_true',
|
||
help='Ausführliche Ausgabe')
|
||
parser.add_argument('--dry-run', action='store_true',
|
||
help='Nur Validierungen, kein Rebuild')
|
||
parser.add_argument('--skip-tests', action='store_true',
|
||
help='Überspringe Entity-Tests')
|
||
parser.add_argument('--base-url', default='http://localhost',
|
||
help='EspoCRM Base-URL (default: http://localhost)')
|
||
parser.add_argument('--username', default='admin',
|
||
help='API-Username (default: admin)')
|
||
parser.add_argument('--password', default='admin',
|
||
help='API-Password (default: admin)')
|
||
args = parser.parse_args()
|
||
|
||
# Finde EspoCRM Root
|
||
script_dir = Path(__file__).parent.parent.parent
|
||
|
||
if not (script_dir / "rebuild.php").exists():
|
||
print(f"{Colors.RED}✗{Colors.END} Fehler: Nicht im EspoCRM-Root-Verzeichnis!")
|
||
sys.exit(1)
|
||
|
||
print(f"{Colors.BOLD}EspoCRM Validator & Rebuild Tool v2.0{Colors.END}")
|
||
if args.verbose:
|
||
print(f"{Colors.DIM}Verbose-Modus aktiviert{Colors.END}")
|
||
print()
|
||
|
||
validator = EntityValidator(str(script_dir), verbose=args.verbose)
|
||
|
||
# Validierungen
|
||
all_valid = validator.validate_all()
|
||
|
||
if not all_valid:
|
||
validator.print_summary()
|
||
print(f"\n{Colors.RED}{Colors.BOLD}✗ REBUILD ABGEBROCHEN{Colors.END}\n")
|
||
sys.exit(1)
|
||
|
||
if args.dry_run:
|
||
validator.print_summary()
|
||
print(f"\n{Colors.GREEN}{Colors.BOLD}✓ VALIDIERUNGEN ABGESCHLOSSEN{Colors.END}\n")
|
||
sys.exit(0)
|
||
|
||
# Rebuild
|
||
if not validator.run_rebuild():
|
||
validator.print_summary()
|
||
print(f"\n{Colors.RED}{Colors.BOLD}✗ REBUILD FEHLGESCHLAGEN{Colors.END}\n")
|
||
sys.exit(1)
|
||
|
||
# Entity-Tests
|
||
if not args.skip_tests:
|
||
time.sleep(2) # Kurze Pause nach Rebuild
|
||
if not validator.run_entity_tests(args.base_url, args.username, args.password):
|
||
validator.print_summary()
|
||
print(f"\n{Colors.YELLOW}{Colors.BOLD}⚠ TESTS FEHLGESCHLAGEN{Colors.END}\n")
|
||
sys.exit(1)
|
||
|
||
validator.print_summary()
|
||
print(f"\n{Colors.GREEN}{Colors.BOLD}✓ ERFOLGREICH ABGESCHLOSSEN{Colors.END}\n")
|
||
sys.exit(0)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|