Skip to main content
A child workflow is a workflow triggered from inside a running task. The parent task waits for the child to complete and receives its output, or it can fire-and-forget if the result is not needed. Child workflows are useful when you need to:
  • Break a large workload into independently observable units
  • Fan out to a dynamic number of parallel workers
  • Reuse a workflow definition across multiple callers
  • Apply separate concurrency or retry policies to a sub-unit of work

Define a child and a parent

Any task or workflow can be used as a child. Define it the same way as any other task, then call it from the parent with await child_task.aio_run(input=...).
worker.py
from pydantic import BaseModel
from hatchet_sdk import Context, Hatchet

hatchet = Hatchet(debug=True)

class SimpleInput(BaseModel):
    message: str

class SimpleOutput(BaseModel):
    transformed_message: str

child_task = hatchet.workflow(name="SimpleWorkflow", input_validator=SimpleInput)

@child_task.task(name="step1")
def step1(input: SimpleInput, ctx: Context) -> SimpleOutput:
    print("executed step1: ", input.message)
    return SimpleOutput(transformed_message=input.message.upper())

Spawn a child workflow from a task

Call await child_task.aio_run(input=...) from inside any async task function. The call returns the child’s typed output.
simple-fanout.py
from typing import Any
from hatchet_sdk import Context, Hatchet
from hatchet_sdk.runnables.types import EmptyModel
from worker import SimpleInput, child_task

hatchet = Hatchet(debug=True)

@hatchet.task(name="SpawnTask")
async def spawn(input: EmptyModel, ctx: Context) -> dict[str, Any]:
    result = await child_task.aio_run(
        input=SimpleInput(message="Hello, World!"),
    )
    return {"results": result}
For a single child with a blocking call:
from worker import SimpleInput, child_task

# Synchronous — blocks until the child completes
result = child_task.run(SimpleInput(message="Hello, World!"))

Fan-out: spawn multiple children in parallel

Fan-out spawns many child workflows concurrently and collects all results before continuing.
Use asyncio.gather to run multiple children simultaneously:
import asyncio
from worker import SimpleInput, child_task

result1 = child_task.aio_run(SimpleInput(message="Hello, World!"))
result2 = child_task.aio_run(SimpleInput(message="Hello, Moon!"))

results = await asyncio.gather(result1, result2)

print(results[0]["transformed_message"])
print(results[1]["transformed_message"])
Or submit a batch in one call with aio_run_many:
greetings = ["Hello, World!", "Hello, Moon!", "Hello, Mars!"]

results = await child_task.aio_run_many(
    [
        child_task.create_bulk_run_item(
            input=SimpleInput(message=greeting),
        )
        for greeting in greetings
    ]
)

print(results)

Register child and parent on the same worker

Both the child and parent must be registered on a worker for the workflow to execute. The worker needs enough slots to run children concurrently.
worker.py
from hatchet_sdk import Hatchet
from worker import child_task

hatchet = Hatchet(debug=True)

def main() -> None:
    worker = hatchet.worker("test-worker", slots=1, workflows=[child_task])
    worker.start()

if __name__ == "__main__":
    main()

Trigger the parent

trigger.py
from worker import SimpleInput, child_task

child_task.run(SimpleInput(message="Hello, World!"))
If the parent and child are on the same worker with limited slots, a deadlock can occur: the parent holds a slot and waits for a child that cannot start because all slots are occupied. Increase slots or run child workflows on a separate worker to avoid this.

Next steps

Durable execution

Spawn children from durable tasks that survive restarts.

DAG workflows

Build task graphs with explicit dependencies.

Build docs developers (and LLMs) love