This page covers the internal streaming pipeline — it’s aimed at developers working on the bot’s source code. For how messages appear in Discord from a user perspective, see Conversations.
agent_streaming.py consumes
the SDK response (parsing StreamEvent messages, handling
auto-compaction retries, and detecting fork interrupts), while
streamer.py bridges the resulting text deltas to Discord — handling
rate limits, the 2000-character message cap, and typing indicators
during tool execution pauses.
Overview
The streaming pipeline has three stages:Agent.stream_chat()yields text deltas andStreamStatussignals. It delegates SDK response consumption tostream_response()inagent_streaming.py, which handles the message loop, auto-compaction retry, and fork interrupt detection. Raw SSE events are parsed byStreamParserinstreamer.py.stream_to_channel()buffers text and routes status signals to an ephemeral status message, progressively editing a Discord message- A background editor task flushes the buffer on a fixed interval, keeping edits throttled and showing typing indicators during pauses
Text deltas and status signals
Agent.stream_chat() is an async generator that yields two types of
output: text strings (for display) and StreamStatus signals (for
phase transitions like thinking, tool use, and compaction).
StreamParser.feed() processes individual SSE event dicts from the SDK:
| Event type | Handling |
|---|---|
content_block_delta with text | Yielded as a text string |
content_block_delta with input_json_delta | Accumulated for tool label formatting (not yielded as text) |
content_block_start with tool_use | Captures tool name for label emission |
content_block_stop for tool use | Emits a StreamStatus(kind="tool_start") signal with a formatted label |
content_block_start with thinking | Emits a StreamStatus(kind="thinking_start") signal |
-# markdown small-text prefix). Each label flushes when the next
tool starts or when text arrives — rather than batching all labels until
text appears. This eliminates visual jumps when multiple tools complete
before the agent writes text. StreamParser._drain() converts deferred
labels into -# *{label}* text,
-# *~~{label}~~ — denied (use /permissions ask to approve)* if the
tool was denied by the permission system, or
-# *~~{label}~~ — error* if the tool returned an error or failed
during execution (interrupted failures are skipped).
MCP tool names are displayed without the mcp__<server>__ prefix — the
formatter strips the prefix generically for all MCP servers, so
mcp__discord__discord_embed renders as discord_embed in the
stream label.
Nested tool activity
When the agent delegates work to a subagent (theAgent tool in Claude
Code CLI 2.1+), the status message surfaces what tool the subagent is
currently using — instead of showing a static timer with no visibility
into subagent work. stream_response() listens for
TaskProgressMessage events from the SDK, extracts last_tool_name,
and yields a StreamStatus(kind="task_progress") signal with a
formatted label.
The label format is agent_name(description) · tool_name, for example:
mcp__<server>__ prefix, matching the convention used
for top-level tool labels.
On the Discord side, stream_to_channel() updates the existing status
message label without resetting the timer — so the elapsed time reflects
how long the subagent has been running overall, not how long the current
tool has been active.
When an
enter_fork tool fires during streaming, stream_response()
interrupts the SDK client and suppresses remaining
stream events. The loop continues to drain messages so the
ResultMessage still saves the session ID, but no further text deltas
are yielded.Throttled editing
stream_to_channel() consumes text deltas and StreamStatus signals,
maintaining a text buffer. A background editor coroutine flushes the
buffer to Discord on a fixed schedule, while StreamStatus signals
control an ephemeral status message (for thinking indicators, tool
labels, and compaction progress).
Timing constants
| Constant | Value | Purpose |
|---|---|---|
FIRST_FLUSH_DELAY | 0.2s | Initial delay so the first message accumulates a meaningful chunk |
EDIT_INTERVAL | 0.5s | Responsive feel while staying within Discord’s rate limits |
MAX_MSG_LEN | 2000 | Discord’s maximum message length |
STATUS_TICK | 1.0s | Interval between timer ticks on the status message (e.g., “Thinking… (1s)“) |
Flush cycle
The editor runs this loop:- Wait
FIRST_FLUSH_DELAYbefore the first flush - Flush the buffer — send a new message or edit the existing one
- Wait
EDIT_INTERVAL - If a status message is active (thinking or tool use), update its timer display
- Else if new content arrived (
staleflag is set), flush again - Else if the response isn’t done, send a typing indicator
- Repeat from step 3 until the delta stream ends
stale flag is set whenever new text arrives from the generator
and cleared after each successful flush. This avoids unnecessary edits
when no new text has accumulated.
Overflow handling
When the buffer exceeds 2000 characters, the streamer splits across multiple messages at natural boundaries — preferring the last newline, then the last space, then a hard cut at 2000 characters. A natural break is only accepted if it falls within the last 200 characters of the window, so messages use most of the available capacity rather than splitting too early.- The current message is finalized at the chosen split point (via
msg.edit()or initialchannel.send()) - A new message is sent with the overflow text
- If the overflow itself exceeds 2000 characters, the process repeats in a loop until all accumulated text is dispatched
- The
msg_startindex tracks where the current message begins in the full buffer, so the streamer always knows which slice to send
track_message() for fork session tracking. This ensures that if the
response came from a background fork, the user can reply to any of the
overflow messages to resume that fork.
Typing indicators
The streamer shows Discord typing indicators (channel.typing()) when
the agent is working but not producing text — typically between tool
executions. This happens in the editor loop: when the interval fires,
no status message is active, and the stale flag is false (no new
text), the editor sends a typing indicator instead of editing the
message. During active tool use or thinking, the ephemeral status
message handles visibility instead.
Before each stream_to_channel() call, the bot sends an initial
channel.typing(). Then, immediately after starting the editor task,
the streamer shows a “Thinking…” status message — before the API
sends its first SSE event. This eliminates the dead zone between
client.query() and the first event where only the typing indicator
was visible. When the real thinking_start event arrives, _set_status
sees the same label and keeps the timer running without resetting it.
If text arrives first (no thinking phase), the text handler clears the
status automatically.
Interrupt on new message
When the user sends a new message while a response is streaming, the bot interrupts the current response:on_messagechecks if the agent lock is held (meaning a response is in progress)- If locked and not compacting, it calls
agent.interrupt(), which cancels pending permission requests and interrupts the SDK client - The interrupted
stream_chat()generator stops yielding deltas stream_to_channel()finishes its final flush with whatever text accumulated- The new message is processed with a fresh
stream_to_channel()call
agent.is_compacting). An interrupt during compaction would kill the
post-compaction response, and the new message would trigger a redundant
compaction cycle. The new message still queues behind the lock and runs
after compaction finishes.
The /interrupt slash command provides the same behavior on demand.
Empty responses
If the delta stream produces no text at all — and no fork entry was requested, and no auto-compaction occurred — the streamer sends a fallback message:enter_fork) or when auto-compaction
occurred — both are legitimate reasons for an empty buffer.
Message tracking
Every message sent by the streamer — both initial messages and overflow continuations — is registered viatrack_message(message_id). This
feeds into the fork session tracking system: when a background fork
streams a response, the message IDs are collected so that a user reply
to any of those messages can resume the fork’s session.
See reply-to-fork context
for the full tracking lifecycle.
Auto-compaction annotation
When the SDK auto-compacts context mid-response, the streamer renders a visible annotation in the DM so you know what happened. The flow:- The SDK emits a
SystemMessagewithsubtype="compact_boundary"and ends the stream.stream_response()detects this, extracts the pre-compaction token count fromcompact_metadata, and yields aStreamStatusevent (kind="compact_start") including an optionalcompact_tokenscount. - The streamer flushes any pre-compaction text to its own message
- An ephemeral status message appears with a timer:
-# *Auto-compacting 5k tokens... (3s)* stream_response()re-sends the original query against the freshly compacted context (viaclient.query()), then streams the new response- When the post-compaction response arrives, the streamer edits the
status to a permanent annotation:
-# *auto-compacted · 5k tokens · 8s* - Post-compaction content continues in a new message
The annotation stays visible in chat history — the streamer does not
delete it after compaction finishes. This gives you a clear record of
when and why context was compacted.
Context usage warning
Auto-compaction handles context overflow automatically, but a heads-up before it triggers lets you compact on your own terms. After each response,stream_response() checks how much of the 200k context
window the current input_tokens occupy. If context exceeds 60% and no
auto-compaction just happened, stream_to_channel() sends a small
annotation below the response.
Two tiers of warning:
| Context usage | Annotation |
|---|---|
| 60–79% | -# *context: 67% (134k) — consider /compact* |
| 80%+ | -# *context: 85% (170k) — compaction soon, /compact recommended* |
stream_response()capturesinput_tokensfrom the finalResultMessage.usage- After all text is yielded, if context exceeds 60% of 200k and the
response was not auto-compacted, it yields a
StreamStatus(kind="context_warning")withinput_tokensandcontext_pct stream_to_channel()stores the warning signal and — after the final flush — sends the annotation as a small italic message- The warning message is registered via
track_message()for fork session tracking, so replying to it resumes the correct session
The 80% escalation threshold is defined as
_ESCALATE_PCT in
streamer.py. The base 60% threshold is _WARN_PCT in
agent_streaming.py.Next steps
Context flow
How context moves between sessions, forks, and pending updates.
Session management
Session IDs, lifecycle events, and compaction.
Forks
Interactive forks, exit strategies, and idle timeout.
Conversations
DM interface, message flow, and interrupt behavior.
Development guide
How to modify the streaming pipeline and other core modules.
