Skip to main content

Overview

Database synchronization in LiveSync ensures that changes in your backend database are reliably propagated to all connected clients in realtime. This page covers synchronization patterns, the Models SDK, and best practices for maintaining consistent state across distributed systems.

Synchronization Patterns

Outbox Pattern (Postgres)

The outbox pattern provides transactional guarantees when publishing database changes:
BEGIN;

-- Update your application data
UPDATE users 
SET email = '[email protected]' 
WHERE id = 123;

-- Write change event to outbox
INSERT INTO outbox (
  mutation_id,
  channel,
  name,
  data
) VALUES (
  'mutation-uuid-123',
  'users:123',
  'user.updated',
  jsonb_build_object(
    'id', 123,
    'email', '[email protected]'
  )
);

COMMIT;
Benefits:
  • Transactional consistency between database writes and message publishing
  • Exactly-once delivery guarantees
  • Automatic retry on failures
  • Message ordering preserved per channel

Change Streams (MongoDB)

MongoDB uses Change Streams to watch for document modifications:
// MongoDB automatically detects changes
db.users.updateOne(
  { _id: ObjectId('123') },
  { $set: { email: '[email protected]' } }
);

// Change stream pipeline routes this to Ably
// No additional code needed!
Benefits:
  • Native MongoDB integration
  • Real-time change detection
  • Full document access with pre/post images
  • Flexible filtering via aggregation pipelines

Frontend Data Models

The Models SDK provides a high-level abstraction for managing synchronized state in frontend applications.

Installing the Models SDK

npm install ably @ably-labs/models

Creating a Model

A model represents a synchronized data structure in your application:
import ModelsClient from '@ably-labs/models';
import { Realtime } from 'ably';

// Initialize Ably
const ably = new Realtime({ key: 'YOUR_API_KEY' });

// Create Models client
const modelsClient = new ModelsClient({ ably });

// Define a model
const postModel = modelsClient.models.get({
  channelName: 'posts:123',
  sync: async () => {
    // Fetch initial state from backend
    const response = await fetch('/api/posts/123');
    const { sequenceId, data } = await response.json();
    return { sequenceId, data };
  },
  merge: (state, event) => {
    // Merge incoming changes into state
    switch (event.name) {
      case 'comment.added':
        return {
          ...state,
          comments: [...state.comments, event.data]
        };
      case 'comment.deleted':
        return {
          ...state,
          comments: state.comments.filter(c => c.id !== event.data.id)
        };
      default:
        return state;
    }
  }
});

The Sync Function

The sync function fetches the current state from your backend:
async function sync() {
  // Query your database within a transaction
  const response = await fetch('/api/posts/123');
  const result = await response.json();
  
  return {
    // Current version of the data
    sequenceId: result.sequenceId,
    
    // The actual data
    data: result.post
  };
}
Backend implementation:
app.get('/api/posts/:id', async (req, res) => {
  const { id } = req.params;
  
  // Use a transaction for consistency
  const result = await db.transaction(async (tx) => {
    // Fetch the post data
    const post = await tx.query(
      'SELECT * FROM posts WHERE id = $1',
      [id]
    );
    
    // Get the latest sequence ID
    const { sequenceId } = await tx.query(
      'SELECT COALESCE(MAX(sequence_id), 0) as sequenceId FROM outbox'
    );
    
    return {
      sequenceId: sequenceId.toString(),
      data: post.rows[0]
    };
  });
  
  res.json(result);
});

The Merge Function

The merge function combines incoming changes with existing state:
function merge(state, event) {
  // Must be pure and deterministic
  // No side effects or external dependencies
  
  const newState = { ...state };
  
  switch (event.name) {
    case 'task.created':
      newState.tasks.push(event.data);
      break;
      
    case 'task.updated':
      const index = newState.tasks.findIndex(
        t => t.id === event.data.id
      );
      if (index !== -1) {
        newState.tasks[index] = {
          ...newState.tasks[index],
          ...event.data
        };
      }
      break;
      
    case 'task.deleted':
      newState.tasks = newState.tasks.filter(
        t => t.id !== event.data.id
      );
      break;
  }
  
  return newState;
}
Important: The merge function must be:
  • Pure: Same input always produces same output
  • Deterministic: No randomness or external state
  • Side-effect free: No API calls, mutations, or logging

Subscribing to Changes

Subscribe to model updates to react to state changes:
// Subscribe to model changes
postModel.subscribe((err, state) => {
  if (err) {
    console.error('Model error:', err);
    return;
  }
  
  // Update UI with new state
  updateUI(state);
});

// Subscribe to confirmed changes only
postModel.subscribe(
  (err, state) => {
    updateUI(state);
  },
  { optimistic: false }
);

Optimistic Updates

Optimistic updates provide instant feedback by applying changes locally before server confirmation.

Implementing Optimistic Updates

async function addComment(postId, commentText) {
  const mutationId = crypto.randomUUID();
  
  // Apply optimistically
  const [confirmation, cancel] = await postModel.optimistic({
    mutationId,
    name: 'comment.added',
    data: {
      id: mutationId,
      text: commentText,
      createdAt: new Date().toISOString(),
      status: 'pending'
    }
  });
  
  try {
    // Send to backend
    const response = await fetch(`/api/posts/${postId}/comments`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        mutationId,
        text: commentText
      })
    });
    
    if (!response.ok) {
      throw new Error('Failed to add comment');
    }
    
    // Wait for confirmation
    await confirmation;
    
  } catch (error) {
    // Cancel optimistic update on error
    cancel();
    throw error;
  }
}

Backend Confirmation

app.post('/api/posts/:id/comments', async (req, res) => {
  const { id } = req.params;
  const { mutationId, text } = req.body;
  
  try {
    await db.transaction(async (tx) => {
      // Create the comment
      const comment = await tx.query(
        'INSERT INTO comments (post_id, text) VALUES ($1, $2) RETURNING *',
        [id, text]
      );
      
      // Write to outbox with mutation_id
      await tx.query(`
        INSERT INTO outbox (mutation_id, channel, name, data)
        VALUES ($1, $2, $3, $4)
      `, [
        mutationId,
        `posts:${id}`,
        'comment.added',
        JSON.stringify(comment.rows[0])
      ]);
    });
    
    res.json({ success: true });
    
  } catch (error) {
    // Write rejection to outbox
    await db.query(`
      INSERT INTO outbox (mutation_id, channel, name, rejected)
      VALUES ($1, $2, $3, true)
    `, [mutationId, `posts:${id}`, 'comment.added']);
    
    res.status(500).json({ error: error.message });
  }
});

Synchronization Strategies

Full State Synchronization

Send complete object state with each update:
{
  "name": "task.updated",
  "data": {
    "id": 123,
    "title": "Complete project",
    "status": "in_progress",
    "assignee": "alice",
    "dueDate": "2026-03-15"
  }
}
Pros:
  • Simple to implement
  • No merge logic needed
  • Always consistent state
Cons:
  • Higher bandwidth usage
  • May exceed message size limits
  • Less efficient for large objects

Delta Synchronization

Send only the changed fields:
{
  "name": "task.updated",
  "data": {
    "id": 123,
    "changes": {
      "status": "in_progress"
    }
  }
}
Pros:
  • Efficient bandwidth usage
  • Works with large objects
  • Clear change intent
Cons:
  • Requires merge logic
  • More complex implementation

JSON Patch Synchronization

Use standardized JSON Patch format:
{
  "name": "task.updated",
  "data": [
    {
      "op": "replace",
      "path": "/status",
      "value": "in_progress"
    }
  ]
}
Pros:
  • Standardized format
  • Automatic merge with libraries
  • Precise change tracking
Cons:
  • More complex to generate
  • Requires JSON Patch library

Implementation Example

import { applyPatch } from 'fast-json-patch';

function merge(state, event) {
  if (event.name.endsWith('.patched')) {
    // Apply JSON Patch
    const result = applyPatch(state, event.data);
    return result.newDocument;
  }
  
  // Handle other event types
  // ...
}

Handling History and Replay

Message History

Retrieve historical changes when a client reconnects:
const channel = ably.channels.get('posts:123');

// Get last 50 messages
const history = await channel.history({ limit: 50 });

for (const message of history.items) {
  // Replay changes
  applyChange(message.data);
}

// Subscribe to live updates
await channel.subscribe((message) => {
  applyChange(message.data);
});

Automatic History Replay

The Models SDK automatically handles history replay:
const model = modelsClient.models.get({
  channelName: 'posts:123',
  sync,
  merge
});

// Model automatically:
// 1. Calls sync() to get initial state
// 2. Rewinds through history to sequenceId
// 3. Replays missed events
// 4. Subscribes to live updates

model.subscribe((err, state) => {
  // State is always up-to-date
});

Performance Optimization

Channel Design

Design channels for optimal performance:
// Good: Fine-grained channels
'users:123'           // User-specific updates
'workspace:abc:tasks' // Workspace tasks
'post:456:comments'   // Post comments

// Avoid: Overly broad channels
'all-users'           // Too many subscribers
'global-updates'      // Too much traffic

Message Batching

Batch multiple changes into single messages:
await db.transaction(async (tx) => {
  // Make multiple changes
  await tx.query('UPDATE tasks SET status = $1', ['done']);
  await tx.query('UPDATE tasks SET completed_at = NOW()');
  
  // Single outbox entry
  await tx.query(`
    INSERT INTO outbox (mutation_id, channel, name, data)
    VALUES ($1, $2, $3, $4)
  `, [
    mutationId,
    'tasks:updates',
    'tasks.batch-updated',
    JSON.stringify({ taskIds: [1, 2, 3], status: 'done' })
  ]);
});

Event Buffering

Buffer events to reduce merge operations:
const modelsClient = new ModelsClient({
  ably,
  eventBufferOptions: {
    bufferMs: 100, // Buffer events for 100ms
    eventOrderer: (a, b) => {
      // Custom ordering logic
      return a.timestamp - b.timestamp;
    }
  }
});

Error Handling

Connection Errors

ably.connection.on('failed', (stateChange) => {
  console.error('Connection failed:', stateChange.reason);
  // Show offline UI
  showOfflineIndicator();
});

ably.connection.on('connected', () => {
  // Hide offline UI
  hideOfflineIndicator();
});

Sync Errors

const model = modelsClient.models.get({
  channelName: 'posts:123',
  sync: async () => {
    try {
      return await fetchInitialState();
    } catch (error) {
      // Log error and retry
      console.error('Sync failed:', error);
      throw error; // Models SDK will retry
    }
  },
  merge
});

// Listen for model errors
model.on('errored', (error) => {
  console.error('Model error:', error);
  showErrorNotification('Failed to sync data');
});

Optimistic Update Failures

try {
  const [confirmation, cancel] = await model.optimistic(event);
  
  await fetch('/api/update', {
    method: 'POST',
    body: JSON.stringify(event.data)
  });
  
  // Wait with timeout
  await Promise.race([
    confirmation,
    new Promise((_, reject) => 
      setTimeout(() => reject(new Error('Timeout')), 5000)
    )
  ]);
  
} catch (error) {
  // Automatically rolls back
  console.error('Update failed:', error);
  showErrorNotification('Failed to save changes');
}

Best Practices

1. Design Clear Channel Hierarchies

// Resource-based
'posts:123'
'posts:123:comments'
'posts:123:reactions'

// User-scoped
'users:alice:notifications'
'users:alice:messages'

// Workspace-scoped
'workspace:acme:tasks'
'workspace:acme:users'

2. Use Consistent Event Names

// Good naming convention
'resource.action'
'post.created'
'post.updated'
'post.deleted'
'comment.added'
'comment.edited'

// Avoid ambiguity
// Bad: 'update', 'change', 'modified'
// Good: 'updated', 'status-changed', 'content-modified'

3. Implement Idempotent Operations

function merge(state, event) {
  // Use IDs to ensure idempotency
  switch (event.name) {
    case 'item.added':
      // Check if already exists
      if (!state.items.find(i => i.id === event.data.id)) {
        state.items.push(event.data);
      }
      break;
  }
  return state;
}

4. Handle Concurrent Updates

function merge(state, event) {
  // Use timestamps for conflict resolution
  const existingItem = state.items.find(i => i.id === event.data.id);
  
  if (existingItem) {
    // Keep newer version
    if (event.data.updatedAt > existingItem.updatedAt) {
      Object.assign(existingItem, event.data);
    }
  } else {
    state.items.push(event.data);
  }
  
  return state;
}

5. Monitor and Log

const model = modelsClient.models.get({
  channelName: 'posts:123',
  sync,
  merge: (state, event) => {
    // Log events for debugging
    console.log('Merging event:', event.name, event.data);
    return actualMerge(state, event);
  }
});

// Monitor model lifecycle
model.on('syncing', () => console.log('Model syncing...'));
model.on('ready', () => console.log('Model ready'));
model.on('paused', () => console.log('Model paused'));
model.on('disposed', () => console.log('Model disposed'));

Next Steps

Conflict Resolution

Learn strategies for resolving conflicts in distributed systems

Postgres Models

Explore the Models SDK for Postgres in depth

Quickstart

Build a complete LiveSync application from scratch

Build docs developers (and LLMs) love