"""Agent event dataclasses — emitted during run_stream() and forwarded to WebSocket clients."""
from dataclasses import dataclass, field
@dataclass
class ToolStarted:
"""Emitted immediately when a tool call begins, before execution completes."""
tool_name: str
arguments: dict
is_subagent: bool = False # True when emitted from inside run_ephemeral
def to_wire(self) -> dict:
return {
"type": "tool_started",
"tool": self.tool_name,
"args": self.arguments,
"is_subagent": self.is_subagent,
}
@dataclass
class ToolEvent:
"""Emitted when a tool call finishes — carries the result."""
tool_name: str
arguments: dict
result: str
success: bool
is_subagent: bool = False # True when emitted from inside run_ephemeral
metadata: dict = field(default_factory=dict) # Extra data for client rendering
def to_wire(self) -> dict:
return {
"type": "tool_call",
"tool": self.tool_name,
"args": self.arguments,
"result": self.result,
"success": self.success,
"is_subagent": self.is_subagent,
"metadata": self.metadata,
}
@dataclass
class TextDelta:
"""A chunk of text from the streaming LLM response."""
delta: str
def to_wire(self) -> dict:
return {"type": "stream_delta", "delta": self.delta}
@dataclass
class ThinkingDelta:
"""A chunk of thinking/reasoning text from the streaming LLM response."""
delta: str
def to_wire(self) -> dict:
return {"type": "thinking_delta", "delta": self.delta}
@dataclass
class ThinkingEnd:
"""Marks the end of the thinking phase."""
def to_wire(self) -> dict:
return {"type": "thinking_end"}
@dataclass
class StreamEnd:
"""Marks the end of the streaming response."""
full_content: str
context_tokens: int | None = None # total tokens used in this turn
max_context_tokens: int = 0 # ollama_num_ctx from config
elapsed_seconds: float | None = None
tool_call_count: int = 0
token_count: int | None = None # same as context_tokens; kept separate for clarity
message_index: int | None = None # raw index of the first assistant msg in this turn group
def to_wire(self) -> dict:
return {
"type": "stream_end",
"content": self.full_content,
"context_tokens": self.context_tokens,
"max_context_tokens": self.max_context_tokens,
"elapsed_seconds": self.elapsed_seconds,
"tool_call_count": self.tool_call_count,
"token_count": self.token_count,
"message_index": self.message_index,
}
@dataclass
class StreamStopped:
"""Emitted when the user stops generation mid-stream (cooperative stop)."""
def to_wire(self) -> dict:
return {"type": "stream_stopped"}
@dataclass
class CompressionStarted:
"""Emitted immediately before context compression begins.
Allows the UI to show a spinner / status label while the summarizer
LLM is working (this can take several seconds on large contexts).
"""
context_tokens: int | None = None
max_context_tokens: int = 0
def to_wire(self) -> dict:
return {
"type": "compression_started",
"context_tokens": self.context_tokens,
"max_context_tokens": self.max_context_tokens,
}
@dataclass
class ContextCompressed:
"""Emitted after context compression runs successfully."""
messages_before: int
messages_after: int
summary: str = "" # the actual summary text produced by the LLM
context_tokens: int | None = None
max_context_tokens: int = 0
def to_wire(self) -> dict:
return {
"type": "context_compressed",
"messages_before": self.messages_before,
"messages_after": self.messages_after,
"summary": self.summary,
"context_tokens": self.context_tokens,
"max_context_tokens": self.max_context_tokens,
}
@dataclass
class ProfileSwitched:
"""Emitted by switch_profile tool when it successfully changes the session profile."""
profile_id: str
profile_name: str
def to_wire(self) -> dict:
return {
"type": "profile_switched",
"profile_id": self.profile_id,
"profile_name": self.profile_name,
}
@dataclass
class PlanningStatus:
"""Emitted at the start of each planning phase to show progress in the UI.
phase: 1 = Analysis, 2 = Execution plan, 3 = Plan review (AIHelper critic).
label: short human-readable description shown next to the spinner.
is_subagent: True when emitted from inside run_ephemeral (subagent planning).
"""
phase: int
label: str
is_subagent: bool = False
def to_wire(self) -> dict:
return {
"type": "planning_status",
"phase": self.phase,
"label": self.label,
"is_subagent": self.is_subagent,
}
@dataclass
class PlanReady:
"""Emitted before the main agent loop when profile.planning_enabled is True.
The plan text has already been injected into session.context as an assistant
message so the LLM will see it and follow it during execution.
is_subagent: True when emitted from inside run_ephemeral (subagent planning).
"""
plan: str
is_subagent: bool = False
def to_wire(self) -> dict:
return {
"type": "plan_ready",
"plan": self.plan,
"is_subagent": self.is_subagent,
}
@dataclass
class TurnThinking:
"""Full thinking/reasoning block from a tool-calling turn (complete() response).
Unlike ThinkingDelta (which streams chunks), this carries the full text at once
because complete() is non-streaming. Emitted before tool calls for that turn.
is_subagent=True when emitted from run_ephemeral().
"""
thinking: str
is_subagent: bool = False
def to_wire(self) -> dict:
return {
"type": "turn_thinking",
"thinking": self.thinking,
"is_subagent": self.is_subagent,
}
@dataclass
class SubagentComplete:
"""Internal: emitted by run_ephemeral into the parent sink to report metrics.
Never forwarded to WebSocket clients."""
token_count: int = 0
tool_call_count: int = 0
def to_wire(self) -> dict | None:
return None # internal only
@dataclass
class PlanningDebugData:
"""Internal: raw outputs from all planning phases, for debug storage in session.
Never forwarded to WebSocket clients. Only emitted for the main agent (not subagents)."""
log: dict # {timestamp, result, phases: {1: {output, prompt_tokens, completion_tokens}, ...}}
def to_wire(self) -> dict | None:
return None # internal only
@dataclass
class AIHelperTokensUsed:
"""Internal: emitted by AIHelper after each LLM call to report token usage.
Never forwarded to WebSocket clients."""
prompt_tokens: int = 0
completion_tokens: int = 0
@property
def total(self) -> int:
return self.prompt_tokens + self.completion_tokens
def to_wire(self) -> dict | None:
return None # internal only
@dataclass
class RecallUpdate:
"""Emitted when a recall status changes — sent to WebSocket clients for real-time UI updates."""
session_id: str
recall_id: str | None = None
call_type: str | None = None
trigger_at: str | None = None
status: str | None = None
action: str | None = None # scheduled | cancelled | skipped | fired | rescheduled
def to_wire(self) -> dict:
return {
"type": "recall_update",
"session_id": self.session_id,
"recall_id": self.recall_id,
"call_type": self.call_type,
"trigger_at": self.trigger_at,
"status": self.status,
"action": self.action,
}
AgentEvent = (
ToolStarted | ToolEvent | TextDelta | ThinkingDelta | ThinkingEnd
| StreamEnd | StreamStopped | CompressionStarted | ContextCompressed | TurnThinking | ProfileSwitched
| PlanningStatus | PlanReady | SubagentComplete | AIHelperTokensUsed | PlanningDebugData
| RecallUpdate
)