Skip to main content
TrailBase provides realtime subscriptions via Server-Sent Events (SSE) and WebSocket connections, allowing your application to receive instant updates when data changes.

Overview

Realtime features:
  • Server-Sent Events (SSE) - HTTP-based streaming (default)
  • WebSocket - Bidirectional connection for lower latency
  • Per-record subscriptions - Listen to changes on specific rows
  • Bulk subscriptions - Listen to all changes in a table
  • Filtered subscriptions - Only receive relevant updates
  • Event types - Insert, Update, and Delete events

Event Types

TrailBase emits three types of events:
type Event =
  | { Insert: Record }  // New record created
  | { Update: Record }  // Existing record updated  
  | { Delete: Record }  // Record deleted
  | { Error: string };  // Subscription error

Subscribing to Changes

Subscribe to a Single Record

import { initClient, type Event } from "trailbase";

const client = initClient("http://localhost:4000");
const todosApi = client.records<Todo>("todos");

// Subscribe to changes on todo with ID 1
const stream = await todosApi.subscribe(1);
const reader = stream.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  if ("Update" in value) {
    console.log("Todo updated:", value.Update);
  } else if ("Delete" in value) {
    console.log("Todo deleted:", value.Delete);
  }
}

Subscribe to All Records

Receive updates for all rows in a table:
import { initClient, type Event } from "trailbase";

const client = initClient("http://localhost:4000");
const todosApi = client.records<Todo>("todos");

// Subscribe to all todos
const stream = await todosApi.subscribeAll();
const reader = stream.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  if ("Insert" in value) {
    console.log("New todo:", value.Insert);
  } else if ("Update" in value) {
    console.log("Todo updated:", value.Update);
  } else if ("Delete" in value) {
    console.log("Todo deleted:", value.Delete);
  }
}

Filtered Subscriptions

Subscribe only to records matching specific criteria:
const todosApi = client.records<Todo>("todos");

// Only get updates for high-priority todos
const stream = await todosApi.subscribeAll({
  filters: [
    { column: "priority", op: "greaterThan", value: 5 },
  ],
});

const reader = stream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  console.log("High-priority todo changed:", value);
}
Filters use the same syntax as Record API queries. See the Record API reference for available operators.

Real-World Example: Live Todo List

Here’s a complete example from the collaborative clicker demo:
src/App.tsx
import { createSignal, onMount } from "solid-js";
import { initClient } from "trailbase";

type Todo = {
  id: number;
  text: string;
  completed: boolean;
};

const client = initClient("http://localhost:4000");

function App() {
  const [todos, setTodos] = createSignal<Todo[]>([]);

  onMount(async () => {
    const api = client.records<Todo>("todos");

    // Initial load
    const response = await api.list();
    setTodos(response.records);

    // Subscribe to live updates
    const listen = async () => {
      const reader = (await api.subscribeAll()).getReader();

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        if ("Insert" in value) {
          // Add new todo to list
          setTodos(prev => [...prev, value.Insert as Todo]);
        } else if ("Update" in value) {
          // Update existing todo
          setTodos(prev => prev.map(t => 
            t.id === (value.Update as Todo).id ? value.Update as Todo : t
          ));
        } else if ("Delete" in value) {
          // Remove deleted todo
          setTodos(prev => prev.filter(t => 
            t.id !== (value.Delete as Todo).id
          ));
        }
      }
    };

    // Reconnect loop
    while (true) {
      await listen().catch(console.error);
      await new Promise(r => setTimeout(r, 5000));
    }
  });

  return (
    <div>
      <h1>Live Todos</h1>
      <ul>
        {todos().map(todo => (
          <li key={todo.id}>
            {todo.text} - {todo.completed ? "Done" : "Pending"}
          </li>
        ))}
      </ul>
    </div>
  );
}
Reconnection Strategy: Implement a reconnection loop with exponential backoff for production apps. The example above uses a simple 5-second delay.

WebSocket Subscriptions

For lower latency, use WebSocket instead of SSE:
import { initClient } from "trailbase";

const client = initClient("http://localhost:4000");
const todosApi = client.records<Todo>("todos");

// Use WebSocket instead of SSE
const stream = await todosApi.subscribeWs(1);
const reader = stream.getReader();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  console.log("WebSocket event:", value);
}
When to use WebSocket:
  • Mobile apps (better battery life)
  • High-frequency updates (lower overhead)
  • Bidirectional communication needed
When to use SSE:
  • Browser-only apps (simpler)
  • Firewalls that block WebSocket
  • Server-to-client updates only

WebSocket Authentication

WebSocket connections require authentication after connecting:
// WebSocket protocol
type WsProtocol = 
  | { Init: { auth_token: string | null } }
  | { Subscribe: { id: string } };

// The client automatically sends Init message with token
const socket = new WebSocket("ws://localhost:4000/api/records/todos/subscribe/1?ws=true");

socket.addEventListener("open", () => {
  // Authenticate
  socket.send(JSON.stringify({
    Init: {
      auth_token: client.tokens()?.auth_token ?? null,
    },
  }));
});

socket.addEventListener("message", (event) => {
  const data = JSON.parse(event.data);
  console.log("Received:", data);
});
The TrailBase client handles WebSocket authentication automatically when using subscribeWs().

Integration with TanStack DB

For automatic state synchronization, use @tanstack/trailbase-db-collection:
import { createCollection } from "@tanstack/react-db";
import { trailBaseCollectionOptions } from "@tanstack/trailbase-db-collection";
import { initClient } from "trailbase";

const client = initClient("http://localhost:4000");

type Todo = {
  id: number;
  text: string;
  completed: boolean;
};

// Create a live collection
const todoCollection = createCollection(
  trailBaseCollectionOptions<Todo>({
    recordApi: client.records<Todo>("todos"),
    getKey: (item) => item.id,
    parse: {},
    serialize: {},
  }),
);

// Subscribe to live data
import { useLiveQuery } from "@tanstack/react-db";

function TodoList() {
  const { data: todos } = useLiveQuery((q) =>
    q.from({ todo: todoCollection })
  );

  return (
    <ul>
      {todos.map(todo => (
        <li key={todo.id}>{todo.text}</li>
      ))}
    </ul>
  );
}
See the TanStack Todo Example for a complete implementation.

Performance Considerations

Debouncing Updates

For high-frequency changes, debounce UI updates:
import { debounce } from "lodash-es";

const debouncedUpdate = debounce((todos: Todo[]) => {
  setTodos(todos);
}, 100);

// Use debounced setter in subscription handler
if ("Update" in value) {
  const updatedTodos = todos.map(t => 
    t.id === value.Update.id ? value.Update : t
  );
  debouncedUpdate(updatedTodos);
}

Pagination with Subscriptions

For large datasets, combine pagination with subscriptions:
const todosApi = client.records<Todo>("todos");

// Load first page
const response = await todosApi.list({ limit: 20 });
const [todos, setTodos] = useState(response.records);

// Subscribe only to currently visible todos
const visibleIds = todos.map(t => t.id);
const stream = await todosApi.subscribeAll({
  filters: [
    { column: "id", op: "in", value: visibleIds },
  ],
});

Selective Subscriptions

Subscribe only to what the user is viewing:
function TodoDetail({ id }: { id: number }) {
  const [todo, setTodo] = useState<Todo | null>(null);

  useEffect(() => {
    let reader: ReadableStreamDefaultReader | null = null;

    async function subscribe() {
      const api = client.records<Todo>("todos");
      
      // Load initial data
      const data = await api.read(id);
      setTodo(data);

      // Subscribe to this specific todo
      const stream = await api.subscribe(id);
      reader = stream.getReader();

      // ... handle updates
    }

    subscribe();

    return () => {
      // Cleanup: cancel subscription when component unmounts
      reader?.cancel();
    };
  }, [id]);  // Re-subscribe when ID changes

  return todo ? <div>{todo.text}</div> : null;
}

Error Handling

Connection Errors

async function subscribeWithRetry() {
  const api = client.records<Todo>("todos");
  let retries = 0;
  const maxRetries = 5;

  while (retries < maxRetries) {
    try {
      const stream = await api.subscribeAll();
      const reader = stream.getReader();

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        if ("Error" in value) {
          console.error("Subscription error:", value.Error);
          break;
        }

        // Handle events...
      }
    } catch (error) {
      console.error("Connection failed:", error);
      retries++;
      
      // Exponential backoff
      const delay = Math.min(1000 * Math.pow(2, retries), 30000);
      await new Promise(r => setTimeout(r, delay));
    }
  }

  console.error("Max retries exceeded");
}

Network State

Respond to network changes:
let reader: ReadableStreamDefaultReader | null = null;

window.addEventListener("online", () => {
  console.log("Network reconnected, resubscribing...");
  subscribe();
});

window.addEventListener("offline", () => {
  console.log("Network lost, canceling subscription...");
  reader?.cancel();
});

Access Control

Subscriptions respect your Record API access rules:
traildepot/config.textproto
record_apis: [
  {
    name: "todos"
    table_name: "todos"
    acl_authenticated: [READ]
    # Users only receive updates for their own todos
    read_access_rule: "_ROW_.user = _USER_.id"
  }
]
Users only receive subscription events for records they have permission to read. Access rules are evaluated on every event.

Testing Subscriptions

Manual Testing with cURL

# Subscribe via SSE
curl -N http://localhost:4000/api/records/todos/subscribe/1

# Subscribe with authentication
curl -N -H "Authorization: Bearer YOUR_AUTH_TOKEN" \
  http://localhost:4000/api/records/todos/subscribe/1

# Subscribe to all records
curl -N http://localhost:4000/api/records/todos/subscribe/*

Automated Testing

import { describe, it, expect } from "vitest";
import { initClient } from "trailbase";

describe("Todo Subscriptions", () => {
  it("receives insert events", async () => {
    const client = initClient("http://localhost:4000");
    const api = client.records<Todo>("todos");

    // Start subscription
    const stream = await api.subscribeAll();
    const reader = stream.getReader();

    // Insert a record
    const newTodo = await api.create({
      text: "Test todo",
      completed: false,
    });

    // Wait for event
    const { value } = await reader.read();
    
    expect(value).toHaveProperty("Insert");
    expect((value as any).Insert.id).toBe(newTodo.id);

    reader.cancel();
  });
});

Common Patterns

Optimistic Updates

Update UI immediately, then reconcile with server:
function TodoList() {
  const [todos, setTodos] = useState<Todo[]>([]);

  async function toggleTodo(todo: Todo) {
    // Optimistic update
    setTodos(prev => prev.map(t => 
      t.id === todo.id ? { ...t, completed: !t.completed } : t
    ));

    try {
      // Send to server
      await todosApi.update(todo.id, {
        completed: !todo.completed,
      });
      // Subscription will confirm the change
    } catch (error) {
      // Revert on error
      setTodos(prev => prev.map(t => 
        t.id === todo.id ? todo : t
      ));
    }
  }

  // ... subscription setup
}

Multi-Tab Sync

Subscriptions automatically sync across browser tabs:
// Tab 1: User adds a todo
await todosApi.create({ text: "New todo", completed: false });

// Tab 2: Automatically receives Insert event
// Both tabs stay in sync

Presence Detection

Track active users (requires custom implementation):
// Create a presence table
// migrations/main/U1234567890__create_presence.sql
/*
CREATE TABLE presence (
  user_id BLOB PRIMARY KEY,
  last_seen INTEGER DEFAULT (UNIXEPOCH()),
  status TEXT DEFAULT 'online'
) STRICT;
*/

// Update presence periodically
setInterval(async () => {
  await client.records("presence").update(currentUser.id, {
    last_seen: Math.floor(Date.now() / 1000),
  });
}, 30000); // Every 30 seconds

// Subscribe to presence changes
const stream = await client.records("presence").subscribeAll();
// ... show online users

Next Steps

First App

Build a realtime todo app

Authentication

Secure your subscriptions

Database Setup

Design efficient schemas

WASM Runtime

Process events with WebAssembly

Examples

Build docs developers (and LLMs) love