Skip to content

Gates

gates

Reusable pre- and post-agent gates.

Gates validate data before and after an AI agent runs. Pre-gates filter input so the agent never sees invalid data. Post-gates validate output to catch dangerous changes before they reach the forge.

All functions are stateless and tracker-agnostic -- they operate on dicts and file lists, not Jira-specific types.

The GATE_REGISTRY maps CLI-friendly names to GateSpec instances. Use resolve_gates() to look up gates by name and validate_gate_env() to check required environment variables before running any gate.

GateSpec(name, fn, phase, required_env=list()) dataclass

Metadata for a registered gate function.

resolve_gates(names)

Look up gate specs by CLI name. Raises SystemExit on unknown names.

Source code in src/agentic_ci/gates.py
def resolve_gates(names: list[str]) -> list[GateSpec]:
    """Look up gate specs by CLI name. Raises SystemExit on unknown names."""
    specs = []
    unknown = []
    for name in names:
        spec = GATE_REGISTRY.get(name)
        if spec is None:
            unknown.append(name)
        else:
            specs.append(spec)
    if unknown:
        available = ", ".join(sorted(GATE_REGISTRY))
        sys.exit(f"Error: unknown gate(s): {', '.join(unknown)}\nAvailable: {available}")
    return specs

validate_gate_env(gates)

Check that all required env vars for the given gates are set.

Fails with a clear error listing every missing variable and which gate needs it.

Source code in src/agentic_ci/gates.py
def validate_gate_env(gates: list[GateSpec]) -> None:
    """Check that all required env vars for the given gates are set.

    Fails with a clear error listing every missing variable and
    which gate needs it.
    """
    missing: dict[str, list[str]] = {}
    for gate in gates:
        for var in gate.required_env:
            if not os.environ.get(var):
                missing.setdefault(var, []).append(gate.name)
    if missing:
        lines = [f"  {var} (needed by: {', '.join(names)})" for var, names in missing.items()]
        sys.exit("Error: missing environment variables for gates:\n" + "\n".join(lines))

check_sensitive_files(changed_files, blocklist=None)

Check if any changed files match a sensitive-file blocklist.

Returns a list of blocked file paths (empty means all clear).

Source code in src/agentic_ci/gates.py
def check_sensitive_files(
    changed_files: list[str],
    blocklist: list[str] | None = None,
) -> list[str]:
    """Check if any changed files match a sensitive-file blocklist.

    Returns a list of blocked file paths (empty means all clear).
    """
    if blocklist is None:
        blocklist = DEFAULT_SENSITIVE_BLOCKLIST
    blocked = []
    for filepath in changed_files:
        name = Path(filepath).name
        for pattern in blocklist:
            if fnmatch.fnmatch(name, pattern) or fnmatch.fnmatch(filepath, pattern):
                blocked.append(filepath)
                break
    return blocked

check_commit_identity(commit_info, expected_email)

Verify the commit committer matches the expected bot email.

Source code in src/agentic_ci/gates.py
def check_commit_identity(commit_info: dict, expected_email: str) -> bool:
    """Verify the commit committer matches the expected bot email."""
    actual = commit_info.get("email", "")
    return actual.lower() == expected_email.lower()

log_changed_files(changed_files, ticket_key)

Log changed files for observability.

Source code in src/agentic_ci/gates.py
def log_changed_files(changed_files: list[str], ticket_key: str) -> None:
    """Log changed files for observability."""
    if changed_files:
        log.info(
            "[%s] Agent changed %d files: %s",
            ticket_key,
            len(changed_files),
            ", ".join(changed_files),
        )
    else:
        log.info("[%s] No files changed by agent", ticket_key)

gitleaks_scan(repo_dir, compare_ref='origin/HEAD')

Scan new commits for secrets using gitleaks.

Returns a list of error strings (empty means clean). Requires gitleaks to be installed on PATH. Fails closed: returns an error if gitleaks is missing or times out.

Source code in src/agentic_ci/gates.py
def gitleaks_scan(repo_dir: Path, compare_ref: str = "origin/HEAD") -> list[str]:
    """Scan new commits for secrets using gitleaks.

    Returns a list of error strings (empty means clean).
    Requires ``gitleaks`` to be installed on PATH.  Fails closed:
    returns an error if gitleaks is missing or times out.
    """
    if not shutil.which("gitleaks"):
        log.error("gitleaks not found on PATH — failing closed")
        return ["gitleaks is not installed; secret scan cannot run"]

    try:
        count_output = subprocess.run(
            ["git", "-C", str(repo_dir), "rev-list", "--count", f"{compare_ref}..HEAD"],
            capture_output=True,
            text=True,
            check=True,
            timeout=30,
        )
    except subprocess.CalledProcessError as exc:
        log.error("git rev-list failed: %s", exc.stderr.strip())
        return [f"gitleaks pre-check failed: git rev-list error: {exc.stderr.strip()}"]

    try:
        commit_count = int(count_output.stdout.strip())
    except ValueError:
        log.error("git rev-list returned non-integer: %r", count_output.stdout.strip())
        return ["gitleaks pre-check failed: could not parse commit count"]

    if commit_count == 0:
        log.info("gitleaks scan skipped: no commits in range %s..HEAD", compare_ref)
        return []

    try:
        result = subprocess.run(
            [
                "gitleaks",
                "detect",
                "--source",
                str(repo_dir),
                f"--log-opts={compare_ref}..HEAD",
                "--verbose",
            ],
            capture_output=True,
            text=True,
            timeout=GITLEAKS_TIMEOUT,
        )
    except subprocess.TimeoutExpired:
        log.error("gitleaks timed out after %ds", GITLEAKS_TIMEOUT)
        return [f"gitleaks timed out after {GITLEAKS_TIMEOUT}s; secret scan inconclusive"]

    if result.returncode != 0:
        log.error("gitleaks detected secrets in committed changes")
        return [
            "gitleaks detected potential secrets in committed code. "
            "Review the gitleaks output in the CI job log for details."
        ]

    log.info("gitleaks scan passed: no secrets found")
    return []

filter_comments_by_domain(comments, allowed_domain_re)

Keep only comments from authors whose email matches allowed_domain_re.

Source code in src/agentic_ci/gates.py
def filter_comments_by_domain(
    comments: list[dict],
    allowed_domain_re: re.Pattern[str],
) -> list[dict]:
    """Keep only comments from authors whose email matches ``allowed_domain_re``."""
    return [c for c in comments if allowed_domain_re.search(c.get("author_email", ""))]

filter_bot_comments(comments, sentinel_phrases)

Remove comments containing any of the given bot sentinel phrases.

Source code in src/agentic_ci/gates.py
def filter_bot_comments(
    comments: list[dict],
    sentinel_phrases: list[str],
) -> list[dict]:
    """Remove comments containing any of the given bot sentinel phrases."""
    return [c for c in comments if not any(s in c.get("body", "") for s in sentinel_phrases)]

check_description_editors(editors, reporter_email, internal_domain_re)

Check if any description editors are outside the trusted domain.

Validates the reporter (original author) and all changelog editors. Returns a list of untrusted email addresses (empty means all clear). Treats empty or missing emails as untrusted.

Source code in src/agentic_ci/gates.py
def check_description_editors(
    editors: list[str],
    reporter_email: str,
    internal_domain_re: re.Pattern[str],
) -> list[str]:
    """Check if any description editors are outside the trusted domain.

    Validates the reporter (original author) and all changelog editors.
    Returns a list of untrusted email addresses (empty means all clear).
    Treats empty or missing emails as untrusted.
    """
    untrusted: list[str] = []
    if not reporter_email:
        untrusted.append("missing-email:reporter")
    elif not internal_domain_re.search(reporter_email):
        untrusted.append(reporter_email)

    for email in editors:
        normalized = email or "missing-email:unknown"
        if normalized.startswith("missing-email:") or not internal_domain_re.search(normalized):
            if normalized not in untrusted:
                untrusted.append(normalized)

    return untrusted

check_external_reporter(ticket, internal_domain_re, *, external_label)

Check if a ticket was filed by an external reporter.

Returns external_label if the reporter is external and the label is not already present, otherwise None.

Source code in src/agentic_ci/gates.py
def check_external_reporter(
    ticket: dict,
    internal_domain_re: re.Pattern[str],
    *,
    external_label: str,
) -> str | None:
    """Check if a ticket was filed by an external reporter.

    Returns ``external_label`` if the reporter is external and the label
    is not already present, otherwise ``None``.
    """
    reporter_email = ticket.get("reporter_email", "")
    labels = ticket.get("labels", [])
    if external_label in labels:
        return None
    if not internal_domain_re.search(reporter_email):
        return external_label
    return None

check_label_author_email(author_info, domain_pattern)

Verify that the label's author email matches domain_pattern.

author_info is the dict returned by JiraClient.get_label_author(), expected to contain "found" (bool) and "email" (str) keys.

Returns True if the author was found and the email matches.

Source code in src/agentic_ci/gates.py
def check_label_author_email(
    author_info: dict,
    domain_pattern: re.Pattern[str],
) -> bool:
    """Verify that the label's author email matches ``domain_pattern``.

    ``author_info`` is the dict returned by ``JiraClient.get_label_author()``,
    expected to contain ``"found"`` (bool) and ``"email"`` (str) keys.

    Returns True if the author was found and the email matches.
    """
    if not author_info.get("found"):
        return False
    email = author_info.get("email", "")
    return bool(domain_pattern.search(email))