Architecture¶
Reflex is an event-driven AI agent framework using PostgreSQL LISTEN/NOTIFY for real-time pub/sub.
🔍 System Overview¶
flowchart TB
subgraph Server["FastAPI Server"]
WS["WebSocket Handler"]
HTTP["HTTP Endpoints"]
Health["Health Checks"]
Loop["Agent Loop (Background)"]
end
subgraph Store["EventStore"]
Publish["publish()"]
Subscribe["subscribe() via LISTEN/NOTIFY"]
Ack["ack() / nack() with exponential backoff"]
end
subgraph DB["PostgreSQL"]
Events[("events table: id, type, payload, status, attempts")]
end
WS --> Publish
HTTP --> Publish
Loop --> Subscribe
Loop --> Ack
Store --> DB 🔄 Event Flow¶
sequenceDiagram
participant Client
participant API
participant EventStore
participant PostgreSQL
participant Agent
Client->>API: Send event (WS/HTTP)
API->>EventStore: publish(event)
EventStore->>PostgreSQL: INSERT + NOTIFY
PostgreSQL-->>EventStore: LISTEN notification
EventStore->>Agent: subscribe() yields event
Agent->>Agent: Process with PydanticAI
Agent->>EventStore: ack(event) or nack(event) - Ingestion - Events arrive via WebSocket or HTTP
- Persistence - EventStore persists to PostgreSQL and fires NOTIFY
- Processing - Agent loop receives events via subscribe()
- Execution - PydanticAI agent processes with tools
- Completion - Events are ack'd (completed) or nack'd (retry with backoff)
🧩 Key Components¶
| Component | Location | Purpose |
|---|---|---|
| EventStore | src/reflex/infra/store.py | Persistent event queue with pub/sub |
| Events | src/reflex/core/events.py | Pydantic models with discriminated unions |
| EventRegistry | src/reflex/core/events.py | Runtime event type registration |
| Triggers | src/reflex/agent/triggers.py | Filter + agent connections |
| Filters | src/reflex/agent/filters.py | Event matching predicates |
| Dependencies | src/reflex/core/deps.py | Dependency injection containers |
| Agent | src/reflex/agent/agents.py | PydanticAI agent with tools |
| API | src/reflex/api/app.py | FastAPI with WebSocket support |
| Config | src/reflex/config.py | Pydantic-settings configuration |
📁 Module Organization¶
Extension Points
The modules are designed with different stability guarantees:
Keep stable - Core infrastructure that rarely changes.
database.py- PostgreSQL connection setupstore.py- EventStore implementation
Extend carefully - Foundational types used throughout.
events.py- Event definitions & registrydeps.py- Dependency containerserrors.py- Structured error handlingtypes.py- Protocol definitions
Primary extension point - Where you build your agent logic.
agents.py- PydanticAI agent and toolstriggers.py- Trigger definitionsfilters.py- Event filters
Modify as needed - HTTP/WebSocket interface.
app.py- FastAPI application
📊 Event States¶
Events progress through these states:
stateDiagram-v2
[*] --> pending
pending --> processing: claim event
processing --> completed: ack()
processing --> failed: nack()
failed --> pending: retry
failed --> dead_letter: max retries exceeded
completed --> [*]
dead_letter --> pending: manual retry | State | Description |
|---|---|
pending | Waiting to be processed |
processing | Currently being handled by an agent |
completed | Successfully processed (ack()) |
failed | Processing failed, will retry (nack()) |
dead_letter | Max retries exceeded, requires manual intervention |