Skip to content

Agent Gateway

The Agent Gateway turns reactive agents into persistent, autonomous services. Instead of waiting for user prompts, gateway-enabled agents respond to heartbeat ticks, cron schedules, webhooks, and other event sources — all governed by a deterministic policy engine that decides what deserves an LLM call and what doesn’t.

Most agent frameworks route every input through an LLM. The gateway inverts this:

┌──────── THE HARNESS ────────┐
│ (zero LLM calls) │
Heartbeats ──┐ │ │
Crons ───────┤ │ InputRouter │
Webhooks ────┼──────────▶│ → PolicyEngine │
Channels ────┤ │ → EventBus │
A2A ─────────┘ │ → AuditLog │
└──────────┬──────────────────┘
Does this need intelligence?
┌───────────────┼───────────────┐
│ NO │ YES
▼ ▼
Skip / Queue / Merge ┌─ THE HORSE ─┐
(deterministic) │ LLM Call │
│ Exec Engine │
└──────────────┘

The Harness handles event routing, policy evaluation, rate limiting, budget enforcement, and event merging — all without touching the LLM. The Horse (the LLM) is only invoked when the policy engine decides intelligence is genuinely needed.

This means autonomous agents are cheaper, faster, and more predictable than architectures that blindly invoke an LLM on every tick.

import { ReactiveAgents } from "reactive-agents";
const agent = await ReactiveAgents.create()
.withName("ops-agent")
.withProvider("anthropic")
.withReasoning()
.withTools()
.withGateway({
heartbeat: {
intervalMs: 1_800_000, // 30 minutes
policy: "adaptive", // Skip ticks when idle
instruction: "Check for pending tasks and take action if needed",
},
crons: [
{
schedule: "0 9 * * MON-FRI",
instruction: "Review overnight alerts and summarize",
priority: "high",
},
],
webhooks: [
{
path: "/github",
adapter: "github",
secret: process.env.GITHUB_WEBHOOK_SECRET,
},
],
policies: {
dailyTokenBudget: 50_000,
maxActionsPerHour: 20,
heartbeatPolicy: "adaptive",
},
})
.build();

All inputs normalize to a universal GatewayEvent envelope before entering the policy engine:

interface GatewayEvent {
readonly id: string;
readonly source: "heartbeat" | "cron" | "webhook" | "channel" | "a2a" | "state-change";
readonly timestamp: Date;
readonly agentId?: string;
readonly payload: unknown;
readonly priority: "low" | "normal" | "high" | "critical";
readonly metadata: Record<string, unknown>;
readonly traceId?: string;
}

Periodic ticks that give agents “thinking turns” — time to check memory, review pending items, and take proactive action.

heartbeat: {
intervalMs: 1_800_000, // Every 30 minutes
policy: "adaptive", // Skip when nothing changed
instruction: "Review and act on pending items",
maxConsecutiveSkips: 6, // Force execution after 6 skips
}
PolicyBehavior
"always"Fire every tick (like OpenClaw)
"adaptive"Skip when agent state hasn’t changed — no pending events, no memory updates. Saves ~50%+ of ticks when idle
"conservative"Only fire when pending events exist

After maxConsecutiveSkips (default: 6), the heartbeat fires regardless of policy to prevent indefinite silence.

Standard 5-field cron expressions with attached instructions. Zero external dependencies.

crons: [
{
schedule: "0 9 * * MON", // 9 AM every Monday (UTC)
instruction: "Generate weekly project status report",
priority: "high",
},
{
schedule: "*/15 * * * *", // Every 15 minutes
instruction: "Check deployment health",
priority: "normal",
enabled: true,
},
{
schedule: "0 0 1 * *", // Midnight on the 1st
instruction: "Run monthly cost analysis",
},
]

Supported syntax: *, specific values, ranges (8-17), steps (*/15), comma lists (MON,WED,FRI), day names (MON-SUN).

HTTP POST endpoints with pluggable adapters for signature validation and payload transformation.

webhooks: [
{
path: "/github",
adapter: "github",
secret: process.env.GITHUB_WEBHOOK_SECRET,
events: ["push", "pull_request"], // Optional: filter by event type
},
{
path: "/stripe",
adapter: "generic",
secret: process.env.STRIPE_WEBHOOK_SECRET,
},
]

Built-in adapters:

AdapterValidationClassification
"github"HMAC-SHA256 via X-Hub-Signature-256"push", "pull_request.opened", etc.
"generic"Configurable HMAC header and algorithmExtracted from payload or "webhook.received"

Implement the WebhookAdapter interface for any source:

import type { WebhookAdapter } from "@reactive-agents/gateway";
import { Effect } from "effect";
const stripeAdapter: WebhookAdapter = {
source: "stripe",
validateSignature: (req, secret) => {
// Verify Stripe-Signature header
return Effect.succeed(verifyStripeSignature(req, secret));
},
transform: (req) => {
const body = JSON.parse(req.body);
return Effect.succeed({
id: body.id,
source: "webhook" as const,
timestamp: new Date(),
payload: body,
priority: body.type.includes("failed") ? "high" as const : "normal" as const,
metadata: { adapter: "stripe", type: body.type },
});
},
classify: (event) => String((event.metadata as any).type ?? "stripe.event"),
};

The policy engine evaluates a chain of policies against each incoming event. Policies are sorted by priority (lower number = evaluated first), and the first non-null decision wins. If no policy returns a decision, the event is executed.

type PolicyDecision =
| { action: "execute"; taskDescription: string } // Run it
| { action: "queue"; reason: string } // Defer for later
| { action: "skip"; reason: string } // Drop it
| { action: "merge"; mergeKey: string } // Batch with similar events
| { action: "escalate"; reason: string } // Flag for human review
PolicyPriorityWhat It Does
Adaptive Heartbeat10Skips heartbeat ticks when agent state is unchanged
Cost Budget20Blocks execution when daily token budget is exhausted
Rate Limit30Caps actions per hour to prevent runaway execution
Event Merging50Batches events with the same merge key (e.g., 5 PRs = 1 review)

Critical priority events bypass cost budget and rate limit policies.

import type { SchedulingPolicy } from "@reactive-agents/gateway";
import { Effect } from "effect";
const businessHoursOnly: SchedulingPolicy = {
_tag: "BusinessHours",
priority: 15,
evaluate: (event, state) => {
const hour = new Date().getUTCHours();
if (hour < 9 || hour > 17) {
return Effect.succeed({ action: "queue" as const, reason: "Outside business hours" });
}
return Effect.succeed(null); // Pass to next policy
},
};

Register custom policies via the PolicyEngine service:

import { PolicyEngine } from "@reactive-agents/gateway";
import { Effect } from "effect";
const program = Effect.gen(function* () {
const engine = yield* PolicyEngine;
yield* engine.addPolicy(businessHoursOnly);
});

The gateway is built on three principles that ensure autonomous agents remain trustworthy:

Every autonomous action is logged to the EventBus. Nothing happens in the dark.

EventWhen
GatewayEventReceivedAn event enters the router
PolicyDecisionMadeA policy makes a routing decision
ProactiveActionInitiatedThe LLM is invoked for an autonomous task
ProactiveActionCompletedAn autonomous task finishes
ProactiveActionSuppressedA policy blocked an event from reaching the LLM
HeartbeatSkippedA heartbeat tick was skipped (with reason and skip count)
EventsMergedMultiple events were batched into one
BudgetExhaustedDaily token budget reached

Subscribe to any of these for real-time monitoring:

await agent.subscribe("ProactiveActionSuppressed", (event) => {
console.log(`Suppressed: ${event.reason} (event: ${event.eventId})`);
});
await agent.subscribe("BudgetExhausted", (event) => {
console.log(`Budget hit: ${event.tokensUsed}/${event.dailyBudget} tokens`);
});

Hard limits prevent runaway execution:

  • Token budgets — Daily cap on LLM token consumption (default: 100,000)
  • Rate limits — Maximum actions per hour (default: 30)
  • Critical bypass — Only "critical" priority events can exceed limits
  • Kill switchagent.stop() or agent.terminate() halts the entire event loop
  • Adaptive heartbeats — Idle agents skip ticks instead of burning tokens

Agents declare their autonomous capabilities upfront. No hidden behaviors.

policies: {
dailyTokenBudget: 50_000, // User sets the ceiling
maxActionsPerHour: 20, // User controls the rate
heartbeatPolicy: "adaptive", // User chooses the mode
requireApprovalFor: ["deploy"], // User gates sensitive actions
}

Monitor gateway health programmatically:

import { GatewayService } from "@reactive-agents/gateway";
import { Effect } from "effect";
const program = Effect.gen(function* () {
const gw = yield* GatewayService;
const status = yield* gw.status();
console.log(status.isRunning); // true
console.log(status.uptime); // 3600000 (ms)
console.log(status.stats.heartbeatsFired); // 12
console.log(status.stats.heartbeatsSkipped); // 36
console.log(status.stats.webhooksReceived); // 8
console.log(status.stats.totalTokensUsed); // 23400
console.log(status.stats.actionsSuppressed); // 5
});

Stats tracked:

StatDescription
heartbeatsFired / heartbeatsSkippedHeartbeat efficiency ratio
webhooksReceived / webhooksProcessed / webhooksMergedWebhook throughput
cronsExecutedCron jobs completed
chatTurnsHandledIncoming channel messages handled in chat mode
totalTokensUsedCumulative LLM token consumption
actionsSuppressed / actionsEscalatedPolicy enforcement activity

The gateway enhances — and is enhanced by — every existing layer:

LayerHow It Integrates
GuardrailsWebhook payloads are checked for injection/PII before reaching the LLM
CostBudget policies delegate to the same CostService used by user-initiated tasks
IdentityAgent certificates can authenticate webhook sources
MemoryHeartbeats consult episodic memory for context before deciding to act
ObservabilityAll gateway events stream to the metrics dashboard and tracing system
Kill Switchagent.stop() halts the gateway event loop at the next phase boundary
VerificationAutonomous outputs are fact-checked before being sent
OrchestrationHigh-risk actions can route through approval gates
interface GatewayConfig {
heartbeat?: HeartbeatConfig;
crons?: CronEntry[];
webhooks?: WebhookConfig[];
accessControl?: GatewayAccessControlConfig;
policies?: PolicyConfig;
port?: number; // Default: 3000
persistMemoryAcrossRuns?: boolean; // Share agent ID across ticks for memory continuity
timezone?: string; // IANA timezone for cron evaluation (default: "UTC")
}
FieldTypeDefaultDescription
intervalMsnumberMilliseconds between heartbeat ticks
policy"always" | "adaptive" | "conservative""adaptive"Heartbeat firing strategy
instructionstringWhat the agent should do on each tick
maxConsecutiveSkipsnumber6Force execution after N consecutive skips
FieldTypeDefaultDescription
schedulestring5-field cron expression
instructionstringTask for the agent when cron fires
agentIdstringOverride target agent
priorityEventPriority"normal"Event priority level
enabledbooleantrueToggle without removing

GatewayAccessControlConfig (accessControl)

Section titled “GatewayAccessControlConfig (accessControl)”
FieldTypeDefaultDescription
accessPolicy"allowlist" | "blocklist" | "open""allowlist"Who can send messages
allowedSendersstring[]Phone numbers / user IDs allowed (allowlist mode)
blockedSendersstring[]Phone numbers / user IDs blocked (blocklist mode)
unknownSenderAction"skip" | "escalate""skip"What to do with unauthorized senders
replyToUnknownstringAuto-reply text for unknown senders
mode"chat" | "task""chat""chat" maintains per-sender conversation history; "task" sends one-shot instructions
sessionTtlDaysnumber30Days of inactivity before a chat session is pruned

Returned by handle.stop():

FieldTypeDescription
heartbeatsFirednumberHeartbeat ticks that triggered an LLM run
totalRunsnumberTotal agent executions (heartbeats + crons + channels)
cronChecksnumberCron schedule evaluations
chatTurnsnumber | undefinedIncoming channel messages handled in chat mode
errorstring | undefinedFatal error if the loop exited unexpectedly
FieldTypeDefaultDescription
dailyTokenBudgetnumber100_000Max tokens per day
maxActionsPerHournumber30Max LLM invocations per hour
heartbeatPolicyHeartbeatPolicy"adaptive"Heartbeat strategy
mergeWindowMsnumber300_000Event merge window (5 min)
requireApprovalForstring[]Categories requiring human approval

The gateway enables agents to communicate via Signal and Telegram using existing MCP servers in Docker containers. No custom adapter code needed — the framework’s .withMCP() connects to the messaging servers, and the gateway heartbeat drives message polling.

See the Messaging Channels guide for setup instructions.

accessControl: {
accessPolicy: "allowlist", // "allowlist" | "blocklist" | "open"
allowedSenders: ["+15551234567"],
unknownSenderAction: "skip", // "skip" | "escalate"
replyToUnknown: "Sorry, I only respond to authorized contacts.",
}

By default (accessControl.mode: "chat"), each incoming channel message starts a stateful per-sender conversation — not a one-shot task. The agent receives the full conversation history, recent episodic context, and a directive to respond via the channel tool.

accessControl: {
accessPolicy: "allowlist",
allowedSenders: ["+15551234567"],
mode: "chat", // default — persistent per-sender history
sessionTtlDays: 30, // prune inactive sessions after 30 days
}

What happens each turn:

  1. Session history for the sender is loaded from SQLite (or the in-memory cache for repeat turns)
  2. History is windowed to the most recent 40 turns / 8,000 characters before injection
  3. Recent gateway activity (heartbeat and cron results) is injected as episodic context — chat-turn episodes are filtered out to avoid recursive noise
  4. The enriched instruction is sent to the execution engine: episodic context → conversation history → user message → tool delivery directive
  5. After the agent run, both the user message and the assistant reply are appended to the session and persisted to SQLite
  6. GatewaySummary.chatTurns is incremented

Task mode skips all of the above and sends a direct one-shot instruction per message:

accessControl: {
mode: "task", // stateless — no history, no session persistence
}

Use task mode when each message is an independent command and you don’t want conversation context to accumulate (e.g. automation triggers, slash-command bots).

Memory requirements: Chat mode requires .withMemory() to be configured — session persistence is backed by SessionStoreService, and episodic context injection uses EpisodicMemoryService. Without a memory layer, sessions are in-memory only (lost on restart) and episodic context is empty.

const agent = await ReactiveAgents.create()
.withName("signal-agent")
.withAgentId("signal-agent") // stable ID for memory continuity across restarts
.withProvider("ollama")
.withMCP([{ name: "signal", transport: "stdio", command: "docker", args: [...] }])
.withMemory({ tier: "enhanced", dbPath: "./memory.sqlite" })
.withGateway({
persistMemoryAcrossRuns: true,
accessControl: {
accessPolicy: "allowlist",
allowedSenders: [process.env.RECIPIENT ?? ""],
mode: "chat",
sessionTtlDays: 30,
},
})
.build();
ErrorWhen
GatewayErrorGeneral gateway failure
GatewayConfigErrorInvalid configuration
WebhookValidationErrorSignature verification failed (401)
WebhookTransformErrorPayload transformation failed
PolicyViolationErrorPolicy explicitly rejected an event
SchedulerErrorInvalid cron expression or scheduling failure
ChannelConnectionErrorChannel adapter connection failure

All errors are Data.TaggedError instances — pattern-matchable in Effect error handlers.