feat: Enhance logging in sync utilities and add code validation script
This commit is contained in:
370
bitbylaw/scripts/validate_code.py
Executable file
370
bitbylaw/scripts/validate_code.py
Executable file
@@ -0,0 +1,370 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Code Validation Script
|
||||
Automatisierte Validierung nach Änderungen an steps/ und services/
|
||||
|
||||
Features:
|
||||
- Syntax-Check (compile)
|
||||
- Import-Check (importlib)
|
||||
- Type-Hint Validation (mypy optional)
|
||||
- Async/Await Pattern Check
|
||||
- Logger Usage Check
|
||||
- Quick execution (~1-2 seconds)
|
||||
|
||||
Usage:
|
||||
python scripts/validate_code.py # Check all
|
||||
python scripts/validate_code.py services/ # Check services only
|
||||
python scripts/validate_code.py --changed # Check only git changed files
|
||||
python scripts/validate_code.py --mypy # Include mypy checks
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import ast
|
||||
import importlib.util
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple, Optional
|
||||
import subprocess
|
||||
import argparse
|
||||
|
||||
# ANSI Colors
|
||||
GREEN = '\033[92m'
|
||||
RED = '\033[91m'
|
||||
YELLOW = '\033[93m'
|
||||
BLUE = '\033[94m'
|
||||
RESET = '\033[0m'
|
||||
BOLD = '\033[1m'
|
||||
|
||||
class ValidationError:
|
||||
def __init__(self, file: str, error_type: str, message: str, line: Optional[int] = None):
|
||||
self.file = file
|
||||
self.error_type = error_type
|
||||
self.message = message
|
||||
self.line = line
|
||||
|
||||
def __str__(self):
|
||||
loc = f":{self.line}" if self.line else ""
|
||||
return f"{RED}✗{RESET} {self.file}{loc}\n {YELLOW}[{self.error_type}]{RESET} {self.message}"
|
||||
|
||||
|
||||
class CodeValidator:
|
||||
def __init__(self, root_dir: Path):
|
||||
self.root_dir = root_dir
|
||||
self.errors: List[ValidationError] = []
|
||||
self.warnings: List[ValidationError] = []
|
||||
self.checked_files = 0
|
||||
|
||||
def add_error(self, file: str, error_type: str, message: str, line: Optional[int] = None):
|
||||
self.errors.append(ValidationError(file, error_type, message, line))
|
||||
|
||||
def add_warning(self, file: str, error_type: str, message: str, line: Optional[int] = None):
|
||||
self.warnings.append(ValidationError(file, error_type, message, line))
|
||||
|
||||
def check_syntax(self, file_path: Path) -> bool:
|
||||
"""Check Python syntax by compiling"""
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
source = f.read()
|
||||
compile(source, str(file_path), 'exec')
|
||||
return True
|
||||
except SyntaxError as e:
|
||||
self.add_error(
|
||||
str(file_path.relative_to(self.root_dir)),
|
||||
"SYNTAX",
|
||||
f"{e.msg}",
|
||||
e.lineno
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
self.add_error(
|
||||
str(file_path.relative_to(self.root_dir)),
|
||||
"SYNTAX",
|
||||
f"Unexpected error: {e}"
|
||||
)
|
||||
return False
|
||||
|
||||
def check_imports(self, file_path: Path) -> bool:
|
||||
"""Check if imports are valid"""
|
||||
try:
|
||||
# Add project root to path
|
||||
sys.path.insert(0, str(self.root_dir))
|
||||
|
||||
spec = importlib.util.spec_from_file_location("module", file_path)
|
||||
if spec and spec.loader:
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return True
|
||||
except ImportError as e:
|
||||
self.add_error(
|
||||
str(file_path.relative_to(self.root_dir)),
|
||||
"IMPORT",
|
||||
f"{e}"
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
# Ignore runtime errors, we only care about imports
|
||||
if "ImportError" in str(type(e)) or "ModuleNotFoundError" in str(type(e)):
|
||||
self.add_error(
|
||||
str(file_path.relative_to(self.root_dir)),
|
||||
"IMPORT",
|
||||
f"{e}"
|
||||
)
|
||||
return False
|
||||
return True
|
||||
finally:
|
||||
# Remove from path
|
||||
if str(self.root_dir) in sys.path:
|
||||
sys.path.remove(str(self.root_dir))
|
||||
|
||||
def check_patterns(self, file_path: Path) -> bool:
|
||||
"""Check common patterns and anti-patterns"""
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
source = f.read()
|
||||
|
||||
tree = ast.parse(source, str(file_path))
|
||||
|
||||
# Check 1: Async functions should use await, not asyncio.run()
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef):
|
||||
is_async = isinstance(node, ast.AsyncFunctionDef)
|
||||
|
||||
# Check for asyncio.run() in async function
|
||||
if is_async:
|
||||
for child in ast.walk(node):
|
||||
if isinstance(child, ast.Call):
|
||||
if isinstance(child.func, ast.Attribute):
|
||||
if (isinstance(child.func.value, ast.Name) and
|
||||
child.func.value.id == 'asyncio' and
|
||||
child.func.attr == 'run'):
|
||||
self.add_warning(
|
||||
str(file_path.relative_to(self.root_dir)),
|
||||
"ASYNC",
|
||||
f"asyncio.run() in async function '{node.name}' - use await instead",
|
||||
node.lineno
|
||||
)
|
||||
|
||||
# Check for logger.warn (should be logger.warning)
|
||||
for child in ast.walk(node):
|
||||
if isinstance(child, ast.Call):
|
||||
if isinstance(child.func, ast.Attribute):
|
||||
# MOTIA-SPECIFIC: warn() is correct, warning() is NOT supported
|
||||
if child.func.attr == 'warning':
|
||||
self.add_warning(
|
||||
str(file_path.relative_to(self.root_dir)),
|
||||
"LOGGER",
|
||||
f"logger.warning() not supported by Motia - use logger.warn()",
|
||||
child.lineno
|
||||
)
|
||||
|
||||
# Check 2: Services should use self.logger if context available
|
||||
if 'services/' in str(file_path):
|
||||
# Check if class has context parameter but uses logger instead of self.logger
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.ClassDef):
|
||||
has_context = False
|
||||
uses_module_logger = False
|
||||
|
||||
# Check __init__ for context parameter
|
||||
for child in node.body:
|
||||
if isinstance(child, ast.FunctionDef) and child.name == '__init__':
|
||||
for arg in child.args.args:
|
||||
if arg.arg == 'context':
|
||||
has_context = True
|
||||
|
||||
# Check for logger.info/error/etc calls
|
||||
for child in ast.walk(node):
|
||||
if isinstance(child, ast.Call):
|
||||
if isinstance(child.func, ast.Attribute):
|
||||
if (isinstance(child.func.value, ast.Name) and
|
||||
child.func.value.id == 'logger'):
|
||||
uses_module_logger = True
|
||||
|
||||
if has_context and uses_module_logger:
|
||||
self.add_warning(
|
||||
str(file_path.relative_to(self.root_dir)),
|
||||
"LOGGER",
|
||||
f"Class '{node.name}' has context but uses 'logger' - use 'self.logger' for Workbench visibility",
|
||||
node.lineno
|
||||
)
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
# Don't fail validation for pattern checks
|
||||
return True
|
||||
|
||||
def check_file(self, file_path: Path) -> bool:
|
||||
"""Run all checks on a file"""
|
||||
self.checked_files += 1
|
||||
|
||||
# 1. Syntax check (must pass)
|
||||
if not self.check_syntax(file_path):
|
||||
return False
|
||||
|
||||
# 2. Import check (must pass)
|
||||
if not self.check_imports(file_path):
|
||||
return False
|
||||
|
||||
# 3. Pattern checks (warnings only)
|
||||
self.check_patterns(file_path)
|
||||
|
||||
return True
|
||||
|
||||
def find_python_files(self, paths: List[str]) -> List[Path]:
|
||||
"""Find all Python files in given paths"""
|
||||
files = []
|
||||
for path_str in paths:
|
||||
path = self.root_dir / path_str
|
||||
if path.is_file() and path.suffix == '.py':
|
||||
files.append(path)
|
||||
elif path.is_dir():
|
||||
files.extend(path.rglob('*.py'))
|
||||
return files
|
||||
|
||||
def get_changed_files(self) -> List[Path]:
|
||||
"""Get git changed files"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['git', 'diff', '--name-only', 'HEAD'],
|
||||
cwd=self.root_dir,
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
|
||||
# Also get staged files
|
||||
result2 = subprocess.run(
|
||||
['git', 'diff', '--cached', '--name-only'],
|
||||
cwd=self.root_dir,
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
|
||||
all_files = result.stdout.strip().split('\n') + result2.stdout.strip().split('\n')
|
||||
|
||||
python_files = []
|
||||
for f in all_files:
|
||||
if f and f.endswith('.py'):
|
||||
file_path = self.root_dir / f
|
||||
if file_path.exists():
|
||||
# Only include services/ and steps/
|
||||
if 'services/' in f or 'steps/' in f:
|
||||
python_files.append(file_path)
|
||||
|
||||
return python_files
|
||||
except Exception as e:
|
||||
print(f"{YELLOW}⚠ Could not get git changed files: {e}{RESET}")
|
||||
return []
|
||||
|
||||
def validate(self, paths: List[str], only_changed: bool = False) -> bool:
|
||||
"""Run validation on all files"""
|
||||
print(f"{BOLD}🔍 Code Validation{RESET}\n")
|
||||
|
||||
if only_changed:
|
||||
files = self.get_changed_files()
|
||||
if not files:
|
||||
print(f"{GREEN}✓{RESET} No changed Python files in services/ or steps/")
|
||||
return True
|
||||
print(f"Checking {len(files)} changed files...\n")
|
||||
else:
|
||||
files = self.find_python_files(paths)
|
||||
print(f"Checking {len(files)} files in {', '.join(paths)}...\n")
|
||||
|
||||
# Check each file
|
||||
for file_path in sorted(files):
|
||||
rel_path = str(file_path.relative_to(self.root_dir))
|
||||
print(f" {BLUE}→{RESET} {rel_path}...", end='')
|
||||
|
||||
if self.check_file(file_path):
|
||||
print(f" {GREEN}✓{RESET}")
|
||||
else:
|
||||
print(f" {RED}✗{RESET}")
|
||||
|
||||
# Print results
|
||||
print(f"\n{BOLD}Results:{RESET}")
|
||||
print(f" Files checked: {self.checked_files}")
|
||||
print(f" Errors: {len(self.errors)}")
|
||||
print(f" Warnings: {len(self.warnings)}")
|
||||
|
||||
# Print errors
|
||||
if self.errors:
|
||||
print(f"\n{BOLD}{RED}Errors:{RESET}")
|
||||
for error in self.errors:
|
||||
print(f" {error}")
|
||||
|
||||
# Print warnings
|
||||
if self.warnings:
|
||||
print(f"\n{BOLD}{YELLOW}Warnings:{RESET}")
|
||||
for warning in self.warnings:
|
||||
print(f" {warning}")
|
||||
|
||||
# Summary
|
||||
print()
|
||||
if self.errors:
|
||||
print(f"{RED}✗ Validation failed with {len(self.errors)} error(s){RESET}")
|
||||
return False
|
||||
elif self.warnings:
|
||||
print(f"{YELLOW}⚠ Validation passed with {len(self.warnings)} warning(s){RESET}")
|
||||
return True
|
||||
else:
|
||||
print(f"{GREEN}✓ All checks passed!{RESET}")
|
||||
return True
|
||||
|
||||
|
||||
def run_mypy(root_dir: Path, paths: List[str]) -> bool:
|
||||
"""Run mypy type checker"""
|
||||
print(f"\n{BOLD}🔍 Running mypy type checker...{RESET}\n")
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['mypy'] + paths + ['--ignore-missing-imports', '--no-error-summary'],
|
||||
cwd=root_dir,
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
|
||||
if result.stdout:
|
||||
print(result.stdout)
|
||||
|
||||
if result.returncode == 0:
|
||||
print(f"{GREEN}✓ mypy: No type errors{RESET}")
|
||||
return True
|
||||
else:
|
||||
print(f"{RED}✗ mypy found type errors{RESET}")
|
||||
return False
|
||||
except FileNotFoundError:
|
||||
print(f"{YELLOW}⚠ mypy not installed - skipping type checks{RESET}")
|
||||
print(f" Install with: pip install mypy")
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Validate Python code in services/ and steps/')
|
||||
parser.add_argument('paths', nargs='*', default=['services/', 'steps/'],
|
||||
help='Paths to check (default: services/ steps/)')
|
||||
parser.add_argument('--changed', '-c', action='store_true',
|
||||
help='Only check git changed files')
|
||||
parser.add_argument('--mypy', '-m', action='store_true',
|
||||
help='Run mypy type checker')
|
||||
parser.add_argument('--verbose', '-v', action='store_true',
|
||||
help='Verbose output')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
root_dir = Path(__file__).parent.parent
|
||||
validator = CodeValidator(root_dir)
|
||||
|
||||
# Run validation
|
||||
success = validator.validate(args.paths, only_changed=args.changed)
|
||||
|
||||
# Run mypy if requested
|
||||
if args.mypy and success:
|
||||
mypy_success = run_mypy(root_dir, args.paths)
|
||||
success = success and mypy_success
|
||||
|
||||
# Exit with appropriate code
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user