Workflow Tool
The Workflow tool enables users to create, manage, and execute automated workflows with scheduled and event-based triggers.Overview
Workflow capabilities:- Search for integration triggers
- Create workflows from natural language
- Save conversations as workflows
- List and manage workflows
- Execute workflows on-demand
- Schedule recurring workflows
# Location: apps/api/app/agents/tools/workflow_tool.py
Core Concepts
Trigger Types
- Manual: User initiates execution
- Scheduled: Cron-based recurring execution
- Integration: Event-based (new email, calendar event, PR created, etc.)
Workflow Modes
new: Create workflow from descriptionfrom_conversation: Save current chat session as reusable workflow
Available Tools
Search Triggers
@tool
async def search_triggers(
config: RunnableConfig,
query: Annotated[str, "Describe when the workflow should trigger"],
limit: Annotated[int, "Max results"] = 15,
) -> dict:
"""
Search for integration triggers matching your description.
Examples:
- "when I receive an email"
- "when calendar event starts"
- "when someone sends a slack message"
- "when a github issue is created"
Returns matching triggers with config_fields embedded.
"""
user_id = get_user_id(config)
results = await TriggerSearchService.search(
query=query,
user_id=user_id,
limit=limit,
)
connected = [t for t in results if t.get("is_connected")]
not_connected = [t for t in results if not t.get("is_connected")]
return success_response({
"triggers": results,
"connected_count": len(connected),
"not_connected_count": len(not_connected),
})
{
"success": true,
"data": {
"triggers": [
{
"trigger_slug": "gmail_new_email",
"trigger_name": "New Email Received",
"description": "Triggers when a new email arrives in Gmail",
"is_connected": true,
"config_fields": [
{
"name": "label_ids",
"type": "array",
"required": false,
"description": "Filter by Gmail labels"
}
]
}
],
"connected_count": 3,
"not_connected_count": 2
}
}
Create Workflow
@tool
@with_rate_limiting("workflow_operations")
async def create_workflow(
config: RunnableConfig,
user_request: Annotated[str, "User's exact words describing workflow"],
mode: Annotated[
Literal["new", "from_conversation"],
"Mode: 'new' for description, 'from_conversation' to save session"
] = "new",
) -> dict:
"""
Create workflow by delegating to workflow assistant.
IMPORTANT: Pass user's request EXACTLY as stated. Do not interpret,
parse schedules, extract steps, or determine trigger types.
Examples:
- create_workflow("checks my email every morning", mode="new")
- create_workflow("notifies me on Slack when I get a GitHub PR", mode="new")
- create_workflow("save this as a workflow", mode="from_conversation")
- create_workflow("runs every Monday", mode="from_conversation")
"""
from app.services.workflow.workflow_subagent import WorkflowSubagentRunner
writer = get_stream_writer()
user_id = get_user_id(config)
thread_id = get_thread_id(config) or ""
user_time = get_user_time(config)
user_timezone = get_user_timezone(config)
# Build task based on mode
if mode == "new":
task_description = _build_new_workflow_task(user_request.strip())
elif mode == "from_conversation":
# Extract workflow from conversation context
context = await WorkflowContextExtractor.extract_from_thread(thread_id)
if not context or not context.workflow_steps:
return error_response(
"extraction_failed",
"Could not extract workflow steps from conversation."
)
task_description = _build_from_conversation_task(context, user_request)
# Execute workflow subagent
subagent_response = await WorkflowSubagentRunner.execute(
task=task_description,
user_id=user_id,
thread_id=thread_id,
user_time=user_time,
stream_writer=writer,
)
# Parse response
result = parse_subagent_response(subagent_response)
if result.mode == "finalized" and result.draft:
draft = result.draft
# Check if can create directly (simple workflows)
if can_create_directly(draft):
# Create workflow immediately
workflow = await WorkflowService.create_workflow(
request=CreateWorkflowRequest(
title=draft.title,
description=draft.description,
trigger_config=TriggerConfig(
type=draft.trigger_type,
enabled=True,
cron_expression=draft.cron_expression,
trigger_name=draft.trigger_slug,
timezone=user_timezone,
),
generate_immediately=True,
),
user_id=user_id,
user_timezone=user_timezone,
)
# Stream workflow_created event
writer({"workflow_created": workflow.model_dump()})
return success_response(
{"status": "created", "workflow_id": workflow.id},
f"Workflow '{workflow.title}' created and activated."
)
else:
# Stream draft for user confirmation
writer(draft.to_stream_payload())
return success_response(
{"status": "draft_sent"},
"Workflow draft sent for confirmation."
)
return success_response({"status": "completed"})
List Workflows
@tool
async def list_workflows(config: RunnableConfig) -> dict:
"""List all workflows for the current user."""
user_id = get_user_id(config)
workflows = await WorkflowService.list_workflows(user_id)
workflow_summaries = [
{
"id": w.id,
"title": w.title,
"description": w.description[:100] + "...",
"trigger_type": w.trigger_config.type,
"activated": w.activated,
"step_count": len(w.steps),
"total_executions": w.total_executions,
}
for w in workflows
]
writer = get_stream_writer()
writer({"workflow_list": {"workflows": workflow_summaries, "total": len(workflows)}})
return success_response({"workflows": workflow_summaries, "total": len(workflows)})
Execute Workflow
@tool
async def execute_workflow(
config: RunnableConfig,
workflow_id: Annotated[str, "The ID of the workflow to execute"],
) -> dict:
"""Execute a workflow immediately (run now)."""
user_id = get_user_id(config)
result = await WorkflowService.execute_workflow(
workflow_id,
WorkflowExecutionRequest(),
user_id
)
data = {
"workflow_id": workflow_id,
"execution_id": result.execution_id,
"message": result.message,
}
writer = get_stream_writer()
writer({"workflow_execution": {"action": "started", "execution": data}})
return success_response(data)
Workflow Subagent
Workflows are created by a specialized subagent:# Location: apps/api/app/services/workflow/workflow_subagent.py
class WorkflowSubagentRunner:
@staticmethod
async def execute(
task: str,
user_id: str,
thread_id: str,
user_time: Optional[datetime] = None,
stream_writer=None,
) -> str:
"""
Execute workflow subagent with specialized tools.
"""
llm = init_llm()
# Create subagent with workflow tools
subagent_graph = await SubAgentFactory.create_provider_subagent(
provider="workflow",
name="workflow_subagent",
llm=llm,
tool_space="workflow_delegated",
use_direct_tools=True,
)
# Execute
result = await subagent_graph.ainvoke(
{"messages": [HumanMessage(content=task)]},
config=build_subagent_config(...),
)
return extract_response(result)
Context Extraction
Forfrom_conversation mode:
# Location: apps/api/app/services/workflow/context_extractor.py
class WorkflowContextExtractor:
@staticmethod
async def extract_from_thread(thread_id: str) -> WorkflowContext:
"""
Extract workflow-relevant information from conversation.
Returns:
WorkflowContext with:
- suggested_title
- summary
- workflow_steps
- integrations_used
"""
# Fetch conversation messages
messages = await get_thread_messages(thread_id)
# Use LLM to extract workflow steps
extraction_prompt = f"""
Analyze this conversation and extract:
1. A concise workflow title
2. A brief summary
3. Ordered list of steps performed
4. Integrations/tools used
Conversation:
{format_messages(messages)}
"""
llm = init_llm(use_free=True)
response = await llm.ainvoke([HumanMessage(content=extraction_prompt)])
return parse_workflow_context(response.content)
Workflow Draft Format
# Location: apps/api/app/services/workflow/subagent_output.py
class FinalizedOutput(BaseModel):
title: str
description: str
trigger_type: Literal["manual", "scheduled", "integration"]
trigger_slug: Optional[str] = None
cron_expression: Optional[str] = None
prompt: str # Workflow steps in natural language
direct_create: bool = False
def to_stream_payload(self) -> dict:
"""Convert to frontend-compatible format."""
return {
"workflow_draft": {
"title": self.title,
"description": self.description,
"triggerType": self.trigger_type,
"triggerSlug": self.trigger_slug,
"cronExpression": self.cron_expression,
"prompt": self.prompt,
}
}
Direct Creation Logic
def can_create_directly(draft: FinalizedOutput) -> bool:
"""
Check if workflow can be created without user confirmation.
Returns True if:
- direct_create flag is True
- Trigger type is manual or scheduled (no config_fields needed)
Returns False if:
- Trigger type is integration (requires user to specify config)
"""
if not draft.direct_create:
return False
# Integration triggers ALWAYS need confirmation
if draft.trigger_type == "integration":
return False
return True
Usage Examples
Creating Simple Scheduled Workflow
# User: "Create a workflow that checks my email every morning"
# Agent delegates to workflow assistant
await create_workflow(
config,
user_request="checks my email every morning",
mode="new"
)
# Workflow assistant:
# 1. Understands "every morning" = daily at 9am
# 2. Creates scheduled trigger with cron: "0 9 * * *"
# 3. Generates workflow prompt: "Check Gmail inbox and summarize new emails"
# 4. Returns finalized draft
# 5. Direct creation succeeds (scheduled, no config needed)
# 6. Frontend receives workflow_created event
Creating Integration-Based Workflow
# User: "Notify me on Slack when I get a GitHub PR"
# Agent delegates
await create_workflow(
config,
user_request="notifies me on Slack when I get a GitHub PR",
mode="new"
)
# Workflow assistant:
# 1. Calls search_triggers("when GitHub PR created")
# 2. Finds "github_pull_request_opened" trigger
# 3. Generates draft with:
# - trigger_type: "integration"
# - trigger_slug: "github_pull_request_opened"
# - config_fields: ["repository_ids"]
# 4. Streams draft to frontend for confirmation
# 5. User selects which repos to monitor
# 6. Frontend calls workflow creation API with config
Saving Conversation as Workflow
# User has conversation about creating todos
User: "Add a task for the budget report"
Agent: *creates todo*
User: "Set it to high priority"
Agent: *updates priority*
User: "Add a reminder for next week"
Agent: *adds reminder*
# User: "Save this as a workflow that runs every month"
await create_workflow(
config,
user_request="runs every month",
mode="from_conversation"
)
# Workflow assistant:
# 1. Extracts context from thread
# 2. Identifies steps: create todo, set priority, add reminder
# 3. Asks: "When in the month should this run?"
# User: "First day of the month"
# 4. Creates scheduled workflow with cron: "0 9 1 * *"
The workflow assistant uses a specialized prompt that guides it to search for triggers, ask clarifying questions, and generate properly formatted workflow drafts.
Best Practices
1. Pass User Requests Verbatim
# Good: Pass exactly as stated
await create_workflow(config, user_request="email me every morning")
# Avoid: Interpreting or parsing
await create_workflow(config, user_request="0 9 * * *") # Don't parse cron
2. Let Workflow Assistant Handle Ambiguity
# User: "Create a workflow for my standup updates"
# Good: Pass to assistant to clarify
await create_workflow(config, "for my standup updates")
# Assistant asks: "When should this run? Daily at 9am before standup?"
# Avoid: Guessing
# Don't assume schedule or steps
3. Use from_conversation for Repeatable Tasks
# User just completed a multi-step task
# Suggest saving as workflow for future automation
if complex_task_completed:
suggest_to_user("Would you like to save this as a reusable workflow?")
Next Steps
- Code Execution - Run code in workflows
- Integration Tool - Connect triggers
- Memory Tool - Store workflow patterns
- LLM Prompts - Workflow assistant prompts