createDurableJobs is the main factory function for @durable-effect/jobs. It takes your job definitions, returns the Durable Object class to export for Cloudflare, and produces a typed client for interacting with job instances.
Signature
function createDurableJobs < T extends Record < string , AnyUnregisteredDefinition >>(
definitions : T ,
options ?: CreateDurableJobsOptions
) : CreateDurableJobsResult < T >
Parameters
definitions
Record<string, JobDefinition>
required
An object whose keys become the job names and whose values are definitions created by Continuous.make, Debounce.make, or Task.make. You may mix all three job types in a single call. const { Jobs , JobsClient } = createDurableJobs ({
tokenRefresher , // Continuous — name becomes "tokenRefresher"
webhookBatcher , // Debounce — name becomes "webhookBatcher"
orderProcessor , // Task — name becomes "orderProcessor"
});
Optional configuration. Show CreateDurableJobsOptions fields
Send job lifecycle events to an external HTTP endpoint for observability. Show TrackerConfig fields
Full URL to the ingestion endpoint that receives batched job events.
Environment label attached to every event (e.g., "production", "staging").
Identifier for this jobs service, attached to every event.
Return value — CreateDurableJobsResult
The Durable Object class. Export this from your worker entry point so Cloudflare can bind it.
A factory with a fromBinding(binding) method. Call it inside your worker handler to get a typed client. fromBinding
(binding: DurableObjectNamespace) => JobsClient
Creates a client from the Durable Object namespace binding on env. const client = JobsClient . fromBinding ( env . JOBS );
An Effect service tag for using the client as an Effect service via Effect.provide.
The typed job registry with full generic type information. Useful for advanced integrations.
JobsClient accessor pattern
A client is obtained with JobsClient.fromBinding(env.JOBS) and then accessed by job type and name:
const client = JobsClient . fromBinding ( env . JOBS );
client . continuous ( "tokenRefresher" ) // → ContinuousClient
client . debounce ( "webhookBatcher" ) // → DebounceClient
client . task ( "orderProcessor" ) // → TaskClient
The name argument is type-checked against the registered definitions — TypeScript will error if you pass an unknown name or a name of the wrong job type.
All client methods return Effects. Run them with Effect.runPromise in a plain fetch handler or yield* them inside an Effect program.
Continuous client
Returned by client.continuous(name).
start
(options: { id: string; input: S }) => Effect<StartResult>
Creates a new continuous job instance and schedules its first run. If an instance with this id already exists, returns its current status without modifying it. true if a new instance was created.
Full Durable Object instance ID: continuous:{jobName}:{id}.
Current status of the instance.
trigger
(id: string) => Effect<void>
Executes the job immediately, bypassing the scheduled alarm. Useful for on-demand runs or testing.
status
(id: string) => Effect<StatusResult>
Returns the current status of a job instance. status
"running" | "stopped" | "not_found"
Current lifecycle status.
How many times execute has run (if the instance exists).
Unix timestamp (ms) of the next scheduled execution, if available.
getState
(id: string) => Effect<{ state: S | null }>
Returns the current persisted state of the instance. state is null if the instance does not exist or has no state.
terminate
(id: string, options?: { reason?: string }) => Effect<void>
Cancels the scheduled alarm and deletes all state. The same id can be reused to start a fresh instance afterward.
Continuous client example
const c = client . continuous ( "tokenRefresher" );
// Start
const { created , instanceId } = await Effect . runPromise (
c . start ({ id: "user-123" , input: { accessToken: "" , refreshToken: "rt_abc" , expiresAt: 0 } })
);
// Force immediate execution
await Effect . runPromise ( c . trigger ( "user-123" ));
// Check status
const { status , runCount , nextRunAt } = await Effect . runPromise ( c . status ( "user-123" ));
// Read state
const { state } = await Effect . runPromise ( c . getState ( "user-123" ));
// Terminate
await Effect . runPromise ( c . terminate ( "user-123" , { reason: "User deleted" }));
Debounce client
Returned by client.debounce(name).
add
(options: { id: string; event: I }) => Effect<AddResult>
Adds an event to the debounce window. Creates the instance if it does not exist. true if this was the first event (instance just created).
Total events accumulated in this window after this add.
Unix timestamp (ms) when the flush will fire, or null if already flushing.
flush
(id: string) => Effect<void>
Forces an immediate flush, triggering execute with flushReason: "manual".
clear
(id: string) => Effect<void>
Discards all accumulated events without calling execute.
status
(id: string) => Effect<StatusResult>
Returns the current debounce status. status
"debouncing" | "idle" | "not_found"
Current lifecycle status.
Events accumulated so far.
Scheduled flush timestamp (ms), if debouncing.
getState
(id: string) => Effect<{ state: S | null }>
Returns the current accumulated state.
Debounce client example
const d = client . debounce ( "webhookBatcher" );
// Add events
const result = await Effect . runPromise (
d . add ({ id: "contact-456" , event: { type: "contact.updated" , contactId: "456" , data: {} } })
);
// result.created, result.eventCount, result.willFlushAt
// Manual flush
await Effect . runPromise ( d . flush ( "contact-456" ));
// Discard without processing
await Effect . runPromise ( d . clear ( "contact-456" ));
// Check status
const { status , eventCount } = await Effect . runPromise ( d . status ( "contact-456" ));
// Read state
const { state } = await Effect . runPromise ( d . getState ( "contact-456" ));
Task client
Returned by client.task(name).
send
(options: { id: string; event: E }) => Effect<SendResult>
Sends an event to a task instance, creating it if it does not exist. The event is validated against eventSchema and passed to onEvent. true if this is the first event (instance just created).
Full Durable Object instance ID: task:{jobName}:{id}.
Unix timestamp (ms) of the next scheduled execution, or null if none is set.
trigger
(id: string) => Effect<void>
Triggers immediate execution of execute, bypassing any scheduled alarm.
status
(id: string) => Effect<StatusResult>
Returns the current task status. status
"active" | "idle" | "not_found"
"active" when an alarm is scheduled; "idle" when no alarm is set; "not_found" when the instance does not exist.
Next scheduled execution timestamp (ms).
getState
(id: string) => Effect<GetStateResult>
Returns the current persisted state. Show GetStateResult fields
Current state, or null if no state has been set.
Next scheduled execution timestamp (ms).
terminate
(id: string) => Effect<void>
Cancels any pending alarm and deletes all state. The same id can be reused afterward.
Task client example
const t = client . task ( "orderProcessor" );
// Send an event
const { created , instanceId , scheduledAt } = await Effect . runPromise (
t . send ({ id: "order-789" , event: { _tag: "OrderPlaced" , orderId: "order-789" } })
);
// Trigger immediate execution
await Effect . runPromise ( t . trigger ( "order-789" ));
// Check status
const { status , scheduledAt : nextRun } = await Effect . runPromise ( t . status ( "order-789" ));
// Read state and scheduled time
const { state , scheduledAt : at } = await Effect . runPromise ( t . getState ( "order-789" ));
// Terminate
await Effect . runPromise ( t . terminate ( "order-789" ));
Instance ID scheme
Every job instance is identified by a Durable Object ID derived from:
{jobType}:{jobName}:{userProvidedId}
Component Example jobTypecontinuous, debounce, taskjobNametokenRefresher (your object key)userProvidedIduser-123 (the id you pass to start, add, or send)
Full example: continuous:tokenRefresher:user-123
This means the same id value can be safely reused across different job names without collision.
Error types
All errors are typed Effect errors. Import them from @durable-effect/jobs:
import {
JobNotFoundError , // Job name not registered in definitions
InstanceNotFoundError , // Instance has no metadata (never started)
InvalidStateError , // Invalid state transition
ValidationError , // Schema validation failed on read or write
ExecutionError , // User's execute/onEvent function threw
DuplicateEventError , // Idempotency check rejected a duplicate
StorageError , // Durable Object storage operation failed
SchedulerError , // Alarm scheduling failed
} from "@durable-effect/jobs" ;
Telemetry events
When tracker is configured, the following events are sent to the endpoint:
Event When job.startedA job instance is created job.executedexecute completes successfullyjob.failedexecute throws an errorjob.retryExhaustedAll retry attempts are exhausted job.terminatedA job instance is terminated debounce.startedThe first event is added to a debounce debounce.flushedA debounce batch is processed task.scheduledA task execution is scheduled
Full setup example
import { Effect , Schema } from "effect" ;
import { createDurableJobs , Continuous , Debounce , Task } from "@durable-effect/jobs" ;
// Define jobs
const tokenRefresher = Continuous . make ({ /* ... */ });
const webhookBatcher = Debounce . make ({ /* ... */ });
const orderProcessor = Task . make ({ /* ... */ });
// Create engine and typed client
const { Jobs , JobsClient } = createDurableJobs (
{ tokenRefresher , webhookBatcher , orderProcessor },
{
tracker: {
endpoint: "https://events.example.com/ingest" ,
env: "production" ,
serviceKey: "my-jobs-service" ,
},
}
);
// Export for Cloudflare (required)
export { Jobs };
// Use the client in a worker
export default {
async fetch ( request : Request , env : Env ) {
const client = JobsClient . fromBinding ( env . JOBS );
await Effect . runPromise (
client . continuous ( "tokenRefresher" ). start ({
id: "user-123" ,
input: { accessToken: "" , refreshToken: "rt_abc" , expiresAt: 0 },
})
);
return new Response ( "OK" );
} ,
} ;
Configure wrangler.jsonc:
{
"durable_objects" : {
"bindings" : [{ "name" : "JOBS" , "class_name" : "Jobs" }]
},
"migrations" : [{ "tag" : "v1" , "new_classes" : [ "Jobs" ] }]
}