Skip to main content

Overview

Autonome uses a hybrid data flow architecture combining oRPC procedures for request/response patterns and Server-Sent Events (SSE) for real-time updates. This ensures type-safe data fetching while maintaining instant UI reactivity.

oRPC Request/Response Flow

Client → Server Data Fetching

The primary pattern for fetching data from the backend:

Step-by-Step Breakdown

1. Component Query

// src/components/positions-list.tsx
import { useQuery } from "@tanstack/react-query";
import { orpc } from "@/server/orpc/client";

function PositionsList() {
  const { data: positions, isLoading } = useQuery(
    orpc.trading.getPositions.queryOptions({ 
      input: { modelId: "abc-123" } 
    })
  );

  if (isLoading) return <Spinner />;
  return <Table data={positions} />;
}
What happens:
  • TanStack Query checks cache (15s staleTime for positions)
  • If stale, triggers oRPC client request
  • Suspends component until data arrives

2. oRPC Client Configuration

// src/server/orpc/client.ts
import { createORPCClient } from "@orpc/client";
import { RPCLink } from "@orpc/client/fetch";
import { createTanstackQueryUtils } from "@orpc/tanstack-query";
import { getRpcUrl } from "@/core/shared/api/apiConfig";
import type router from "@/server/orpc/router";

const link = new RPCLink({
  url: getRpcUrl(), // http://localhost:8081/api/rpc in dev
});

export const client: RouterClient<typeof router> = createORPCClient(link);
export const orpc = createTanstackQueryUtils(client);
Key points:
  • RouterClient<typeof router> infers full type signature from backend
  • getRpcUrl() uses VITE_API_URL env var (or Vite proxy in dev)
  • createTanstackQueryUtils() generates .queryOptions() for every procedure

3. API URL Resolution

// src/core/shared/api/apiConfig.ts
export function getApiBaseUrl(): string {
  // Production: Use explicit API URL
  if (import.meta.env.VITE_API_URL) {
    return import.meta.env.VITE_API_URL.replace(/\/$/, "");
  }

  // Development: Use Vite proxy (relative path)
  if (typeof window !== "undefined") {
    return window.location.origin; // http://localhost:5173
  }

  // SSR fallback
  return "http://localhost:8081";
}

export function getRpcUrl(): string {
  return `${getApiBaseUrl()}/api/rpc`;
}
Environment-specific behavior:
EnvironmentVITE_API_URLResolved URL
Development(not set)http://localhost:5173/api/rpc (proxied to 8081)
Production (Vercel)https://api.autonome.aihttps://api.autonome.ai/api/rpc

4. Vite Proxy (Development Only)

// vite.config.ts
export default defineConfig({
  server: {
    port: 5173,
    proxy: {
      "/api": {
        target: "http://localhost:8081",
        changeOrigin: true,
      },
    },
  },
});
Flow:
  • Browser requests http://localhost:5173/api/rpc/trading.getPositions
  • Vite intercepts and forwards to http://localhost:8081/api/rpc/trading.getPositions
  • Backend responds, Vite forwards back to browser
The proxy eliminates CORS issues in development. In production, the frontend directly calls the VPS API URL.

5. Hono API Server

// api/src/index.ts
import { RPCHandler } from "@orpc/server/fetch";
import { Hono } from "hono";
import router from "@/server/orpc/router";

const app = new Hono();
const rpcHandler = new RPCHandler(router);

app.all("/api/rpc/*", async (c) => {
  const { response } = await rpcHandler.handle(c.req.raw, {
    prefix: "/api/rpc",
    context: {}, // Can inject auth, DB, etc.
  });
  return response ?? c.json({ error: "Not Found" }, 404);
});
Routing logic:
  • POST /api/rpc/trading.getPositionsrouter.trading.getPositions
  • POST /api/rpc/simulator.placeOrderrouter.simulator.placeOrder
  • Wildcard matching for nested procedures

6. oRPC Procedure Handler

// src/server/orpc/router/trading.ts
import "@/polyfill"; // Compatibility shims
import { os } from "@orpc/server";
import * as Sentry from "@sentry/react";
import { z } from "zod";
import { fetchPositions } from "@/server/features/trading/queries.server";

export const getPositions = os
  .input(z.object({ 
    modelId: z.string().optional() 
  }))
  .output(z.array(z.object({
    symbol: z.string(),
    side: z.enum(["LONG", "SHORT"]),
    quantity: z.string(),
    // ...
  })))
  .handler(async ({ input }) =>
    Sentry.startSpan({ name: "getPositions" }, async () => {
      const positions = await fetchPositions(input.modelId);
      return positions; // Zod validates output schema
    })
  );
Handler responsibilities:
  1. Input validation: Zod parses and validates input
  2. Business logic: Calls feature module (e.g., fetchPositions)
  3. Output validation: Zod validates return value matches schema
  4. Error handling: Zod validation errors auto-convert to 400 responses
  5. Observability: Sentry span tracks execution time

7. Feature Module (Business Logic)

// src/server/features/trading/queries.server.ts
import { db } from "@/db";
import { orders, models } from "@/db/schema";
import { eq, and } from "drizzle-orm";
import { calculateUnrealizedPnl } from "@/core/shared/trading/calculations";

export async function fetchPositions(modelId?: string) {
  const openOrders = await db
    .select({
      id: orders.id,
      modelId: orders.modelId,
      modelName: models.name,
      symbol: orders.symbol,
      side: orders.side,
      quantity: orders.quantity,
      entryPrice: orders.entryPrice,
      leverage: orders.leverage,
      exitPlan: orders.exitPlan,
      openedAt: orders.openedAt,
    })
    .from(orders)
    .leftJoin(models, eq(orders.modelId, models.id))
    .where(
      and(
        eq(orders.status, "OPEN"),
        modelId ? eq(orders.modelId, modelId) : undefined
      )
    )
    .orderBy(orders.openedAt);

  // Enrich with current prices and unrealized P&L
  const positions = await Promise.all(
    openOrders.map(async (order) => {
      const currentPrice = await getCurrentPrice(order.symbol);
      const unrealizedPnl = calculateUnrealizedPnl(
        parseFloat(order.entryPrice),
        currentPrice,
        parseFloat(order.quantity),
        order.side
      );

      return {
        ...order,
        currentPrice: currentPrice.toString(),
        unrealizedPnl: unrealizedPnl.toString(),
      };
    })
  );

  return positions;
}
Data flow:
  • Drizzle query with join (Orders + Models)
  • Filter for OPEN orders (positions, not trades)
  • Enrich with live price data (unrealized P&L)
  • Return typed array

8. Response Flow

The response travels back through the stack:
PostgreSQL result
Drizzle ORM (typed rows)
Feature module (enriched data)
oRPC handler (Zod output validation)
Hono API (JSON serialization)
Vite proxy (dev) or direct (prod)
oRPC client (deserialization)
TanStack Query (cache update)
Component (re-render with data)

TanStack Query Caching Strategy

TanStack Query caches responses to reduce server load:
// Example query configuration
const { data } = useQuery({
  ...orpc.trading.getPositions.queryOptions({ input: {} }),
  staleTime: 15_000,      // Consider fresh for 15s
  gcTime: 5 * 60_000,     // Keep in cache for 5min after unmount
  refetchOnWindowFocus: true,
});
Cache behavior:
TimeAction
0sInitial fetch (cache miss)
5sComponent reads from cache (no request)
16sData is stale, refetch in background
5minComponent unmounts, data eligible for garbage collection
Stale time recommendations:
  • High-frequency updates (positions, trades): 15s
  • Moderate updates (portfolio history): 30s
  • Rarely changing (models, variants): 5min

Server-Sent Events (SSE) Streaming

Why SSE?

SSE provides unidirectional real-time updates from server to client:
FeatureSSEWebSocketsPolling
DirectionServer → ClientBidirectionalClient → Server → Client
ProtocolHTTP (text/event-stream)WebSocket (ws://)HTTP (repeated requests)
ReconnectionAutomaticManualManual (interval)
Browser SupportNative EventSourceNative WebSocketNative fetch
Use CaseReal-time push (trades, positions)Chat, multiplayerSimple polling
Autonome’s choice: SSE is simpler than WebSockets for one-way updates and automatically reconnects on network issues.

SSE Architecture

Backend: Event Broadcasting

EventEmitter Pattern

// src/server/features/trading/events/positionEvents.ts
import { EventEmitter } from "node:events";

export type PositionEventData = {
  modelId: string;
  modelName: string;
  positions: unknown[];
  totalUnrealizedPnl: number;
  availableCash: number;
};

export type PositionEvent = {
  type: "positions:updated";
  timestamp: string;
  data: PositionEventData[];
};

const emitter = new EventEmitter();
emitter.setMaxListeners(50); // Support many concurrent SSE connections

const EVENT_KEY = "position-update";
let currentPositionsCache: PositionEventData[] = [];
let lastPositionUpdateAt: number | null = null;

export const emitPositionEvent = (event: PositionEvent) => {
  currentPositionsCache = event.data;
  lastPositionUpdateAt = Date.now();
  emitter.emit(EVENT_KEY, event);
};

export const subscribeToPositionEvents = (
  listener: (event: PositionEvent) => void
) => {
  emitter.on(EVENT_KEY, listener);
  return () => emitter.off(EVENT_KEY, listener); // Cleanup function
};

export const getCurrentPositions = () => currentPositionsCache;
Key design choices:
  • In-memory cache: Stores latest event for immediate SSE hydration
  • Max listeners: Increased from default (10) to support many SSE clients
  • Cleanup function: Returned by subscribe for proper unsubscription

SSE Endpoint Handler

// api/src/index.ts
import { streamSSE } from "hono/streaming";
import {
  emitPositionEvent,
  getCurrentPositions,
  subscribeToPositionEvents,
} from "@/server/features/trading/events/positionEvents";
import { fetchPositions } from "@/server/features/trading/queries.server";

app.get("/api/events/positions", async (c) => {
  return streamSSE(c, async (stream) => {
    // 1. Hydrate cache with latest data
    const positions = await fetchPositions();
    emitPositionEvent({
      type: "positions:updated",
      timestamp: new Date().toISOString(),
      data: positions,
    });

    // 2. Send initial data to new subscriber
    await stream.writeSSE({ 
      data: JSON.stringify(getCurrentPositions()) 
    });

    // 3. Subscribe to future updates
    const unsubscribe = subscribeToPositionEvents((event) => {
      stream.writeSSE({ data: JSON.stringify(event.data) });
    });

    // 4. Heartbeat to keep connection alive
    const heartbeat = setInterval(() => {
      stream.writeSSE({ event: "ping", data: "" });
    }, 15_000);

    // 5. Cleanup on disconnect
    stream.onAbort(() => {
      clearInterval(heartbeat);
      unsubscribe();
    });

    // 6. Keep stream alive indefinitely
    await new Promise(() => {});
  });
});
Flow:
  1. Cache hydration: Fetch latest positions from DB
  2. Initial data: Send current cache to new subscriber
  3. Subscription: Register listener for future events
  4. Heartbeat: Send ping every 15s to prevent proxy timeout
  5. Cleanup: Remove listener when client disconnects
  6. Keep-alive: Never-resolving promise keeps stream open

Event Emission from Schedulers

// src/server/features/trading/tradeExecutor.ts
import { emitPositionEvent } from "./events/positionEvents";

let tradeSchedulerHandle: NodeJS.Timeout | null = null;

export function ensureTradeScheduler() {
  if (tradeSchedulerHandle) return;

  tradeSchedulerHandle = setInterval(async () => {
    try {
      // Execute AI trading decisions
      await runTradingWorkflow();

      // Fetch updated positions
      const positions = await fetchPositions();

      // Broadcast to all SSE subscribers
      emitPositionEvent({
        type: "positions:updated",
        timestamp: new Date().toISOString(),
        data: positions,
      });
    } catch (error) {
      console.error("[TradeScheduler] Error:", error);
    }
  }, 60_000); // Every 60s
}
Trigger points:
  • Schedulers: Price tracker (10s), trade executor (60s)
  • Simulator: Auto-close triggers (stop-loss, take-profit)
  • Manual actions: User places/closes orders

Frontend: Event Consumption

EventSource Subscription

// src/hooks/usePositionsStream.ts
import { useEffect } from "react";
import { useQueryClient } from "@tanstack/react-query";
import { getSseUrl } from "@/core/shared/api/apiConfig";

export function usePositionsStream() {
  const queryClient = useQueryClient();

  useEffect(() => {
    const eventSource = new EventSource(getSseUrl("/api/events/positions"));

    eventSource.onmessage = (event) => {
      const positions = JSON.parse(event.data);
      
      // Update TanStack Query cache
      queryClient.setQueryData(
        ["positions"], // Query key
        positions
      );
    };

    eventSource.onerror = (error) => {
      console.error("[SSE] Connection error:", error);
      // EventSource auto-reconnects
    };

    return () => {
      eventSource.close();
    };
  }, [queryClient]);
}
Usage in component:
function PositionsList() {
  usePositionsStream(); // Subscribe to SSE updates

  const { data: positions } = useQuery(
    orpc.trading.getPositions.queryOptions({ input: {} })
  );

  return <Table data={positions} />;
}
How it works:
  1. Component mounts → usePositionsStream() subscribes to SSE
  2. Server emits position update → SSE pushes to client
  3. onmessage handler → updates TanStack Query cache
  4. Cache update → React re-renders component with new data
  5. Component unmounts → EventSource.close() called

SSE Endpoints Overview

EndpointDataUpdate FrequencyUse Case
/api/events/positionsOpen positions10s (price updates)Position table, P&L
/api/events/tradesCompleted tradesOn trade closeTrade history
/api/events/conversationsAI chat historyOn agent invocationChat interface
/api/events/portfolioPortfolio snapshots10sPerformance chart
/api/events/workflowWorkflow statusOn workflow eventDebug panel

State Management Layers

Autonome uses a multi-layer state architecture:

1. Server State (TanStack Query)

For data fetched from the backend:
const { data: positions } = useQuery(
  orpc.trading.getPositions.queryOptions({ input: {} })
);
Managed by: TanStack Query cache
Persistence: In-memory (cleared on refresh)
Synchronization: oRPC + SSE

2. URL State (TanStack Router)

For navigation and shareable state:
const { variantId } = Route.useSearch(); // /dashboard?variantId=apex
const navigate = useNavigate();

navigate({ search: { variantId: "trendsurfer" } });
Managed by: TanStack Router
Persistence: URL (shareable, bookmarkable)
Use cases: Filters, tabs, pagination

3. Local State (TanStack Store)

For ephemeral UI state:
import { Store, useStore } from "@tanstack/react-store";

const variantStore = new Store({
  selectedVariant: "all" as VariantIdWithAll,
});

export function useVariant() {
  const selectedVariant = useStore(variantStore, (state) => state.selectedVariant);
  const setSelectedVariant = (variant: VariantIdWithAll) => {
    variantStore.setState((state) => ({ ...state, selectedVariant: variant }));
  };
  return { selectedVariant, setSelectedVariant };
}
Managed by: TanStack Store
Persistence: In-memory (cleared on refresh)
Use cases: Sidebar expanded state, theme preference

4. Component State (React)

For isolated component logic:
const [isOpen, setIsOpen] = useState(false);
const [searchQuery, setSearchQuery] = useState("");
Managed by: React useState/useReducer
Persistence: Component lifecycle only
Use cases: Form inputs, modals, tooltips

Optimistic Updates

For instant UI feedback on mutations:
import { useMutation } from "@tanstack/react-query";
import { orpc } from "@/server/orpc/client";

function ClosePositionButton({ positionId }: { positionId: string }) {
  const queryClient = useQueryClient();

  const mutation = useMutation({
    mutationFn: () => orpc.trading.closePosition({ positionId }),
    onMutate: async () => {
      // Cancel outgoing refetches
      await queryClient.cancelQueries({ queryKey: ["positions"] });

      // Snapshot current value
      const previousPositions = queryClient.getQueryData(["positions"]);

      // Optimistically update cache
      queryClient.setQueryData(["positions"], (old) =>
        old.filter((p) => p.id !== positionId)
      );

      return { previousPositions };
    },
    onError: (err, variables, context) => {
      // Rollback on error
      queryClient.setQueryData(["positions"], context.previousPositions);
    },
    onSettled: () => {
      // Refetch to ensure sync
      queryClient.invalidateQueries({ queryKey: ["positions"] });
    },
  });

  return <button onClick={() => mutation.mutate()}>Close</button>;
}
Flow:
  1. User clicks “Close” → UI updates instantly (optimistic)
  2. Mutation sent to server → oRPC handler executes
  3. If success → SSE confirms update (cache already correct)
  4. If error → Rollback optimistic update (restore previous state)
  5. onSettled → Refetch to guarantee server truth

Data Consistency Guarantees

Single Source of Truth: Orders Table

The "Orders" table in PostgreSQL is the canonical source for positions:
  • OPEN orders = active positions
  • CLOSED orders = completed trades
All derived data (UI, simulator state, analytics) must reconcile with this table.

Simulator State Restoration

On server restart, the simulator rehydrates from the database:
// src/server/features/simulator/exchangeSimulator.ts
private async restorePositionsFromDb() {
  const openOrders = await getAllOpenOrders();

  for (const order of openOrders) {
    const account = this.getOrCreateAccount(order.modelId);
    
    // Restore position in memory
    account.applyExecution(
      order.symbol,
      order.side === "LONG" ? "buy" : "sell",
      {
        fills: [{ quantity: parseFloat(order.quantity), price: parseFloat(order.entryPrice) }],
        averagePrice: parseFloat(order.entryPrice),
        totalQuantity: parseFloat(order.quantity),
        totalFees: 0,
        status: "filled",
      },
      parseFloat(order.leverage ?? "1")
    );

    // Restore exit plan
    if (order.exitPlan) {
      account.setExitPlan(order.symbol, order.exitPlan);
    }
  }
}
Guarantees:
  • Simulator state matches DB on startup
  • Auto-close triggers work after restart
  • No position data loss during deployments

Cache Invalidation Strategy

Explicit invalidation after mutations:
await queryClient.invalidateQueries({ queryKey: ["positions"] });
SSE-driven invalidation:
eventSource.onmessage = (event) => {
  const positions = JSON.parse(event.data);
  queryClient.setQueryData(["positions"], positions);
};
Time-based staleness: TanStack Query refetches stale data automatically.

Performance Optimizations

1. Query Deduplication

TanStack Query deduplicates concurrent requests:
// Both components render simultaneously
<PositionsList />  // Triggers fetch
<PositionsSummary />  // Reuses in-flight request

2. Selective Cache Updates

SSE updates only changed data:
eventSource.onmessage = (event) => {
  const updatedPosition = JSON.parse(event.data);
  
  queryClient.setQueryData(["positions"], (old) =>
    old.map((p) => 
      p.id === updatedPosition.id ? updatedPosition : p
    )
  );
};

3. Background Refetching

TanStack Query refetches stale data without blocking UI:
const { data, isRefetching } = useQuery({
  ...orpc.trading.getPositions.queryOptions({ input: {} }),
  staleTime: 15_000,
});

// First render: data from cache (instant)
// Background: refetch if stale (non-blocking)

4. Partial Hydration

SSE endpoints hydrate cache before sending initial data:
// Ensures initial SSE message is never empty
const positions = await fetchPositions();
emitPositionEvent({ type: "positions:updated", data: positions });
await stream.writeSSE({ data: JSON.stringify(getCurrentPositions()) });

Error Handling

oRPC Errors

const { data, error } = useQuery(
  orpc.trading.getPositions.queryOptions({ input: {} })
);

if (error) {
  // Zod validation error (400)
  if (error.code === "VALIDATION_ERROR") {
    return <Alert>Invalid input: {error.message}</Alert>;
  }

  // Server error (500)
  return <Alert>Failed to load positions</Alert>;
}

SSE Reconnection

EventSource automatically reconnects on disconnect:
eventSource.onerror = (error) => {
  console.error("[SSE] Connection lost, reconnecting...");
  // Browser auto-reconnects with exponential backoff
};

Mutation Rollback

Optimistic updates rollback on error (see Optimistic Updates section).

Summary

Autonome’s data flow architecture balances:
  • Type safety: oRPC procedures with Zod validation
  • Performance: TanStack Query caching and background refetching
  • Real-time: SSE for instant UI updates
  • Consistency: Single source of truth (Orders table)
  • Resilience: Automatic reconnection and error recovery
This hybrid approach provides the best of both worlds: type-safe request/response for data fetching and low-latency SSE for real-time updates.

Next Steps

Build docs developers (and LLMs) love