Skip to content

Jira

Client

client

Jira client with acli-first delegation and REST API fallback.

Provides a JiraClient class that delegates to the Atlassian CLI (acli) for operations it supports and falls back to the REST API for gaps (changelog queries, visibility-restricted comments, custom fields, attachment upload, ADF conversion).

When acli is not on PATH, all operations use the REST API.

Usage::

from agentic_ci.jira import JiraClient

client = JiraClient.from_env(url="https://myorg.atlassian.net")
ticket = client.get_issue("PROJ-123")
client.add_comment("PROJ-123", "Fixed in PR #42")

JiraError(message, status_code=None, response_text='')

Bases: Exception

Raised when a Jira API call fails.

Source code in src/agentic_ci/jira/client.py
def __init__(self, message: str, status_code: int | None = None, response_text: str = ""):
    super().__init__(message)
    self.status_code = status_code
    self.response_text = response_text

JiraClient(url, email, token, *, timeout=30)

Jira client that delegates to acli where possible.

On init, checks for acli on PATH. If available, write operations (create, edit, transition, assign, comment, link) and search/view use acli subprocess calls. Read operations that need ADF conversion, changelog queries, custom fields, or visibility-restricted comments fall back to the REST API.

Source code in src/agentic_ci/jira/client.py
def __init__(self, url: str, email: str, token: str, *, timeout: int = 30):
    self.url = url.rstrip("/")
    self.auth = (email, token)
    self.timeout = timeout
    self._field_cache: dict[str, str] | None = None
    self._field_schema_cache: dict[str, str] | None = None
    self._acli_available = acli_mod.is_available()
    if self._acli_available:
        try:
            acli_mod.setup_auth()
            log.info("acli authenticated, will use acli for supported operations")
        except acli_mod.AcliError as exc:
            log.info("acli on PATH but auth failed, using REST only: %s", exc)
            self._acli_available = False

from_env(url=None) classmethod

Create a client from environment variables.

Reads JIRA_URL, JIRA_EMAIL (or JIRA_USER), and JIRA_API_TOKEN.

Raises RuntimeError if required variables are missing.

Source code in src/agentic_ci/jira/client.py
@classmethod
def from_env(cls, url: str | None = None) -> JiraClient:
    """Create a client from environment variables.

    Reads ``JIRA_URL``, ``JIRA_EMAIL`` (or ``JIRA_USER``), and
    ``JIRA_API_TOKEN``.

    Raises ``RuntimeError`` if required variables are missing.
    """
    jira_url = url or os.environ.get("JIRA_URL", "")
    if not jira_url:
        raise RuntimeError("JIRA_URL environment variable not set and no url provided")
    email = os.environ.get("JIRA_EMAIL") or os.environ.get("JIRA_USER", "")
    token = os.environ.get("JIRA_API_TOKEN", "")
    missing = []
    if not email:
        missing.append("JIRA_EMAIL (or JIRA_USER)")
    if not token:
        missing.append("JIRA_API_TOKEN")
    if missing:
        raise RuntimeError(f"Missing environment variable(s): {', '.join(missing)}")
    timeout = int(os.environ.get("JIRA_API_TIMEOUT", "30"))
    return cls(jira_url, email, token, timeout=timeout)

get_issue(key)

Fetch a single issue with comments. Returns a normalised dict.

The returned dict has keys: key, summary, description (plain text), issue_type, labels, status, project, components, reporter_name, reporter_email, comments.

Source code in src/agentic_ci/jira/client.py
def get_issue(self, key: str) -> dict:
    """Fetch a single issue with comments. Returns a normalised dict.

    The returned dict has keys: ``key``, ``summary``, ``description``
    (plain text), ``issue_type``, ``labels``, ``status``, ``project``,
    ``components``, ``reporter_name``, ``reporter_email``, ``comments``.
    """
    req_fields = "summary,description,issuetype,labels,status,reporter,components,project"
    resp = self._request(
        "get",
        self._api_url(f"issue/{key}") + f"?fields={req_fields}",
        headers=self._headers(),
    )
    self._check(resp)
    issue = resp.json()
    fields = issue.get("fields", {})

    desc_field = fields.get("description")
    if isinstance(desc_field, dict):
        description = adf_to_text(desc_field)
    else:
        description = desc_field or ""

    comments = self._fetch_comments(key)
    reporter = fields.get("reporter") or {}
    components = [{"name": c.get("name", "")} for c in fields.get("components", [])]

    return {
        "key": issue.get("key", ""),
        "summary": fields.get("summary", ""),
        "description": description,
        "issue_type": fields.get("issuetype", {}).get("name", ""),
        "labels": fields.get("labels", []),
        "status": fields.get("status", {}).get("name", ""),
        "project": {"key": fields.get("project", {}).get("key", "")},
        "components": components,
        "reporter_name": reporter.get("displayName", ""),
        "reporter_email": reporter.get("emailAddress", ""),
        "comments": comments,
    }

search(jql, *, max_results=500)

Search issues by JQL. Returns a list of normalised dicts.

Source code in src/agentic_ci/jira/client.py
def search(self, jql: str, *, max_results: int = 500) -> list[dict]:
    """Search issues by JQL. Returns a list of normalised dicts."""
    results: list[dict] = []
    next_page_token: str | None = None
    search_url = self._api_url("search/jql")

    while True:
        payload: dict = {
            "jql": jql,
            "fields": ["summary", "description", "issuetype", "labels", "comment", "status"],
            "maxResults": min(50, max_results - len(results)),
        }
        if next_page_token:
            payload["nextPageToken"] = next_page_token

        resp = self._request(
            "post",
            search_url,
            headers=self._headers(),
            json=payload,
        )
        self._check(resp)

        data = resp.json()
        for issue in data.get("issues", []):
            fields = issue.get("fields", {})
            desc_field = fields.get("description")
            if isinstance(desc_field, dict):
                description = adf_to_text(desc_field)
            else:
                description = desc_field or ""

            comments_data = fields.get("comment", {})
            comments = []
            for c in comments_data.get("comments", []):
                body_field = c.get("body", "")
                body = adf_to_text(body_field) if isinstance(body_field, dict) else body_field
                comments.append(
                    {
                        "id": c.get("id", ""),
                        "author": c.get("author", {}).get("displayName", "Unknown"),
                        "author_email": c.get("author", {}).get("emailAddress", ""),
                        "body": body,
                        "created": c.get("created", ""),
                        "visibility": c.get("visibility"),
                    }
                )

            results.append(
                {
                    "key": issue.get("key", ""),
                    "summary": fields.get("summary", ""),
                    "description": description,
                    "issue_type": fields.get("issuetype", {}).get("name", ""),
                    "labels": fields.get("labels", []),
                    "status": fields.get("status", {}).get("name", ""),
                    "comments": comments,
                }
            )

        if len(results) >= max_results:
            break
        if data.get("isLast", True) or not data.get("issues"):
            break
        next_page_token = data.get("nextPageToken")
        if not next_page_token:
            break

    return results

get_label_author(key, label)

Find who most recently added a label via the changelog.

Returns {"found": True, "email": ..., "displayName": ...} or {"found": False}.

Falls back to the ticket reporter if the label was set at creation time (no changelog entry).

Source code in src/agentic_ci/jira/client.py
def get_label_author(self, key: str, label: str) -> dict:
    """Find who most recently added a label via the changelog.

    Returns ``{"found": True, "email": ..., "displayName": ...}``
    or ``{"found": False}``.

    Falls back to the ticket reporter if the label was set at
    creation time (no changelog entry).
    """
    author_email: str | None = None
    author_name: str | None = None
    start_at = 0

    while True:
        resp = self._request(
            "get",
            self._api_url(f"issue/{key}/changelog"),
            headers=self._headers(),
            params={"startAt": start_at, "maxResults": 100},
        )
        self._check(resp)

        data = resp.json()
        for entry in data.get("values", []):
            for item in entry.get("items", []):
                if item.get("field") != "labels":
                    continue
                from_str = item.get("fromString") or ""
                to_str = item.get("toString") or ""
                from_labels = set(from_str.split()) if from_str else set()
                to_labels = set(to_str.split()) if to_str else set()
                if label in to_labels and label not in from_labels:
                    author = entry.get("author", {})
                    author_email = author.get("emailAddress", "")
                    author_name = author.get("displayName", "Unknown")

        total = data.get("total", 0)
        values = data.get("values", [])
        start_at += len(values)
        if start_at >= total or not values:
            break

    if author_email is None:
        resp = self._request(
            "get",
            self._api_url(f"issue/{key}") + "?fields=labels,reporter",
            headers=self._headers(),
        )
        if resp.status_code == 200:
            fields = resp.json().get("fields", {})
            if label in fields.get("labels", []):
                reporter = fields.get("reporter") or {}
                author_email = reporter.get("emailAddress", "")
                author_name = reporter.get("displayName", "Unknown")

    if author_email is None:
        return {"found": False}
    return {"found": True, "email": author_email, "displayName": author_name}

get_description_editors(key)

Return email addresses of all users who edited the issue description.

Walks the full changelog looking for description field changes. Returns a list of unique email addresses (may be empty if the description was never edited after creation).

Source code in src/agentic_ci/jira/client.py
def get_description_editors(self, key: str) -> list[str]:
    """Return email addresses of all users who edited the issue description.

    Walks the full changelog looking for description field changes.
    Returns a list of unique email addresses (may be empty if the
    description was never edited after creation).
    """
    editors: list[str] = []
    seen: set[str] = set()
    start_at = 0

    while True:
        resp = self._request(
            "get",
            self._api_url(f"issue/{key}/changelog"),
            headers=self._headers(),
            params={"startAt": start_at, "maxResults": 100},
        )
        self._check(resp)

        data = resp.json()
        for entry in data.get("values", []):
            if not any(item.get("field") == "description" for item in entry.get("items", [])):
                continue
            author = entry.get("author", {})
            email = author.get("emailAddress", "")
            if not email:
                account_id = author.get("accountId", "unknown")
                email = f"missing-email:{account_id}"
            if email not in seen:
                editors.append(email)
                seen.add(email)

        total = data.get("total", 0)
        values = data.get("values", [])
        start_at += len(values)
        if start_at >= total or not values:
            break

    return editors

get_custom_field(key, *field_names)

Read custom fields by name. Returns {name: value}.

Source code in src/agentic_ci/jira/client.py
def get_custom_field(self, key: str, *field_names: str) -> dict[str, object]:
    """Read custom fields by name. Returns ``{name: value}``."""
    field_ids = {name: self._resolve_field_id(name) for name in field_names}
    ids_csv = ",".join(field_ids.values())

    resp = self._request(
        "get",
        self._api_url(f"issue/{key}") + f"?fields={ids_csv}",
        headers=self._headers(),
    )
    self._check(resp)

    fields = resp.json().get("fields", {})
    result: dict[str, object] = {}
    for name, fid in field_ids.items():
        value = fields.get(fid)
        if isinstance(value, dict) and "value" in value:
            value = value["value"]
        result[name] = value
    return result

search_parent_epics(jql)

Find parent Epic keys for issues matching a child JQL query.

Source code in src/agentic_ci/jira/client.py
def search_parent_epics(self, jql: str) -> list[str]:
    """Find parent Epic keys for issues matching a child JQL query."""
    parent_keys: set[str] = set()
    next_page_token: str | None = None
    search_url = self._api_url("search/jql")

    while True:
        payload: dict = {"jql": jql, "fields": ["parent"], "maxResults": 50}
        if next_page_token:
            payload["nextPageToken"] = next_page_token

        resp = self._request(
            "post",
            search_url,
            headers=self._headers(),
            json=payload,
        )
        self._check(resp)

        data = resp.json()
        for issue in data.get("issues", []):
            parent = issue.get("fields", {}).get("parent")
            if parent and parent.get("key"):
                parent_keys.add(parent["key"])

        if data.get("isLast", True) or not data.get("issues"):
            break
        next_page_token = data.get("nextPageToken")
        if not next_page_token:
            break

    return sorted(parent_keys)

add_comment(key, body, *, visibility_group=None)

Post a comment, optionally restricted to a visibility group.

The body is plain text (with optional markdown markup, converted to ADF for the REST API). Uses acli when no visibility restriction is needed. Returns True on success, False on failure.

Source code in src/agentic_ci/jira/client.py
def add_comment(
    self,
    key: str,
    body: str,
    *,
    visibility_group: str | None = None,
) -> bool:
    """Post a comment, optionally restricted to a visibility group.

    The body is plain text (with optional markdown markup, converted
    to ADF for the REST API).  Uses acli when no visibility
    restriction is needed.  Returns True on success, False on failure.
    """
    if self._acli_available and not visibility_group:
        try:
            acli_mod.run_acli(
                "jira",
                "workitem",
                "comment",
                "create",
                "--key",
                key,
                "--body",
                body,
            )
            log.info("Commented on %s (via acli)", key)
            return True
        except acli_mod.AcliError as exc:
            log.warning("acli comment failed, falling back to REST: %s", exc)

    payload: dict = {"body": text_to_adf(body)}
    if visibility_group:
        payload["visibility"] = {"type": "group", "value": visibility_group}

    resp = self._request(
        "post",
        self._api_url(f"issue/{key}/comment"),
        headers=self._headers(),
        json=payload,
    )
    if resp.status_code == 201:
        log.info("Commented on %s", key)
        return True
    log.warning("Failed to comment on %s: HTTP %d", key, resp.status_code)
    return False

update_comment(key, comment_id, body, *, visibility_group=None)

Update an existing comment by ID.

Same ADF conversion and visibility semantics as add_comment. Returns True on success, False on failure.

Source code in src/agentic_ci/jira/client.py
def update_comment(
    self,
    key: str,
    comment_id: str,
    body: str,
    *,
    visibility_group: str | None = None,
) -> bool:
    """Update an existing comment by ID.

    Same ADF conversion and visibility semantics as ``add_comment``.
    Returns True on success, False on failure.
    """
    if self._acli_available and not visibility_group:
        adf_json = json.dumps(text_to_adf(body))
        try:
            with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as tmp:
                tmp.write(adf_json)
                tmp_path = tmp.name
            try:
                acli_mod.run_acli(
                    "jira",
                    "workitem",
                    "comment",
                    "update",
                    "--key",
                    key,
                    "--id",
                    comment_id,
                    "--body-adf",
                    tmp_path,
                )
            finally:
                os.unlink(tmp_path)
            log.info("Updated comment %s on %s (via acli)", comment_id, key)
            return True
        except acli_mod.AcliError as exc:
            log.warning("acli update_comment failed, falling back to REST: %s", exc)

    payload: dict = {"body": text_to_adf(body)}
    if visibility_group:
        payload["visibility"] = {"type": "group", "value": visibility_group}

    resp = self._request(
        "put",
        self._api_url(f"issue/{key}/comment/{comment_id}"),
        headers=self._headers(),
        json=payload,
    )
    if resp.status_code == 200:
        log.info("Updated comment %s on %s", comment_id, key)
        return True
    log.warning("Failed to update comment %s on %s: HTTP %d", comment_id, key, resp.status_code)
    return False

edit_labels(key, *, add=None, remove=None)

Add and/or remove labels on an issue (atomic update).

Delegates to acli for add-only operations. Falls back to REST API for remove or mixed add+remove (acli edit --labels replaces; the REST API supports atomic add/remove).

Source code in src/agentic_ci/jira/client.py
def edit_labels(
    self,
    key: str,
    *,
    add: list[str] | None = None,
    remove: list[str] | None = None,
) -> None:
    """Add and/or remove labels on an issue (atomic update).

    Delegates to acli for add-only operations. Falls back to
    REST API for remove or mixed add+remove (acli ``edit --labels``
    replaces; the REST API supports atomic add/remove).
    """
    if not add and not remove:
        return

    if self._acli_available and add and not remove:
        try:
            acli_mod.run_acli(
                "jira",
                "workitem",
                "edit",
                "--key",
                key,
                "--labels",
                ",".join(add),
            )
            log.info("Labels updated on %s (via acli)", key)
            return
        except acli_mod.AcliError as exc:
            log.warning("acli edit_labels failed, falling back to REST: %s", exc)

    update: dict = {}
    if add:
        update.setdefault("labels", []).extend({"add": lbl} for lbl in add)
    if remove:
        update.setdefault("labels", []).extend({"remove": lbl} for lbl in remove)

    resp = self._request(
        "put",
        self._api_url(f"issue/{key}"),
        headers=self._headers(),
        json={"update": update},
    )
    self._check(resp, expected=(200, 204))
    log.info("Labels updated on %s", key)

transition(key, status)

Transition an issue to a new status by name.

Source code in src/agentic_ci/jira/client.py
def transition(self, key: str, status: str) -> None:
    """Transition an issue to a new status by name."""
    if self._acli_available:
        try:
            acli_mod.run_acli(
                "jira",
                "workitem",
                "transition",
                "--key",
                key,
                "--status",
                status,
            )
            log.info("Transitioned %s to %s (via acli)", key, status)
            return
        except acli_mod.AcliError as exc:
            log.warning("acli transition failed, falling back to REST: %s", exc)

    resp = self._request(
        "get",
        self._api_url(f"issue/{key}/transitions"),
        headers=self._headers(),
    )
    self._check(resp)

    transition_id = None
    for t in resp.json().get("transitions", []):
        if t.get("name", "").lower() == status.lower():
            transition_id = t["id"]
            break
        if t.get("to", {}).get("name", "").lower() == status.lower():
            transition_id = t["id"]
            break

    if transition_id is None:
        available = [t.get("name", "") for t in resp.json().get("transitions", [])]
        raise JiraError(f"No transition to '{status}' found for {key}. Available: {available}")

    resp = self._request(
        "post",
        self._api_url(f"issue/{key}/transitions"),
        headers=self._headers(),
        json={"transition": {"id": transition_id}},
    )
    self._check(resp, expected=(200, 204))
    log.info("Transitioned %s to %s", key, status)

assign(key, assignee)

Assign an issue to a user by account ID or email.

Source code in src/agentic_ci/jira/client.py
def assign(self, key: str, assignee: str) -> None:
    """Assign an issue to a user by account ID or email."""
    if self._acli_available:
        try:
            acli_mod.run_acli(
                "jira",
                "workitem",
                "assign",
                "--key",
                key,
                "--assignee",
                assignee,
            )
            log.info("Assigned %s to %s (via acli)", key, assignee)
            return
        except acli_mod.AcliError as exc:
            log.warning("acli assign failed, falling back to REST: %s", exc)

    account_id = self._resolve_account_id(assignee)
    resp = self._request(
        "put",
        self._api_url(f"issue/{key}/assignee"),
        headers=self._headers(),
        json={"accountId": account_id},
    )
    self._check(resp, expected=(200, 204))
    log.info("Assigned %s to %s", key, assignee)

create_issue(project, issue_type, summary, *, description='', parent_epic='', **extra_fields)

Create a new issue. Returns the issue key.

Uses acli for simple creates (no extra_fields). Falls back to REST API when Epic Link or extra fields are needed.

Source code in src/agentic_ci/jira/client.py
def create_issue(
    self,
    project: str,
    issue_type: str,
    summary: str,
    *,
    description: str = "",
    parent_epic: str = "",
    **extra_fields: object,
) -> str:
    """Create a new issue. Returns the issue key.

    Uses acli for simple creates (no extra_fields). Falls back to
    REST API when Epic Link or extra fields are needed.
    """
    if self._acli_available and not extra_fields and not parent_epic:
        try:
            args = [
                "jira",
                "workitem",
                "create",
                "--project",
                project,
                "--type",
                issue_type,
                "--summary",
                summary,
            ]
            if description:
                args.extend(["--description", description])
            result = acli_mod.run_acli(*args, json_output=True)
            data = json.loads(result.stdout)
            key = data.get("key", "")
            log.info("Created issue %s (via acli)", key)
            return key
        except (acli_mod.AcliError, json.JSONDecodeError, KeyError) as exc:
            log.warning("acli create failed, falling back to REST: %s", exc)

    fields: dict = {
        "project": {"key": project},
        "summary": summary,
        "issuetype": {"name": issue_type},
    }
    if description:
        fields["description"] = text_to_adf(description)
    if parent_epic:
        epic_field = self._resolve_field_id("Epic Link")
        fields[epic_field] = parent_epic
    fields.update(extra_fields)

    resp = self._request(
        "post",
        self._api_url("issue"),
        headers=self._headers(),
        json={"fields": fields},
    )
    self._check(resp, expected=201)
    key = resp.json().get("key", "")
    log.info("Created issue %s", key)
    return key

Link two issues with a named link type.

Source code in src/agentic_ci/jira/client.py
def link_issues(self, source: str, target: str, link_type: str) -> None:
    """Link two issues with a named link type."""
    if self._acli_available:
        try:
            acli_mod.run_acli(
                "jira",
                "workitem",
                "link",
                "create",
                "--out",
                source,
                "--in",
                target,
                "--type",
                link_type,
            )
            log.info("Linked %s '%s' %s (via acli)", source, link_type, target)
            return
        except acli_mod.AcliError as exc:
            log.warning("acli link failed, falling back to REST: %s", exc)

    resp = self._request(
        "get",
        self._api_url("issueLinkType"),
        headers=self._headers(),
    )
    self._check(resp)

    resolved_name = None
    direction = "outward"
    link_lower = link_type.lower()
    for lt in resp.json().get("issueLinkTypes", []):
        if lt.get("name", "").lower() == link_lower:
            resolved_name = lt["name"]
            direction = "outward"
            break
        if lt.get("inward", "").lower() == link_lower:
            resolved_name = lt["name"]
            direction = "inward"
            break
        if lt.get("outward", "").lower() == link_lower:
            resolved_name = lt["name"]
            direction = "outward"
            break

    if not resolved_name:
        available = [lt.get("name", "") for lt in resp.json().get("issueLinkTypes", [])]
        raise JiraError(f"Link type '{link_type}' not found. Available: {', '.join(available)}")

    if direction == "inward":
        payload = {
            "type": {"name": resolved_name},
            "inwardIssue": {"key": source},
            "outwardIssue": {"key": target},
        }
    else:
        payload = {
            "type": {"name": resolved_name},
            "inwardIssue": {"key": target},
            "outwardIssue": {"key": source},
        }

    resp = self._request(
        "post",
        self._api_url("issueLink"),
        headers=self._headers(),
        json=payload,
    )
    self._check(resp, expected=201)
    log.info("Linked %s '%s' %s", source, link_type, target)

attach_file(key, filepath)

Attach a file to an issue.

Source code in src/agentic_ci/jira/client.py
def attach_file(self, key: str, filepath: str | Path) -> None:
    """Attach a file to an issue."""
    path = Path(filepath)
    if not path.is_file():
        raise JiraError(f"File '{filepath}' not found or not readable")

    with open(path, "rb") as f:
        resp = self._request(
            "post",
            self._api_url(f"issue/{key}/attachments"),
            headers={"X-Atlassian-Token": "no-check"},
            files={"file": (path.name, f)},
        )
    self._check(resp)
    log.info("Attached '%s' to %s", path.name, key)

set_security_level(key, level_name)

Set the security level on an issue by level name.

Fetches available security levels for the issue, matches level_name (case-insensitive), and sets it via the REST API.

Raises JiraError if the level name is not found.

Source code in src/agentic_ci/jira/client.py
def set_security_level(self, key: str, level_name: str) -> None:
    """Set the security level on an issue by level name.

    Fetches available security levels for the issue, matches
    *level_name* (case-insensitive), and sets it via the REST API.

    Raises ``JiraError`` if the level name is not found.
    """
    resp = self._request(
        "get",
        self._api_url(f"issue/{key}"),
        headers=self._headers(),
        params={"fields": "project"},
    )
    self._check(resp)
    project_id = resp.json().get("fields", {}).get("project", {}).get("id")
    if not project_id:
        raise JiraError(f"Could not determine project ID for issue {key}")

    resp = self._request(
        "get",
        self._api_url(f"project/{project_id}/securitylevel"),
        headers=self._headers(),
    )
    self._check(resp)

    level_id = None
    available = []
    for level in resp.json().get("levels", []):
        name = level.get("name", "")
        available.append(name)
        if name.lower() == level_name.lower():
            level_id = level.get("id")
            break

    if level_id is None:
        raise JiraError(
            f"Security level '{level_name}' not found for {key}. "
            f"Available: {', '.join(available)}"
        )

    resp = self._request(
        "put",
        self._api_url(f"issue/{key}"),
        headers=self._headers(),
        json={"fields": {"security": {"id": level_id}}},
    )
    self._check(resp, expected=(200, 204))
    log.info("Set security level '%s' on %s", level_name, key)

set_custom_field(key, field_name, value)

Set a custom field by name.

Consults the field schema to determine value format: text/string fields get the raw string, option/select fields get {"value": v}. If the value is valid JSON, it is sent as-is.

Source code in src/agentic_ci/jira/client.py
def set_custom_field(self, key: str, field_name: str, value: str) -> None:
    """Set a custom field by name.

    Consults the field schema to determine value format: text/string
    fields get the raw string, option/select fields get ``{"value": v}``.
    If the value is valid JSON, it is sent as-is.
    """
    field_id = self._resolve_field_id(field_name)

    parsed_value: str | dict | list | int | float | bool | None
    try:
        parsed_value = json.loads(value)
    except json.JSONDecodeError:
        schema_type = self._get_field_schema_type(field_name)
        if schema_type in ("string", "any", ""):
            parsed_value = value
        else:
            parsed_value = {"value": value}

    resp = self._request(
        "put",
        self._api_url(f"issue/{key}"),
        headers=self._headers(),
        json={"fields": {field_id: parsed_value}},
    )
    self._check(resp, expected=(200, 204))
    log.info("Set '%s' on %s", field_name, key)

ADF Conversion

adf

Atlassian Document Format (ADF) conversion utilities.

Converts between plain text (with markdown markup) and ADF, the JSON document format used by Jira Cloud REST API v3 for rich-text fields.

text_to_adf(text)

Convert plain text with markdown markup to Atlassian Document Format.

Handles: - lang ... fenced code blocks -> codeBlock nodes - {expand:Title}...{expand} -> expand nodes (collapsible sections) - ---- or --- on a line by itself -> rule nodes (horizontal dividers) - # through ###### headings -> heading nodes - - bullets -> bulletList nodes - bold and italic inline markup - URLs -> inlineCard nodes - Double newlines split paragraphs, single newlines become hardBreak

Source code in src/agentic_ci/jira/adf.py
def text_to_adf(text: str) -> dict:
    """Convert plain text with markdown markup to Atlassian Document Format.

    Handles:
    - ```lang ... ``` fenced code blocks -> codeBlock nodes
    - {expand:Title}...{expand} -> expand nodes (collapsible sections)
    - ---- or --- on a line by itself -> rule nodes (horizontal dividers)
    - # through ###### headings -> heading nodes
    - - bullets -> bulletList nodes
    - **bold** and *italic* inline markup
    - URLs -> inlineCard nodes
    - Double newlines split paragraphs, single newlines become hardBreak
    """
    if not text:
        return {"type": "doc", "version": 1, "content": []}

    content: list[dict] = []
    block_pattern = re.compile(
        r"```(\w*)\n(.*?)```"
        r"|"
        r"\{expand:([^}]*)\}\n?(.*?)\n?\{expand\}",
        re.DOTALL,
    )

    last_end = 0
    for match in block_pattern.finditer(text):
        before = text[last_end : match.start()]
        if before.strip():
            content.extend(_markdown_text_to_adf_blocks(before.strip()))

        if match.group(2) is not None:
            code_text = match.group(2)
            if code_text.endswith("\n"):
                code_text = code_text[:-1]
            lang = match.group(1) or ""
            node: dict = {"type": "text", "text": code_text}
            block: dict = {"type": "codeBlock", "content": [node]}
            if lang:
                block["attrs"] = {"language": lang}
            content.append(block)
        else:
            title = match.group(3) or ""
            inner_text = match.group(4) or ""
            inner_blocks = text_to_adf(inner_text).get("content", []) if inner_text.strip() else []
            expand_node: dict = {"type": "expand", "attrs": {"title": title}}
            if inner_blocks:
                expand_node["content"] = inner_blocks
            content.append(expand_node)

        last_end = match.end()

    remaining = text[last_end:]
    if remaining.strip():
        content.extend(_markdown_text_to_adf_blocks(remaining.strip()))

    if not content:
        content.append({"type": "paragraph", "content": [{"type": "text", "text": ""}]})

    return {"type": "doc", "version": 1, "content": content}

adf_to_text(adf)

Extract plain text from an ADF document.

Source code in src/agentic_ci/jira/adf.py
def adf_to_text(adf: dict) -> str:
    """Extract plain text from an ADF document."""
    if not adf or not isinstance(adf, dict):
        return ""

    def extract_node(node: dict) -> str:
        node_type = node.get("type", "")
        if node_type == "text":
            text = node.get("text", "")
            marks = node.get("marks", [])
            for mark in marks:
                mark_type = mark.get("type")
                if mark_type == "link":
                    href = mark.get("attrs", {}).get("href", "")
                    if href and href != text:
                        text = f"{text} {href}"
                elif mark_type == "strong":
                    text = f"**{text}**"
                elif mark_type == "em":
                    text = f"*{text}*"
                elif mark_type == "code":
                    text = f"`{text}`"
            return text
        elif node_type == "hardBreak":
            return "\n"
        elif node_type == "paragraph":
            return extract_children(node) + "\n\n"
        elif node_type == "heading":
            level = node.get("attrs", {}).get("level", 1)
            prefix = "#" * level + " "
            return prefix + extract_children(node) + "\n\n"
        elif node_type == "codeBlock":
            lang = node.get("attrs", {}).get("language", "")
            code = extract_children(node)
            return f"```{lang}\n{code}\n```\n\n"
        elif node_type in ("bulletList", "orderedList"):
            return extract_children(node) + "\n"
        elif node_type == "listItem":
            return "- " + extract_children(node)
        elif node_type in ("inlineCard", "blockCard"):
            return node.get("attrs", {}).get("url", "")
        elif node_type == "blockquote":
            lines = extract_children(node).rstrip("\n").split("\n")
            return "\n".join(f"> {line}" for line in lines) + "\n\n"
        elif node_type == "expand":
            title = node.get("attrs", {}).get("title", "")
            inner = extract_children(node).rstrip("\n")
            return f"{{expand:{title}}}\n{inner}\n{{expand}}\n\n"
        elif node_type == "rule":
            return "----\n\n"
        elif node_type == "doc":
            return extract_children(node)
        else:
            return extract_children(node)

    def extract_children(node: dict) -> str:
        return "".join(extract_node(child) for child in node.get("content", []))

    result = extract_node(adf)
    while result.endswith("\n\n\n"):
        result = result[:-1]
    return result.rstrip("\n")

acli Wrapper

acli

Atlassian CLI (acli) wrapper for agentic-ci.

Downloads the acli binary if not already on PATH, handles authentication, and provides a subprocess runner for acli commands.

AcliError(message, returncode=1, stderr='')

Bases: Exception

Raised when an acli command fails.

Source code in src/agentic_ci/jira/acli.py
def __init__(self, message: str, returncode: int = 1, stderr: str = ""):
    super().__init__(message)
    self.returncode = returncode
    self.stderr = stderr

is_available()

Check if acli is on PATH.

Source code in src/agentic_ci/jira/acli.py
def is_available() -> bool:
    """Check if acli is on PATH."""
    return _resolve_acli() is not None

ensure_acli(dest='/usr/local/bin/acli')

Download acli if not already on PATH. Returns absolute path to binary.

Source code in src/agentic_ci/jira/acli.py
def ensure_acli(dest: str = "/usr/local/bin/acli") -> str:
    """Download acli if not already on PATH. Returns absolute path to binary."""
    existing = _resolve_acli()
    if existing:
        return existing

    curl_path = shutil.which("curl")
    if not curl_path:
        raise AcliError("curl not found on PATH; cannot download acli")

    log.info("Downloading acli to %s", dest)
    try:
        subprocess.run(
            [curl_path, "-fsSL", ACLI_DOWNLOAD_URL, "-o", dest],
            check=True,
            capture_output=True,
            timeout=SUBPROCESS_TIMEOUT,
        )
        os.chmod(dest, os.stat(dest).st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
    except subprocess.TimeoutExpired as exc:
        raise AcliError(f"acli download timed out after {SUBPROCESS_TIMEOUT}s") from exc
    except subprocess.CalledProcessError as exc:
        raise AcliError(
            f"Failed to download acli: {exc.stderr.decode()}", returncode=exc.returncode
        ) from exc

    return dest

setup_auth(site=DEFAULT_SITE)

Authenticate acli using JIRA_EMAIL + JIRA_API_TOKEN env vars.

Source code in src/agentic_ci/jira/acli.py
def setup_auth(site: str = DEFAULT_SITE) -> None:
    """Authenticate acli using JIRA_EMAIL + JIRA_API_TOKEN env vars."""
    email = os.environ.get("JIRA_EMAIL") or os.environ.get("JIRA_USER", "")
    token = os.environ.get("JIRA_API_TOKEN", "")
    if not email or not token:
        missing = []
        if not email:
            missing.append("JIRA_EMAIL (or JIRA_USER)")
        if not token:
            missing.append("JIRA_API_TOKEN")
        raise AcliError(f"Missing env vars for acli auth: {', '.join(missing)}")

    acli_path = _resolve_acli()
    if not acli_path:
        raise AcliError("acli not found on PATH; call ensure_acli() first")

    try:
        result = subprocess.run(
            [acli_path, "jira", "auth", "login", "--email", email, "--site", site, "--token"],
            input=token,
            capture_output=True,
            text=True,
            timeout=SUBPROCESS_TIMEOUT,
        )
    except subprocess.TimeoutExpired as exc:
        raise AcliError(f"acli auth timed out after {SUBPROCESS_TIMEOUT}s") from exc

    if result.returncode != 0:
        raise AcliError(
            f"acli auth failed: {result.stderr}",
            returncode=result.returncode,
            stderr=result.stderr,
        )
    log.info("acli authenticated against %s", site)

run_acli(*args, json_output=False, check=True)

Run an acli command.

If json_output is True, appends --json to the command. If check is True, raises AcliError on non-zero exit.

Source code in src/agentic_ci/jira/acli.py
def run_acli(
    *args: str,
    json_output: bool = False,
    check: bool = True,
) -> subprocess.CompletedProcess:
    """Run an acli command.

    If ``json_output`` is True, appends ``--json`` to the command.
    If ``check`` is True, raises ``AcliError`` on non-zero exit.
    """
    acli_path = _resolve_acli()
    if not acli_path:
        raise AcliError("acli not found on PATH")

    cmd = [acli_path, *args]
    if json_output:
        cmd.append("--json")

    log.info("Running: %s", " ".join(_redact_cmd(cmd)))
    try:
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT)
    except subprocess.TimeoutExpired as exc:
        raise AcliError(
            f"acli command timed out after {SUBPROCESS_TIMEOUT}s: {' '.join(_redact_cmd(cmd))}"
        ) from exc

    if check and result.returncode != 0:
        raise AcliError(
            f"acli command failed: {' '.join(_redact_cmd(list(args)))}\n{result.stderr}",
            returncode=result.returncode,
            stderr=result.stderr,
        )

    return result