Skip to main content

Overview

Workflows enable complex, multi-step automation by orchestrating multiple agent invocations as a directed acyclic graph (DAG). Each step runs an agent with a specific profile and prompt, with support for dependency management, parallel execution, and output templating.

Architecture

Workflows are defined as JSON files stored in workspace/workflows/ and consist of:
  1. Metadata — Name and description
  2. Steps — Individual agent invocations with dependencies
  3. Execution layers — Parallel batches computed from the dependency graph
# From grip/workflow/models.py:100-113
@dataclass(slots=True)
class WorkflowDef:
    """Complete workflow definition: a named DAG of steps.
    
    Steps are validated at load time to ensure:
      - Non-empty workflow name
      - At least one step
      - Valid step names (alphanumeric, underscore, hyphen)
      - Positive timeout values
      - No duplicate step names
      - All depends_on references point to existing steps
      - No circular dependencies
    """
    name: str
    description: str = ""
    steps: list[StepDef] = field(default_factory=list)

JSON Format

{
  "name": "feature-pipeline",
  "description": "Complete feature development workflow",
  "steps": [
    {
      "name": "research",
      "prompt": "Research similar implementations in the codebase",
      "profile": "researcher",
      "timeout_seconds": 300
    },
    {
      "name": "design",
      "prompt": "Design the feature based on: {{research.output}}",
      "profile": "architect",
      "depends_on": ["research"],
      "timeout_seconds": 600
    },
    {
      "name": "implement",
      "prompt": "Implement the design: {{design.output}}",
      "profile": "developer",
      "depends_on": ["design"],
      "timeout_seconds": 1200
    },
    {
      "name": "test",
      "prompt": "Write tests for the implemented feature",
      "profile": "tester",
      "depends_on": ["implement"],
      "timeout_seconds": 600
    },
    {
      "name": "document",
      "prompt": "Document the new feature",
      "profile": "writer",
      "depends_on": ["implement"],
      "timeout_seconds": 300
    }
  ]
}

Step Definition

Each step supports the following fields:
# From grip/workflow/models.py:32-44
@dataclass(slots=True)
class StepDef:
    """Definition of a single workflow step.
    
    depends_on lists step names that must complete before this step runs.
    prompt can contain {{step_name.output}} placeholders that are resolved
    at execution time from prior step results.
    """
    name: str              # Unique identifier (alphanumeric, underscore, hyphen)
    prompt: str            # Task description sent to the agent
    profile: str = "default"  # Agent profile to use
    depends_on: list[str] = field(default_factory=list)  # Step dependencies
    timeout_seconds: int = 300  # Per-step timeout
  • name: Must match ^[\w-]+$ (alphanumeric, underscore, hyphen only)
  • prompt: Cannot be empty or whitespace-only
  • profile: Must exist in config.agents.profiles
  • depends_on: All referenced steps must exist in the workflow
  • timeout_seconds: Must be >= 1

Dependency Resolution

The engine computes execution layers using topological sort:
# From grip/workflow/models.py:159-179
def get_execution_order(self) -> list[list[str]]:
    """Return steps grouped into parallel execution layers.
    
    Each layer contains steps whose dependencies are all in earlier
    layers, so they can execute concurrently.
    """
    adj, in_degree = _build_graph(self.steps)
    layers: list[list[str]] = []
    queue = deque(n for n, d in in_degree.items() if d == 0)
    
    while queue:
        layer = sorted(queue)
        layers.append(layer)
        queue.clear()
        for node in layer:
            for neighbor in adj[node]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)
    
    return layers
Example execution order:
# For the parallel-pipeline workflow above:
Layer 1: ["fetch_data"]                              # No dependencies
Layer 2: ["analyze_frontend", "analyze_backend"]     # Both depend on fetch_data
Layer 3: ["merge_results"]                           # Depends on both analyses

Parallel Execution

Steps within the same layer execute concurrently via asyncio.gather:
# From grip/workflow/engine.py:76-93
for layer_idx, layer_names in enumerate(layers, 1):
    runnable = [
        name for name in layer_names
        if result.step_results[name].status != StepStatus.SKIPPED
    ]
    if not runnable:
        continue
    
    logger.info("Executing layer {}/{}: {}", layer_idx, len(layers), runnable)
    
    tasks = []
    for step_name in runnable:
        step_def = step_map[step_name]
        step_result = result.step_results[step_name]
        resolved_prompt = self._resolve_template(step_def.prompt, result.step_results)
        tasks.append(self._execute_step(step_def, step_result, resolved_prompt))
    
    await asyncio.gather(*tasks)
Parallel execution significantly reduces total workflow duration when steps have no interdependencies.

Template Syntax

Step prompts can reference outputs from previous steps using {{step_name.output}} syntax:
# From grip/workflow/engine.py:166-181
@staticmethod
def _resolve_template(prompt: str, step_results: dict[str, StepResult]) -> str:
    """Replace {{step_name.output}} placeholders with sanitized step outputs.
    
    Outputs are stripped of nested template patterns, truncated, and wrapped
    in delimiters to prevent injection and clarify boundaries for the LLM.
    """
    def replacer(match: re.Match) -> str:
        step_name = match.group(1)
        result = step_results.get(step_name)
        if result and result.status == StepStatus.COMPLETED:
            sanitized = WorkflowEngine._sanitize_output(result.output)
            return f"[output from {step_name}]\n{sanitized}\n[/output from {step_name}]"
        return match.group(0)
    
    return TEMPLATE_PATTERN.sub(replacer, prompt)
Template features:
  • Outputs are truncated to 50,000 characters to prevent context overflow
  • Nested template references are stripped to prevent injection attacks
  • Output is wrapped in clear delimiters for LLM parsing

Error Handling

Validation occurs before execution:
# From grip/workflow/models.py:117-157
errors = workflow.validate()
if errors:
    raise ValueError(f"Invalid workflow: {'; '.join(errors)}")

# Possible validation errors:
- "Workflow name cannot be empty"
- "Workflow must have at least one step"
- "Step name 'invalid name' is invalid (must be alphanumeric/underscore/hyphen)"
- "Step 'step1' has an empty prompt"
- "Step 'step1' has invalid timeout (0s); must be >= 1"
- "Duplicate step names found"
- "Step 'step2' depends on unknown step 'step1'"
- "Circular dependency detected in workflow steps"
When a step fails, dependent steps are automatically skipped:
# From grip/workflow/engine.py:184-200
@staticmethod
def _skip_dependents(
    failed_layer: list[str],
    remaining_layers: list[list[str]],
    result: WorkflowRunResult,
    step_map: dict[str, StepDef],
) -> None:
    """Mark steps that depend on failed steps as skipped."""
    failed_set = {
        name for name in failed_layer 
        if result.step_results[name].status == StepStatus.FAILED
    }
    
    for layer in remaining_layers:
        for step_name in layer:
            step_def = step_map[step_name]
            if any(dep in failed_set for dep in step_def.depends_on):
                result.step_results[step_name].mark_skipped("Skipped due to dependency failure")
                failed_set.add(step_name)
Possible step statuses:
  • PENDING — Waiting to execute
  • RUNNING — Currently executing
  • COMPLETED — Successfully finished
  • FAILED — Encountered an error or timeout
  • SKIPPED — Skipped due to dependency failure

Workflow Results

Execution produces a comprehensive result object:
# From grip/workflow/models.py:219-259
@dataclass(slots=True)
class WorkflowRunResult:
    """Complete result of a workflow execution."""
    workflow_name: str
    status: str = "pending"  # pending, running, completed, failed, partial
    step_results: dict[str, StepResult] = field(default_factory=dict)
    started_at: str = ""
    completed_at: str = ""
    total_duration_seconds: float = 0.0
    
    def to_dict(self) -> dict[str, Any]:
        return {
            "workflow_name": self.workflow_name,
            "status": self.status,
            "started_at": self.started_at,
            "completed_at": self.completed_at,
            "total_duration_seconds": self.total_duration_seconds,
            "steps": {
                name: {
                    "status": r.status.value,
                    "output": r.output[:500] + "... [truncated]" if len(r.output) > 500 else r.output,
                    "error": r.error,
                    "iterations": r.iterations,
                    "duration_seconds": r.duration_seconds,
                }
                for name, r in self.step_results.items()
            },
        }

Persistence

Workflows are stored as JSON files in workspace/workflows/:
# From grip/workflow/store.py:31-42
def save(self, workflow: WorkflowDef) -> Path:
    """Save a workflow definition to disk."""
    self._validate_name(workflow.name)
    path = self._dir / f"{workflow.name}.json"
    tmp = path.with_suffix(".tmp")
    tmp.write_text(
        json.dumps(workflow.to_dict(), indent=2, ensure_ascii=False),
        encoding="utf-8",
    )
    tmp.rename(path)  # Atomic write
    logger.debug("Saved workflow: {}", workflow.name)
    return path

CLI Usage

# List workflows
grip workflow list

# Run a workflow
grip workflow run feature-pipeline

# Create from template
grip workflow create my-workflow --template parallel

# Delete a workflow
grip workflow delete old-workflow
  • Task Tracking — Persistent todo lists for multi-step work
  • Scheduling — Automated periodic workflow execution
  • Skills — Specialized agent profiles for workflow steps

Build docs developers (and LLMs) love