Todo Tool
The Todo tool provides comprehensive task and project management capabilities with semantic search, bulk operations, and workflow integration.Overview
Features:- Create, read, update, and delete todos
- Project organization
- Label-based categorization
- Priority levels (high, medium, low, none)
- Due date management with timezone support
- Subtask tracking
- Bulk operations
- Semantic and keyword search
- Statistics and analytics
- Workflow automation integration
# Location: apps/api/app/agents/tools/todo_tool.py
Core Operations
Create Todo
@tool
@with_rate_limiting("todo_operations")
async def create_todo(
config: RunnableConfig,
title: Annotated[str, "Title of the todo item (required)"],
description: Annotated[Optional[str], "Detailed description"] = None,
labels: Annotated[Optional[List[str]], "List of labels/tags"] = None,
due_date: Annotated[Optional[datetime], "When the task should be completed"] = None,
due_date_timezone: Annotated[Optional[str], "Timezone (e.g., 'America/New_York')"] = None,
priority: Annotated[Optional[str], "Priority: high, medium, low, or none"] = None,
project_id: Annotated[Optional[str], "Project ID to assign the todo to"] = None,
) -> Dict[str, Any]:
"""
Create a new todo task.
Returns:
{"todo": {...}, "error": None}
"""
user_id = get_user_id_from_config(config)
# Convert priority string to enum
priority_enum = Priority(priority) if priority else Priority.NONE
todo_data = TodoModel(
title=title,
description=description,
labels=labels or [],
due_date=due_date,
due_date_timezone=due_date_timezone,
priority=priority_enum,
project_id=project_id,
)
result = await create_todo_service(todo_data, user_id)
todo_dict = result.model_dump(mode="json")
# Stream to frontend
writer = get_stream_writer()
writer({
"todo_data": {
"todos": [todo_dict],
"action": "create",
"message": f"Created task: {title}",
}
})
return {"todo": todo_dict, "error": None}
title(str, required): Task titledescription(optional str): Detailed descriptionlabels(optional list[str]): Tags for categorizationdue_date(optional datetime): Completion deadlinedue_date_timezone(optional str): Timezone for due datepriority(optional str): “high”, “medium”, “low”, “none”project_id(optional str): Assign to project
await create_todo(
config,
title="Finish quarterly report",
description="Complete Q4 financial analysis",
labels=["work", "finance"],
due_date=datetime(2024, 1, 31, 17, 0, 0),
due_date_timezone="America/New_York",
priority="high",
)
List Todos
@tool
async def list_todos(
config: RunnableConfig,
project_id: Annotated[Optional[str], "Filter by project"] = None,
completed: Annotated[Optional[bool], "Filter by completion status"] = None,
priority: Annotated[Optional[str], "Filter by priority"] = None,
has_due_date: Annotated[Optional[bool], "Filter todos with/without due dates"] = None,
overdue: Annotated[Optional[bool], "Filter overdue uncompleted todos"] = None,
skip: Annotated[int, "Pagination offset"] = 0,
limit: Annotated[int, "Max results (max 100)"] = 50,
) -> Dict[str, Any]:
"""
List todos with filtering and pagination.
Returns:
{"todos": [...], "count": N, "error": None}
"""
user_id = get_user_id_from_config(config)
# Enforce limits
if limit > 100:
limit = 100
results = await get_all_todos_service(
user_id,
project_id=project_id,
completed=completed,
priority=priority,
has_due_date=has_due_date,
overdue=overdue,
skip=skip,
limit=limit,
)
todos_data = [todo.model_dump(mode="json") for todo in results]
# Stream to frontend
writer = get_stream_writer()
writer({
"todo_data": {
"todos": todos_data,
"action": "list",
"message": f"Found {len(results)} task{'s' if len(results) != 1 else ''}",
}
})
return {"todos": todos_data, "count": len(results), "error": None}
Update Todo
@tool
async def update_todo(
config: RunnableConfig,
todo_id: Annotated[str, "ID of the todo to update (required)"],
title: Annotated[Optional[str], "New title"] = None,
description: Annotated[Optional[str], "New description"] = None,
labels: Annotated[Optional[List[str]], "New labels"] = None,
due_date: Annotated[Optional[datetime], "New due date"] = None,
due_date_timezone: Annotated[Optional[str], "New timezone"] = None,
priority: Annotated[Optional[str], "New priority"] = None,
project_id: Annotated[Optional[str], "Move to different project"] = None,
completed: Annotated[Optional[bool], "Mark as complete/incomplete"] = None,
) -> Dict[str, Any]:
"""
Update an existing todo. Only specified fields are updated.
"""
user_id = get_user_id_from_config(config)
update_request = TodoUpdateRequest(
title=title,
description=description,
labels=labels,
due_date=due_date,
due_date_timezone=due_date_timezone,
priority=Priority(priority) if priority is not None else None,
project_id=project_id,
completed=completed,
)
result = await update_todo_service(todo_id, update_request, user_id)
todo_dict = result.model_dump(mode="json")
writer = get_stream_writer()
writer({
"todo_data": {
"todos": [todo_dict],
"action": "update",
"message": f"Updated task: {result.title}",
}
})
return {"todo": todo_dict, "error": None}
Search Operations
Keyword Search
@tool
async def search_todos(
config: RunnableConfig,
query: Annotated[str, "Search query (required)"],
) -> Dict[str, Any]:
"""
Search todos by keyword matching in title and description.
"""
user_id = get_user_id_from_config(config)
results = await search_todos_service(query, user_id)
todos_data = [todo.model_dump(mode="json") for todo in results]
writer = get_stream_writer()
writer({
"todo_data": {
"todos": todos_data,
"action": "search",
"message": f"Found {len(results)} task{'s' if len(results) != 1 else ''} matching '{query}'",
}
})
return {"todos": todos_data, "count": len(results), "error": None}
Semantic Search
@tool
async def semantic_search_todos(
config: RunnableConfig,
query: Annotated[str, "Natural language search query (required)"],
limit: Annotated[int, "Max results"] = 20,
project_id: Annotated[Optional[str], "Filter by project"] = None,
completed: Annotated[Optional[bool], "Filter by completion"] = None,
priority: Annotated[Optional[str], "Filter by priority"] = None,
) -> Dict[str, Any]:
"""
Search todos using AI-powered semantic search.
Example queries:
- "Important finance tasks"
- "Things related to the marketing campaign"
- "Urgent items for this week"
"""
user_id = get_user_id_from_config(config)
if limit > 50:
limit = 50
results = await semantic_search_todos_service(
query=query,
user_id=user_id,
limit=limit,
project_id=project_id,
completed=completed,
priority=priority,
)
todos_data = [todo.model_dump(mode="json") for todo in results]
writer = get_stream_writer()
writer({
"todo_data": {
"todos": todos_data,
"action": "search",
"message": f"Found {len(results)} task{'s' if len(results) != 1 else ''} using AI search",
}
})
return {"todos": todos_data, "count": len(results), "search_type": "semantic", "error": None}
Project Management
Create Project
@tool
async def create_project(
config: RunnableConfig,
name: Annotated[str, "Name of the project (required)"],
description: Annotated[Optional[str], "Project description"] = None,
color: Annotated[Optional[str], "Hex color code (e.g., #FF5733)"] = None,
) -> Dict[str, Any]:
"""
Create a new project for organizing todos.
"""
user_id = get_user_id_from_config(config)
project_data = ProjectCreate(
name=name,
description=description,
color=color,
)
result = await create_project_service(project_data, user_id)
project_dict = result.model_dump(mode="json")
writer = get_stream_writer()
writer({
"todo_data": {
"projects": [project_dict],
"action": "create",
"message": f"Created project: {name}",
}
})
return {"project": project_dict, "error": None}
List Projects
@tool
async def list_projects(config: RunnableConfig) -> Dict[str, Any]:
"""
List all projects for the user.
"""
user_id = get_user_id_from_config(config)
results = await get_all_projects_service(user_id)
projects_data = [project.model_dump(mode="json") for project in results]
writer = get_stream_writer()
writer({
"todo_data": {
"projects": projects_data,
"action": "list",
"message": f"You have {len(results)} project{'s' if len(results) != 1 else ''}",
}
})
return {"projects": projects_data, "count": len(results), "error": None}
Bulk Operations
Bulk Complete
@tool
async def bulk_complete_todos(
config: RunnableConfig,
todo_ids: Annotated[List[str], "List of todo IDs to mark as complete (required)"],
) -> Dict[str, Any]:
"""
Mark multiple todos as complete in one operation.
"""
user_id = get_user_id_from_config(config)
results = await bulk_complete_service(todo_ids, user_id)
todos_data = [todo.model_dump(mode="json") for todo in results]
writer = get_stream_writer()
writer({
"todo_data": {
"todos": todos_data,
"action": "update",
"message": f"Completed {len(results)} task{'s' if len(results) != 1 else ''}",
}
})
return {"todos": todos_data, "count": len(results), "error": None}
Bulk Move
@tool
async def bulk_move_todos(
config: RunnableConfig,
todo_ids: Annotated[List[str], "List of todo IDs to move (required)"],
project_id: Annotated[str, "Target project ID (required)"],
) -> Dict[str, Any]:
"""
Move multiple todos to a different project.
"""
user_id = get_user_id_from_config(config)
results = await bulk_move_service(todo_ids, project_id, user_id)
todos_data = [todo.model_dump(mode="json") for todo in results]
writer = get_stream_writer()
writer({
"todo_data": {
"todos": todos_data,
"action": "update",
"message": f"Moved {len(results)} task{'s' if len(results) != 1 else ''} to project",
}
})
return {"todos": todos_data, "count": len(results), "error": None}
Bulk Delete
@tool
async def bulk_delete_todos(
config: RunnableConfig,
todo_ids: Annotated[List[str], "List of todo IDs to delete (required)"],
) -> Dict[str, Any]:
"""
Delete multiple todos permanently.
"""
user_id = get_user_id_from_config(config)
await bulk_delete_service(todo_ids, user_id)
writer = get_stream_writer()
writer({
"todo_data": {
"action": "delete",
"message": f"Deleted {len(todo_ids)} task{'s' if len(todo_ids) != 1 else ''}",
}
})
return {"success": True, "error": None}
Subtask Management
Add Subtask
@tool
async def add_subtask(
config: RunnableConfig,
todo_id: Annotated[str, "Parent todo ID (required)"],
title: Annotated[str, "Subtask title (required)"],
) -> Dict[str, Any]:
"""
Add a subtask to an existing todo.
"""
user_id = get_user_id_from_config(config)
todo = await get_todo_service(todo_id, user_id)
new_subtask = SubTask(
id=str(uuid.uuid4()),
title=title,
completed=False
)
update_data = TodoUpdateRequest(
subtasks=todo.subtasks + [new_subtask]
)
result = await update_todo_service(todo_id, update_data, user_id)
return {"todo": result.model_dump(mode="json"), "error": None}
Statistics
Get Todo Summary
@tool
async def get_todos_summary(config: RunnableConfig) -> Dict[str, Any]:
"""
Get comprehensive productivity snapshot.
Returns:
- Today's tasks
- Overdue tasks
- Upcoming week tasks
- High priority tasks
- Recently completed
- Next deadline
- Statistics
- Per-project breakdown
"""
user_id = get_user_id_from_config(config)
# Calculate date ranges
now = datetime.now(timezone.utc)
today_start = datetime.combine(datetime.today(), time.min)
today_end = datetime.combine(datetime.today(), time.max)
week_end = now + timedelta(days=7)
yesterday = now - timedelta(days=1)
# Fetch data in parallel
today_todos, upcoming_todos, all_todos, all_projects = await asyncio.gather(
get_todos_by_date_range(user_id, today_start, today_end),
get_todos_by_date_range(user_id, now, week_end),
get_all_todos_service(user_id, limit=100),
get_all_projects_service(user_id),
)
# Filter and categorize
overdue = [t for t in all_todos if t.due_date and not t.completed and t.due_date < now]
high_priority = [t for t in all_todos if t.priority == Priority.HIGH and not t.completed]
recently_completed = [t for t in all_todos if t.completed and t.completed_at and t.completed_at > yesterday]
# Calculate stats
total = len(all_todos)
completed = len([t for t in all_todos if t.completed])
completion_rate = round((completed / total * 100), 1) if total > 0 else 0
summary = {
"today": serialize_todos(today_todos),
"overdue": serialize_todos(overdue),
"upcoming_week": serialize_todos(upcoming_todos),
"high_priority": serialize_todos(high_priority),
"recently_completed": {"count": len(recently_completed), "todos": [...]},
"next_deadline": next_deadline,
"stats": {
"total": total,
"completed": completed,
"pending": total - completed,
"completed_today": len(recently_completed),
"overdue": len(overdue),
"completion_rate": completion_rate,
},
"by_project": project_breakdown,
}
return {"summary": summary, "error": None}
Workflow Integration
Todos can trigger workflows:# Example: AI-generated workflow for complex todos
todo_dict = result.model_dump(mode="json")
if todo_dict.get("workflow"):
response_data["workflow"] = todo_dict["workflow"]
response_data["message"] = "Created task with workflow plan"
Streaming Updates
All todo operations stream to the frontend:writer = get_stream_writer()
writer({
"todo_data": {
"todos": todos_data,
"action": "create", # or "update", "delete", "list", "search"
"message": "Created task: ...",
}
})
The
get_stream_writer() function returns a writer that sends data to the frontend in real-time, enabling instant UI updates.Best Practices
1. Use Semantic Search for Intent
# User: "Show me finance stuff"
# Use semantic search, not keyword search
await semantic_search_todos(config, query="finance stuff")
# Matches: "Q4 budget", "Tax preparation", "Invoice review"
2. Batch Operations for Efficiency
# Good: Bulk complete
await bulk_complete_todos(config, todo_ids=["id1", "id2", "id3"])
# Avoid: Loop with individual updates
for todo_id in ["id1", "id2", "id3"]:
await update_todo(config, todo_id, completed=True)
3. Always Include Filters When Listing
# Good: Filtered list
await list_todos(config, completed=False, priority="high", limit=20)
# Avoid: Unfiltered large lists
await list_todos(config, limit=100) # May return too much data
Next Steps
- Calendar Tool - Event scheduling
- Memory Tool - Memory storage
- Workflow Tool - Automation
- Code Execution - Run code