Deterministic Event Pipelines for AI Agents
1. The Problem: The "Forever Loop" Fallacy
In 95% of AI tutorials, agents are implemented as recursive loops:
// The "Forever Loop" Pattern
while (true) {
const plan = await llm.think(history);
if (plan.isFinished) break;
const result = await tool.execute(plan.action); // BLOCKING
history.push(result);
}
This pattern is acceptable for prototypes but catastrophic for production systems.
Critical Failures:
- Concurrency Deadlocks: The runtime is blocked during tool execution. If your agent needs to fetch data (500ms) and simultaneously listen for a user interrupt, the loop architecture fails.
- Observability Black Holes: State exists only within the closure of the loop. If the process crashes on iteration #4, you lose the stack. You cannot "replay" the session.
- Testing Impossibility: You cannot unit test the "Tool Execution" logic without mocking the entire LLM response chain.
The Solution: Treat the Agent not as a function, but as a State Machine driven by a discrete Event Pipeline.
2. The Solution: The Cognitive Event Bus
We decouple Inference (Brain) from Execution (Hands). The components communicate only via typed events.
The Pipeline:
- Ingestion: User input is normalized into an
INPUT_RECEIVEDevent. - Cognition: The LLM listens to events, updates internal context, and emits
INTENT_DETECTED. - Router: A deterministic layer validates the intent and emits
EXECUTION_REQUESTED. - Effect: Workers execute the tool and emit
TOOL_RESULT. - Loopback: The
TOOL_RESULTis fed back into the Cognition layer.
3. Project Structure
Separation of concerns is enforced at the directory level.
/src
├── /core
│ ├── bus.ts # Typed EventEmitter Singleton
│ ├── types.ts # Zod Schemas for Event Payloads
│ └── store.ts # Redis/Postgres State Persistence
├── /reactors
│ ├── cognition.ts # Logic: Event -> LLM -> Intent
│ └── execution.ts # Logic: Intent -> Tool -> Result
├── /tools
│ ├── network.ts # Axios/Fetch Wrappers
│ └── filesystem.ts # Safe FS Operations
└── index.ts # Pipeline Composition
4. Code Implementation
We use TypeScript to enforce strict contracts on the event bus. This eliminates "magic string" bugs.
A. The Typed Event Bus
// src/core/bus.ts
import { EventEmitter } from 'events';
export enum AgentEvent {
INPUT = 'AGENT:INPUT',
INTENT = 'AGENT:INTENT',
EXECUTE = 'AGENT:EXECUTE',
RESULT = 'AGENT:RESULT',
ERROR = 'AGENT:ERROR'
}
// Strict Payload Definitions
interface EventPayloads {
[AgentEvent.INPUT]: { id: string; content: string };
[AgentEvent.INTENT]: { id: string; tool: string; args: unknown };
[AgentEvent.EXECUTE]: { id: string; tool: string; args: unknown };
[AgentEvent.RESULT]: { id: string; output: string };
[AgentEvent.ERROR]: { id: string; error: Error };
}
class TypedBus extends EventEmitter {
emit<K extends keyof EventPayloads>(event: K, payload: EventPayloads[K]): boolean {
console.log(`[BUS] ${event}`, payload); // Free Observability
return super.emit(event, payload);
}
on<K extends keyof EventPayloads>(event: K, fn: (p: EventPayloads[K]) => void): this {
return super.on(event, fn);
}
}
export const bus = new TypedBus();
B. The Cognition Reactor (The Brain)
This component is pure. It creates no side effects. It simply transforms state into intent.
// src/reactors/cognition.ts
import { bus, AgentEvent } from '../core/bus';
import { llm } from '../lib/llm';
export const registerCognition = () => {
const cycle = async (sessionId: string, context: any[]) => {
try {
const response = await llm.generate(context);
if (response.tool_calls) {
// Emit intent, do NOT execute
bus.emit(AgentEvent.INTENT, {
id: sessionId,
tool: response.tool_calls[0].function.name,
args: JSON.parse(response.tool_calls[0].function.arguments)
});
} else {
// Handle final answer
}
} catch (err) {
bus.emit(AgentEvent.ERROR, { id: sessionId, error: err as Error });
}
};
// Trigger on Input OR Tool Results (The Loopback)
bus.on(AgentEvent.INPUT, (p) => cycle(p.id, [{ role: 'user', content: p.content }]));
bus.on(AgentEvent.RESULT, (p) => cycle(p.id, [{ role: 'tool', content: p.output }]));
};
C. The Execution Reactor (The Hands)
This component is "dumb." It knows nothing of the LLM. It simply executes commands.
// src/reactors/execution.ts
import { bus, AgentEvent } from '../core/bus';
import { tools } from '../tools';
export const registerExecution = () => {
bus.on(AgentEvent.EXECUTE, async ({ id, tool, args }) => {
try {
// 1. Validation Logic
if (!tools[tool]) throw new Error(`Unknown tool: ${tool}`);
// 2. Execution
const output = await tools[tool](args);
// 3. Emit Result
bus.emit(AgentEvent.RESULT, { id, output: JSON.stringify(output) });
} catch (err) {
// Even errors are events, allowing the LLM to self-correct
bus.emit(AgentEvent.RESULT, { id, output: `Error: ${err.message}` });
}
});
};
5. The Strategic Advantage
| Metric | Monolithic Loop | Event Pipeline |
|---|---|---|
| Scalability | Vertical (Single Process) | Horizontal (Worker Pools can listen to EXECUTE events) |
| Debugging | Parsing 50MB log files | Time-travel replay of the event stream |
| Reliability | One unhandled exception kills the agent | Errors are just data; the pipeline persists |