Skip to main content

Todo Tool

The Todo tool provides comprehensive task and project management capabilities with semantic search, bulk operations, and workflow integration.

Overview

Features:
  • Create, read, update, and delete todos
  • Project organization
  • Label-based categorization
  • Priority levels (high, medium, low, none)
  • Due date management with timezone support
  • Subtask tracking
  • Bulk operations
  • Semantic and keyword search
  • Statistics and analytics
  • Workflow automation integration
# Location: apps/api/app/agents/tools/todo_tool.py

Core Operations

Create Todo

@tool
@with_rate_limiting("todo_operations")
async def create_todo(
    config: RunnableConfig,
    title: Annotated[str, "Title of the todo item (required)"],
    description: Annotated[Optional[str], "Detailed description"] = None,
    labels: Annotated[Optional[List[str]], "List of labels/tags"] = None,
    due_date: Annotated[Optional[datetime], "When the task should be completed"] = None,
    due_date_timezone: Annotated[Optional[str], "Timezone (e.g., 'America/New_York')"] = None,
    priority: Annotated[Optional[str], "Priority: high, medium, low, or none"] = None,
    project_id: Annotated[Optional[str], "Project ID to assign the todo to"] = None,
) -> Dict[str, Any]:
    """
    Create a new todo task.
    
    Returns:
        {"todo": {...}, "error": None}
    """
    user_id = get_user_id_from_config(config)
    
    # Convert priority string to enum
    priority_enum = Priority(priority) if priority else Priority.NONE
    
    todo_data = TodoModel(
        title=title,
        description=description,
        labels=labels or [],
        due_date=due_date,
        due_date_timezone=due_date_timezone,
        priority=priority_enum,
        project_id=project_id,
    )
    
    result = await create_todo_service(todo_data, user_id)
    todo_dict = result.model_dump(mode="json")
    
    # Stream to frontend
    writer = get_stream_writer()
    writer({
        "todo_data": {
            "todos": [todo_dict],
            "action": "create",
            "message": f"Created task: {title}",
        }
    })
    
    return {"todo": todo_dict, "error": None}
Parameters:
  • title (str, required): Task title
  • description (optional str): Detailed description
  • labels (optional list[str]): Tags for categorization
  • due_date (optional datetime): Completion deadline
  • due_date_timezone (optional str): Timezone for due date
  • priority (optional str): “high”, “medium”, “low”, “none”
  • project_id (optional str): Assign to project
Example:
await create_todo(
    config,
    title="Finish quarterly report",
    description="Complete Q4 financial analysis",
    labels=["work", "finance"],
    due_date=datetime(2024, 1, 31, 17, 0, 0),
    due_date_timezone="America/New_York",
    priority="high",
)

List Todos

@tool
async def list_todos(
    config: RunnableConfig,
    project_id: Annotated[Optional[str], "Filter by project"] = None,
    completed: Annotated[Optional[bool], "Filter by completion status"] = None,
    priority: Annotated[Optional[str], "Filter by priority"] = None,
    has_due_date: Annotated[Optional[bool], "Filter todos with/without due dates"] = None,
    overdue: Annotated[Optional[bool], "Filter overdue uncompleted todos"] = None,
    skip: Annotated[int, "Pagination offset"] = 0,
    limit: Annotated[int, "Max results (max 100)"] = 50,
) -> Dict[str, Any]:
    """
    List todos with filtering and pagination.
    
    Returns:
        {"todos": [...], "count": N, "error": None}
    """
    user_id = get_user_id_from_config(config)
    
    # Enforce limits
    if limit > 100:
        limit = 100
    
    results = await get_all_todos_service(
        user_id,
        project_id=project_id,
        completed=completed,
        priority=priority,
        has_due_date=has_due_date,
        overdue=overdue,
        skip=skip,
        limit=limit,
    )
    
    todos_data = [todo.model_dump(mode="json") for todo in results]
    
    # Stream to frontend
    writer = get_stream_writer()
    writer({
        "todo_data": {
            "todos": todos_data,
            "action": "list",
            "message": f"Found {len(results)} task{'s' if len(results) != 1 else ''}",
        }
    })
    
    return {"todos": todos_data, "count": len(results), "error": None}

Update Todo

@tool
async def update_todo(
    config: RunnableConfig,
    todo_id: Annotated[str, "ID of the todo to update (required)"],
    title: Annotated[Optional[str], "New title"] = None,
    description: Annotated[Optional[str], "New description"] = None,
    labels: Annotated[Optional[List[str]], "New labels"] = None,
    due_date: Annotated[Optional[datetime], "New due date"] = None,
    due_date_timezone: Annotated[Optional[str], "New timezone"] = None,
    priority: Annotated[Optional[str], "New priority"] = None,
    project_id: Annotated[Optional[str], "Move to different project"] = None,
    completed: Annotated[Optional[bool], "Mark as complete/incomplete"] = None,
) -> Dict[str, Any]:
    """
    Update an existing todo. Only specified fields are updated.
    """
    user_id = get_user_id_from_config(config)
    
    update_request = TodoUpdateRequest(
        title=title,
        description=description,
        labels=labels,
        due_date=due_date,
        due_date_timezone=due_date_timezone,
        priority=Priority(priority) if priority is not None else None,
        project_id=project_id,
        completed=completed,
    )
    
    result = await update_todo_service(todo_id, update_request, user_id)
    todo_dict = result.model_dump(mode="json")
    
    writer = get_stream_writer()
    writer({
        "todo_data": {
            "todos": [todo_dict],
            "action": "update",
            "message": f"Updated task: {result.title}",
        }
    })
    
    return {"todo": todo_dict, "error": None}

Search Operations

@tool
async def search_todos(
    config: RunnableConfig,
    query: Annotated[str, "Search query (required)"],
) -> Dict[str, Any]:
    """
    Search todos by keyword matching in title and description.
    """
    user_id = get_user_id_from_config(config)
    
    results = await search_todos_service(query, user_id)
    todos_data = [todo.model_dump(mode="json") for todo in results]
    
    writer = get_stream_writer()
    writer({
        "todo_data": {
            "todos": todos_data,
            "action": "search",
            "message": f"Found {len(results)} task{'s' if len(results) != 1 else ''} matching '{query}'",
        }
    })
    
    return {"todos": todos_data, "count": len(results), "error": None}
@tool
async def semantic_search_todos(
    config: RunnableConfig,
    query: Annotated[str, "Natural language search query (required)"],
    limit: Annotated[int, "Max results"] = 20,
    project_id: Annotated[Optional[str], "Filter by project"] = None,
    completed: Annotated[Optional[bool], "Filter by completion"] = None,
    priority: Annotated[Optional[str], "Filter by priority"] = None,
) -> Dict[str, Any]:
    """
    Search todos using AI-powered semantic search.
    
    Example queries:
    - "Important finance tasks"
    - "Things related to the marketing campaign"
    - "Urgent items for this week"
    """
    user_id = get_user_id_from_config(config)
    
    if limit > 50:
        limit = 50
    
    results = await semantic_search_todos_service(
        query=query,
        user_id=user_id,
        limit=limit,
        project_id=project_id,
        completed=completed,
        priority=priority,
    )
    
    todos_data = [todo.model_dump(mode="json") for todo in results]
    
    writer = get_stream_writer()
    writer({
        "todo_data": {
            "todos": todos_data,
            "action": "search",
            "message": f"Found {len(results)} task{'s' if len(results) != 1 else ''} using AI search",
        }
    })
    
    return {"todos": todos_data, "count": len(results), "search_type": "semantic", "error": None}

Project Management

Create Project

@tool
async def create_project(
    config: RunnableConfig,
    name: Annotated[str, "Name of the project (required)"],
    description: Annotated[Optional[str], "Project description"] = None,
    color: Annotated[Optional[str], "Hex color code (e.g., #FF5733)"] = None,
) -> Dict[str, Any]:
    """
    Create a new project for organizing todos.
    """
    user_id = get_user_id_from_config(config)
    
    project_data = ProjectCreate(
        name=name,
        description=description,
        color=color,
    )
    
    result = await create_project_service(project_data, user_id)
    project_dict = result.model_dump(mode="json")
    
    writer = get_stream_writer()
    writer({
        "todo_data": {
            "projects": [project_dict],
            "action": "create",
            "message": f"Created project: {name}",
        }
    })
    
    return {"project": project_dict, "error": None}

List Projects

@tool
async def list_projects(config: RunnableConfig) -> Dict[str, Any]:
    """
    List all projects for the user.
    """
    user_id = get_user_id_from_config(config)
    
    results = await get_all_projects_service(user_id)
    projects_data = [project.model_dump(mode="json") for project in results]
    
    writer = get_stream_writer()
    writer({
        "todo_data": {
            "projects": projects_data,
            "action": "list",
            "message": f"You have {len(results)} project{'s' if len(results) != 1 else ''}",
        }
    })
    
    return {"projects": projects_data, "count": len(results), "error": None}

Bulk Operations

Bulk Complete

@tool
async def bulk_complete_todos(
    config: RunnableConfig,
    todo_ids: Annotated[List[str], "List of todo IDs to mark as complete (required)"],
) -> Dict[str, Any]:
    """
    Mark multiple todos as complete in one operation.
    """
    user_id = get_user_id_from_config(config)
    
    results = await bulk_complete_service(todo_ids, user_id)
    todos_data = [todo.model_dump(mode="json") for todo in results]
    
    writer = get_stream_writer()
    writer({
        "todo_data": {
            "todos": todos_data,
            "action": "update",
            "message": f"Completed {len(results)} task{'s' if len(results) != 1 else ''}",
        }
    })
    
    return {"todos": todos_data, "count": len(results), "error": None}

Bulk Move

@tool
async def bulk_move_todos(
    config: RunnableConfig,
    todo_ids: Annotated[List[str], "List of todo IDs to move (required)"],
    project_id: Annotated[str, "Target project ID (required)"],
) -> Dict[str, Any]:
    """
    Move multiple todos to a different project.
    """
    user_id = get_user_id_from_config(config)
    
    results = await bulk_move_service(todo_ids, project_id, user_id)
    todos_data = [todo.model_dump(mode="json") for todo in results]
    
    writer = get_stream_writer()
    writer({
        "todo_data": {
            "todos": todos_data,
            "action": "update",
            "message": f"Moved {len(results)} task{'s' if len(results) != 1 else ''} to project",
        }
    })
    
    return {"todos": todos_data, "count": len(results), "error": None}

Bulk Delete

@tool
async def bulk_delete_todos(
    config: RunnableConfig,
    todo_ids: Annotated[List[str], "List of todo IDs to delete (required)"],
) -> Dict[str, Any]:
    """
    Delete multiple todos permanently.
    """
    user_id = get_user_id_from_config(config)
    
    await bulk_delete_service(todo_ids, user_id)
    
    writer = get_stream_writer()
    writer({
        "todo_data": {
            "action": "delete",
            "message": f"Deleted {len(todo_ids)} task{'s' if len(todo_ids) != 1 else ''}",
        }
    })
    
    return {"success": True, "error": None}

Subtask Management

Add Subtask

@tool
async def add_subtask(
    config: RunnableConfig,
    todo_id: Annotated[str, "Parent todo ID (required)"],
    title: Annotated[str, "Subtask title (required)"],
) -> Dict[str, Any]:
    """
    Add a subtask to an existing todo.
    """
    user_id = get_user_id_from_config(config)
    
    todo = await get_todo_service(todo_id, user_id)
    
    new_subtask = SubTask(
        id=str(uuid.uuid4()),
        title=title,
        completed=False
    )
    
    update_data = TodoUpdateRequest(
        subtasks=todo.subtasks + [new_subtask]
    )
    
    result = await update_todo_service(todo_id, update_data, user_id)
    
    return {"todo": result.model_dump(mode="json"), "error": None}

Statistics

Get Todo Summary

@tool
async def get_todos_summary(config: RunnableConfig) -> Dict[str, Any]:
    """
    Get comprehensive productivity snapshot.
    
    Returns:
    - Today's tasks
    - Overdue tasks
    - Upcoming week tasks
    - High priority tasks
    - Recently completed
    - Next deadline
    - Statistics
    - Per-project breakdown
    """
    user_id = get_user_id_from_config(config)
    
    # Calculate date ranges
    now = datetime.now(timezone.utc)
    today_start = datetime.combine(datetime.today(), time.min)
    today_end = datetime.combine(datetime.today(), time.max)
    week_end = now + timedelta(days=7)
    yesterday = now - timedelta(days=1)
    
    # Fetch data in parallel
    today_todos, upcoming_todos, all_todos, all_projects = await asyncio.gather(
        get_todos_by_date_range(user_id, today_start, today_end),
        get_todos_by_date_range(user_id, now, week_end),
        get_all_todos_service(user_id, limit=100),
        get_all_projects_service(user_id),
    )
    
    # Filter and categorize
    overdue = [t for t in all_todos if t.due_date and not t.completed and t.due_date < now]
    high_priority = [t for t in all_todos if t.priority == Priority.HIGH and not t.completed]
    recently_completed = [t for t in all_todos if t.completed and t.completed_at and t.completed_at > yesterday]
    
    # Calculate stats
    total = len(all_todos)
    completed = len([t for t in all_todos if t.completed])
    completion_rate = round((completed / total * 100), 1) if total > 0 else 0
    
    summary = {
        "today": serialize_todos(today_todos),
        "overdue": serialize_todos(overdue),
        "upcoming_week": serialize_todos(upcoming_todos),
        "high_priority": serialize_todos(high_priority),
        "recently_completed": {"count": len(recently_completed), "todos": [...]},
        "next_deadline": next_deadline,
        "stats": {
            "total": total,
            "completed": completed,
            "pending": total - completed,
            "completed_today": len(recently_completed),
            "overdue": len(overdue),
            "completion_rate": completion_rate,
        },
        "by_project": project_breakdown,
    }
    
    return {"summary": summary, "error": None}

Workflow Integration

Todos can trigger workflows:
# Example: AI-generated workflow for complex todos
todo_dict = result.model_dump(mode="json")

if todo_dict.get("workflow"):
    response_data["workflow"] = todo_dict["workflow"]
    response_data["message"] = "Created task with workflow plan"

Streaming Updates

All todo operations stream to the frontend:
writer = get_stream_writer()
writer({
    "todo_data": {
        "todos": todos_data,
        "action": "create",  # or "update", "delete", "list", "search"
        "message": "Created task: ...",
    }
})
The get_stream_writer() function returns a writer that sends data to the frontend in real-time, enabling instant UI updates.

Best Practices

1. Use Semantic Search for Intent

# User: "Show me finance stuff"
# Use semantic search, not keyword search
await semantic_search_todos(config, query="finance stuff")
# Matches: "Q4 budget", "Tax preparation", "Invoice review"

2. Batch Operations for Efficiency

# Good: Bulk complete
await bulk_complete_todos(config, todo_ids=["id1", "id2", "id3"])

# Avoid: Loop with individual updates
for todo_id in ["id1", "id2", "id3"]:
    await update_todo(config, todo_id, completed=True)

3. Always Include Filters When Listing

# Good: Filtered list
await list_todos(config, completed=False, priority="high", limit=20)

# Avoid: Unfiltered large lists
await list_todos(config, limit=100)  # May return too much data

Next Steps

Build docs developers (and LLMs) love