Skip to main content
Feature clients are accessed as properties on the Hatchet instance. Each client exposes both synchronous and async (aio_) variants of every method.

RunsClient — hatchet.runs

Interact with task and workflow runs.

Key methods

# Get a workflow run by ID
details: V1WorkflowRunDetails = hatchet.runs.get(workflow_run_id)
details = await hatchet.runs.aio_get(workflow_run_id)

# Get the status of a workflow run
status: V1TaskStatus = hatchet.runs.get_status(workflow_run_id)
status = await hatchet.runs.aio_get_status(workflow_run_id)

# Get a single task run
task: V1TaskSummary = hatchet.runs.get_task_run(task_run_id)
task = await hatchet.runs.aio_get_task_run(task_run_id)

# List runs (auto-paginated by day)
runs: list[V1TaskSummary] = hatchet.runs.list_with_pagination(
    since=datetime(2025, 1, 1, tzinfo=timezone.utc),
    statuses=[V1TaskStatus.FAILED],
    workflow_ids=["wf-uuid"],
)
runs = await hatchet.runs.aio_list_with_pagination(...)

# Cancel a single run
hatchet.runs.cancel(run_id)
await hatchet.runs.aio_cancel(run_id)

# Bulk cancel by IDs or filter
from hatchet_sdk import BulkCancelReplayOpts, RunFilter

hatchet.runs.bulk_cancel(BulkCancelReplayOpts(ids=["id1", "id2"]))
hatchet.runs.bulk_cancel(BulkCancelReplayOpts(
    filters=RunFilter(
        since=datetime(2025, 1, 1, tzinfo=timezone.utc),
        statuses=[V1TaskStatus.RUNNING],
    )
))
await hatchet.runs.aio_bulk_cancel(opts)

# Replay a single run
hatchet.runs.replay(run_id)
await hatchet.runs.aio_replay(run_id)

# Bulk replay by IDs or filter
hatchet.runs.bulk_replay(BulkCancelReplayOpts(ids=["id1"]))
await hatchet.runs.aio_bulk_replay(opts)

# Bulk cancel / replay large ranges with automatic chunking
hatchet.runs.bulk_cancel_by_filters_with_pagination(
    since=datetime(2025, 1, 1, tzinfo=timezone.utc),
    statuses=[V1TaskStatus.RUNNING],
    chunk_size=500,
    sleep_time=3,
)
hatchet.runs.bulk_replay_by_filters_with_pagination(
    since=datetime(2025, 1, 1, tzinfo=timezone.utc),
    statuses=[V1TaskStatus.FAILED],
)

# Get the output of a completed run
result: JSONSerializableMapping = hatchet.runs.get_result(run_id)
result = await hatchet.runs.aio_get_result(run_id)

# Get a WorkflowRunRef to stream events or await completion
ref: WorkflowRunRef = hatchet.runs.get_run_ref(workflow_run_id)

BulkCancelReplayOpts

from hatchet_sdk import BulkCancelReplayOpts, RunFilter

# By explicit IDs
opts = BulkCancelReplayOpts(ids=["id1", "id2"])

# By filter
opts = BulkCancelReplayOpts(
    filters=RunFilter(
        since=datetime(2025, 1, 1, tzinfo=timezone.utc),
        until=datetime(2025, 1, 2, tzinfo=timezone.utc),
        statuses=[V1TaskStatus.FAILED],
        workflow_ids=["wf-uuid"],
        additional_metadata={"env": "production"},
    )
)
ids and filters are mutually exclusive — exactly one must be provided.

CronClient — hatchet.cron

Create and manage cron workflow triggers.

Key methods

# Create a cron trigger
cron: CronWorkflows = hatchet.cron.create(
    workflow_name="my-workflow",
    cron_name="nightly-job",
    expression="0 2 * * *",
    input={"key": "value"},
    additional_metadata={},
    priority=1,
)
cron = await hatchet.cron.aio_create(
    workflow_name="my-workflow",
    cron_name="nightly-job",
    expression="0 2 * * *",
    input={},
    additional_metadata={},
)

# List cron triggers
result: CronWorkflowsList = hatchet.cron.list(
    workflow_name="my-workflow",
    cron_name="nightly-job",
    limit=50,
    offset=0,
)
result = await hatchet.cron.aio_list(workflow_name="my-workflow")

# Get a specific cron trigger
cron: CronWorkflows = hatchet.cron.get(cron_id)
cron = await hatchet.cron.aio_get(cron_id)

# Delete a cron trigger
hatchet.cron.delete(cron_id)
await hatchet.cron.aio_delete(cron_id)
Valid cron expression aliases: @yearly, @annually, @monthly, @weekly, @daily, @hourly.

ScheduledClient — hatchet.scheduled

Create and manage one-off scheduled workflow runs.

Key methods

from datetime import datetime, timezone

# Create a scheduled run
scheduled: ScheduledWorkflows = hatchet.scheduled.create(
    workflow_name="my-workflow",
    trigger_at=datetime(2025, 6, 1, 9, 0, tzinfo=timezone.utc),
    input={"key": "value"},
    additional_metadata={},
)
scheduled = await hatchet.scheduled.aio_create(
    workflow_name="my-workflow",
    trigger_at=datetime(2025, 6, 1, 9, 0, tzinfo=timezone.utc),
    input={},
    additional_metadata={},
)

# List scheduled runs
result: ScheduledWorkflowsList = hatchet.scheduled.list(
    workflow_id="wf-uuid",
    statuses=[ScheduledRunStatus.PENDING],
    limit=50,
)
result = await hatchet.scheduled.aio_list()

# Get a specific scheduled run
scheduled: ScheduledWorkflows = hatchet.scheduled.get(scheduled_id)
scheduled = await hatchet.scheduled.aio_get(scheduled_id)

# Reschedule a run
updated: ScheduledWorkflows = hatchet.scheduled.update(
    scheduled_id,
    trigger_at=datetime(2025, 7, 1, 9, 0, tzinfo=timezone.utc),
)
updated = await hatchet.scheduled.aio_update(scheduled_id, trigger_at=...)

# Delete a scheduled run
hatchet.scheduled.delete(scheduled_id)
await hatchet.scheduled.aio_delete(scheduled_id)

# Bulk delete
response: ScheduledWorkflowsBulkDeleteResponse = hatchet.scheduled.bulk_delete(
    scheduled_ids=["id1", "id2"],
)
# or by filter:
hatchet.scheduled.bulk_delete(workflow_id="wf-uuid")
await hatchet.scheduled.aio_bulk_delete(scheduled_ids=["id1"])

# Bulk reschedule
from hatchet_sdk.clients.rest.models.scheduled_workflows_bulk_update_item import (
    ScheduledWorkflowsBulkUpdateItem,
)
response: ScheduledWorkflowsBulkUpdateResponse = hatchet.scheduled.bulk_update(
    [("sched-id-1", datetime(2025, 8, 1, tzinfo=timezone.utc))]
)
hatchet.scheduled.bulk_update([ScheduledWorkflowsBulkUpdateItem(id="sched-id-1", triggerAt=...)])
await hatchet.scheduled.aio_bulk_update(updates)

WebhooksClient — hatchet.webhooks

Create and manage incoming webhook endpoints that trigger Hatchet workflows.

Key methods

from hatchet_sdk.clients.rest.models.v1_webhook_source_name import V1WebhookSourceName
from hatchet_sdk.clients.rest.models.v1_webhook_hmac_auth import V1WebhookHMACAuth
from hatchet_sdk.clients.rest.models.v1_webhook_hmac_algorithm import V1WebhookHMACAlgorithm
from hatchet_sdk.clients.rest.models.v1_webhook_hmac_encoding import V1WebhookHMACEncoding

# Create a webhook
webhook: V1Webhook = hatchet.webhooks.create(
    source_name=V1WebhookSourceName.GITHUB,
    name="my-github-webhook",
    event_key_expression="'github:' + body.action",
    auth=V1WebhookHMACAuth(
        secret="my-secret",
        algorithm=V1WebhookHMACAlgorithm.SHA256,
        encoding=V1WebhookHMACEncoding.HEX,
        header="X-Hub-Signature-256",
    ),
    scope_expression="body.repository.full_name",
)
webhook = await hatchet.webhooks.aio_create(
    source_name=V1WebhookSourceName.GITHUB,
    name="my-github-webhook",
    event_key_expression="'github:' + body.action",
    auth=auth_config,
)

# List webhooks
result: V1WebhookList = hatchet.webhooks.list(limit=50, webhook_names=["my-webhook"])
result = await hatchet.webhooks.aio_list()

# Get a webhook by name
webhook: V1Webhook = hatchet.webhooks.get("my-github-webhook")
webhook = await hatchet.webhooks.aio_get("my-github-webhook")

# Update a webhook
updated: V1Webhook = hatchet.webhooks.update(
    "my-github-webhook",
    event_key_expression="'github:' + body.event",
)
updated = await hatchet.webhooks.aio_update("my-github-webhook", event_key_expression=...)

# Delete a webhook
deleted: V1Webhook = hatchet.webhooks.delete("my-github-webhook")
deleted = await hatchet.webhooks.aio_delete("my-github-webhook")

WorkersClient — hatchet.workers

List and manage running worker processes.

Key methods

# List all workers in the tenant
workers: WorkerList = hatchet.workers.list()
workers = await hatchet.workers.aio_list()

# Get a specific worker by ID
worker: Worker = hatchet.workers.get(worker_id)
worker = await hatchet.workers.aio_get(worker_id)

# Pause a worker (stop accepting new tasks)
paused: Worker = hatchet.workers.pause(worker_id)
paused = await hatchet.workers.aio_pause(worker_id)

# Unpause a worker
unpaused: Worker = hatchet.workers.unpause(worker_id)
unpaused = await hatchet.workers.aio_unpause(worker_id)

# Update a worker with custom options
from hatchet_sdk.clients.rest.models.update_worker_request import UpdateWorkerRequest

updated: Worker = hatchet.workers.update(
    worker_id,
    UpdateWorkerRequest(isPaused=True),
)
updated = await hatchet.workers.aio_update(worker_id, opts)

WorkflowsClient — hatchet.workflows

Inspect and manage workflow declarations (not individual runs — use hatchet.runs for those).

Key methods

# List workflows
result: WorkflowList = hatchet.workflows.list(
    workflow_name="my-workflow",
    limit=50,
    offset=0,
)
result = await hatchet.workflows.aio_list()

# Get a workflow by ID
workflow: Workflow = hatchet.workflows.get(workflow_id)
workflow = await hatchet.workflows.aio_get(workflow_id)

# Get a specific version
version: WorkflowVersion = hatchet.workflows.get_version(workflow_id)
version = hatchet.workflows.get_version(workflow_id, version="v2")
version = await hatchet.workflows.aio_get_version(workflow_id)

# Delete a workflow (irreversible)
hatchet.workflows.delete(workflow_id)
await hatchet.workflows.aio_delete(workflow_id)
hatchet.workflows.delete() permanently deletes a workflow and all its associated data.

EventClient — hatchet.event

Push events that trigger event-subscribed workflows or durable tasks waiting for user events.

Key methods

from hatchet_sdk.clients.events import PushEventOptions, BulkPushEventWithMetadata

# Push a single event
event: Event = hatchet.event.push(
    "user:registered",
    {"user_id": "u_123", "email": "[email protected]"},
)
event = await hatchet.event.aio_push(
    "user:registered",
    {"user_id": "u_123"},
    options=PushEventOptions(
        additional_metadata={"source": "signup-form"},
        priority=2,
    ),
)

# Push multiple events in a single request
events: list[Event] = hatchet.event.bulk_push(
    [
        BulkPushEventWithMetadata(
            key="order:created",
            payload={"order_id": "o_1"},
            additional_metadata={"region": "us-east"},
        ),
        BulkPushEventWithMetadata(
            key="order:created",
            payload={"order_id": "o_2"},
        ),
    ]
)
events = await hatchet.event.aio_bulk_push(event_list)

# List events
result: V1EventList = hatchet.event.list(
    keys=["user:registered"],
    since=datetime(2025, 1, 1, tzinfo=timezone.utc),
    limit=100,
)
result = await hatchet.event.aio_list(keys=["order:created"])

# Get a specific event
ev: V1Event = hatchet.event.get(event_id)
ev = await hatchet.event.aio_get(event_id)

PushEventOptions

additional_metadata
dict
default:"{}"
Arbitrary key-value metadata attached to the event.
namespace
str | None
Override the namespace for this event only.
priority
int | None
Scheduling priority for runs triggered by this event.
scope
str | None
Scope value used for webhook routing.

Build docs developers (and LLMs) love