Skip to content

Streaming Responses

agent.runStream() returns an AsyncGenerator of typed events. Use it to show tokens as they arrive, display step progress, or build live UIs.

Print tokens as the model generates them:

import { ReactiveAgents } from "reactive-agents";
const agent = await ReactiveAgents.create()
.withName("streamer")
.withProvider("anthropic")
.build();
for await (const event of agent.runStream("Explain quantum entanglement")) {
if (event._tag === "TextDelta") {
process.stdout.write(event.text);
}
if (event._tag === "StreamCompleted") {
console.log("\n\nDone!");
}
}
for await (const event of agent.runStream(prompt)) {
switch (event._tag) {
case "TextDelta":
// A chunk of generated text (token or word depending on density)
process.stdout.write(event.text);
break;
case "IterationProgress":
// Emitted at the start of each reasoning iteration
console.log(`\nStep ${event.iteration}/${event.maxIterations}`);
if (event.toolsCalledThisStep.length > 0) {
console.log(` Tools: ${event.toolsCalledThisStep.join(", ")}`);
}
break;
case "StreamCompleted":
// Final event — includes full output and metrics
console.log(`\nCompleted in ${event.metadata.duration}ms`);
console.log(`Steps: ${event.metadata.stepsCount}`);
if (event.toolSummary?.length) {
for (const t of event.toolSummary) {
console.log(` ${t.name}: ${t.calls} call(s), avg ${t.avgMs}ms`);
}
}
break;
case "StreamError":
console.error("Stream failed:", event.cause);
break;
case "StreamCancelled":
console.log("Stream was cancelled.");
break;
}
}

Use the Web-standard AbortController to cancel a running stream:

const controller = new AbortController();
// Cancel after 10 seconds
const timeout = setTimeout(() => controller.abort(), 10_000);
try {
for await (const event of agent.runStream(prompt, { signal: controller.signal })) {
if (event._tag === "TextDelta") process.stdout.write(event.text);
if (event._tag === "StreamCancelled") console.log("\nCancelled.");
if (event._tag === "StreamCompleted") clearTimeout(timeout);
}
} catch {
// AbortError when signal fires mid-stream
}

AgentStream.collect() buffers all events and returns the final output string:

import { AgentStream } from "reactive-agents";
const output = await AgentStream.collect(agent.runStream(prompt));
console.log(output); // full text after completion

Send a stream over HTTP with AgentStream.toSSE():

import { AgentStream } from "reactive-agents";
import { Hono } from "hono";
const app = new Hono();
app.get("/stream", async (c) => {
const { readable, headers } = AgentStream.toSSE(agent.runStream(c.req.query("q") ?? ""));
return c.body(readable, { headers });
});

Clients receive standard SSE events. TextDelta events include data: {"text":"..."}.

Convert to ReadableStream for use with Response in edge runtimes:

export async function GET(req: Request) {
const stream = AgentStream.toReadableStream(
agent.runStream(new URL(req.url).searchParams.get("q") ?? "")
);
return new Response(stream, {
headers: { "Content-Type": "text/event-stream" },
});
}

streamDensity controls how many tokens are batched per TextDelta event:

const agent = await ReactiveAgents.create()
.withProvider("anthropic")
.withStreaming({ density: "tokens" }) // "tokens" | "words" | "sentences" | "paragraphs"
.build();

Use "tokens" for the most responsive UI; "sentences" for lower overhead.