Skip to content

Stream Processors

stream

Stream processors for AI agent output formats.

ClaudeCodeStreamProcessor(color=True, wrap=0, claude_pid=0)

Processes Claude Code stream-json and prints human-readable output.

Source code in src/agentic_ci/stream.py
def __init__(self, color=True, wrap=0, claude_pid=0):
    self.wrap = wrap
    self.claude_pid = claude_pid

    if color:
        self.THINK = "\033[3;31m"
        self.TOOL = "\033[1;90m"
        self.CLAUDE = ""
        self.RED = "\033[31m"
        self.YELLOW = "\033[33m"
        self.RESET = "\033[0m"
    else:
        self.THINK = self.TOOL = self.CLAUDE = ""
        self.RED = self.YELLOW = self.RESET = ""

    self._in_text = False
    self._in_thinking = False
    self._last_block_type = None
    self._tool_name = None
    self._tool_json = ""
    self._line_buf = ""
    self._total_input = 0
    self._total_output = 0
    self._total_cache_read = 0
    self._total_cache_write = 0
    self._last_emitted_total = 0
    self._last_emitted_time = 0.0

process_line(line)

Process a single line of stream-json. Returns True if run is complete.

Source code in src/agentic_ci/stream.py
def process_line(self, line):
    """Process a single line of stream-json. Returns True if run is complete."""
    line = line.strip()
    if not line:
        return False

    try:
        msg = json.loads(line)
    except (json.JSONDecodeError, ValueError):
        return False

    msg_type = msg.get("type")

    if msg_type == "system":
        subtype = msg.get("subtype", "")
        if subtype == "init":
            self._format_init(msg)
        elif subtype == "api_retry":
            attempt = msg.get("attempt", "?")
            max_retries = msg.get("max_retries", "?")
            delay = msg.get("retry_delay_ms", "?")
            error = msg.get("error", "unknown")
            self._last_block_type = "system"
            print(
                f"  {self.YELLOW}\U0001f504 Retry {attempt}/{max_retries}{self.RESET} "
                f"{error} — retrying in {delay}ms",
                flush=True,
            )
        return False

    if msg_type == "user":
        for block in msg.get("message", {}).get("content", []):
            if block.get("type") == "tool_result":
                content = block.get("content", "")
                if isinstance(content, str) and "FULL RUN COMPLETE" in content:
                    self._end_block()
                    if self.claude_pid:
                        log.section("FULL RUN COMPLETE detected, terminating Claude")
                        try:
                            os.kill(self.claude_pid, signal.SIGTERM)
                        except ProcessLookupError:
                            pass
                    return True
        return False

    if msg_type == "result":
        self._end_block()
        self._format_result(msg)
        return True

    if msg_type != "stream_event":
        return False

    event = msg.get("event", {})
    event_type = event.get("type")

    if event_type == "content_block_start":
        block = event.get("content_block", {})
        block_type = block.get("type")
        if block_type == "text":
            self._last_block_type = "text"
            print(f"  {self.CLAUDE}\U0001f4ac Claude ", end="", flush=True)
            self._in_text = True
        elif block_type == "thinking":
            self._last_block_type = "thinking"
            print(f"  {self.THINK}\U0001f9e0 Thinking ", end="", flush=True)
            self._in_thinking = True
        elif block_type in ("tool_use", "server_tool_use"):
            self._tool_name = block.get("name", "unknown")
            self._tool_json = ""

    elif event_type == "content_block_delta":
        delta = event.get("delta", {})
        delta_type = delta.get("type")
        if delta_type == "text_delta":
            self._emit(delta.get("text", ""))
        elif delta_type == "thinking_delta":
            self._emit(delta.get("thinking", ""))
        elif delta_type == "input_json_delta":
            self._tool_json += delta.get("partial_json", "")

    elif event_type == "content_block_stop":
        self._end_block()

    elif event_type == "message_start":
        usage = event.get("message", {}).get("usage", {})
        self._total_input = usage.get("input_tokens", 0)
        self._total_cache_read = usage.get("cache_read_input_tokens", 0)
        self._total_cache_write = usage.get("cache_creation_input_tokens", 0)

    elif event_type == "message_delta":
        usage = event.get("usage", {})
        out = usage.get("output_tokens", 0)
        if out > 0:
            self._total_output = out
            total = (
                self._total_input
                + self._total_output
                + self._total_cache_read
                + self._total_cache_write
            )
            if total - self._last_emitted_total >= 5_000 or self._last_emitted_total == 0:
                now = time.monotonic()
                rate = 0.0
                try:
                    with open(
                        os.environ.get("OTEL_RATE_FILE", "/tmp/claude-otel-rate.json")
                    ) as rf:
                        rd = json.load(rf)
                    rate = rd.get("rate", 0)
                except Exception:
                    pass
                if rate <= 0 and self._last_emitted_time > 0:
                    dt = now - self._last_emitted_time
                    dv = total - self._last_emitted_total
                    if dt > 0:
                        rate = dv / dt
                rate_str = f" rate={rate:.0f}/s" if rate > 0 else ""
                self._last_emitted_total = total
                self._last_emitted_time = now
                print(
                    f"{self.TOOL}  \U0001f4ca TOKENS in={self._total_input} "
                    f"out={self._total_output} "
                    f"cache_r={self._total_cache_read} "
                    f"cache_w={self._total_cache_write} "
                    f"total={total}{rate_str}{self.RESET}",
                    flush=True,
                )

    elif event_type == "error":
        error = event.get("error", {})
        error_type = error.get("type", "unknown")
        error_msg = error.get("message", "")
        self._last_block_type = "error"
        print(
            f"  {self.RED}❌ Error: {error_type}: {error_msg}{self.RESET}",
            flush=True,
        )

    return False

process(input_stream)

Process a stream of lines. Returns True if run completed normally.

Source code in src/agentic_ci/stream.py
def process(self, input_stream):
    """Process a stream of lines. Returns True if run completed normally."""
    for line in input_stream:
        if isinstance(line, bytes):
            line = line.decode("utf-8", errors="replace")
        if self.process_line(line):
            return True
    return False

OpenCodeStreamProcessor(color=True, wrap=0, agent_pid=0)

Processes OpenCode JSON output and prints human-readable CI logs.

Source code in src/agentic_ci/stream.py
def __init__(self, color=True, wrap=0, agent_pid=0):
    self.wrap = wrap
    self.agent_pid = agent_pid

    if color:
        self.THINK = "\033[3;31m"
        self.TOOL = "\033[1;90m"
        self.AGENT = ""
        self.RED = "\033[31m"
        self.RESET = "\033[0m"
    else:
        self.THINK = self.TOOL = self.AGENT = self.RED = self.RESET = ""

    self._in_text = False
    self._in_thinking = False
    self._emitted_first_line = False
    self._line_buf = ""
    self._errors: list[str] = []

flush_errors()

Print collected errors, deduplicating generic messages.

Source code in src/agentic_ci/stream.py
def flush_errors(self):
    """Print collected errors, deduplicating generic messages."""
    if not self._errors:
        return
    specific = [e for e in self._errors if e != self._GENERIC_ERROR]
    if specific:
        for error_msg in dict.fromkeys(specific):
            print(
                f"{self._INDENT}{self.RED}❌ Error: {error_msg}{self.RESET}",
                flush=True,
            )
    else:
        print(
            f"{self._INDENT}{self.RED}❌ Error: OpenCode returned a server error. "
            f"Common causes: invalid model name, missing or expired credentials, "
            f"or insufficient API permissions.{self.RESET}",
            flush=True,
        )

process_line(line)

Process a single JSONL line from OpenCode. Returns True when run is complete.

Source code in src/agentic_ci/stream.py
def process_line(self, line):
    """Process a single JSONL line from OpenCode. Returns True when run is complete."""
    line = line.strip()
    if not line:
        return False

    try:
        msg = json.loads(line)
    except (json.JSONDecodeError, ValueError):
        return False

    msg_type = msg.get("type")
    part = msg.get("part", {})

    if msg_type == "error":
        self._end_text()
        error = msg.get("error", {})
        error_msg = error.get("data", {}).get("message", str(error))
        self._errors.append(error_msg)
        return False

    if msg_type == "text":
        text = part.get("text", "")
        if not self._in_text:
            self._end_text()
            print(f"{self._INDENT}{self.AGENT}\U0001f4ac Agent ", end="", flush=True)
            self._in_text = True
            self._emitted_first_line = False
        self._emit(text + "\n")

    elif msg_type == "thinking":
        text = part.get("text", "")
        if not self._in_thinking:
            self._end_text()
            print(f"{self._INDENT}{self.THINK}\U0001f9e0 Thinking ", end="", flush=True)
            self._in_thinking = True
            self._emitted_first_line = False
        self._emit(text + "\n")

    elif msg_type == "tool_use":
        self._end_text()
        tool_name = part.get("tool", "unknown")
        display_name = tool_name[0].upper() + tool_name[1:] if tool_name else tool_name
        state = part.get("state", {})
        inp = state.get("input", {})
        title = part.get("title", "")
        summary = _format_tool(tool_name, inp) if inp else title
        icon = "\U0001f916" if tool_name in ("task", "agent") else "\U0001f527"
        print(
            f"{self._INDENT}{self.TOOL}{icon} {display_name} {summary}{self.RESET}",
            flush=True,
        )
        if tool_name in ("task", "agent"):
            self._print_task_detail(inp, state.get("output", ""))

    elif msg_type == "step_start":
        self._end_text()

    elif msg_type == "step_finish":
        self._end_text()
        reason = part.get("reason", "")
        tokens = part.get("tokens", {})
        cost = part.get("cost", 0)
        total = tokens.get("total", 0)
        inp = tokens.get("input", 0)
        out = tokens.get("output", 0)
        cache = tokens.get("cache", {})
        cache_r = cache.get("read", 0)
        cache_w = cache.get("write", 0)
        print(
            f"{self._INDENT}{self.TOOL}\U0001f4ca TOKENS in={inp} "
            f"out={out} "
            f"cache_r={cache_r} "
            f"cache_w={cache_w} "
            f"total={total} cost=${cost:.4f}{self.RESET}",
            flush=True,
        )
        if reason == "stop":
            return True

    return False

process(input_stream)

Process a stream of lines. Returns True if run completed normally.

Source code in src/agentic_ci/stream.py
def process(self, input_stream):
    """Process a stream of lines. Returns True if run completed normally."""
    for line in input_stream:
        if isinstance(line, bytes):
            line = line.decode("utf-8", errors="replace")
        if self.process_line(line):
            return True
    return False