Define a workflow and add tasks
- Python
- TypeScript
- Go
Call
hatchet.workflow() to create a workflow object, then decorate functions with @workflow.task(). Declare dependencies with the parents argument.worker.py
import random
from datetime import timedelta
from pydantic import BaseModel
from hatchet_sdk import Context, EmptyModel, Hatchet
hatchet = Hatchet(debug=True)
class StepOutput(BaseModel):
random_number: int
class RandomSum(BaseModel):
sum: int
dag_workflow = hatchet.workflow(name="DAGWorkflow")
@dag_workflow.task(execution_timeout=timedelta(seconds=5))
def step1(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
@dag_workflow.task(execution_timeout=timedelta(seconds=5))
async def step2(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
Call
hatchet.workflow() to create a workflow, then add tasks via workflow.task(). The parents array declares dependencies.workflow.ts
import { hatchet } from './hatchet-client';
type DagInput = {
Message: string;
};
type DagOutput = {
reverse: {
Original: string;
Transformed: string;
};
};
export const dag = hatchet.workflow<DagInput, DagOutput>({
name: 'simple',
});
const toLower = dag.task({
name: 'to-lower',
fn: (input) => {
return {
TransformedMessage: input.Message.toLowerCase(),
};
},
});
Call
client.NewWorkflow() to create a workflow, then add tasks with workflow.NewTask(). Use hatchet.WithParents() to declare dependencies.main.go
package main
import (
"log"
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)
type Input struct {
Value int `json:"value"`
}
type StepOutput struct {
Step int `json:"step"`
Result int `json:"result"`
}
func main() {
client, err := hatchet.NewClient()
if err != nil {
log.Fatalf("failed to create hatchet client: %v", err)
}
workflow := client.NewWorkflow("dag-workflow")
step1 := workflow.NewTask("step-1",
func(ctx hatchet.Context, input Input) (StepOutput, error) {
return StepOutput{Step: 1, Result: input.Value * 2}, nil
},
)
_ = step1
}
Add tasks with parent dependencies
- Python
- TypeScript
- Go
Pass the parent task functions in the
parents list. Inside the child task, call ctx.task_output(parent_fn) to get the typed output of a parent.worker.py
@dag_workflow.task(parents=[step1, step2])
async def step3(input: EmptyModel, ctx: Context) -> RandomSum:
one = ctx.task_output(step1).random_number
two = ctx.task_output(step2).random_number
return RandomSum(sum=one + two)
@dag_workflow.task(parents=[step1, step3])
async def step4(input: EmptyModel, ctx: Context) -> dict[str, str]:
print(
ctx.task_output(step1),
ctx.task_output(step3),
)
return {"step4": "step4"}
Pass the parent task in the
parents array and call await ctx.parentOutput(parentTask) to retrieve its typed result.workflow.ts
dag.task({
name: 'reverse',
parents: [toLower],
fn: async (input, ctx) => {
const lower = await ctx.parentOutput(toLower);
return {
Original: input.Message,
Transformed: lower.TransformedMessage.split('').reverse().join(''),
};
},
});
Pass
hatchet.WithParents(step1) as an option to NewTask. Inside the handler, call ctx.ParentOutput(step1, &output) to decode the parent’s result.main.go
step2 := workflow.NewTask("step-2",
func(ctx hatchet.Context, input Input) (StepOutput, error) {
var step1Output StepOutput
if err := ctx.ParentOutput(step1, &step1Output); err != nil {
return StepOutput{}, err
}
return StepOutput{
Step: 2,
Result: step1Output.Result + 10,
}, nil
},
hatchet.WithParents(step1),
)
Full example: multi-step DAG
The following example builds a four-step DAG. Steps 2 and 3 both depend on step 1 (they run in parallel), and the final step depends on both.- Python
- TypeScript
- Go
worker.py
import random
from datetime import timedelta
from pydantic import BaseModel
from hatchet_sdk import Context, EmptyModel, Hatchet
hatchet = Hatchet(debug=True)
class StepOutput(BaseModel):
random_number: int
class RandomSum(BaseModel):
sum: int
dag_workflow = hatchet.workflow(name="DAGWorkflow")
@dag_workflow.task(execution_timeout=timedelta(seconds=5))
def step1(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
@dag_workflow.task(execution_timeout=timedelta(seconds=5))
async def step2(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
@dag_workflow.task(parents=[step1, step2])
async def step3(input: EmptyModel, ctx: Context) -> RandomSum:
one = ctx.task_output(step1).random_number
two = ctx.task_output(step2).random_number
return RandomSum(sum=one + two)
@dag_workflow.task(parents=[step1, step3])
async def step4(input: EmptyModel, ctx: Context) -> dict[str, str]:
print(ctx.task_output(step1), ctx.task_output(step3))
return {"step4": "step4"}
def main() -> None:
worker = hatchet.worker("dag-worker", workflows=[dag_workflow])
worker.start()
if __name__ == "__main__":
main()
workflow.ts
import { hatchet } from './hatchet-client';
type DagInput = { Message: string };
type DagOutput = {
reverse: { Original: string; Transformed: string };
};
export const dag = hatchet.workflow<DagInput, DagOutput>({
name: 'simple',
});
const toLower = dag.task({
name: 'to-lower',
fn: (input) => ({
TransformedMessage: input.Message.toLowerCase(),
}),
});
dag.task({
name: 'reverse',
parents: [toLower],
fn: async (input, ctx) => {
const lower = await ctx.parentOutput(toLower);
return {
Original: input.Message,
Transformed: lower.TransformedMessage.split('').reverse().join(''),
};
},
});
worker.ts
import { hatchet } from './hatchet-client';
import { dag } from './workflow';
async function main() {
const worker = await hatchet.worker('dag-worker', { workflows: [dag] });
await worker.start();
}
if (require.main === module) { main(); }
main.go
package main
import (
"context"
"log"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)
type Input struct {
Value int `json:"value"`
}
type StepOutput struct {
Step int `json:"step"`
Result int `json:"result"`
}
func main() {
client, err := hatchet.NewClient()
if err != nil {
log.Fatalf("failed to create hatchet client: %v", err)
}
workflow := client.NewWorkflow("dag-workflow")
step1 := workflow.NewTask("step-1",
func(ctx hatchet.Context, input Input) (StepOutput, error) {
return StepOutput{Step: 1, Result: input.Value * 2}, nil
},
)
step2 := workflow.NewTask("step-2",
func(ctx hatchet.Context, input Input) (StepOutput, error) {
var step1Output StepOutput
if err := ctx.ParentOutput(step1, &step1Output); err != nil {
return StepOutput{}, err
}
return StepOutput{Step: 2, Result: step1Output.Result + 10}, nil
},
hatchet.WithParents(step1),
)
step3 := workflow.NewTask("step-3",
func(ctx hatchet.Context, input Input) (StepOutput, error) {
var step1Output StepOutput
if err := ctx.ParentOutput(step1, &step1Output); err != nil {
return StepOutput{}, err
}
return StepOutput{Step: 3, Result: step1Output.Result * 3}, nil
},
hatchet.WithParents(step1),
)
_ = workflow.NewTask("final-step",
func(ctx hatchet.Context, input Input) (StepOutput, error) {
var step2Output, step3Output StepOutput
ctx.ParentOutput(step2, &step2Output)
ctx.ParentOutput(step3, &step3Output)
return StepOutput{Step: 4, Result: step2Output.Result + step3Output.Result}, nil
},
hatchet.WithParents(step2, step3),
)
worker, err := client.NewWorker("dag-worker", hatchet.WithWorkflows(workflow))
if err != nil {
log.Fatalf("failed to create worker: %v", err)
}
interruptCtx, cancel := cmdutils.NewInterruptContext()
defer cancel()
go func() {
if err := worker.StartBlocking(interruptCtx); err != nil {
log.Fatalf("failed to start worker: %v", err)
}
}()
_, err = client.Run(context.Background(), "dag-workflow", Input{Value: 5})
if err != nil {
log.Fatalf("failed to run workflow: %v", err)
}
<-interruptCtx.Done()
}
Trigger a DAG workflow
- Python
- TypeScript
- Go
trigger.py
from worker import dag_workflow
dag_workflow.run()
run.ts
import { dag } from './workflow';
async function main() {
const res = await dag.run({
Message: 'hello world',
});
console.log(res.reverse.Transformed);
}
if (require.main === module) { main(); }
_, err = client.Run(context.Background(), "dag-workflow", Input{Value: 5})
if err != nil {
log.Fatalf("failed to run workflow: %v", err)
}
Conditional task execution
Useskip_if (Python) or skipIf (TypeScript) to conditionally skip a task based on a parent’s output or an external event. A skipped task is not considered a failure.
- Python
- TypeScript
ParentCondition evaluates a CEL expression against the named parent’s output. If the expression is truthy, the task is skipped.worker.py
from hatchet_sdk import (
Context, EmptyModel, Hatchet, ParentCondition, SleepCondition, UserEventCondition, or_
)
from pydantic import BaseModel
import random
from datetime import timedelta
hatchet = Hatchet(debug=True)
class StepOutput(BaseModel):
random_number: int
task_condition_workflow = hatchet.workflow(name="TaskConditionWorkflow")
@task_condition_workflow.task()
def start(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
# Wait for a sleep condition before running
@task_condition_workflow.task(
parents=[start],
wait_for=[SleepCondition(timedelta(seconds=10))],
)
def wait_for_sleep(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
# Skip this task if start.random_number > 0
@task_condition_workflow.task(
parents=[start, wait_for_sleep],
skip_if=[ParentCondition(parent=start, expression="output.random_number > 0")],
)
def skip_with_multiple_parents(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
# Branch left if random_number <= 50
@task_condition_workflow.task(
parents=[wait_for_sleep],
skip_if=[
ParentCondition(
parent=wait_for_sleep,
expression="output.random_number > 50",
)
],
)
def left_branch(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
# Branch right if random_number > 50
@task_condition_workflow.task(
parents=[wait_for_sleep],
skip_if=[
ParentCondition(
parent=wait_for_sleep,
expression="output.random_number <= 50",
)
],
)
def right_branch(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
ctx.was_skipped(task_fn) before reading a potentially-skipped task’s output:@task_condition_workflow.task(
parents=[start, wait_for_sleep, left_branch, right_branch],
)
def sum(input: EmptyModel, ctx: Context) -> RandomSum:
one = ctx.task_output(start).random_number
two = ctx.task_output(wait_for_sleep).random_number
three = (
ctx.task_output(left_branch).random_number
if not ctx.was_skipped(left_branch)
else 0
)
four = (
ctx.task_output(right_branch).random_number
if not ctx.was_skipped(right_branch)
else 0
)
return RandomSum(sum=one + two + three + four)
Pass a
waitFor condition using Or, SleepCondition, or UserEventCondition from @hatchet/v1/conditions.workflow.ts
import { Or } from '@hatchet/v1/conditions';
import { hatchet } from './hatchet-client';
export const dagWithConditions = hatchet.workflow({
name: 'simple',
});
const firstTask = dagWithConditions.task({
name: 'first-task',
fn: async () => ({ Completed: true }),
});
dagWithConditions.task({
name: 'second-task',
parents: [firstTask],
waitFor: Or({ eventKey: 'user:event' }, { sleepFor: '10s' }),
fn: async (_, ctx) => {
console.log('triggered by condition', ctx.triggers());
return { Completed: true };
},
});
Use
wait_for to delay a task until a condition is met (e.g. a timer or external event). Use skip_if to conditionally bypass a task entirely based on parent output.Next steps
Durable execution
Sleep and wait for events inside a task, surviving restarts.
Child workflows
Spawn sub-workflows dynamically from within a task.