CascadingDev/automation/config.py

183 lines
6.1 KiB
Python

"""
Configuration helpers for CascadingDev automation.
Responsibilities:
• Resolve cascading `.ai-rules.yml` files (nearest directory wins).
• Render templated paths (tokens: {rel}, {basename}, {feature_id}, etc.).
• Enforce repo-relative path safety (no escaping repo root).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from functools import lru_cache
from pathlib import Path
from typing import Any, Iterator
import yaml
@dataclass
class RulesConfig:
root: Path
global_rules: dict[str, Any]
_dir_cache: dict[Path, dict[str, Any]] = field(default_factory=dict, init=False, repr=False)
@classmethod
def load(cls, root: Path) -> "RulesConfig":
root = root.resolve()
global_path = root / ".ai-rules.yml"
if not global_path.exists():
raise FileNotFoundError(f"{global_path} not found")
with global_path.open("r", encoding="utf-8") as fh:
global_rules = yaml.safe_load(fh) or {}
return cls(root=root, global_rules=global_rules)
def get_rule_name(self, rel_path: Path) -> str | None:
"""
Return the rule name associated with the file (if any) via cascading lookup.
"""
rel = rel_path.as_posix()
filename = rel_path.name
for rules in self._iter_directory_rules(rel_path.parent):
associations = rules.get("file_associations") or {}
if filename in associations:
return associations.get(filename)
associations = self.global_rules.get("file_associations") or {}
if filename in associations:
return associations.get(filename)
return None
def cascade_for(self, rel_path: Path, rule_name: str) -> dict[str, Any]:
"""
Merge configuration for a rule, starting from the global rule definition
and applying directory-specific overrides from the file's location outward.
"""
merged: dict[str, Any] = {}
global_rules = self.global_rules.get("rules") or {}
if rule_name in global_rules:
merged = _deep_copy(global_rules[rule_name])
for rules in self._iter_directory_rules(rel_path.parent):
dir_rules = rules.get("rules") or {}
if rule_name in dir_rules:
merged = _merge_dicts(merged, dir_rules[rule_name])
return merged
def resolve_template(self, template: str, rel_source: Path) -> str:
"""
Render variables in the path template using details from the source path.
"""
rel_posix = rel_source.as_posix()
basename = rel_source.name
name = basename.rsplit(".", 1)[0]
ext = rel_source.suffix.lstrip(".")
feature_id = _extract_feature_id(rel_posix)
stage = _extract_stage_from_basename(basename)
parent_rel = rel_source.parent.as_posix()
if parent_rel == ".":
parent_rel = "."
tokens = {
"rel": rel_posix,
"basename": basename,
"name": name,
"ext": ext,
"feature_id": feature_id,
"stage": stage,
"dir": parent_rel,
"path": rel_posix,
"repo": ".",
}
result = template
for key, value in tokens.items():
if value:
result = result.replace(f"{{{key}}}", value)
return result
def normalize_repo_rel(self, raw_path: str) -> Path:
"""
Ensure the target path stays within the repository root. Returns a repo-relative Path.
"""
abs_path = (self.root / raw_path).resolve()
if not str(abs_path).startswith(str(self.root)):
raise ValueError(f"Output path escapes repo: {raw_path}{abs_path}")
return abs_path.relative_to(self.root)
def _load_rules_file(self, directory: Path) -> dict[str, Any]:
if directory in self._dir_cache:
return self._dir_cache[directory]
rules_path = directory / ".ai-rules.yml"
if not rules_path.exists():
data: dict[str, Any] = {}
else:
with rules_path.open("r", encoding="utf-8") as fh:
data = yaml.safe_load(fh) or {}
self._dir_cache[directory] = data
return data
def _iter_directory_rules(self, start_dir: Path) -> Iterator[dict[str, Any]]:
"""
Yield rules from start_dir up to root, nearest directory first.
"""
if not start_dir or start_dir.as_posix() in (".", ""):
return
current = (self.root / start_dir).resolve()
root = self.root
parents: list[Path] = []
while True:
if current == root:
break
if root not in current.parents and current != root:
break
parents.append(current)
current = current.parent
if current == root:
break
parents = [p for p in parents if (p / ".ai-rules.yml").exists()]
for directory in parents:
yield self._load_rules_file(directory)
def _extract_feature_id(rel_path: str) -> str:
"""
Extract FR_* identifier from a Docs/features path, if present.
"""
parts = rel_path.split("/")
for i, part in enumerate(parts):
if part.startswith("FR_"):
return part
return ""
def _extract_stage_from_basename(basename: str) -> str:
if basename.endswith(".discussion.md"):
return basename.replace(".discussion.md", "")
return ""
def _merge_dicts(base: dict[str, Any], overrides: dict[str, Any]) -> dict[str, Any]:
"""
Recursive dictionary merge (overrides win). Returns a new dict.
"""
merged: dict[str, Any] = _deep_copy(base)
for key, value in (overrides or {}).items():
if isinstance(value, dict) and isinstance(merged.get(key), dict):
merged[key] = _merge_dicts(merged[key], value)
else:
merged[key] = _deep_copy(value)
return merged
def _deep_copy(value: Any) -> Any:
if isinstance(value, dict):
return {k: _deep_copy(v) for k, v in value.items()}
if isinstance(value, list):
return [_deep_copy(v) for v in value]
return value