Durable jobs run on Cloudflare Durable Objects. This page covers the end-to-end setup for your Worker project.
Creating the engine
Use createDurableJobs to register your job definitions. It returns the Jobs Durable Object class and the JobsClient factory.
import { createDurableJobs } from "@durable-effect/jobs";
import { tokenRefresher, webhookBatcher, orderProcessor } from "./jobs";
const { Jobs, JobsClient } = createDurableJobs({
tokenRefresher,
webhookBatcher,
orderProcessor,
});
// Export the Durable Object class — required by Cloudflare
export { Jobs };
You must export the Jobs class from your Worker’s entry point. Cloudflare uses this export to locate the Durable Object class when routing requests.
Using the client in your fetch handler
export default {
async fetch(request: Request, env: Env) {
const client = JobsClient.fromBinding(env.JOBS);
await Effect.runPromise(
client.continuous("tokenRefresher").start({
id: "user-123",
input: { accessToken: "", refreshToken: "rt_abc", expiresAt: 0 },
})
);
return new Response("OK");
},
};
wrangler.jsonc configuration
Add the Durable Object binding and migration to your wrangler.jsonc:
{
"$schema": "node_modules/wrangler/config-schema.json",
"name": "my-worker",
"main": "src/worker.ts",
"compatibility_date": "2024-11-27",
"compatibility_flags": ["nodejs_compat"],
"durable_objects": {
"bindings": [
{
"name": "JOBS",
"class_name": "Jobs"
}
]
},
"migrations": [
{
"tag": "v1",
"new_classes": ["Jobs"]
}
]
}
The name in durable_objects.bindings must match the property you access on env (e.g. env.JOBS). The class_name must match the exported class name from your Worker entry point.
Telemetry
Send job lifecycle events to an external endpoint for observability:
const { Jobs, JobsClient } = createDurableJobs(
{ tokenRefresher, webhookBatcher },
{
tracker: {
endpoint: "https://events.example.com/ingest",
env: "production",
serviceKey: "my-jobs-service",
},
}
);
Emitted events
| Event | When |
|---|
job.started | Job instance created |
job.executed | execute completed successfully |
job.failed | execute threw an error |
job.retryExhausted | All retry attempts failed |
job.terminated | Job instance terminated |
debounce.started | First event added to a debounce instance |
debounce.flushed | Batch processed |
task.scheduled | Execution scheduled via ctx.schedule() |
Logging
Control log verbosity globally or per job. Configure per job using the logging option on each job definition:
import { LogLevel } from "effect";
const myJob = Continuous.make({
// ...
logging: true, // LogLevel.Debug (all logs)
logging: false, // LogLevel.Error (failures only) — DEFAULT
logging: LogLevel.Warning,
logging: LogLevel.None, // Silent
});
Error types
All errors are typed Effect errors exported from @durable-effect/jobs:
import {
JobNotFoundError, // Job name not found in registry
InstanceNotFoundError, // Instance has no metadata
InvalidStateError, // Invalid state transition
ValidationError, // Schema validation failed
ExecutionError, // User function threw an error
DuplicateEventError, // Idempotency check failed
StorageError, // Durable Object storage error
SchedulerError, // Alarm scheduling error
} from "@durable-effect/jobs";
Use these in your Effect error channels to handle specific failure modes:
yield* client.continuous("tokenRefresher").start({ id: "user-123", input: initialState })
.pipe(
Effect.catchTag("JobNotFoundError", (e) =>
Effect.logError("Job not registered", e)
),
Effect.catchTag("ValidationError", (e) =>
Effect.logError("Invalid initial state", e)
),
);
Full example
Define your jobs
// src/jobs.ts
import { Effect, Schema } from "effect";
import { Continuous } from "@durable-effect/jobs";
export const tokenRefresher = Continuous.make({
stateSchema: Schema.Struct({
accessToken: Schema.String,
refreshToken: Schema.String,
expiresAt: Schema.Number,
}),
schedule: Continuous.every("30 minutes"),
execute: (ctx) =>
Effect.gen(function* () {
const state = yield* ctx.state;
const newToken = yield* refreshAccessToken(state.refreshToken);
yield* ctx.setState({ ...state, accessToken: newToken, expiresAt: Date.now() + 1800000 });
}),
});
Create the engine in your Worker entry point
// src/worker.ts
import { Effect } from "effect";
import { createDurableJobs } from "@durable-effect/jobs";
import { tokenRefresher } from "./jobs";
const { Jobs, JobsClient } = createDurableJobs({ tokenRefresher });
export { Jobs };
export default {
async fetch(request: Request, env: Env) {
const client = JobsClient.fromBinding(env.JOBS);
await Effect.runPromise(
client.continuous("tokenRefresher").start({
id: "user-123",
input: { accessToken: "", refreshToken: "rt_abc", expiresAt: 0 },
})
);
return new Response("OK");
},
};
Configure wrangler.jsonc
{
"$schema": "node_modules/wrangler/config-schema.json",
"name": "my-worker",
"main": "src/worker.ts",
"compatibility_date": "2024-11-27",
"compatibility_flags": ["nodejs_compat"],
"durable_objects": {
"bindings": [{ "name": "JOBS", "class_name": "Jobs" }]
},
"migrations": [{ "tag": "v1", "new_classes": ["Jobs"] }]
}