Overview
Autonome uses a hybrid data flow architecture combining oRPC procedures for request/response patterns and Server-Sent Events (SSE) for real-time updates. This ensures type-safe data fetching while maintaining instant UI reactivity.
oRPC Request/Response Flow
Client → Server Data Fetching
The primary pattern for fetching data from the backend:
Step-by-Step Breakdown
1. Component Query
// src/components/positions-list.tsx
import { useQuery } from "@tanstack/react-query";
import { orpc } from "@/server/orpc/client";
function PositionsList() {
const { data: positions, isLoading } = useQuery(
orpc.trading.getPositions.queryOptions({
input: { modelId: "abc-123" }
})
);
if (isLoading) return <Spinner />;
return <Table data={positions} />;
}
What happens:
- TanStack Query checks cache (15s staleTime for positions)
- If stale, triggers oRPC client request
- Suspends component until data arrives
2. oRPC Client Configuration
// src/server/orpc/client.ts
import { createORPCClient } from "@orpc/client";
import { RPCLink } from "@orpc/client/fetch";
import { createTanstackQueryUtils } from "@orpc/tanstack-query";
import { getRpcUrl } from "@/core/shared/api/apiConfig";
import type router from "@/server/orpc/router";
const link = new RPCLink({
url: getRpcUrl(), // http://localhost:8081/api/rpc in dev
});
export const client: RouterClient<typeof router> = createORPCClient(link);
export const orpc = createTanstackQueryUtils(client);
Key points:
RouterClient<typeof router> infers full type signature from backend
getRpcUrl() uses VITE_API_URL env var (or Vite proxy in dev)
createTanstackQueryUtils() generates .queryOptions() for every procedure
3. API URL Resolution
// src/core/shared/api/apiConfig.ts
export function getApiBaseUrl(): string {
// Production: Use explicit API URL
if (import.meta.env.VITE_API_URL) {
return import.meta.env.VITE_API_URL.replace(/\/$/, "");
}
// Development: Use Vite proxy (relative path)
if (typeof window !== "undefined") {
return window.location.origin; // http://localhost:5173
}
// SSR fallback
return "http://localhost:8081";
}
export function getRpcUrl(): string {
return `${getApiBaseUrl()}/api/rpc`;
}
Environment-specific behavior:
| Environment | VITE_API_URL | Resolved URL |
|---|
| Development | (not set) | http://localhost:5173/api/rpc (proxied to 8081) |
| Production (Vercel) | https://api.autonome.ai | https://api.autonome.ai/api/rpc |
4. Vite Proxy (Development Only)
// vite.config.ts
export default defineConfig({
server: {
port: 5173,
proxy: {
"/api": {
target: "http://localhost:8081",
changeOrigin: true,
},
},
},
});
Flow:
- Browser requests
http://localhost:5173/api/rpc/trading.getPositions
- Vite intercepts and forwards to
http://localhost:8081/api/rpc/trading.getPositions
- Backend responds, Vite forwards back to browser
The proxy eliminates CORS issues in development. In production, the frontend directly calls the VPS API URL.
5. Hono API Server
// api/src/index.ts
import { RPCHandler } from "@orpc/server/fetch";
import { Hono } from "hono";
import router from "@/server/orpc/router";
const app = new Hono();
const rpcHandler = new RPCHandler(router);
app.all("/api/rpc/*", async (c) => {
const { response } = await rpcHandler.handle(c.req.raw, {
prefix: "/api/rpc",
context: {}, // Can inject auth, DB, etc.
});
return response ?? c.json({ error: "Not Found" }, 404);
});
Routing logic:
- POST
/api/rpc/trading.getPositions → router.trading.getPositions
- POST
/api/rpc/simulator.placeOrder → router.simulator.placeOrder
- Wildcard matching for nested procedures
6. oRPC Procedure Handler
// src/server/orpc/router/trading.ts
import "@/polyfill"; // Compatibility shims
import { os } from "@orpc/server";
import * as Sentry from "@sentry/react";
import { z } from "zod";
import { fetchPositions } from "@/server/features/trading/queries.server";
export const getPositions = os
.input(z.object({
modelId: z.string().optional()
}))
.output(z.array(z.object({
symbol: z.string(),
side: z.enum(["LONG", "SHORT"]),
quantity: z.string(),
// ...
})))
.handler(async ({ input }) =>
Sentry.startSpan({ name: "getPositions" }, async () => {
const positions = await fetchPositions(input.modelId);
return positions; // Zod validates output schema
})
);
Handler responsibilities:
- Input validation: Zod parses and validates
input
- Business logic: Calls feature module (e.g.,
fetchPositions)
- Output validation: Zod validates return value matches schema
- Error handling: Zod validation errors auto-convert to 400 responses
- Observability: Sentry span tracks execution time
7. Feature Module (Business Logic)
// src/server/features/trading/queries.server.ts
import { db } from "@/db";
import { orders, models } from "@/db/schema";
import { eq, and } from "drizzle-orm";
import { calculateUnrealizedPnl } from "@/core/shared/trading/calculations";
export async function fetchPositions(modelId?: string) {
const openOrders = await db
.select({
id: orders.id,
modelId: orders.modelId,
modelName: models.name,
symbol: orders.symbol,
side: orders.side,
quantity: orders.quantity,
entryPrice: orders.entryPrice,
leverage: orders.leverage,
exitPlan: orders.exitPlan,
openedAt: orders.openedAt,
})
.from(orders)
.leftJoin(models, eq(orders.modelId, models.id))
.where(
and(
eq(orders.status, "OPEN"),
modelId ? eq(orders.modelId, modelId) : undefined
)
)
.orderBy(orders.openedAt);
// Enrich with current prices and unrealized P&L
const positions = await Promise.all(
openOrders.map(async (order) => {
const currentPrice = await getCurrentPrice(order.symbol);
const unrealizedPnl = calculateUnrealizedPnl(
parseFloat(order.entryPrice),
currentPrice,
parseFloat(order.quantity),
order.side
);
return {
...order,
currentPrice: currentPrice.toString(),
unrealizedPnl: unrealizedPnl.toString(),
};
})
);
return positions;
}
Data flow:
- Drizzle query with join (
Orders + Models)
- Filter for
OPEN orders (positions, not trades)
- Enrich with live price data (unrealized P&L)
- Return typed array
8. Response Flow
The response travels back through the stack:
PostgreSQL result
→ Drizzle ORM (typed rows)
→ Feature module (enriched data)
→ oRPC handler (Zod output validation)
→ Hono API (JSON serialization)
→ Vite proxy (dev) or direct (prod)
→ oRPC client (deserialization)
→ TanStack Query (cache update)
→ Component (re-render with data)
TanStack Query Caching Strategy
TanStack Query caches responses to reduce server load:
// Example query configuration
const { data } = useQuery({
...orpc.trading.getPositions.queryOptions({ input: {} }),
staleTime: 15_000, // Consider fresh for 15s
gcTime: 5 * 60_000, // Keep in cache for 5min after unmount
refetchOnWindowFocus: true,
});
Cache behavior:
| Time | Action |
|---|
| 0s | Initial fetch (cache miss) |
| 5s | Component reads from cache (no request) |
| 16s | Data is stale, refetch in background |
| 5min | Component unmounts, data eligible for garbage collection |
Stale time recommendations:
- High-frequency updates (positions, trades):
15s
- Moderate updates (portfolio history):
30s
- Rarely changing (models, variants):
5min
Server-Sent Events (SSE) Streaming
Why SSE?
SSE provides unidirectional real-time updates from server to client:
| Feature | SSE | WebSockets | Polling |
|---|
| Direction | Server → Client | Bidirectional | Client → Server → Client |
| Protocol | HTTP (text/event-stream) | WebSocket (ws://) | HTTP (repeated requests) |
| Reconnection | Automatic | Manual | Manual (interval) |
| Browser Support | Native EventSource | Native WebSocket | Native fetch |
| Use Case | Real-time push (trades, positions) | Chat, multiplayer | Simple polling |
Autonome’s choice: SSE is simpler than WebSockets for one-way updates and automatically reconnects on network issues.
SSE Architecture
Backend: Event Broadcasting
EventEmitter Pattern
// src/server/features/trading/events/positionEvents.ts
import { EventEmitter } from "node:events";
export type PositionEventData = {
modelId: string;
modelName: string;
positions: unknown[];
totalUnrealizedPnl: number;
availableCash: number;
};
export type PositionEvent = {
type: "positions:updated";
timestamp: string;
data: PositionEventData[];
};
const emitter = new EventEmitter();
emitter.setMaxListeners(50); // Support many concurrent SSE connections
const EVENT_KEY = "position-update";
let currentPositionsCache: PositionEventData[] = [];
let lastPositionUpdateAt: number | null = null;
export const emitPositionEvent = (event: PositionEvent) => {
currentPositionsCache = event.data;
lastPositionUpdateAt = Date.now();
emitter.emit(EVENT_KEY, event);
};
export const subscribeToPositionEvents = (
listener: (event: PositionEvent) => void
) => {
emitter.on(EVENT_KEY, listener);
return () => emitter.off(EVENT_KEY, listener); // Cleanup function
};
export const getCurrentPositions = () => currentPositionsCache;
Key design choices:
- In-memory cache: Stores latest event for immediate SSE hydration
- Max listeners: Increased from default (10) to support many SSE clients
- Cleanup function: Returned by
subscribe for proper unsubscription
SSE Endpoint Handler
// api/src/index.ts
import { streamSSE } from "hono/streaming";
import {
emitPositionEvent,
getCurrentPositions,
subscribeToPositionEvents,
} from "@/server/features/trading/events/positionEvents";
import { fetchPositions } from "@/server/features/trading/queries.server";
app.get("/api/events/positions", async (c) => {
return streamSSE(c, async (stream) => {
// 1. Hydrate cache with latest data
const positions = await fetchPositions();
emitPositionEvent({
type: "positions:updated",
timestamp: new Date().toISOString(),
data: positions,
});
// 2. Send initial data to new subscriber
await stream.writeSSE({
data: JSON.stringify(getCurrentPositions())
});
// 3. Subscribe to future updates
const unsubscribe = subscribeToPositionEvents((event) => {
stream.writeSSE({ data: JSON.stringify(event.data) });
});
// 4. Heartbeat to keep connection alive
const heartbeat = setInterval(() => {
stream.writeSSE({ event: "ping", data: "" });
}, 15_000);
// 5. Cleanup on disconnect
stream.onAbort(() => {
clearInterval(heartbeat);
unsubscribe();
});
// 6. Keep stream alive indefinitely
await new Promise(() => {});
});
});
Flow:
- Cache hydration: Fetch latest positions from DB
- Initial data: Send current cache to new subscriber
- Subscription: Register listener for future events
- Heartbeat: Send ping every 15s to prevent proxy timeout
- Cleanup: Remove listener when client disconnects
- Keep-alive: Never-resolving promise keeps stream open
Event Emission from Schedulers
// src/server/features/trading/tradeExecutor.ts
import { emitPositionEvent } from "./events/positionEvents";
let tradeSchedulerHandle: NodeJS.Timeout | null = null;
export function ensureTradeScheduler() {
if (tradeSchedulerHandle) return;
tradeSchedulerHandle = setInterval(async () => {
try {
// Execute AI trading decisions
await runTradingWorkflow();
// Fetch updated positions
const positions = await fetchPositions();
// Broadcast to all SSE subscribers
emitPositionEvent({
type: "positions:updated",
timestamp: new Date().toISOString(),
data: positions,
});
} catch (error) {
console.error("[TradeScheduler] Error:", error);
}
}, 60_000); // Every 60s
}
Trigger points:
- Schedulers: Price tracker (10s), trade executor (60s)
- Simulator: Auto-close triggers (stop-loss, take-profit)
- Manual actions: User places/closes orders
Frontend: Event Consumption
EventSource Subscription
// src/hooks/usePositionsStream.ts
import { useEffect } from "react";
import { useQueryClient } from "@tanstack/react-query";
import { getSseUrl } from "@/core/shared/api/apiConfig";
export function usePositionsStream() {
const queryClient = useQueryClient();
useEffect(() => {
const eventSource = new EventSource(getSseUrl("/api/events/positions"));
eventSource.onmessage = (event) => {
const positions = JSON.parse(event.data);
// Update TanStack Query cache
queryClient.setQueryData(
["positions"], // Query key
positions
);
};
eventSource.onerror = (error) => {
console.error("[SSE] Connection error:", error);
// EventSource auto-reconnects
};
return () => {
eventSource.close();
};
}, [queryClient]);
}
Usage in component:
function PositionsList() {
usePositionsStream(); // Subscribe to SSE updates
const { data: positions } = useQuery(
orpc.trading.getPositions.queryOptions({ input: {} })
);
return <Table data={positions} />;
}
How it works:
- Component mounts →
usePositionsStream() subscribes to SSE
- Server emits position update → SSE pushes to client
onmessage handler → updates TanStack Query cache
- Cache update → React re-renders component with new data
- Component unmounts →
EventSource.close() called
SSE Endpoints Overview
| Endpoint | Data | Update Frequency | Use Case |
|---|
/api/events/positions | Open positions | 10s (price updates) | Position table, P&L |
/api/events/trades | Completed trades | On trade close | Trade history |
/api/events/conversations | AI chat history | On agent invocation | Chat interface |
/api/events/portfolio | Portfolio snapshots | 10s | Performance chart |
/api/events/workflow | Workflow status | On workflow event | Debug panel |
State Management Layers
Autonome uses a multi-layer state architecture:
1. Server State (TanStack Query)
For data fetched from the backend:
const { data: positions } = useQuery(
orpc.trading.getPositions.queryOptions({ input: {} })
);
Managed by: TanStack Query cache
Persistence: In-memory (cleared on refresh)
Synchronization: oRPC + SSE
2. URL State (TanStack Router)
For navigation and shareable state:
const { variantId } = Route.useSearch(); // /dashboard?variantId=apex
const navigate = useNavigate();
navigate({ search: { variantId: "trendsurfer" } });
Managed by: TanStack Router
Persistence: URL (shareable, bookmarkable)
Use cases: Filters, tabs, pagination
3. Local State (TanStack Store)
For ephemeral UI state:
import { Store, useStore } from "@tanstack/react-store";
const variantStore = new Store({
selectedVariant: "all" as VariantIdWithAll,
});
export function useVariant() {
const selectedVariant = useStore(variantStore, (state) => state.selectedVariant);
const setSelectedVariant = (variant: VariantIdWithAll) => {
variantStore.setState((state) => ({ ...state, selectedVariant: variant }));
};
return { selectedVariant, setSelectedVariant };
}
Managed by: TanStack Store
Persistence: In-memory (cleared on refresh)
Use cases: Sidebar expanded state, theme preference
4. Component State (React)
For isolated component logic:
const [isOpen, setIsOpen] = useState(false);
const [searchQuery, setSearchQuery] = useState("");
Managed by: React useState/useReducer
Persistence: Component lifecycle only
Use cases: Form inputs, modals, tooltips
Optimistic Updates
For instant UI feedback on mutations:
import { useMutation } from "@tanstack/react-query";
import { orpc } from "@/server/orpc/client";
function ClosePositionButton({ positionId }: { positionId: string }) {
const queryClient = useQueryClient();
const mutation = useMutation({
mutationFn: () => orpc.trading.closePosition({ positionId }),
onMutate: async () => {
// Cancel outgoing refetches
await queryClient.cancelQueries({ queryKey: ["positions"] });
// Snapshot current value
const previousPositions = queryClient.getQueryData(["positions"]);
// Optimistically update cache
queryClient.setQueryData(["positions"], (old) =>
old.filter((p) => p.id !== positionId)
);
return { previousPositions };
},
onError: (err, variables, context) => {
// Rollback on error
queryClient.setQueryData(["positions"], context.previousPositions);
},
onSettled: () => {
// Refetch to ensure sync
queryClient.invalidateQueries({ queryKey: ["positions"] });
},
});
return <button onClick={() => mutation.mutate()}>Close</button>;
}
Flow:
- User clicks “Close” → UI updates instantly (optimistic)
- Mutation sent to server → oRPC handler executes
- If success → SSE confirms update (cache already correct)
- If error → Rollback optimistic update (restore previous state)
onSettled → Refetch to guarantee server truth
Data Consistency Guarantees
Single Source of Truth: Orders Table
The "Orders" table in PostgreSQL is the canonical source for positions:
- OPEN orders = active positions
- CLOSED orders = completed trades
All derived data (UI, simulator state, analytics) must reconcile with this table.
Simulator State Restoration
On server restart, the simulator rehydrates from the database:
// src/server/features/simulator/exchangeSimulator.ts
private async restorePositionsFromDb() {
const openOrders = await getAllOpenOrders();
for (const order of openOrders) {
const account = this.getOrCreateAccount(order.modelId);
// Restore position in memory
account.applyExecution(
order.symbol,
order.side === "LONG" ? "buy" : "sell",
{
fills: [{ quantity: parseFloat(order.quantity), price: parseFloat(order.entryPrice) }],
averagePrice: parseFloat(order.entryPrice),
totalQuantity: parseFloat(order.quantity),
totalFees: 0,
status: "filled",
},
parseFloat(order.leverage ?? "1")
);
// Restore exit plan
if (order.exitPlan) {
account.setExitPlan(order.symbol, order.exitPlan);
}
}
}
Guarantees:
- Simulator state matches DB on startup
- Auto-close triggers work after restart
- No position data loss during deployments
Cache Invalidation Strategy
Explicit invalidation after mutations:
await queryClient.invalidateQueries({ queryKey: ["positions"] });
SSE-driven invalidation:
eventSource.onmessage = (event) => {
const positions = JSON.parse(event.data);
queryClient.setQueryData(["positions"], positions);
};
Time-based staleness: TanStack Query refetches stale data automatically.
1. Query Deduplication
TanStack Query deduplicates concurrent requests:
// Both components render simultaneously
<PositionsList /> // Triggers fetch
<PositionsSummary /> // Reuses in-flight request
2. Selective Cache Updates
SSE updates only changed data:
eventSource.onmessage = (event) => {
const updatedPosition = JSON.parse(event.data);
queryClient.setQueryData(["positions"], (old) =>
old.map((p) =>
p.id === updatedPosition.id ? updatedPosition : p
)
);
};
3. Background Refetching
TanStack Query refetches stale data without blocking UI:
const { data, isRefetching } = useQuery({
...orpc.trading.getPositions.queryOptions({ input: {} }),
staleTime: 15_000,
});
// First render: data from cache (instant)
// Background: refetch if stale (non-blocking)
4. Partial Hydration
SSE endpoints hydrate cache before sending initial data:
// Ensures initial SSE message is never empty
const positions = await fetchPositions();
emitPositionEvent({ type: "positions:updated", data: positions });
await stream.writeSSE({ data: JSON.stringify(getCurrentPositions()) });
Error Handling
oRPC Errors
const { data, error } = useQuery(
orpc.trading.getPositions.queryOptions({ input: {} })
);
if (error) {
// Zod validation error (400)
if (error.code === "VALIDATION_ERROR") {
return <Alert>Invalid input: {error.message}</Alert>;
}
// Server error (500)
return <Alert>Failed to load positions</Alert>;
}
SSE Reconnection
EventSource automatically reconnects on disconnect:
eventSource.onerror = (error) => {
console.error("[SSE] Connection lost, reconnecting...");
// Browser auto-reconnects with exponential backoff
};
Mutation Rollback
Optimistic updates rollback on error (see Optimistic Updates section).
Summary
Autonome’s data flow architecture balances:
- Type safety: oRPC procedures with Zod validation
- Performance: TanStack Query caching and background refetching
- Real-time: SSE for instant UI updates
- Consistency: Single source of truth (Orders table)
- Resilience: Automatic reconnection and error recovery
This hybrid approach provides the best of both worlds: type-safe request/response for data fetching and low-latency SSE for real-time updates.
Next Steps