#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ DDD Modular Monolith Scaffold Generator (Production-friendly) Features: - Create multiple contexts at once - Highly customizable via JSON config file or CLI options - Toggle layers per context (api/domain/application/infrastructure/interfaces/boot) - Custom directory templates (per layer) - Optional placeholder files (package-info.java, README.md) - Dry-run mode - Overwrite policy: skip | overwrite | fail - Emit manifest.json with file hashes for auditability - Safe rollback with file hash verification (only deletes unmodified files) Typical usage: python tools/scaffold_ddd.py --init-config scaffold.json python tools/scaffold_ddd.py --config scaffold.json --contexts journey,recordbook,capsule python tools/scaffold_ddd.py --root modules --package-base top.ysit.travel --contexts journey --dry-run python tools/scaffold_ddd.py --rollback modules/{context}/manifest.json python tools/scaffold_ddd.py --rollback modules/{context}/manifest.json --dry-run """ from __future__ import annotations import argparse import hashlib import json import logging import os import shutil from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any, Tuple # --------------------------- # Defaults (you can override by config) # --------------------------- DEFAULT_LAYERS: List[str] = ["api", "domain", "application", "infrastructure", "interfaces"] DEFAULT_JAVA_SOURCE_DIR: str = "src/main/java" DEFAULT_RESOURCES_DIR: str = "src/main/resources" DEFAULT_LAYER_DIRS: Dict[str, Any] = { # layer -> tree structure with dirs and files # Format: directories are nested dicts, files are in "_files" key "api": { "contract": {}, "event": {}, "client": {}, }, "domain": { "model": { "aggregate": {}, "entity": {}, "vo": {}, "enum": {}, }, "repository": {}, "event": {}, "service": {}, "exception": {}, }, "application": { "command": {}, "usecase": {}, "query": { "dto": {}, }, "assembler": {}, "port": { "in": {}, "out": {}, }, "security": {}, "handler": {}, }, "infrastructure": { "persistence": { "mapper": {}, "po": {}, "convert": {}, "repository": {}, }, "event": {}, "config": {}, }, "interfaces": { "web": { "request": {}, "response": {}, }, "facade": {}, "doc": {}, }, "boot": { "config": {}, }, } DEFAULT_PLACEHOLDERS = { "create_keep_files": True, # create .keep in leaf dirs "create_package_info": False, # create package-info.java in base package root "create_layer_readme": True, # create README.md in each module root "create_context_readme": True, # create README.md in context root } DEFAULT_NAMING = { "module_name_pattern": "{context}-{layer}", # e.g. journey-domain "context_dir_pattern": "{context}", # e.g. modules/journey } DEFAULT_MANIFEST_NAME = "manifest.json" # --------------------------- # Data Models # --------------------------- @dataclass class PlaceholderOptions: create_keep_files: bool = True create_package_info: bool = False create_layer_readme: bool = True create_context_readme: bool = True @staticmethod def from_dict(d: Dict[str, Any]) -> "PlaceholderOptions": return PlaceholderOptions( create_keep_files=bool(d.get("create_keep_files", True)), create_package_info=bool(d.get("create_package_info", False)), create_layer_readme=bool(d.get("create_layer_readme", True)), create_context_readme=bool(d.get("create_context_readme", True)), ) @dataclass class NamingOptions: module_name_pattern: str = "{context}-{layer}" context_dir_pattern: str = "{context}" @staticmethod def from_dict(d: Dict[str, Any]) -> "NamingOptions": return NamingOptions( module_name_pattern=str(d.get("module_name_pattern", "{context}-{layer}")), context_dir_pattern=str(d.get("context_dir_pattern", "{context}")), ) @dataclass class ScaffoldConfig: root: str = "modules" # where contexts live package_base: str = "top.ysit.travel" # base java package contexts: List[str] = field(default_factory=list) layers: List[str] = field(default_factory=lambda: DEFAULT_LAYERS.copy()) java_source_dir: str = DEFAULT_JAVA_SOURCE_DIR resources_dir: str = DEFAULT_RESOURCES_DIR layer_dirs: Dict[str, List[str]] = field(default_factory=lambda: dict(DEFAULT_LAYER_DIRS)) placeholders: PlaceholderOptions = field(default_factory=PlaceholderOptions) naming: NamingOptions = field(default_factory=NamingOptions) overwrite: str = "skip" # skip | overwrite | fail dry_run: bool = False emit_manifest: bool = True manifest_name: str = DEFAULT_MANIFEST_NAME use_standard_structure: bool = True # 是否使用标准 Maven 目录结构 (src/main/java) template_vars: Dict[str, str] = field(default_factory=dict) # 模板变量定义 @staticmethod def from_dict(d: Dict[str, Any]) -> "ScaffoldConfig": cfg = ScaffoldConfig() cfg.root = str(d.get("root", cfg.root)) cfg.package_base = str(d.get("package_base", cfg.package_base)) cfg.contexts = list(d.get("contexts", cfg.contexts)) cfg.layers = list(d.get("layers", cfg.layers)) cfg.java_source_dir = str(d.get("java_source_dir", cfg.java_source_dir)) cfg.resources_dir = str(d.get("resources_dir", cfg.resources_dir)) cfg.layer_dirs = dict(d.get("layer_dirs", cfg.layer_dirs)) cfg.placeholders = PlaceholderOptions.from_dict(d.get("placeholders", {})) cfg.naming = NamingOptions.from_dict(d.get("naming", {})) cfg.overwrite = str(d.get("overwrite", cfg.overwrite)).lower() cfg.dry_run = bool(d.get("dry_run", cfg.dry_run)) cfg.emit_manifest = bool(d.get("emit_manifest", cfg.emit_manifest)) cfg.manifest_name = str(d.get("manifest_name", cfg.manifest_name)) cfg.use_standard_structure = bool(d.get("use_standard_structure", cfg.use_standard_structure)) cfg.template_vars = dict(d.get("template_vars", cfg.template_vars)) return cfg # --------------------------- # Utilities # --------------------------- def now_iso() -> str: return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") def normalize_list_csv(value: Optional[str]) -> List[str]: if not value: return [] parts = [p.strip() for p in value.split(",")] return [p for p in parts if p] def safe_context_name(name: str) -> str: # keep it simple: lowercase letters, digits, hyphen underscore cleaned = [] for ch in name.strip(): if ch.isalnum() or ch in ("-", "_"): cleaned.append(ch.lower()) else: cleaned.append("-") out = "".join(cleaned).strip("-") if not out: raise ValueError(f"Invalid context name: {name!r}") return out def pkg_to_path(pkg: str) -> Path: return Path(*pkg.split(".")) def ensure_dir(path: Path, dry_run: bool) -> None: if dry_run: return path.mkdir(parents=True, exist_ok=True) def calculate_file_hash(content: str) -> str: """计算文件内容的 SHA256 哈希值""" return hashlib.sha256(content.encode("utf-8")).hexdigest() def write_text_file(path: Path, content: str, overwrite: str, dry_run: bool) -> Tuple[Optional[str], bool]: """ 写入文本文件并返回哈希值 返回: (hash_value, was_written) """ if path.exists(): if overwrite == "skip": return None, False if overwrite == "fail": raise FileExistsError(f"File exists: {path}") if overwrite != "overwrite": raise ValueError(f"Unknown overwrite policy: {overwrite}") file_hash = calculate_file_hash(content) if dry_run: return file_hash, False path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") return file_hash, True def layer_readme(context: str, layer: str, module_name: str) -> str: return f"""# {module_name} **Context:** `{context}` **Layer:** `{layer}` This module is generated by the DDD scaffold tool. ## Responsibilities - Keep this layer focused. - Respect dependency direction. - Avoid leaking implementation details across contexts. """ def context_readme(context: str, layers: List[str]) -> str: l = ", ".join(layers) return f"""# Context: {context} Layers: {l} This directory groups all modules for the `{context}` bounded context. Recommended dependency direction (typical): `interfaces -> application -> domain` `infrastructure -> (domain/application)` `api` should be stable contracts only. """ def package_info_java(package_name: str) -> str: return f"""@javax.annotation.ParametersAreNonnullByDefault package {package_name}; """ def render_template(template: str, variables: Dict[str, str]) -> str: """渲染模板字符串,替换其中的变量""" result = template for key, value in variables.items(): result = result.replace(f"{{{key}}}", value) return result def build_template_vars( cfg: ScaffoldConfig, context: str, layer: str, module_name: str ) -> Dict[str, str]: """ 构建模板变量字典 优先级: 1. 内置基础变量(context, layer, module, package_base) 2. 配置文件中定义的模板变量(可以引用基础变量) 3. 如果配置文件没有定义,使用默认值 """ # 内置基础变量 base_vars = { "context": context, "layer": layer, "module": module_name, "package_base": cfg.package_base, } # 如果配置文件中定义了模板变量,使用配置的定义 if cfg.template_vars: # 先复制基础变量 template_vars = dict(base_vars) # 多轮解析,支持变量间的引用 # 例如:package = "{package_base}.{context}",然后 layer_package = "{package}.{layer}" max_iterations = 10 for _ in range(max_iterations): changed = False for key, value_template in cfg.template_vars.items(): if isinstance(value_template, str): # 渲染模板,使用当前已知的变量 new_value = render_template(value_template, template_vars) # 检查是否有变化 if template_vars.get(key) != new_value: template_vars[key] = new_value changed = True else: # 非字符串值直接设置 template_vars[key] = str(value_template) # 如果没有变化,说明所有变量都已解析完成 if not changed: break return template_vars else: # 使用默认的模板变量定义(向后兼容) context_pkg = cfg.package_base + f".{context}" layer_pkg = context_pkg + f".{layer}" return { **base_vars, "context_package": context_pkg, "layer_package": layer_pkg, "package": context_pkg, # 默认使用 context_package } def process_layer_tree( tree: Dict[str, Any], base_path: Path, root_path: Path, manifest: Manifest, cfg: ScaffoldConfig, context: str, layer: str, template_vars: Dict[str, str], current_path: Path = None ) -> None: """ 递归处理层的树形结构,创建目录和文件 参数: tree: 树形结构配置 base_path: 基础路径(模块根目录) root_path: 根路径(用于清单) manifest: 清单对象 cfg: 配置对象 context: 上下文名 layer: 层名 template_vars: 模板变量 current_path: 当前路径(递归用) """ if current_path is None: current_path = base_path for key, value in tree.items(): if key == "_files": # 处理文件定义 if isinstance(value, dict): for filename, file_config in value.items(): file_path = current_path / filename # 获取文件内容 if isinstance(file_config, dict): content = file_config.get("content", "") # 支持从文件读取内容 if "content_from" in file_config: content_path = Path(file_config["content_from"]) if content_path.exists(): content = content_path.read_text(encoding="utf-8") elif isinstance(file_config, str): # 直接字符串内容 content = file_config else: content = "" # 渲染模板 content = render_template(content, template_vars) # 写入文件 file_hash, _ = write_text_file(file_path, content, cfg.overwrite, cfg.dry_run) if file_hash: manifest.add_file( file_path, root_path, file_hash, {"type": "template_file", "layer": layer, "filename": filename} ) elif key.startswith("_"): # 跳过其他元数据键 continue else: # 处理目录 dir_path = current_path / key ensure_dir(dir_path, cfg.dry_run) manifest.add_dir(dir_path, root_path, {"type": "layer_dir", "layer": layer}) # 检查是否是叶子目录(没有子目录,只有元数据或为空) is_leaf = True if isinstance(value, dict): for sub_key in value.keys(): if not sub_key.startswith("_"): # 有非元数据的子项,不是叶子目录 is_leaf = False break # 递归处理子目录 if isinstance(value, dict) and not is_leaf: process_layer_tree( value, base_path, root_path, manifest, cfg, context, layer, template_vars, dir_path ) # 叶子目录:检查是否需要创建 .keep 文件 if is_leaf and cfg.placeholders.create_keep_files: keep_file = dir_path / ".keep" file_hash, was_written = write_text_file(keep_file, "", cfg.overwrite, cfg.dry_run) if file_hash: manifest.add_file( keep_file, root_path, file_hash, {"type": "keep_file", "layer": layer} ) # --------------------------- # Core Scaffold Logic # --------------------------- @dataclass class Manifest: generated_at: str config: Dict[str, Any] tree: Dict[str, Any] = field(default_factory=dict) root_path: Optional[Path] = None def _ensure_path(self, path: Path, root: Path) -> Dict[str, Any]: """确保路径在树中存在,返回对应的节点""" # 计算相对路径 try: rel_path = path.relative_to(root) except ValueError: # 如果路径不在根目录下,使用绝对路径 rel_path = path # 从根节点开始构建路径 current = self.tree for part in rel_path.parts: if part not in current: current[part] = {} current = current[part] return current def add_dir(self, path: Path, root: Path, meta: Optional[Dict[str, Any]] = None) -> None: """添加目录到树形结构""" node = self._ensure_path(path, root) node["_type"] = "dir" if meta: node["_meta"] = meta def add_file(self, path: Path, root: Path, file_hash: str, meta: Optional[Dict[str, Any]] = None) -> None: """添加文件到树形结构""" node = self._ensure_path(path, root) node["_type"] = "file" node["_hash"] = file_hash if meta: node["_meta"] = meta def to_json(self) -> str: return json.dumps( {"generated_at": self.generated_at, "config": self.config, "tree": self.tree}, ensure_ascii=False, indent=2, ) def validate_config(cfg: ScaffoldConfig) -> None: if cfg.overwrite not in ("skip", "overwrite", "fail"): raise ValueError("overwrite must be one of: skip | overwrite | fail") if not cfg.package_base or "." not in cfg.package_base: # technically not required, but strongly recommended raise ValueError("package_base should look like 'top.ysit.travel'") for layer in cfg.layers: if layer not in cfg.layer_dirs: raise ValueError(f"Layer '{layer}' not defined in layer_dirs") def resolve_module_name(naming: NamingOptions, context: str, layer: str) -> str: return naming.module_name_pattern.format(context=context, layer=layer) def resolve_context_dir(naming: NamingOptions, context: str) -> str: return naming.context_dir_pattern.format(context=context) def generate_for_context(cfg: ScaffoldConfig, context: str, manifest: Manifest) -> None: context = safe_context_name(context) root = Path(cfg.root) context_dir = root / resolve_context_dir(cfg.naming, context) # 清单文件应该在上下文目录中 manifest_path = str(context_dir / cfg.manifest_name) # 设置清单的根路径(用于构建树形结构) manifest.root_path = root # context root ensure_dir(context_dir, cfg.dry_run) manifest.add_dir(context_dir, root) if cfg.placeholders.create_context_readme: readme_path = context_dir / "README.md" readme_content = context_readme(context, cfg.layers) file_hash, _ = write_text_file(readme_path, readme_content, cfg.overwrite, cfg.dry_run) manifest.add_file(readme_path, root, file_hash, {"type": "context_readme"}) for layer in cfg.layers: module_name = resolve_module_name(cfg.naming, context, layer) module_root = context_dir / module_name # module root ensure_dir(module_root, cfg.dry_run) manifest.add_dir(module_root, root, {"layer": layer, "module": module_name}) # 根据配置决定是否使用标准 Maven 目录结构 if cfg.use_standard_structure: # standard source roots java_root = module_root / cfg.java_source_dir res_root = module_root / cfg.resources_dir ensure_dir(java_root, cfg.dry_run) manifest.add_dir(java_root, root, {"type": "java_root"}) ensure_dir(res_root, cfg.dry_run) manifest.add_dir(res_root, root, {"type": "resources_root"}) else: # 不使用标准结构,直接在模块根目录下创建包结构 java_root = module_root res_root = module_root / "resources" if cfg.resources_dir else None if res_root: ensure_dir(res_root, cfg.dry_run) manifest.add_dir(res_root, root, {"type": "resources_root"}) # base package dir for module # 如果使用标准结构,包路径包含完整包名;否则可以简化 if cfg.use_standard_structure: base_pkg = cfg.package_base + f".{context}" if layer == "interfaces": base_pkg += ".interfaces" elif layer == "infrastructure": base_pkg += ".infrastructure" elif layer == "application": base_pkg += ".application" elif layer == "domain": base_pkg += ".domain" elif layer == "api": base_pkg += ".api" elif layer == "boot": base_pkg += ".boot" else: base_pkg += f".{layer}" base_pkg_path = java_root / pkg_to_path(base_pkg) ensure_dir(base_pkg_path, cfg.dry_run) manifest.add_dir(base_pkg_path, root, {"type": "base_package", "package": base_pkg}) else: # 简化结构:不创建额外的包目录,直接在模块根目录下创建子目录 base_pkg = "" # 简化结构下不使用包路径 base_pkg_path = java_root # 直接在模块根目录 # 准备模板变量(从配置文件构建) template_vars = build_template_vars(cfg, context, layer, module_name) # layer-specific directories and files (tree structure) layer_tree = cfg.layer_dirs.get(layer, {}) # 兼容旧的列表格式 if isinstance(layer_tree, list): # 旧格式:字符串列表 for rel in layer_tree: if cfg.use_standard_structure: dir_path = java_root / pkg_to_path(cfg.package_base) / pkg_to_path(context) / Path(rel) else: rel_path = Path(rel) if rel_path.parts and rel_path.parts[0] == layer: rel_path = Path(*rel_path.parts[1:]) dir_path = java_root / rel_path ensure_dir(dir_path, cfg.dry_run) manifest.add_dir(dir_path, root, {"type": "layer_dir", "layer": layer}) elif isinstance(layer_tree, dict): # 新格式:树形结构 if cfg.use_standard_structure: tree_base = java_root / pkg_to_path(cfg.package_base) / pkg_to_path(context) else: tree_base = java_root process_layer_tree( layer_tree, tree_base, root, manifest, cfg, context, layer, template_vars ) # package-info.java (optional) if cfg.placeholders.create_package_info: if cfg.use_standard_structure: # 标准结构:在包路径下创建 pi = base_pkg_path / "package-info.java" pi_content = package_info_java(base_pkg) file_hash, _ = write_text_file(pi, pi_content, cfg.overwrite, cfg.dry_run) manifest.add_file(pi, root, file_hash, {"type": "package-info", "package": base_pkg}) else: # 非标准结构:在模块根目录创建 # 构建包名(用于模板变量) pkg = template_vars.get("layer_package", template_vars.get("package", "")) pi = java_root / "package-info.java" pi_content = package_info_java(pkg) file_hash, _ = write_text_file(pi, pi_content, cfg.overwrite, cfg.dry_run) manifest.add_file(pi, root, file_hash, {"type": "package-info", "package": pkg}) # layer README (optional) if cfg.placeholders.create_layer_readme: lr = module_root / "README.md" lr_content = layer_readme(context, layer, module_name) file_hash, _ = write_text_file(lr, lr_content, cfg.overwrite, cfg.dry_run) manifest.add_file(lr, root, file_hash, {"type": "layer_readme", "layer": layer}) def build_effective_config(args: argparse.Namespace) -> ScaffoldConfig: cfg: ScaffoldConfig if args.config: path = Path(args.config) data = json.loads(path.read_text(encoding="utf-8")) cfg = ScaffoldConfig.from_dict(data) else: cfg = ScaffoldConfig() # CLI overrides if args.root: cfg.root = args.root if args.package_base: cfg.package_base = args.package_base if args.contexts: cfg.contexts = normalize_list_csv(args.contexts) or cfg.contexts if args.layers: cfg.layers = normalize_list_csv(args.layers) or cfg.layers if args.overwrite: cfg.overwrite = args.overwrite.lower() if args.dry_run: cfg.dry_run = True if args.no_manifest: cfg.emit_manifest = False return cfg def init_config_file(path: Path) -> None: example = { "root": "modules", "package_base": "top.ysit.travel", "contexts": ["journey", "recordbook", "capsule", "collaboration", "growth", "media"], "layers": ["api", "domain", "application", "infrastructure", "interfaces"], "java_source_dir": "src/main/java", "resources_dir": "src/main/resources", "overwrite": "skip", "dry_run": False, "emit_manifest": True, "manifest_name": "manifest.json", "use_standard_structure": True, "naming": { "module_name_pattern": "{context}-{layer}", "context_dir_pattern": "{context}" }, "placeholders": { "create_keep_files": True, "create_package_info": False, "create_layer_readme": True, "create_context_readme": True }, "template_vars": { "_comment": "模板变量定义。基础变量:{context}, {layer}, {module}, {package_base}。支持变量间引用。", "package": "{package_base}.{context}", "context_package": "{package_base}.{context}", "layer_package": "{package}.{layer}" }, "layer_dirs": { "api": { "contract": {}, "event": {}, "client": {}, "_files": { "package-info.java": "/**\n * API contracts for {context} context.\n * @author DDD Scaffold Tool\n */\npackage {layer_package};\n" } }, "domain": { "model": { "aggregate": { "_files": { ".gitkeep": "" } }, "entity": {}, "vo": {}, "enum": {} }, "repository": {}, "event": {}, "service": {}, "exception": {} }, "application": { "command": {}, "usecase": {}, "query": {"dto": {}}, "assembler": {}, "port": {"in": {}, "out": {}}, "security": {}, "handler": {} }, "infrastructure": { "persistence": { "mapper": {}, "po": {}, "convert": {}, "repository": {} }, "event": {}, "config": {} }, "interfaces": { "web": {"request": {}, "response": {}}, "facade": {}, "doc": {} } }, "_comment": { "use_standard_structure": "是否使用标准 Maven 目录结构。false 时:上下文->层名->包结构;true 时:上下文->模块名->src/main/java->包结构", "module_name_pattern": "模块名称模式。可用变量:{context}(上下文名)、{layer}(层名)。例如:'{layer}' 生成 'api'、'domain' 等;'{context}-{layer}' 生成 'auth-api'、'auth-domain' 等", "layer_dirs": "层目录结构配置。支持树形嵌套结构,目录用空对象 {} 表示,文件用 _files 键定义。", "template_variables": "文件内容支持模板变量:{context}(上下文名)、{layer}(层名)、{module}(模块名)、{package}(包名)、{package_base}(基础包名)" } } path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(example, ensure_ascii=False, indent=2), encoding="utf-8") def configure_logging(verbose: bool) -> None: level = logging.DEBUG if verbose else logging.INFO logging.basicConfig(level=level, format="%(levelname)s %(message)s") # --------------------------- # Rollback Logic # --------------------------- def load_manifest(manifest_path: Path) -> Dict[str, Any]: """加载 manifest 文件""" if not manifest_path.exists(): raise FileNotFoundError(f"Manifest not found: {manifest_path}") data = json.loads(manifest_path.read_text(encoding="utf-8")) return data def verify_file_hash(file_path: Path, expected_hash: str) -> bool: """验证文件哈希值是否匹配""" if not file_path.exists(): return False if not file_path.is_file(): return False try: content = file_path.read_text(encoding="utf-8") actual_hash = calculate_file_hash(content) return actual_hash == expected_hash except (OSError, UnicodeDecodeError): return False def walk_tree(tree: Dict[str, Any], root_path: Path, current_path: Path = None) -> List[Tuple[Path, Dict[str, Any]]]: """递归遍历树形结构,返回所有路径和节点信息""" if current_path is None: current_path = root_path results = [] for key, value in tree.items(): if key.startswith("_"): # 跳过元数据键 continue child_path = current_path / key if isinstance(value, dict): node_type = value.get("_type") if node_type == "file": # 这是一个文件 results.append((child_path, value)) elif node_type == "dir": # 这是一个显式标记的目录 results.append((child_path, value)) else: # 这是一个没有 _type 的中间目录节点(如 model、persistence 等) # 也需要加入到结果中,以便正确识别所有清单中的路径 results.append((child_path, {"_type": "dir"})) # 递归遍历子节点 results.extend(walk_tree(value, root_path, child_path)) return results def verify_all_files(manifest_data: Dict[str, Any], root_path: Path) -> List[Path]: """ 验证所有文件的哈希值(从树形结构中读取) 返回: 哈希验证失败的文件路径列表 """ tree = manifest_data.get("tree", {}) if not tree: logging.warning("No tree structure found in manifest.") return [] files_to_verify = [] for file_path, node_info in walk_tree(tree, root_path): if node_info.get("_type") == "file": file_hash = node_info.get("_hash") if file_hash: files_to_verify.append((file_path, file_hash)) if not files_to_verify: logging.warning("No files with hash found in manifest.") return [] failed_files = [] for file_path, expected_hash in files_to_verify: if not verify_file_hash(file_path, expected_hash): failed_files.append(file_path) if failed_files: logging.warning( "⚠ Hash verification failed for %d file(s). These files will be skipped.", len(failed_files) ) for f in failed_files[:10]: # 显示前10个 logging.warning(" - %s", f) if len(failed_files) > 10: logging.warning(" ... and %d more", len(failed_files) - 10) verified_count = len(files_to_verify) - len(failed_files) logging.info("✓ Verified %d file(s) by hash", verified_count) return failed_files def collect_module_roots(manifest_data: Dict[str, Any], root_path: Path) -> List[Path]: """收集所有模块根目录(从树形结构中读取)""" tree = manifest_data.get("tree", {}) module_roots: set[Path] = set() for path, node_info in walk_tree(tree, root_path): meta = node_info.get("_meta", {}) if meta.get("module"): # 这是一个模块根目录 module_roots.add(path) return sorted(module_roots) def collect_paths_to_delete(manifest_data: Dict[str, Any], root_path: Path) -> Tuple[List[Dict[str, Any]], List[Path], set[Path]]: """ 收集需要删除的路径,返回 (文件信息列表(包含路径和哈希), 目录列表, 所有路径集合) 从树形结构中读取 """ tree = manifest_data.get("tree", {}) files: List[Dict[str, Any]] = [] dirs: List[Path] = [] all_paths: set[Path] = set() for path, node_info in walk_tree(tree, root_path): all_paths.add(path) node_type = node_info.get("_type") if node_type == "file": # 文件信息包含路径和哈希值 file_hash = node_info.get("_hash") file_info = {"path": path, "hash": file_hash} files.append(file_info) elif node_type == "dir": dirs.append(path) # 去重并排序:文件按深度从深到浅,目录按深度从深到浅 files = sorted(files, key=lambda f: len(f["path"].parts), reverse=True) dirs = sorted(set(dirs), key=lambda p: len(p.parts), reverse=True) return files, dirs, all_paths def safe_delete_file(file_info: Dict[str, Any], dry_run: bool) -> bool: """ 安全删除文件(验证哈希值) 返回: 是否成功删除 """ path = file_info["path"] expected_hash = file_info.get("hash") if not path.exists(): logging.debug("File already deleted: %s", path) return True if not path.is_file(): logging.warning("Path is not a file, skipping: %s", path) return False # 如果有哈希值,验证文件是否被修改 if expected_hash: if not verify_file_hash(path, expected_hash): logging.error( "File hash mismatch, skipping deletion (file may have been modified): %s", path ) return False if dry_run: logging.info("[DRY-RUN] Would delete file: %s", path) return True try: path.unlink() logging.debug("Deleted file: %s", path) return True except OSError as e: logging.warning("Failed to delete file: %s (%s)", path, e) return False def safe_delete_dir(path: Path, dry_run: bool, manifest_paths: set[Path], protected_paths: set[Path], exclude_files: set[str] = None) -> bool: """ 安全删除目录(只删除清单中记录的内容) 如果目录不为空(包含非清单内容或受保护的内容),则保留 参数: path: 要删除的目录路径 dry_run: 是否为演练模式 manifest_paths: 清单中记录的所有路径 protected_paths: 受保护的路径集合(跳过的文件及其父目录) exclude_files: 要排除检查的文件名集合(如 manifest.json) 返回: 是否成功删除 """ if exclude_files is None: exclude_files = set() if not path.exists(): logging.debug("Directory already deleted: %s", path) return True if not path.is_dir(): logging.warning("Path is not a directory, skipping: %s", path) return False # 检查目录本身或其子路径是否受保护 if path in protected_paths: logging.debug("Directory is protected (contains skipped files): %s", path) return False # 检查目录中是否有非清单中的内容 try: items = list(path.iterdir()) # 过滤掉隐藏文件、系统文件和排除的文件 visible_items = {item for item in items if not any(part.startswith('.') for part in item.parts) and item.name not in exclude_files} # 检查是否有非清单中的可见内容 unknown_items = {item for item in visible_items if item not in manifest_paths} if unknown_items: logging.debug( "Directory contains items not in manifest, will keep directory: %s (unknown: %s)", path, [str(item.name) for item in list(unknown_items)[:3]] ) return False except OSError as e: logging.warning("Cannot check directory contents: %s (%s)", path, e) return False if dry_run: logging.info("[DRY-RUN] Would delete directory: %s", path) return True try: # 尝试删除目录(如果为空) path.rmdir() logging.debug("Deleted directory: %s", path) return True except OSError: # 目录不为空,保留目录 logging.debug("Directory not empty, will keep: %s", path) return False def rollback(manifest_path: Path, dry_run: bool = False) -> None: """执行回滚操作(基于文件哈希值验证)""" logging.info("Rolling back using manifest: %s", manifest_path) # 1. 加载 manifest manifest_data = load_manifest(manifest_path) logging.info("Loaded manifest generated at: %s", manifest_data.get("generated_at")) # 确定根路径(从树形结构的第一个键推断) tree = manifest_data.get("tree", {}) if not tree: raise ValueError("No tree structure found in manifest.") # 树的第一层键是上下文名(如 "auth"),不是 root 目录名 first_key = next(iter(tree.keys())) # 从 manifest 路径向上查找根目录 # manifest 通常在 {root}/{context}/manifest.json # 所以向上两级应该就是 root 目录 current = manifest_path.parent.parent if current.name == first_key: # current 本身就是上下文目录,root 是它的父目录 root_path = current.parent elif (current / first_key).exists(): # current 包含了上下文目录,所以 current 就是 root root_path = current else: # 如果找不到,使用配置的 root config = manifest_data.get("config", {}) root_str = config.get("root", first_key) root_path = Path(root_str) # 2. 验证所有文件的哈希值(不中断,只是收集失败的文件) logging.info("Verifying files by hash...") failed_verification = verify_all_files(manifest_data, root_path) # 3. 收集要删除的路径 files, dirs, all_paths = collect_paths_to_delete(manifest_data, root_path) logging.info("Found %d files and %d directories to delete", len(files), len(dirs)) # 4. 删除文件(从深到浅,验证哈希值) # 收集所有被跳过的文件路径 logging.info("Deleting files...") deleted_files = 0 skipped_files = [] for file_info in files: if safe_delete_file(file_info, dry_run): deleted_files += 1 else: skipped_files.append(file_info["path"]) if skipped_files: logging.warning("Skipped %d file(s) (hash mismatch or deletion failed)", len(skipped_files)) logging.info("Deleted %d file(s)", deleted_files) # 5. 构建受保护路径集合(跳过的文件及其所有父目录) protected_paths: set[Path] = set() for file_path in skipped_files: # 添加文件本身 protected_paths.add(file_path) # 添加所有父目录 for parent in file_path.parents: if parent == root_path or parent.parent == root_path.parent: # 不要保护 root 之上的路径 break protected_paths.add(parent) if protected_paths: logging.info("Protected %d path(s) due to skipped files", len(protected_paths)) # 6. 删除目录(从深到浅,只删除清单中记录的目录) # 多轮删除,确保所有空目录都被删除 # 排除 manifest.json 文件,避免误报 exclude_files = {manifest_path.name} logging.info("Deleting directories...") max_rounds = 10 for round_num in range(max_rounds): deleted_count = 0 for d in dirs: if not d.exists(): continue if safe_delete_dir(d, dry_run, all_paths, protected_paths, exclude_files): deleted_count += 1 if deleted_count == 0: break logging.debug("Round %d: deleted %d directories", round_num + 1, deleted_count) # 7. 先删除 manifest 文件本身(为清理上下文目录做准备) manifest_deleted = False if manifest_path.exists(): if dry_run: logging.info("[DRY-RUN] Would delete manifest: %s", manifest_path) manifest_deleted = True # 假设删除成功 else: try: manifest_path.unlink() logging.info("Deleted manifest: %s", manifest_path) manifest_deleted = True except OSError as e: logging.warning("Could not delete manifest: %s (%s)", manifest_path, e) # 8. 删除空的上下文目录(如果目录是工具生成的且为空) logging.info("Cleaning up empty context directories...") # 找到上下文根目录(manifest.json 所在的目录) context_root = manifest_path.parent # 检查目录是否受保护 if context_root in protected_paths: logging.info("Context directory is protected (contains skipped files): %s", context_root) else: # 检查上下文目录是否为空 try: if context_root.exists() and context_root.is_dir(): items = list(context_root.iterdir()) # 过滤掉隐藏文件 visible_items = [item for item in items if not item.name.startswith('.')] if not visible_items: # 目录为空,可以删除 if dry_run: logging.info("[DRY-RUN] Would delete empty context directory: %s", context_root) else: try: context_root.rmdir() logging.info("Deleted empty context directory: %s", context_root) except OSError as e: logging.debug("Could not delete context directory: %s (%s)", context_root, e) else: logging.info("Context directory not empty, will keep: %s (contains: %s)", context_root, [item.name for item in visible_items[:3]]) except OSError as e: logging.warning("Cannot check context directory contents: %s (%s)", context_root, e) # 10. 总结 if skipped_files: logging.warning("⚠ Rollback completed with %d file(s) skipped", len(skipped_files)) logging.warning("The following directories were preserved due to skipped files:") preserved = sorted(protected_paths & set(dirs))[:5] for p in preserved: logging.warning(" - %s", p) if len(protected_paths & set(dirs)) > 5: logging.warning(" ... and %d more", len(protected_paths & set(dirs)) - 5) else: logging.info("Rollback completed successfully") def main() -> int: parser = argparse.ArgumentParser(description="DDD multi-module scaffold generator (production-friendly).") parser.add_argument("--config", help="Path to JSON config file.") parser.add_argument("--init-config", help="Write an example JSON config to the given path, then exit.") parser.add_argument("--root", help="Override root directory for contexts (e.g. modules).") parser.add_argument("--package-base", help="Override Java base package (e.g. top.ysit.travel).") parser.add_argument("--contexts", help="Comma-separated context names (e.g. journey,recordbook).") parser.add_argument("--layers", help="Comma-separated layers to generate (subset of api,domain,application,infrastructure,interfaces,boot).") parser.add_argument("--overwrite", choices=["skip", "overwrite", "fail"], help="Overwrite policy for files.") parser.add_argument("--dry-run", action="store_true", help="Dry run: do not touch filesystem.") parser.add_argument("--no-manifest", action="store_true", help="Disable manifest.json output.") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose logging.") parser.add_argument("--manifest-stdout", action="store_true", help="Print manifest JSON to stdout (useful for dry-run).") parser.add_argument("--manifest-write-in-dry-run", action="store_true", help="Allow writing manifest.json even in dry-run.") parser.add_argument("--rollback", metavar="MANIFEST_PATH", help="Rollback scaffold using the specified manifest.json file.") args = parser.parse_args() configure_logging(args.verbose) if args.init_config: path = Path(args.init_config) init_config_file(path) logging.info("Wrote example config to: %s", path) return 0 if args.rollback: manifest_path = Path(args.rollback) try: rollback(manifest_path, dry_run=args.dry_run) logging.info("Rollback completed successfully.") return 0 except Exception as e: logging.error("Rollback failed: %s", e) if args.verbose: import traceback traceback.print_exc() return 1 cfg = build_effective_config(args) validate_config(cfg) if not cfg.contexts: logging.error("No contexts provided. Use --contexts or config file 'contexts'.") return 2 logging.info("Scaffolding %d contexts into '%s' (dry_run=%s)", len(cfg.contexts), cfg.root, cfg.dry_run) # 为每个上下文生成独立的清单文件 for c in cfg.contexts: logging.info("→ context: %s", c) # 为每个上下文创建独立的 Manifest context_manifest = Manifest(generated_at=now_iso(), config={ "root": cfg.root, "package_base": cfg.package_base, "contexts": [c], # 只包含当前上下文 "layers": cfg.layers, "java_source_dir": cfg.java_source_dir, "resources_dir": cfg.resources_dir, "overwrite": cfg.overwrite, "dry_run": cfg.dry_run, "emit_manifest": cfg.emit_manifest, "manifest_name": cfg.manifest_name, "use_standard_structure": cfg.use_standard_structure, "template_vars": cfg.template_vars, "placeholders": { "create_keep_files": cfg.placeholders.create_keep_files, "create_package_info": cfg.placeholders.create_package_info, "create_layer_readme": cfg.placeholders.create_layer_readme, "create_context_readme": cfg.placeholders.create_context_readme, }, "naming": { "module_name_pattern": cfg.naming.module_name_pattern, "context_dir_pattern": cfg.naming.context_dir_pattern, } }) generate_for_context(cfg, c, context_manifest) # 为每个上下文写入清单文件 if cfg.emit_manifest: context_manifest_json = context_manifest.to_json() # 1) dry-run 时默认把 manifest 输出到 stdout(更符合"预演清单"的价值) if cfg.dry_run and (args.manifest_stdout or cfg.emit_manifest): print(f"=== Manifest for context '{c}' ===") print(context_manifest_json) # 2) 非 dry-run 正常写文件;或用户显式要求 dry-run 也写 should_write_manifest_file = not cfg.dry_run or args.manifest_write_in_dry_run if should_write_manifest_file: context_dir = Path(cfg.root) / resolve_context_dir(cfg.naming, safe_context_name(c)) out = context_dir / cfg.manifest_name # 这里传 False,确保即使 dry-run + manifest_write 也能落盘 write_text_file(out, context_manifest_json, cfg.overwrite, dry_run=False) logging.info("Manifest for context '%s': %s", c, out) logging.info("Done.") return 0 if __name__ == "__main__": raise SystemExit(main())