DiegoVallejo

Deterministic Event Pipelines for AI Agents

1. The Problem: The "Forever Loop" Fallacy

In 95% of AI tutorials, agents are implemented as recursive loops:

// The "Forever Loop" Pattern
while (true) {
  const plan = await llm.think(history);
  if (plan.isFinished) break;
  const result = await tool.execute(plan.action); // BLOCKING
  history.push(result);
}

This pattern is acceptable for prototypes but catastrophic for production systems.

Critical Failures:

  1. Concurrency Deadlocks: The runtime is blocked during tool execution. If your agent needs to fetch data (500ms) and simultaneously listen for a user interrupt, the loop architecture fails.
  2. Observability Black Holes: State exists only within the closure of the loop. If the process crashes on iteration #4, you lose the stack. You cannot "replay" the session.
  3. Testing Impossibility: You cannot unit test the "Tool Execution" logic without mocking the entire LLM response chain.

The Solution: Treat the Agent not as a function, but as a State Machine driven by a discrete Event Pipeline.

2. The Solution: The Cognitive Event Bus

We decouple Inference (Brain) from Execution (Hands). The components communicate only via typed events.

The Pipeline:

  1. Ingestion: User input is normalized into an INPUT_RECEIVED event.
  2. Cognition: The LLM listens to events, updates internal context, and emits INTENT_DETECTED.
  3. Router: A deterministic layer validates the intent and emits EXECUTION_REQUESTED.
  4. Effect: Workers execute the tool and emit TOOL_RESULT.
  5. Loopback: The TOOL_RESULT is fed back into the Cognition layer.

3. Project Structure

Separation of concerns is enforced at the directory level.

/src
├── /core
│   ├── bus.ts             # Typed EventEmitter Singleton
│   ├── types.ts           # Zod Schemas for Event Payloads
│   └── store.ts           # Redis/Postgres State Persistence
├── /reactors
│   ├── cognition.ts       # Logic: Event -> LLM -> Intent
│   └── execution.ts       # Logic: Intent -> Tool -> Result
├── /tools
│   ├── network.ts         # Axios/Fetch Wrappers
│   └── filesystem.ts      # Safe FS Operations
└── index.ts               # Pipeline Composition

4. Code Implementation

We use TypeScript to enforce strict contracts on the event bus. This eliminates "magic string" bugs.

A. The Typed Event Bus

// src/core/bus.ts
import { EventEmitter } from 'events';

export enum AgentEvent {
  INPUT = 'AGENT:INPUT',
  INTENT = 'AGENT:INTENT',
  EXECUTE = 'AGENT:EXECUTE',
  RESULT = 'AGENT:RESULT',
  ERROR = 'AGENT:ERROR'
}

// Strict Payload Definitions
interface EventPayloads {
  [AgentEvent.INPUT]: { id: string; content: string };
  [AgentEvent.INTENT]: { id: string; tool: string; args: unknown };
  [AgentEvent.EXECUTE]: { id: string; tool: string; args: unknown };
  [AgentEvent.RESULT]: { id: string; output: string };
  [AgentEvent.ERROR]: { id: string; error: Error };
}

class TypedBus extends EventEmitter {
  emit<K extends keyof EventPayloads>(event: K, payload: EventPayloads[K]): boolean {
    console.log(`[BUS] ${event}`, payload); // Free Observability
    return super.emit(event, payload);
  }

  on<K extends keyof EventPayloads>(event: K, fn: (p: EventPayloads[K]) => void): this {
    return super.on(event, fn);
  }
}

export const bus = new TypedBus();

B. The Cognition Reactor (The Brain)

This component is pure. It creates no side effects. It simply transforms state into intent.

// src/reactors/cognition.ts
import { bus, AgentEvent } from '../core/bus';
import { llm } from '../lib/llm';

export const registerCognition = () => {
  const cycle = async (sessionId: string, context: any[]) => {
    try {
      const response = await llm.generate(context);
      
      if (response.tool_calls) {
        // Emit intent, do NOT execute
        bus.emit(AgentEvent.INTENT, {
          id: sessionId,
          tool: response.tool_calls[0].function.name,
          args: JSON.parse(response.tool_calls[0].function.arguments)
        });
      } else {
        // Handle final answer
      }
    } catch (err) {
      bus.emit(AgentEvent.ERROR, { id: sessionId, error: err as Error });
    }
  };

  // Trigger on Input OR Tool Results (The Loopback)
  bus.on(AgentEvent.INPUT, (p) => cycle(p.id, [{ role: 'user', content: p.content }]));
  bus.on(AgentEvent.RESULT, (p) => cycle(p.id, [{ role: 'tool', content: p.output }]));
};

C. The Execution Reactor (The Hands)

This component is "dumb." It knows nothing of the LLM. It simply executes commands.

// src/reactors/execution.ts
import { bus, AgentEvent } from '../core/bus';
import { tools } from '../tools';

export const registerExecution = () => {
  bus.on(AgentEvent.EXECUTE, async ({ id, tool, args }) => {
    try {
      // 1. Validation Logic
      if (!tools[tool]) throw new Error(`Unknown tool: ${tool}`);

      // 2. Execution
      const output = await tools[tool](args);

      // 3. Emit Result
      bus.emit(AgentEvent.RESULT, { id, output: JSON.stringify(output) });
      
    } catch (err) {
      // Even errors are events, allowing the LLM to self-correct
      bus.emit(AgentEvent.RESULT, { id, output: `Error: ${err.message}` });
    }
  });
};

5. The Strategic Advantage

Metric Monolithic Loop Event Pipeline
Scalability Vertical (Single Process) Horizontal (Worker Pools can listen to EXECUTE events)
Debugging Parsing 50MB log files Time-travel replay of the event stream
Reliability One unhandled exception kills the agent Errors are just data; the pipeline persists
​> Architectural Axiom: Determinism in AI systems is not about controlling the model's output (which is impossible). It is about controlling the state transitions that result from that output. ​By adopting an Event Bus, you move from "scripting a bot" to orchestrating a distributed system. This is the only path to production-grade reliability.