Skip to main content

Overview

IHP DataSync provides real-time data synchronization between your PostgreSQL database and frontend clients via WebSockets. It automatically streams database changes to subscribed clients and enforces row-level security policies. Key Features:
  • Real-time subscriptions to database queries
  • Automatic change notifications (INSERT, UPDATE, DELETE)
  • Built-in row-level security (RLS) enforcement
  • Transaction support with rollback/commit
  • TypeScript type generation from your database schema

Installation

Add ihp-datasync to your project’s default.nix:
haskellDeps = p: with p; [
    ihp-datasync
];

Core Types

DataSyncMessage

Messages sent from the client to the server:
data DataSyncMessage
    = DataSyncQuery 
        { query :: !DynamicSQLQuery
        , requestId :: !Int
        , transactionId :: !(Maybe UUID)
        }
    | CreateDataSubscription 
        { query :: !DynamicSQLQuery
        , requestId :: !Int
        }
    | CreateCountSubscription 
        { query :: !DynamicSQLQuery
        , requestId :: !Int
        }
    | DeleteDataSubscription 
        { subscriptionId :: !UUID
        , requestId :: !Int
        }
    | CreateRecordMessage 
        { table :: !Text
        , record :: !(HashMap Text Value)
        , requestId :: !Int
        , transactionId :: !(Maybe UUID)
        }
    | UpdateRecordMessage 
        { table :: !Text
        , id :: !UUID
        , patch :: !(HashMap Text Value)
        , requestId :: !Int
        , transactionId :: !(Maybe UUID)
        }
    | DeleteRecordMessage 
        { table :: !Text
        , id :: !UUID
        , requestId :: !Int
        , transactionId :: !(Maybe UUID)
        }
    | StartTransaction { requestId :: !Int }
    | RollbackTransaction { requestId :: !Int, id :: !UUID }
    | CommitTransaction { requestId :: !Int, id :: !UUID }

DataSyncResponse

Responses sent from the server to the client:
data DataSyncResponse
    = DataSyncResult 
        { result :: ![[Field]]
        , requestId :: !Int
        }
    | DidCreateDataSubscription 
        { requestId :: !Int
        , subscriptionId :: !UUID
        , result :: ![[Field]]
        }
    | DidInsert 
        { subscriptionId :: !UUID
        , record :: ![Field]
        }
    | DidUpdate 
        { subscriptionId :: !UUID
        , id :: UUID
        , changeSet :: !(Maybe Value)
        , appendSet :: !(Maybe Value)
        }
    | DidDelete 
        { subscriptionId :: !UUID
        , id :: !UUID
        }
    | DidChangeCount 
        { subscriptionId :: !UUID
        , count :: !Int
        }
    | DataSyncError 
        { requestId :: !Int
        , errorMessage :: !Text
        }

DynamicSQLQuery

Represents a database query with filtering, sorting, and pagination:
data DynamicSQLQuery = DynamicSQLQuery
    { table :: !Text
    , selectedColumns :: SelectedColumns
    , whereCondition :: !(Maybe ConditionExpression)
    , orderByClause :: ![OrderByClause]
    , distinctOnColumn :: !(Maybe ByteString)
    , limit :: !(Maybe Int)
    , offset :: !(Maybe Int)
    }

data ConditionExpression
    = ColumnExpression { field :: !Text }
    | InfixOperatorExpression
        { left :: !ConditionExpression
        , op :: !ConditionOperator
        , right :: !ConditionExpression
        }
    | LiteralExpression { value :: !Value }
    | ListExpression { values :: ![Value] }

Server-Side API

runDataSyncController

Main entry point for setting up the DataSync WebSocket controller:
runDataSyncController ::
    ( HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
    , ?context :: ControllerContext
    , ?modelContext :: ModelContext
    , ?request :: Request
    , ?state :: IORef DataSyncController
    , Typeable CurrentUserRecord
    , HasNewSessionUrl CurrentUserRecord
    , Show (PrimaryKey (GetTableName CurrentUserRecord))
    ) => Hasql.Pool.Pool 
      -> EnsureRLSEnabledFn 
      -> InstallTableChangeTriggerFn 
      -> IO ByteString 
      -> SendJSONFn 
      -> HandleCustomMessageFn 
      -> (Text -> Renamer) 
      -> IO ()
hasqlPool
Hasql.Pool.Pool
PostgreSQL connection pool for executing queries
ensureRLSEnabled
Text -> IO TableWithRLS
Function to verify row-level security is enabled on a table
installTableChangeTriggers
TableWithRLS -> IO ()
Function to install change notification triggers on a table
receiveData
IO ByteString
Function to receive incoming WebSocket messages
sendJSON
DataSyncResponse -> IO ()
Function to send responses back to the client
handleCustomMessage
(DataSyncResponse -> IO ()) -> DataSyncMessage -> IO ()
Handler for custom message types
renamer
Text -> Renamer
Function to convert between database column names and field names

Row-Level Security

ensureRLSEnabled

Verifies that row-level security is enabled on a table:
ensureRLSEnabled :: Hasql.Pool.Pool -> Text -> IO TableWithRLS
pool
Hasql.Pool.Pool
Database connection pool
table
Text
Name of the table to check
TableWithRLS
TableWithRLS
Proof that RLS is enabled on the table
Example:
tableWithRLS <- ensureRLSEnabled pool "projects"
-- Throws an error if RLS is not enabled

sqlQueryWithRLS

Execute a query with row-level security policies applied:
sqlQueryWithRLS ::
    ( ?context :: ControllerContext
    , Show (PrimaryKey (GetTableName CurrentUserRecord))
    , HasNewSessionUrl CurrentUserRecord
    , Typeable CurrentUserRecord
    , HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
    ) => Hasql.Pool.Pool -> Statement.Statement () [result] -> IO [result]
Automatically sets the RLS session variables based on the current user. Example:
result <- sqlQueryWithRLS hasqlPool statement
-- Query executes with RLS policies for current user

makeCachedEnsureRLSEnabled

Create a memoized version of ensureRLSEnabled:
makeCachedEnsureRLSEnabled :: Hasql.Pool.Pool -> IO (Text -> IO TableWithRLS)
Example:
ensureRLSEnabled <- makeCachedEnsureRLSEnabled hasqlPool

ensureRLSEnabled "projects" -- Runs database query
ensureRLSEnabled "projects" -- Returns cached result

TypeScript Client API

Query Building

import { query } from 'ihp-datasync';

// Fetch all tasks
const tasks = await query('tasks').fetch();

// Filter and order
const activeTasks = await query('tasks')
    .where('status', 'active')
    .orderBy('createdAt')
    .limit(10)
    .fetch();

Real-Time Subscriptions

// Subscribe to query results
const unsubscribe = query('tasks')
    .where('userId', currentUserId)
    .subscribe(tasks => {
        console.log('Tasks updated:', tasks);
    });

// Later: clean up subscription
unsubscribe();

CRUD Operations

import { createRecord, updateRecord, deleteRecord } from 'ihp-datasync';

// Create
const task = await createRecord('tasks', {
    title: 'New Task',
    userId: currentUserId
});

// Update
const updated = await updateRecord('tasks', task.id, {
    isCompleted: true
});

// Delete
await deleteRecord('tasks', task.id);

Transactions

import { withTransaction } from 'ihp-datasync';

const result = await withTransaction(async (transaction) => {
    const project = await transaction.createRecord('projects', {
        name: 'New Project'
    });
    
    const task = await transaction.createRecord('tasks', {
        projectId: project.id,
        title: 'First Task'
    });
    
    return { project, task };
});
// Auto-commits on success, auto-rollbacks on error

React Hooks

useQuery

Real-time query hook for React:
import { useQuery } from 'ihp-datasync/react';
import { query } from 'ihp-datasync';

function TasksList() {
    const tasks = useQuery(query('tasks').orderBy('createdAt'));
    
    if (tasks === null) return <div>Loading...</div>;
    
    return (
        <div>
            {tasks.map(task => <div key={task.id}>{task.title}</div>)}
        </div>
    );
}

useQuerySingleResult

Fetch a single record:
function TaskDetail({ taskId }) {
    const task = useQuerySingleResult(
        query('tasks').where('id', taskId)
    );
    
    if (!task) return <div>Loading...</div>;
    
    return <div>{task.title}</div>;
}

useCurrentUser

Access the authenticated user:
function Profile() {
    const user = useCurrentUser();
    
    if (!user) return <div>Not logged in</div>;
    
    return <div>Hello, {user.email}</div>;
}

Type Generation

Generate TypeScript types from your database schema:
generate-datasync-types Application/Schema.sql Frontend/types/ihp-datasync/index.d.ts
This creates type-safe interfaces for all your database tables:
interface Task {
    id: UUID;
    title: string;
    isCompleted: boolean;
    userId: UUID;
    createdAt: string;
}

interface NewTask {
    title: string;
    isCompleted?: boolean; // Optional if it has a default
    userId: UUID;
}

Configuration Limits

maxSubscriptionsPerConnection

Limit concurrent subscriptions per WebSocket connection:
instance FrontController WebApplication where
    controllers = 
        [ startPage WelcomeAction
        ]

instance InitControllerContext WebApplication where
    initContext = do
        setField @"layout" defaultLayout
        -- Limit to 50 subscriptions per connection
        putContext (DataSyncMaxSubscriptionsPerConnection 50)

maxTransactionsPerConnection

Limit concurrent transactions per connection:
putContext (DataSyncMaxTransactionsPerConnection 10)

Best Practices

  1. Always enable RLS: DataSync requires row-level security on all tables
    ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
    
    CREATE POLICY user_access ON projects
        USING (user_id = ihp_user_id());
    
  2. Use transactions for related writes: Group related operations
    await withTransaction(async (tx) => {
        const project = await tx.createRecord('projects', {...});
        await tx.createRecord('tasks', { projectId: project.id, ... });
    });
    
  3. Limit subscription counts: Set reasonable limits to prevent resource exhaustion
  4. Generate types regularly: Re-run generate-datasync-types after schema changes

See Also

Build docs developers (and LLMs) love