Skip to main content

Workflow Tool

The Workflow tool enables users to create, manage, and execute automated workflows with scheduled and event-based triggers.

Overview

Workflow capabilities:
  • Search for integration triggers
  • Create workflows from natural language
  • Save conversations as workflows
  • List and manage workflows
  • Execute workflows on-demand
  • Schedule recurring workflows
# Location: apps/api/app/agents/tools/workflow_tool.py

Core Concepts

Trigger Types

  1. Manual: User initiates execution
  2. Scheduled: Cron-based recurring execution
  3. Integration: Event-based (new email, calendar event, PR created, etc.)

Workflow Modes

  • new: Create workflow from description
  • from_conversation: Save current chat session as reusable workflow

Available Tools

Search Triggers

@tool
async def search_triggers(
    config: RunnableConfig,
    query: Annotated[str, "Describe when the workflow should trigger"],
    limit: Annotated[int, "Max results"] = 15,
) -> dict:
    """
    Search for integration triggers matching your description.
    
    Examples:
    - "when I receive an email"
    - "when calendar event starts"
    - "when someone sends a slack message"
    - "when a github issue is created"
    
    Returns matching triggers with config_fields embedded.
    """
    user_id = get_user_id(config)
    
    results = await TriggerSearchService.search(
        query=query,
        user_id=user_id,
        limit=limit,
    )
    
    connected = [t for t in results if t.get("is_connected")]
    not_connected = [t for t in results if not t.get("is_connected")]
    
    return success_response({
        "triggers": results,
        "connected_count": len(connected),
        "not_connected_count": len(not_connected),
    })
Response:
{
  "success": true,
  "data": {
    "triggers": [
      {
        "trigger_slug": "gmail_new_email",
        "trigger_name": "New Email Received",
        "description": "Triggers when a new email arrives in Gmail",
        "is_connected": true,
        "config_fields": [
          {
            "name": "label_ids",
            "type": "array",
            "required": false,
            "description": "Filter by Gmail labels"
          }
        ]
      }
    ],
    "connected_count": 3,
    "not_connected_count": 2
  }
}

Create Workflow

@tool
@with_rate_limiting("workflow_operations")
async def create_workflow(
    config: RunnableConfig,
    user_request: Annotated[str, "User's exact words describing workflow"],
    mode: Annotated[
        Literal["new", "from_conversation"],
        "Mode: 'new' for description, 'from_conversation' to save session"
    ] = "new",
) -> dict:
    """
    Create workflow by delegating to workflow assistant.
    
    IMPORTANT: Pass user's request EXACTLY as stated. Do not interpret,
    parse schedules, extract steps, or determine trigger types.
    
    Examples:
    - create_workflow("checks my email every morning", mode="new")
    - create_workflow("notifies me on Slack when I get a GitHub PR", mode="new")
    - create_workflow("save this as a workflow", mode="from_conversation")
    - create_workflow("runs every Monday", mode="from_conversation")
    """
    from app.services.workflow.workflow_subagent import WorkflowSubagentRunner
    
    writer = get_stream_writer()
    user_id = get_user_id(config)
    thread_id = get_thread_id(config) or ""
    user_time = get_user_time(config)
    user_timezone = get_user_timezone(config)
    
    # Build task based on mode
    if mode == "new":
        task_description = _build_new_workflow_task(user_request.strip())
    elif mode == "from_conversation":
        # Extract workflow from conversation context
        context = await WorkflowContextExtractor.extract_from_thread(thread_id)
        if not context or not context.workflow_steps:
            return error_response(
                "extraction_failed",
                "Could not extract workflow steps from conversation."
            )
        task_description = _build_from_conversation_task(context, user_request)
    
    # Execute workflow subagent
    subagent_response = await WorkflowSubagentRunner.execute(
        task=task_description,
        user_id=user_id,
        thread_id=thread_id,
        user_time=user_time,
        stream_writer=writer,
    )
    
    # Parse response
    result = parse_subagent_response(subagent_response)
    
    if result.mode == "finalized" and result.draft:
        draft = result.draft
        
        # Check if can create directly (simple workflows)
        if can_create_directly(draft):
            # Create workflow immediately
            workflow = await WorkflowService.create_workflow(
                request=CreateWorkflowRequest(
                    title=draft.title,
                    description=draft.description,
                    trigger_config=TriggerConfig(
                        type=draft.trigger_type,
                        enabled=True,
                        cron_expression=draft.cron_expression,
                        trigger_name=draft.trigger_slug,
                        timezone=user_timezone,
                    ),
                    generate_immediately=True,
                ),
                user_id=user_id,
                user_timezone=user_timezone,
            )
            
            # Stream workflow_created event
            writer({"workflow_created": workflow.model_dump()})
            
            return success_response(
                {"status": "created", "workflow_id": workflow.id},
                f"Workflow '{workflow.title}' created and activated."
            )
        else:
            # Stream draft for user confirmation
            writer(draft.to_stream_payload())
            return success_response(
                {"status": "draft_sent"},
                "Workflow draft sent for confirmation."
            )
    
    return success_response({"status": "completed"})

List Workflows

@tool
async def list_workflows(config: RunnableConfig) -> dict:
    """List all workflows for the current user."""
    user_id = get_user_id(config)
    workflows = await WorkflowService.list_workflows(user_id)
    
    workflow_summaries = [
        {
            "id": w.id,
            "title": w.title,
            "description": w.description[:100] + "...",
            "trigger_type": w.trigger_config.type,
            "activated": w.activated,
            "step_count": len(w.steps),
            "total_executions": w.total_executions,
        }
        for w in workflows
    ]
    
    writer = get_stream_writer()
    writer({"workflow_list": {"workflows": workflow_summaries, "total": len(workflows)}})
    
    return success_response({"workflows": workflow_summaries, "total": len(workflows)})

Execute Workflow

@tool
async def execute_workflow(
    config: RunnableConfig,
    workflow_id: Annotated[str, "The ID of the workflow to execute"],
) -> dict:
    """Execute a workflow immediately (run now)."""
    user_id = get_user_id(config)
    
    result = await WorkflowService.execute_workflow(
        workflow_id,
        WorkflowExecutionRequest(),
        user_id
    )
    
    data = {
        "workflow_id": workflow_id,
        "execution_id": result.execution_id,
        "message": result.message,
    }
    
    writer = get_stream_writer()
    writer({"workflow_execution": {"action": "started", "execution": data}})
    
    return success_response(data)

Workflow Subagent

Workflows are created by a specialized subagent:
# Location: apps/api/app/services/workflow/workflow_subagent.py
class WorkflowSubagentRunner:
    @staticmethod
    async def execute(
        task: str,
        user_id: str,
        thread_id: str,
        user_time: Optional[datetime] = None,
        stream_writer=None,
    ) -> str:
        """
        Execute workflow subagent with specialized tools.
        """
        llm = init_llm()
        
        # Create subagent with workflow tools
        subagent_graph = await SubAgentFactory.create_provider_subagent(
            provider="workflow",
            name="workflow_subagent",
            llm=llm,
            tool_space="workflow_delegated",
            use_direct_tools=True,
        )
        
        # Execute
        result = await subagent_graph.ainvoke(
            {"messages": [HumanMessage(content=task)]},
            config=build_subagent_config(...),
        )
        
        return extract_response(result)

Context Extraction

For from_conversation mode:
# Location: apps/api/app/services/workflow/context_extractor.py
class WorkflowContextExtractor:
    @staticmethod
    async def extract_from_thread(thread_id: str) -> WorkflowContext:
        """
        Extract workflow-relevant information from conversation.
        
        Returns:
            WorkflowContext with:
            - suggested_title
            - summary
            - workflow_steps
            - integrations_used
        """
        # Fetch conversation messages
        messages = await get_thread_messages(thread_id)
        
        # Use LLM to extract workflow steps
        extraction_prompt = f"""
        Analyze this conversation and extract:
        1. A concise workflow title
        2. A brief summary
        3. Ordered list of steps performed
        4. Integrations/tools used
        
        Conversation:
        {format_messages(messages)}
        """
        
        llm = init_llm(use_free=True)
        response = await llm.ainvoke([HumanMessage(content=extraction_prompt)])
        
        return parse_workflow_context(response.content)

Workflow Draft Format

# Location: apps/api/app/services/workflow/subagent_output.py
class FinalizedOutput(BaseModel):
    title: str
    description: str
    trigger_type: Literal["manual", "scheduled", "integration"]
    trigger_slug: Optional[str] = None
    cron_expression: Optional[str] = None
    prompt: str  # Workflow steps in natural language
    direct_create: bool = False
    
    def to_stream_payload(self) -> dict:
        """Convert to frontend-compatible format."""
        return {
            "workflow_draft": {
                "title": self.title,
                "description": self.description,
                "triggerType": self.trigger_type,
                "triggerSlug": self.trigger_slug,
                "cronExpression": self.cron_expression,
                "prompt": self.prompt,
            }
        }

Direct Creation Logic

def can_create_directly(draft: FinalizedOutput) -> bool:
    """
    Check if workflow can be created without user confirmation.
    
    Returns True if:
    - direct_create flag is True
    - Trigger type is manual or scheduled (no config_fields needed)
    
    Returns False if:
    - Trigger type is integration (requires user to specify config)
    """
    if not draft.direct_create:
        return False
    
    # Integration triggers ALWAYS need confirmation
    if draft.trigger_type == "integration":
        return False
    
    return True

Usage Examples

Creating Simple Scheduled Workflow

# User: "Create a workflow that checks my email every morning"

# Agent delegates to workflow assistant
await create_workflow(
    config,
    user_request="checks my email every morning",
    mode="new"
)

# Workflow assistant:
# 1. Understands "every morning" = daily at 9am
# 2. Creates scheduled trigger with cron: "0 9 * * *"
# 3. Generates workflow prompt: "Check Gmail inbox and summarize new emails"
# 4. Returns finalized draft
# 5. Direct creation succeeds (scheduled, no config needed)
# 6. Frontend receives workflow_created event

Creating Integration-Based Workflow

# User: "Notify me on Slack when I get a GitHub PR"

# Agent delegates
await create_workflow(
    config,
    user_request="notifies me on Slack when I get a GitHub PR",
    mode="new"
)

# Workflow assistant:
# 1. Calls search_triggers("when GitHub PR created")
# 2. Finds "github_pull_request_opened" trigger
# 3. Generates draft with:
#    - trigger_type: "integration"
#    - trigger_slug: "github_pull_request_opened"
#    - config_fields: ["repository_ids"]
# 4. Streams draft to frontend for confirmation
# 5. User selects which repos to monitor
# 6. Frontend calls workflow creation API with config

Saving Conversation as Workflow

# User has conversation about creating todos
User: "Add a task for the budget report"
Agent: *creates todo*
User: "Set it to high priority"
Agent: *updates priority*
User: "Add a reminder for next week"
Agent: *adds reminder*

# User: "Save this as a workflow that runs every month"

await create_workflow(
    config,
    user_request="runs every month",
    mode="from_conversation"
)

# Workflow assistant:
# 1. Extracts context from thread
# 2. Identifies steps: create todo, set priority, add reminder
# 3. Asks: "When in the month should this run?"
# User: "First day of the month"
# 4. Creates scheduled workflow with cron: "0 9 1 * *"
The workflow assistant uses a specialized prompt that guides it to search for triggers, ask clarifying questions, and generate properly formatted workflow drafts.

Best Practices

1. Pass User Requests Verbatim

# Good: Pass exactly as stated
await create_workflow(config, user_request="email me every morning")

# Avoid: Interpreting or parsing
await create_workflow(config, user_request="0 9 * * *")  # Don't parse cron

2. Let Workflow Assistant Handle Ambiguity

# User: "Create a workflow for my standup updates"

# Good: Pass to assistant to clarify
await create_workflow(config, "for my standup updates")
# Assistant asks: "When should this run? Daily at 9am before standup?"

# Avoid: Guessing
# Don't assume schedule or steps

3. Use from_conversation for Repeatable Tasks

# User just completed a multi-step task
# Suggest saving as workflow for future automation

if complex_task_completed:
    suggest_to_user("Would you like to save this as a reusable workflow?")

Next Steps

Build docs developers (and LLMs) love