Overview
IHP DataSync provides real-time data synchronization between your PostgreSQL database and frontend clients via WebSockets. It automatically streams database changes to subscribed clients and enforces row-level security policies.
Key Features:
- Real-time subscriptions to database queries
- Automatic change notifications (INSERT, UPDATE, DELETE)
- Built-in row-level security (RLS) enforcement
- Transaction support with rollback/commit
- TypeScript type generation from your database schema
Installation
Add ihp-datasync to your project’s default.nix:
haskellDeps = p: with p; [
ihp-datasync
];
Core Types
DataSyncMessage
Messages sent from the client to the server:
data DataSyncMessage
= DataSyncQuery
{ query :: !DynamicSQLQuery
, requestId :: !Int
, transactionId :: !(Maybe UUID)
}
| CreateDataSubscription
{ query :: !DynamicSQLQuery
, requestId :: !Int
}
| CreateCountSubscription
{ query :: !DynamicSQLQuery
, requestId :: !Int
}
| DeleteDataSubscription
{ subscriptionId :: !UUID
, requestId :: !Int
}
| CreateRecordMessage
{ table :: !Text
, record :: !(HashMap Text Value)
, requestId :: !Int
, transactionId :: !(Maybe UUID)
}
| UpdateRecordMessage
{ table :: !Text
, id :: !UUID
, patch :: !(HashMap Text Value)
, requestId :: !Int
, transactionId :: !(Maybe UUID)
}
| DeleteRecordMessage
{ table :: !Text
, id :: !UUID
, requestId :: !Int
, transactionId :: !(Maybe UUID)
}
| StartTransaction { requestId :: !Int }
| RollbackTransaction { requestId :: !Int, id :: !UUID }
| CommitTransaction { requestId :: !Int, id :: !UUID }
DataSyncResponse
Responses sent from the server to the client:
data DataSyncResponse
= DataSyncResult
{ result :: ![[Field]]
, requestId :: !Int
}
| DidCreateDataSubscription
{ requestId :: !Int
, subscriptionId :: !UUID
, result :: ![[Field]]
}
| DidInsert
{ subscriptionId :: !UUID
, record :: ![Field]
}
| DidUpdate
{ subscriptionId :: !UUID
, id :: UUID
, changeSet :: !(Maybe Value)
, appendSet :: !(Maybe Value)
}
| DidDelete
{ subscriptionId :: !UUID
, id :: !UUID
}
| DidChangeCount
{ subscriptionId :: !UUID
, count :: !Int
}
| DataSyncError
{ requestId :: !Int
, errorMessage :: !Text
}
DynamicSQLQuery
Represents a database query with filtering, sorting, and pagination:
data DynamicSQLQuery = DynamicSQLQuery
{ table :: !Text
, selectedColumns :: SelectedColumns
, whereCondition :: !(Maybe ConditionExpression)
, orderByClause :: ![OrderByClause]
, distinctOnColumn :: !(Maybe ByteString)
, limit :: !(Maybe Int)
, offset :: !(Maybe Int)
}
data ConditionExpression
= ColumnExpression { field :: !Text }
| InfixOperatorExpression
{ left :: !ConditionExpression
, op :: !ConditionOperator
, right :: !ConditionExpression
}
| LiteralExpression { value :: !Value }
| ListExpression { values :: ![Value] }
Server-Side API
runDataSyncController
Main entry point for setting up the DataSync WebSocket controller:
runDataSyncController ::
( HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
, ?context :: ControllerContext
, ?modelContext :: ModelContext
, ?request :: Request
, ?state :: IORef DataSyncController
, Typeable CurrentUserRecord
, HasNewSessionUrl CurrentUserRecord
, Show (PrimaryKey (GetTableName CurrentUserRecord))
) => Hasql.Pool.Pool
-> EnsureRLSEnabledFn
-> InstallTableChangeTriggerFn
-> IO ByteString
-> SendJSONFn
-> HandleCustomMessageFn
-> (Text -> Renamer)
-> IO ()
PostgreSQL connection pool for executing queries
Function to verify row-level security is enabled on a table
installTableChangeTriggers
Function to install change notification triggers on a table
Function to receive incoming WebSocket messages
sendJSON
DataSyncResponse -> IO ()
Function to send responses back to the client
handleCustomMessage
(DataSyncResponse -> IO ()) -> DataSyncMessage -> IO ()
Handler for custom message types
Function to convert between database column names and field names
Row-Level Security
ensureRLSEnabled
Verifies that row-level security is enabled on a table:
ensureRLSEnabled :: Hasql.Pool.Pool -> Text -> IO TableWithRLS
Name of the table to check
Proof that RLS is enabled on the table
Example:
tableWithRLS <- ensureRLSEnabled pool "projects"
-- Throws an error if RLS is not enabled
sqlQueryWithRLS
Execute a query with row-level security policies applied:
sqlQueryWithRLS ::
( ?context :: ControllerContext
, Show (PrimaryKey (GetTableName CurrentUserRecord))
, HasNewSessionUrl CurrentUserRecord
, Typeable CurrentUserRecord
, HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
) => Hasql.Pool.Pool -> Statement.Statement () [result] -> IO [result]
Automatically sets the RLS session variables based on the current user.
Example:
result <- sqlQueryWithRLS hasqlPool statement
-- Query executes with RLS policies for current user
makeCachedEnsureRLSEnabled
Create a memoized version of ensureRLSEnabled:
makeCachedEnsureRLSEnabled :: Hasql.Pool.Pool -> IO (Text -> IO TableWithRLS)
Example:
ensureRLSEnabled <- makeCachedEnsureRLSEnabled hasqlPool
ensureRLSEnabled "projects" -- Runs database query
ensureRLSEnabled "projects" -- Returns cached result
TypeScript Client API
Query Building
import { query } from 'ihp-datasync';
// Fetch all tasks
const tasks = await query('tasks').fetch();
// Filter and order
const activeTasks = await query('tasks')
.where('status', 'active')
.orderBy('createdAt')
.limit(10)
.fetch();
Real-Time Subscriptions
// Subscribe to query results
const unsubscribe = query('tasks')
.where('userId', currentUserId)
.subscribe(tasks => {
console.log('Tasks updated:', tasks);
});
// Later: clean up subscription
unsubscribe();
CRUD Operations
import { createRecord, updateRecord, deleteRecord } from 'ihp-datasync';
// Create
const task = await createRecord('tasks', {
title: 'New Task',
userId: currentUserId
});
// Update
const updated = await updateRecord('tasks', task.id, {
isCompleted: true
});
// Delete
await deleteRecord('tasks', task.id);
Transactions
import { withTransaction } from 'ihp-datasync';
const result = await withTransaction(async (transaction) => {
const project = await transaction.createRecord('projects', {
name: 'New Project'
});
const task = await transaction.createRecord('tasks', {
projectId: project.id,
title: 'First Task'
});
return { project, task };
});
// Auto-commits on success, auto-rollbacks on error
React Hooks
useQuery
Real-time query hook for React:
import { useQuery } from 'ihp-datasync/react';
import { query } from 'ihp-datasync';
function TasksList() {
const tasks = useQuery(query('tasks').orderBy('createdAt'));
if (tasks === null) return <div>Loading...</div>;
return (
<div>
{tasks.map(task => <div key={task.id}>{task.title}</div>)}
</div>
);
}
useQuerySingleResult
Fetch a single record:
function TaskDetail({ taskId }) {
const task = useQuerySingleResult(
query('tasks').where('id', taskId)
);
if (!task) return <div>Loading...</div>;
return <div>{task.title}</div>;
}
useCurrentUser
Access the authenticated user:
function Profile() {
const user = useCurrentUser();
if (!user) return <div>Not logged in</div>;
return <div>Hello, {user.email}</div>;
}
Type Generation
Generate TypeScript types from your database schema:
generate-datasync-types Application/Schema.sql Frontend/types/ihp-datasync/index.d.ts
This creates type-safe interfaces for all your database tables:
interface Task {
id: UUID;
title: string;
isCompleted: boolean;
userId: UUID;
createdAt: string;
}
interface NewTask {
title: string;
isCompleted?: boolean; // Optional if it has a default
userId: UUID;
}
Configuration Limits
maxSubscriptionsPerConnection
Limit concurrent subscriptions per WebSocket connection:
instance FrontController WebApplication where
controllers =
[ startPage WelcomeAction
]
instance InitControllerContext WebApplication where
initContext = do
setField @"layout" defaultLayout
-- Limit to 50 subscriptions per connection
putContext (DataSyncMaxSubscriptionsPerConnection 50)
maxTransactionsPerConnection
Limit concurrent transactions per connection:
putContext (DataSyncMaxTransactionsPerConnection 10)
Best Practices
-
Always enable RLS: DataSync requires row-level security on all tables
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
CREATE POLICY user_access ON projects
USING (user_id = ihp_user_id());
-
Use transactions for related writes: Group related operations
await withTransaction(async (tx) => {
const project = await tx.createRecord('projects', {...});
await tx.createRecord('tasks', { projectId: project.id, ... });
});
-
Limit subscription counts: Set reasonable limits to prevent resource exhaustion
-
Generate types regularly: Re-run
generate-datasync-types after schema changes
See Also