Skip to content

Streaming

Agent streaming delivers LLM tokens to your UI the moment they’re generated — no waiting for the full response. The runStream() API emits a discriminated union of events that you consume with a standard for await...of loop, and two density modes let you choose between minimal overhead (tokens only) and full lifecycle visibility (phases, tools, thoughts). Concurrent streams are fiber-isolated via Effect-TS FiberRef, so multiple callers never see each other’s tokens.

import { ReactiveAgents } from "@reactive-agents/runtime";
const agent = await ReactiveAgents.create()
.withName("streamer")
.withProvider("anthropic")
.withReasoning()
.withStreaming({ density: "tokens" })
.build();
for await (const event of agent.runStream("Write a haiku about Effect-TS")) {
if (event._tag === "TextDelta") process.stdout.write(event.text);
if (event._tag === "StreamCompleted") console.log("\nDone!");
}
await agent.dispose();

.withStreaming() sets the default density. runStream() returns an AsyncGenerator<AgentStreamEvent> — each iteration yields the next event.

Every event carries a _tag discriminant. Narrow with switch or if — TypeScript infers the payload automatically.

type AgentStreamEvent =
| { _tag: "TextDelta"; text: string }
| { _tag: "StreamCompleted"; output: string; metadata: AgentResultMetadata; taskId?: string; agentId?: string }
| { _tag: "StreamError"; cause: string }
| { _tag: "PhaseStarted"; phase: string; timestamp: number }
| { _tag: "PhaseCompleted"; phase: string; durationMs: number }
| { _tag: "ThoughtEmitted"; content: string; iteration: number }
| { _tag: "ToolCallStarted"; toolName: string; callId: string }
| { _tag: "ToolCallCompleted"; toolName: string; callId: string; durationMs: number; success: boolean };

These three events are emitted regardless of density mode:

EventShapeDescription
TextDelta{ text: string }A text token from the LLM. High-frequency during inference.
StreamCompleted{ output, metadata, taskId?, agentId? }Execution succeeded. Always the last event on a successful stream.
StreamError{ cause: string }Execution failed. Always the last event on a failed stream.

These five events are only emitted when density is "full":

EventShapeDescription
PhaseStarted{ phase, timestamp }A lifecycle phase (bootstrap, think, act, etc.) started.
PhaseCompleted{ phase, durationMs }A lifecycle phase completed with its duration.
ThoughtEmitted{ content, iteration }The LLM produced a reasoning thought during a think phase.
ToolCallStarted{ toolName, callId }A tool call began execution.
ToolCallCompleted{ toolName, callId, durationMs, success }A tool call finished with its duration and success status.
ModeEvents EmittedUse Case
"tokens"TextDelta + StreamCompleted + StreamErrorChat UIs — minimal overhead, just the text
"full"All 8 event typesDev tools, dashboards — full lifecycle visibility

Precedence: per-call options.density > builder .withStreaming({ density }) > config default > "tokens".

// Override density per call
for await (const event of agent.runStream("Analyze this data", { density: "full" })) {
switch (event._tag) {
case "TextDelta":
process.stdout.write(event.text);
break;
case "PhaseStarted":
console.log(`\n[${event.phase}] started`);
break;
case "PhaseCompleted":
console.log(`[${event.phase}] ${event.durationMs}ms`);
break;
case "ThoughtEmitted":
console.log(` thought #${event.iteration}: ${event.content.slice(0, 80)}...`);
break;
case "ToolCallStarted":
console.log(` tool: ${event.toolName} (${event.callId})`);
break;
case "ToolCallCompleted":
console.log(` tool: ${event.toolName} ${event.success ? "ok" : "FAIL"} ${event.durationMs}ms`);
break;
case "StreamCompleted":
console.log(`\nDone — ${event.output.length} chars`);
break;
case "StreamError":
console.error(`\nError: ${event.cause}`);
break;
}
}

The raw runStream() returns an AsyncGenerator. For HTTP servers and other environments, AgentStream provides four adapters that convert the underlying Effect stream.

AgentStream.toSSE(stream) returns a standard Response with Content-Type: text/event-stream. Each event is JSON-encoded on a data: line. The forked fiber is interrupted when the HTTP client disconnects.

import { ReactiveAgents, AgentStream } from "@reactive-agents/runtime";
const agent = await ReactiveAgents.create()
.withProvider("anthropic")
.withReasoning()
.withStreaming()
.build();
Bun.serve({
port: 3000,
async fetch(req) {
if (new URL(req.url).pathname === "/stream") {
const stream = await agent.runtime.runPromise(
agent.engine.executeStream(task, { density: "tokens" }),
);
return AgentStream.toSSE(stream);
}
return new Response("Not found", { status: 404 });
},
});

Client-side:

const source = new EventSource("/stream");
source.onmessage = (e) => {
const event = JSON.parse(e.data);
if (event._tag === "TextDelta") appendToUI(event.text);
if (event._tag === "StreamCompleted") source.close();
};

AgentStream.toReadableStream(stream) returns a ReadableStream<AgentStreamEvent> compatible with the Web Streams API.

const readable = AgentStream.toReadableStream(effectStream);
const reader = readable.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) break;
if (value._tag === "TextDelta") process.stdout.write(value.text);
}

AgentStream.toAsyncIterable(stream) converts the Effect stream into a standard AsyncIterable<AgentStreamEvent> for for await...of consumption. Works in Node 18+, Bun, and browsers.

for await (const event of AgentStream.toAsyncIterable(effectStream)) {
if (event._tag === "TextDelta") process.stdout.write(event.text);
}

AgentStream.collect(stream) accumulates the entire stream into a single AgentResult — equivalent to calling agent.run(). Useful when you need to pass a stream to both a UI and a final-result handler.

const result = await AgentStream.collect(effectStream);
console.log(result.output); // Full response text
console.log(result.success); // true
console.log(result.metadata); // { stepsCount, tokensUsed, ... }
agent.runStream("prompt")
┌────────────▼────────────────┐
│ ExecutionEngine │
│ │
│ Queue.unbounded() │
│ ▲ │ │
│ │ ▼ │
│ TextDelta Stream.unfold │──▶ AsyncGenerator
│ ▲ │ │
│ │ ▼ │
│ FiberRef StreamCompleted │
│ callback / StreamError │
│ ▲ │
│ │ │
│ Effect.locally( │
│ execute(task), │
│ StreamingTextCallback, │
│ (text) => Queue.offer() │
│ ).pipe(Effect.forkDaemon) │
└──────────────────────────────┘
  1. Queue — An unbounded Queue<AgentStreamEvent> acts as the bridge between the execution fiber and the consumer.
  2. FiberRefStreamingTextCallback is a FiberRef that the react-kernel reads during LLM streaming. When the LLM emits a text token, the callback pushes a TextDelta event onto the queue.
  3. Effect.locally — Sets the StreamingTextCallback FiberRef for the execution scope only. This is what makes concurrent streams fiber-isolated — each runStream() call gets its own callback bound to its own queue.
  4. forkDaemon — Execution runs in a forked daemon fiber so the stream can yield events as they arrive rather than waiting for execution to complete.
  5. Stream.unfoldEffect — Reads events from the queue one at a time, yielding each to the consumer. Stops after receiving a terminal event (StreamCompleted or StreamError).
ValueEventsOverhead
"tokens"TextDelta, StreamCompleted, StreamErrorMinimal — just text tokens
"full"All 8 event typesHigher — includes phase timing, tool tracking, thoughts
MethodDescription
.withStreaming()Enable streaming with default "tokens" density
.withStreaming({ density: "full" })Enable streaming with full event density
agent.runStream(input)Stream with builder-configured density
agent.runStream(input, { density: "full" })Stream with per-call density override

When streaming is active, two events are published to the EventBus:

EventWhen
AgentStreamStartedrunStream() begins execution (includes density, taskId, agentId)
AgentStreamCompletedStream terminates (includes success, durationMs)
  • Handle StreamError — Always check for StreamError events. If you only listen for TextDelta, errors will be silently swallowed.
  • TextDelta requires reasoningTextDelta events come from the LLM’s streaming output, which flows through the react-kernel. Without .withReasoning(), you’ll get StreamCompleted but no intermediate tokens.
  • Call dispose() — After you’re done streaming, call agent.dispose() to release the ManagedRuntime and any MCP subprocesses. Or use await using for automatic cleanup.
  • Streams are single-use — Each runStream() call creates a new stream. You cannot replay or fork a stream — call runStream() again for a new execution.
  • SSE adapter runs in Effect contextAgentStream.toSSE() calls Effect.runFork internally. If you need the stream within an existing Effect program, use executeStream() directly on the engine instead of the agent.runStream() facade.