Overview
The workflow tool enables agents to create, list, inspect, edit, and delete workflow definitions stored as JSON in the workspace. Workflows define a DAG (directed acyclic graph) of steps where each step runs an agent with a specific profile and can depend on prior step outputs.
workflow
Create, list, inspect, edit, and delete DAG-based multi-step workflows.
Action to perform: create, list, show, edit, or delete
Name of the workflow (for create/show/edit/delete). Must be alphanumeric with underscores/hyphens only.
Workflow description (for create/edit)
Step definitions (for create/edit). Each step object must have:
name (required): Step identifier
prompt (required): Task for the agent to perform
profile (optional): Agent profile name (default: “default”)
depends_on (optional): Array of step names this step depends on
timeout_seconds (optional): Execution timeout (default: 300)
Actions
Create Workflow
result = await workflow(
action="create",
workflow_name="data_pipeline",
description="Extract, transform, and load sales data",
steps=[
{
"name": "extract",
"prompt": "Download sales data from API endpoint /api/sales/export",
"profile": "data_engineer",
"timeout_seconds": 600
},
{
"name": "transform",
"prompt": "Clean and normalize the data: {{extract.output}}",
"profile": "data_engineer",
"depends_on": ["extract"]
},
{
"name": "load",
"prompt": "Load transformed data into warehouse: {{transform.output}}",
"profile": "data_engineer",
"depends_on": ["transform"]
},
{
"name": "report",
"prompt": "Generate summary report of loaded data",
"profile": "analyst",
"depends_on": ["load"]
}
]
)
Returns:
Workflow 'data_pipeline' created successfully.
Steps: 4
Execution layers: 4
Saved to: /workspace/workflows/data_pipeline.json
Run with: grip workflow run data_pipeline
List Workflows
result = await workflow(action="list")
Returns:
## Saved Workflows
- **data_pipeline**: 4 steps, 4 layers — Extract, transform, and load sales data
- **weekly_report**: 3 steps, 3 layers — Generate and distribute weekly analytics
- **code_review**: 5 steps, 3 layers — Automated code quality analysis
Show Workflow
result = await workflow(
action="show",
workflow_name="data_pipeline"
)
Returns:
## Workflow: data_pipeline
Description: Extract, transform, and load sales data
Steps: 4
Execution layers: 4
### Steps
- **extract** [profile: data_engineer, timeout: 600s]
Dependencies: (none)
Prompt: Download sales data from API endpoint /api/sales/export
- **transform** [profile: data_engineer, timeout: 300s]
Dependencies: extract
Prompt: Clean and normalize the data: {{extract.output}}
- **load** [profile: data_engineer, timeout: 300s]
Dependencies: transform
Prompt: Load transformed data into warehouse: {{transform.output}}
- **report** [profile: analyst, timeout: 300s]
Dependencies: load
Prompt: Generate summary report of loaded data
### Execution Order
Layer 1: extract
Layer 2: transform
Layer 3: load
Layer 4: report
Edit Workflow
result = await workflow(
action="edit",
workflow_name="data_pipeline",
description="Updated: Extract, transform, load, and analyze sales data",
steps=[
# ... updated steps ...
]
)
Returns:
Workflow 'data_pipeline' updated successfully.
Steps: 5
Execution layers: 4
Saved to: /workspace/workflows/data_pipeline.json
Delete Workflow
result = await workflow(
action="delete",
workflow_name="data_pipeline"
)
Returns:
Workflow 'data_pipeline' deleted.
Step Dependencies
Dependency Graph
Steps can depend on one or more prior steps using the depends_on array:
steps=[
{"name": "fetch_data", "prompt": "..."},
{"name": "process_a", "prompt": "...", "depends_on": ["fetch_data"]},
{"name": "process_b", "prompt": "...", "depends_on": ["fetch_data"]},
{"name": "merge", "prompt": "...", "depends_on": ["process_a", "process_b"]}
]
Execution order:
- Layer 1:
fetch_data (no dependencies)
- Layer 2:
process_a, process_b (both depend only on fetch_data, run in parallel)
- Layer 3:
merge (waits for both process_a and process_b)
Output Templating
Reference outputs from previous steps using {{step_name.output}} in prompts:
{
"name": "analyze",
"prompt": "Analyze this data: {{fetch_data.output}}",
"depends_on": ["fetch_data"]
}
At execution time, {{fetch_data.output}} is replaced with the actual output from the fetch_data step.
Validation
Workflows are validated before saving:
Cycle Detection
steps=[
{"name": "a", "prompt": "...", "depends_on": ["b"]},
{"name": "b", "prompt": "...", "depends_on": ["a"]}
]
# Error: workflow validation failed:
# - Circular dependency detected: a → b → a
Missing Dependencies
steps=[
{"name": "analyze", "prompt": "...", "depends_on": ["nonexistent"]}
]
# Error: workflow validation failed:
# - Step 'analyze' depends on undefined step 'nonexistent'
Duplicate Step Names
steps=[
{"name": "process", "prompt": "..."},
{"name": "process", "prompt": "..."} # Duplicate!
]
# Error: workflow validation failed:
# - Duplicate step name: 'process'
Max Steps Limit
Workflows cannot exceed 50 steps:
steps=[...51 steps...]
# Error: workflow exceeds maximum of 50 steps.
Execution
Workflows are executed via the CLI (not directly from the tool):
grip workflow run data_pipeline
The workflow engine:
- Loads the workflow definition from
workspace/workflows/data_pipeline.json
- Computes execution layers based on dependencies
- Executes each layer sequentially, steps within a layer run in parallel
- Passes outputs from completed steps to dependent steps via template substitution
- Reports results to the console or configured channel
Storage
Workflows are stored as JSON files in workspace/workflows/:
File: workspace/workflows/data_pipeline.json
{
"name": "data_pipeline",
"description": "Extract, transform, and load sales data",
"steps": [
{
"name": "extract",
"prompt": "Download sales data from API endpoint /api/sales/export",
"profile": "data_engineer",
"depends_on": [],
"timeout_seconds": 600
},
{
"name": "transform",
"prompt": "Clean and normalize the data: {{extract.output}}",
"profile": "data_engineer",
"depends_on": ["extract"],
"timeout_seconds": 300
}
]
}
Best Practices
- Keep workflows under 20 steps for maintainability
- Use descriptive step names (e.g. “fetch_user_data” not “step1”)
- Set appropriate timeouts for long-running steps
- Leverage parallelism by minimizing unnecessary dependencies
- Use profiles to assign specialized agents to different step types
- Test small workflows first before creating complex DAGs
- Document workflows with clear descriptions
Use Cases
Data Pipeline
steps=[
{"name": "extract", "prompt": "Download CSV from S3"},
{"name": "validate", "prompt": "Check schema", "depends_on": ["extract"]},
{"name": "transform", "prompt": "Apply transformations", "depends_on": ["validate"]},
{"name": "load", "prompt": "Insert into database", "depends_on": ["transform"]}
]
Code Review
steps=[
{"name": "lint", "prompt": "Run linter"},
{"name": "test", "prompt": "Run unit tests"},
{"name": "security", "prompt": "Run security scan"},
{"name": "summary", "prompt": "Compile results", "depends_on": ["lint", "test", "security"]}
]
Report Generation
steps=[
{"name": "sales", "prompt": "Generate sales metrics"},
{"name": "traffic", "prompt": "Analyze web traffic"},
{"name": "merge", "prompt": "Combine reports", "depends_on": ["sales", "traffic"]},
{"name": "send", "prompt": "Email to stakeholders", "depends_on": ["merge"]}
]
Implementation
Defined in grip/tools/workflow.py. Uses:
WorkflowDef and StepDef dataclasses from grip.workflow.models
WorkflowStore for JSON persistence
- Topological sort for execution layer computation
- DAG validation with cycle detection
- Template substitution for step output references