Reusable pre- and post-agent gates.
Gates validate data before and after an AI agent runs. Pre-gates filter
input so the agent never sees invalid data. Post-gates validate output
to catch dangerous changes before they reach the forge.
All functions are stateless and tracker-agnostic -- they operate on
dicts and file lists, not Jira-specific types.
The GATE_REGISTRY maps CLI-friendly names to GateSpec instances.
Use resolve_gates() to look up gates by name and
validate_gate_env() to check required environment variables before
running any gate.
GateSpec(name, fn, phase, required_env=list())
dataclass
Metadata for a registered gate function.
resolve_gates(names)
Look up gate specs by CLI name. Raises SystemExit on unknown names.
Source code in src/agentic_ci/gates.py
| def resolve_gates(names: list[str]) -> list[GateSpec]:
"""Look up gate specs by CLI name. Raises SystemExit on unknown names."""
specs = []
unknown = []
for name in names:
spec = GATE_REGISTRY.get(name)
if spec is None:
unknown.append(name)
else:
specs.append(spec)
if unknown:
available = ", ".join(sorted(GATE_REGISTRY))
sys.exit(f"Error: unknown gate(s): {', '.join(unknown)}\nAvailable: {available}")
return specs
|
validate_gate_env(gates)
Check that all required env vars for the given gates are set.
Fails with a clear error listing every missing variable and
which gate needs it.
Source code in src/agentic_ci/gates.py
| def validate_gate_env(gates: list[GateSpec]) -> None:
"""Check that all required env vars for the given gates are set.
Fails with a clear error listing every missing variable and
which gate needs it.
"""
missing: dict[str, list[str]] = {}
for gate in gates:
for var in gate.required_env:
if not os.environ.get(var):
missing.setdefault(var, []).append(gate.name)
if missing:
lines = [f" {var} (needed by: {', '.join(names)})" for var, names in missing.items()]
sys.exit("Error: missing environment variables for gates:\n" + "\n".join(lines))
|
check_sensitive_files(changed_files, blocklist=None)
Check if any changed files match a sensitive-file blocklist.
Returns a list of blocked file paths (empty means all clear).
Source code in src/agentic_ci/gates.py
| def check_sensitive_files(
changed_files: list[str],
blocklist: list[str] | None = None,
) -> list[str]:
"""Check if any changed files match a sensitive-file blocklist.
Returns a list of blocked file paths (empty means all clear).
"""
if blocklist is None:
blocklist = DEFAULT_SENSITIVE_BLOCKLIST
blocked = []
for filepath in changed_files:
name = Path(filepath).name
for pattern in blocklist:
if fnmatch.fnmatch(name, pattern) or fnmatch.fnmatch(filepath, pattern):
blocked.append(filepath)
break
return blocked
|
check_commit_identity(commit_info, expected_email)
Verify the commit committer matches the expected bot email.
Source code in src/agentic_ci/gates.py
| def check_commit_identity(commit_info: dict, expected_email: str) -> bool:
"""Verify the commit committer matches the expected bot email."""
actual = commit_info.get("email", "")
return actual.lower() == expected_email.lower()
|
log_changed_files(changed_files, ticket_key)
Log changed files for observability.
Source code in src/agentic_ci/gates.py
| def log_changed_files(changed_files: list[str], ticket_key: str) -> None:
"""Log changed files for observability."""
if changed_files:
log.info(
"[%s] Agent changed %d files: %s",
ticket_key,
len(changed_files),
", ".join(changed_files),
)
else:
log.info("[%s] No files changed by agent", ticket_key)
|
gitleaks_scan(repo_dir, compare_ref='origin/HEAD')
Scan new commits for secrets using gitleaks.
Returns a list of error strings (empty means clean).
Requires gitleaks to be installed on PATH. Fails closed:
returns an error if gitleaks is missing or times out.
Source code in src/agentic_ci/gates.py
| def gitleaks_scan(repo_dir: Path, compare_ref: str = "origin/HEAD") -> list[str]:
"""Scan new commits for secrets using gitleaks.
Returns a list of error strings (empty means clean).
Requires ``gitleaks`` to be installed on PATH. Fails closed:
returns an error if gitleaks is missing or times out.
"""
if not shutil.which("gitleaks"):
log.error("gitleaks not found on PATH — failing closed")
return ["gitleaks is not installed; secret scan cannot run"]
try:
count_output = subprocess.run(
["git", "-C", str(repo_dir), "rev-list", "--count", f"{compare_ref}..HEAD"],
capture_output=True,
text=True,
check=True,
timeout=30,
)
except subprocess.CalledProcessError as exc:
log.error("git rev-list failed: %s", exc.stderr.strip())
return [f"gitleaks pre-check failed: git rev-list error: {exc.stderr.strip()}"]
try:
commit_count = int(count_output.stdout.strip())
except ValueError:
log.error("git rev-list returned non-integer: %r", count_output.stdout.strip())
return ["gitleaks pre-check failed: could not parse commit count"]
if commit_count == 0:
log.info("gitleaks scan skipped: no commits in range %s..HEAD", compare_ref)
return []
try:
result = subprocess.run(
[
"gitleaks",
"detect",
"--source",
str(repo_dir),
f"--log-opts={compare_ref}..HEAD",
"--verbose",
],
capture_output=True,
text=True,
timeout=GITLEAKS_TIMEOUT,
)
except subprocess.TimeoutExpired:
log.error("gitleaks timed out after %ds", GITLEAKS_TIMEOUT)
return [f"gitleaks timed out after {GITLEAKS_TIMEOUT}s; secret scan inconclusive"]
if result.returncode != 0:
log.error("gitleaks detected secrets in committed changes")
return [
"gitleaks detected potential secrets in committed code. "
"Review the gitleaks output in the CI job log for details."
]
log.info("gitleaks scan passed: no secrets found")
return []
|
filter_comments_by_domain(comments, allowed_domain_re)
Keep only comments from authors whose email matches allowed_domain_re.
Source code in src/agentic_ci/gates.py
| def filter_comments_by_domain(
comments: list[dict],
allowed_domain_re: re.Pattern[str],
) -> list[dict]:
"""Keep only comments from authors whose email matches ``allowed_domain_re``."""
return [c for c in comments if allowed_domain_re.search(c.get("author_email", ""))]
|
Remove comments containing any of the given bot sentinel phrases.
Source code in src/agentic_ci/gates.py
| def filter_bot_comments(
comments: list[dict],
sentinel_phrases: list[str],
) -> list[dict]:
"""Remove comments containing any of the given bot sentinel phrases."""
return [c for c in comments if not any(s in c.get("body", "") for s in sentinel_phrases)]
|
check_description_editors(editors, reporter_email, internal_domain_re)
Check if any description editors are outside the trusted domain.
Validates the reporter (original author) and all changelog editors.
Returns a list of untrusted email addresses (empty means all clear).
Treats empty or missing emails as untrusted.
Source code in src/agentic_ci/gates.py
| def check_description_editors(
editors: list[str],
reporter_email: str,
internal_domain_re: re.Pattern[str],
) -> list[str]:
"""Check if any description editors are outside the trusted domain.
Validates the reporter (original author) and all changelog editors.
Returns a list of untrusted email addresses (empty means all clear).
Treats empty or missing emails as untrusted.
"""
untrusted: list[str] = []
if not reporter_email:
untrusted.append("missing-email:reporter")
elif not internal_domain_re.search(reporter_email):
untrusted.append(reporter_email)
for email in editors:
normalized = email or "missing-email:unknown"
if normalized.startswith("missing-email:") or not internal_domain_re.search(normalized):
if normalized not in untrusted:
untrusted.append(normalized)
return untrusted
|
check_external_reporter(ticket, internal_domain_re, *, external_label)
Check if a ticket was filed by an external reporter.
Returns external_label if the reporter is external and the label
is not already present, otherwise None.
Source code in src/agentic_ci/gates.py
| def check_external_reporter(
ticket: dict,
internal_domain_re: re.Pattern[str],
*,
external_label: str,
) -> str | None:
"""Check if a ticket was filed by an external reporter.
Returns ``external_label`` if the reporter is external and the label
is not already present, otherwise ``None``.
"""
reporter_email = ticket.get("reporter_email", "")
labels = ticket.get("labels", [])
if external_label in labels:
return None
if not internal_domain_re.search(reporter_email):
return external_label
return None
|
check_label_author_email(author_info, domain_pattern)
Verify that the label's author email matches domain_pattern.
author_info is the dict returned by JiraClient.get_label_author(),
expected to contain "found" (bool) and "email" (str) keys.
Returns True if the author was found and the email matches.
Source code in src/agentic_ci/gates.py
| def check_label_author_email(
author_info: dict,
domain_pattern: re.Pattern[str],
) -> bool:
"""Verify that the label's author email matches ``domain_pattern``.
``author_info`` is the dict returned by ``JiraClient.get_label_author()``,
expected to contain ``"found"`` (bool) and ``"email"`` (str) keys.
Returns True if the author was found and the email matches.
"""
if not author_info.get("found"):
return False
email = author_info.get("email", "")
return bool(domain_pattern.search(email))
|