TrailBase provides realtime subscriptions via Server-Sent Events (SSE) and WebSocket connections, allowing your application to receive instant updates when data changes.
Overview
Realtime features:
Server-Sent Events (SSE) - HTTP-based streaming (default)
WebSocket - Bidirectional connection for lower latency
Per-record subscriptions - Listen to changes on specific rows
Bulk subscriptions - Listen to all changes in a table
Filtered subscriptions - Only receive relevant updates
Event types - Insert, Update, and Delete events
Event Types
TrailBase emits three types of events:
type Event =
| { Insert : Record } // New record created
| { Update : Record } // Existing record updated
| { Delete : Record } // Record deleted
| { Error : string }; // Subscription error
Subscribing to Changes
Subscribe to a Single Record
TypeScript
React Hook
Dart/Flutter
import { initClient , type Event } from "trailbase" ;
const client = initClient ( "http://localhost:4000" );
const todosApi = client . records < Todo >( "todos" );
// Subscribe to changes on todo with ID 1
const stream = await todosApi . subscribe ( 1 );
const reader = stream . getReader ();
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
if ( "Update" in value ) {
console . log ( "Todo updated:" , value . Update );
} else if ( "Delete" in value ) {
console . log ( "Todo deleted:" , value . Delete );
}
}
import { useEffect , useState } from "react" ;
import { initClient } from "trailbase" ;
import type { Todo } from "./types/todo" ;
const client = initClient ( "http://localhost:4000" );
function useTodo ( id : number ) {
const [ todo , setTodo ] = useState < Todo | null >( null );
useEffect (() => {
let reader : ReadableStreamDefaultReader | null = null ;
async function subscribe () {
const api = client . records < Todo >( "todos" );
// Initial load
const initial = await api . read ( id );
setTodo ( initial );
// Subscribe to changes
const stream = await api . subscribe ( id );
reader = stream . getReader ();
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
if ( "Update" in value ) {
setTodo ( value . Update as Todo );
} else if ( "Delete" in value ) {
setTodo ( null );
}
}
}
subscribe (). catch ( console . error );
return () => {
reader ?. cancel ();
};
}, [ id ]);
return todo ;
}
// Usage
function TodoDetail ({ id } : { id : number }) {
const todo = useTodo ( id );
if ( ! todo ) return < div > Loading... </ div > ;
return (
< div >
< h2 > { todo . text } </ h2 >
< p > Status: { todo . completed ? "Done" : "Pending" } </ p >
</ div >
);
}
import 'package:trailbase/trailbase.dart' ;
final client = Client ( "http://localhost:4000" );
final todosApi = client. records < Map < String , dynamic >>( "todos" );
// Subscribe to changes on todo with ID 1
final stream = await todosApi. subscribe ( 1 );
await for ( final event in stream) {
if (event is InsertEvent ) {
print ( "Todo created: ${ event . record } " );
} else if (event is UpdateEvent ) {
print ( "Todo updated: ${ event . record } " );
} else if (event is DeleteEvent ) {
print ( "Todo deleted: ${ event . record } " );
}
}
Subscribe to All Records
Receive updates for all rows in a table:
import { initClient , type Event } from "trailbase" ;
const client = initClient ( "http://localhost:4000" );
const todosApi = client . records < Todo >( "todos" );
// Subscribe to all todos
const stream = await todosApi . subscribeAll ();
const reader = stream . getReader ();
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
if ( "Insert" in value ) {
console . log ( "New todo:" , value . Insert );
} else if ( "Update" in value ) {
console . log ( "Todo updated:" , value . Update );
} else if ( "Delete" in value ) {
console . log ( "Todo deleted:" , value . Delete );
}
}
Filtered Subscriptions
Subscribe only to records matching specific criteria:
const todosApi = client . records < Todo >( "todos" );
// Only get updates for high-priority todos
const stream = await todosApi . subscribeAll ({
filters: [
{ column: "priority" , op: "greaterThan" , value: 5 },
],
});
const reader = stream . getReader ();
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
console . log ( "High-priority todo changed:" , value );
}
Filters use the same syntax as Record API queries. See the Record API reference for available operators.
Real-World Example: Live Todo List
Here’s a complete example from the collaborative clicker demo:
import { createSignal , onMount } from "solid-js" ;
import { initClient } from "trailbase" ;
type Todo = {
id : number ;
text : string ;
completed : boolean ;
};
const client = initClient ( "http://localhost:4000" );
function App () {
const [ todos , setTodos ] = createSignal < Todo []>([]);
onMount ( async () => {
const api = client . records < Todo >( "todos" );
// Initial load
const response = await api . list ();
setTodos ( response . records );
// Subscribe to live updates
const listen = async () => {
const reader = ( await api . subscribeAll ()). getReader ();
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
if ( "Insert" in value ) {
// Add new todo to list
setTodos ( prev => [ ... prev , value . Insert as Todo ]);
} else if ( "Update" in value ) {
// Update existing todo
setTodos ( prev => prev . map ( t =>
t . id === ( value . Update as Todo ). id ? value . Update as Todo : t
));
} else if ( "Delete" in value ) {
// Remove deleted todo
setTodos ( prev => prev . filter ( t =>
t . id !== ( value . Delete as Todo ). id
));
}
}
};
// Reconnect loop
while ( true ) {
await listen (). catch ( console . error );
await new Promise ( r => setTimeout ( r , 5000 ));
}
});
return (
< div >
< h1 > Live Todos </ h1 >
< ul >
{ todos (). map ( todo => (
< li key = { todo . id } >
{ todo . text } - { todo . completed ? "Done" : "Pending" }
</ li >
)) }
</ ul >
</ div >
);
}
Reconnection Strategy: Implement a reconnection loop with exponential backoff for production apps. The example above uses a simple 5-second delay.
WebSocket Subscriptions
For lower latency, use WebSocket instead of SSE:
import { initClient } from "trailbase" ;
const client = initClient ( "http://localhost:4000" );
const todosApi = client . records < Todo >( "todos" );
// Use WebSocket instead of SSE
const stream = await todosApi . subscribeWs ( 1 );
const reader = stream . getReader ();
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
console . log ( "WebSocket event:" , value );
}
When to use WebSocket:
Mobile apps (better battery life)
High-frequency updates (lower overhead)
Bidirectional communication needed
When to use SSE:
Browser-only apps (simpler)
Firewalls that block WebSocket
Server-to-client updates only
WebSocket Authentication
WebSocket connections require authentication after connecting:
// WebSocket protocol
type WsProtocol =
| { Init : { auth_token : string | null } }
| { Subscribe : { id : string } };
// The client automatically sends Init message with token
const socket = new WebSocket ( "ws://localhost:4000/api/records/todos/subscribe/1?ws=true" );
socket . addEventListener ( "open" , () => {
// Authenticate
socket . send ( JSON . stringify ({
Init: {
auth_token: client . tokens ()?. auth_token ?? null ,
},
}));
});
socket . addEventListener ( "message" , ( event ) => {
const data = JSON . parse ( event . data );
console . log ( "Received:" , data );
});
The TrailBase client handles WebSocket authentication automatically when using subscribeWs().
Integration with TanStack DB
For automatic state synchronization, use @tanstack/trailbase-db-collection :
import { createCollection } from "@tanstack/react-db" ;
import { trailBaseCollectionOptions } from "@tanstack/trailbase-db-collection" ;
import { initClient } from "trailbase" ;
const client = initClient ( "http://localhost:4000" );
type Todo = {
id : number ;
text : string ;
completed : boolean ;
};
// Create a live collection
const todoCollection = createCollection (
trailBaseCollectionOptions < Todo >({
recordApi: client . records < Todo >( "todos" ),
getKey : ( item ) => item . id ,
parse: {},
serialize: {},
}),
);
// Subscribe to live data
import { useLiveQuery } from "@tanstack/react-db" ;
function TodoList () {
const { data : todos } = useLiveQuery (( q ) =>
q . from ({ todo: todoCollection })
);
return (
< ul >
{ todos . map ( todo => (
< li key = {todo. id } > {todo. text } </ li >
))}
</ ul >
);
}
See the TanStack Todo Example for a complete implementation.
Debouncing Updates
For high-frequency changes, debounce UI updates:
import { debounce } from "lodash-es" ;
const debouncedUpdate = debounce (( todos : Todo []) => {
setTodos ( todos );
}, 100 );
// Use debounced setter in subscription handler
if ( "Update" in value ) {
const updatedTodos = todos . map ( t =>
t . id === value . Update . id ? value . Update : t
);
debouncedUpdate ( updatedTodos );
}
For large datasets, combine pagination with subscriptions:
const todosApi = client . records < Todo >( "todos" );
// Load first page
const response = await todosApi . list ({ limit: 20 });
const [ todos , setTodos ] = useState ( response . records );
// Subscribe only to currently visible todos
const visibleIds = todos . map ( t => t . id );
const stream = await todosApi . subscribeAll ({
filters: [
{ column: "id" , op: "in" , value: visibleIds },
],
});
Selective Subscriptions
Subscribe only to what the user is viewing:
function TodoDetail ({ id } : { id : number }) {
const [ todo , setTodo ] = useState < Todo | null >( null );
useEffect (() => {
let reader : ReadableStreamDefaultReader | null = null ;
async function subscribe () {
const api = client . records < Todo >( "todos" );
// Load initial data
const data = await api . read ( id );
setTodo ( data );
// Subscribe to this specific todo
const stream = await api . subscribe ( id );
reader = stream . getReader ();
// ... handle updates
}
subscribe ();
return () => {
// Cleanup: cancel subscription when component unmounts
reader ?. cancel ();
};
}, [ id ]); // Re-subscribe when ID changes
return todo ? < div >{todo. text } </ div > : null ;
}
Error Handling
Connection Errors
async function subscribeWithRetry () {
const api = client . records < Todo >( "todos" );
let retries = 0 ;
const maxRetries = 5 ;
while ( retries < maxRetries ) {
try {
const stream = await api . subscribeAll ();
const reader = stream . getReader ();
while ( true ) {
const { done , value } = await reader . read ();
if ( done ) break ;
if ( "Error" in value ) {
console . error ( "Subscription error:" , value . Error );
break ;
}
// Handle events...
}
} catch ( error ) {
console . error ( "Connection failed:" , error );
retries ++ ;
// Exponential backoff
const delay = Math . min ( 1000 * Math . pow ( 2 , retries ), 30000 );
await new Promise ( r => setTimeout ( r , delay ));
}
}
console . error ( "Max retries exceeded" );
}
Network State
Respond to network changes:
let reader : ReadableStreamDefaultReader | null = null ;
window . addEventListener ( "online" , () => {
console . log ( "Network reconnected, resubscribing..." );
subscribe ();
});
window . addEventListener ( "offline" , () => {
console . log ( "Network lost, canceling subscription..." );
reader ?. cancel ();
});
Access Control
Subscriptions respect your Record API access rules:
traildepot/config.textproto
record_apis: [
{
name: "todos"
table_name: "todos"
acl_authenticated: [READ]
# Users only receive updates for their own todos
read_access_rule: "_ROW_.user = _USER_.id"
}
]
Users only receive subscription events for records they have permission to read. Access rules are evaluated on every event.
Testing Subscriptions
Manual Testing with cURL
# Subscribe via SSE
curl -N http://localhost:4000/api/records/todos/subscribe/1
# Subscribe with authentication
curl -N -H "Authorization: Bearer YOUR_AUTH_TOKEN" \
http://localhost:4000/api/records/todos/subscribe/1
# Subscribe to all records
curl -N http://localhost:4000/api/records/todos/subscribe/ *
Automated Testing
import { describe , it , expect } from "vitest" ;
import { initClient } from "trailbase" ;
describe ( "Todo Subscriptions" , () => {
it ( "receives insert events" , async () => {
const client = initClient ( "http://localhost:4000" );
const api = client . records < Todo >( "todos" );
// Start subscription
const stream = await api . subscribeAll ();
const reader = stream . getReader ();
// Insert a record
const newTodo = await api . create ({
text: "Test todo" ,
completed: false ,
});
// Wait for event
const { value } = await reader . read ();
expect ( value ). toHaveProperty ( "Insert" );
expect (( value as any ). Insert . id ). toBe ( newTodo . id );
reader . cancel ();
});
});
Common Patterns
Optimistic Updates
Update UI immediately, then reconcile with server:
function TodoList () {
const [ todos , setTodos ] = useState < Todo []>([]);
async function toggleTodo ( todo : Todo ) {
// Optimistic update
setTodos ( prev => prev . map ( t =>
t . id === todo . id ? { ... t , completed: ! t . completed } : t
));
try {
// Send to server
await todosApi . update ( todo . id , {
completed: ! todo . completed ,
});
// Subscription will confirm the change
} catch ( error ) {
// Revert on error
setTodos ( prev => prev . map ( t =>
t . id === todo . id ? todo : t
));
}
}
// ... subscription setup
}
Multi-Tab Sync
Subscriptions automatically sync across browser tabs:
// Tab 1: User adds a todo
await todosApi . create ({ text: "New todo" , completed: false });
// Tab 2: Automatically receives Insert event
// Both tabs stay in sync
Presence Detection
Track active users (requires custom implementation):
// Create a presence table
// migrations/main/U1234567890__create_presence.sql
/*
CREATE TABLE presence (
user_id BLOB PRIMARY KEY,
last_seen INTEGER DEFAULT (UNIXEPOCH()),
status TEXT DEFAULT 'online'
) STRICT;
*/
// Update presence periodically
setInterval ( async () => {
await client . records ( "presence" ). update ( currentUser . id , {
last_seen: Math . floor ( Date . now () / 1000 ),
});
}, 30000 ); // Every 30 seconds
// Subscribe to presence changes
const stream = await client . records ( "presence" ). subscribeAll ();
// ... show online users
Next Steps
First App Build a realtime todo app
Authentication Secure your subscriptions
Database Setup Design efficient schemas
WASM Runtime Process events with WebAssembly
Examples