The TaskQueue manages task lifecycle with a priority-based min-heap and enforces valid state transitions.
TaskQueue Class
Priority queue implementation where lower priority numbers are dequeued first. Tasks with equal priority are ordered by creation time (FIFO).
Constructor
Creates an empty task queue with an internal min-heap.
Methods
enqueue()
Adds a task to the queue and registers it in the lifecycle tracker.
enqueue(task: Task): void
dequeue()
Removes and returns the highest priority task (lowest priority number).
dequeue(): Task | undefined
The next task to execute, or undefined if queue is empty
peek()
Returns the highest priority task without removing it.
updateStatus()
Updates a task’s status with state transition validation.
updateStatus(taskId: string, newStatus: TaskStatus): void
New status: pending, assigned, running, complete, failed, or cancelled
Throws an error if the transition is invalid according to the state machine.
getTask()
Retrieves a task by ID.
getTask(taskId: string): Task | undefined
Query Methods
isEmpty(): boolean
size(): number
getPendingCount(): number
getRunningCount(): number
getCompletedCount(): number
getFailedCount(): number
getAllTasks(): Task[]
State Machine
The queue enforces these valid state transitions:
const VALID_TRANSITIONS: Record<TaskStatus, TaskStatus[]> = {
pending: ["assigned", "cancelled"],
assigned: ["running", "cancelled"],
running: ["complete", "failed", "cancelled"],
complete: [],
failed: ["pending"], // Allows retry
cancelled: [],
};
State Transition Rules
- pending → assigned: Worker picks up task
- assigned → running: Worker starts execution
- running → complete: Task succeeds
- running → failed: Task fails (can be retried)
- failed → pending: Retry failed task
- Any → cancelled: Task is cancelled
Failed tasks can transition back to pending for retry, but completed/cancelled tasks are terminal states.
Callbacks
onStatusChange()
Registers a callback that fires on every status change.
onStatusChange(
callback: (task: Task, oldStatus: TaskStatus, newStatus: TaskStatus) => void
): void
Priority Ordering
Tasks are ordered by:
- Priority number (1 = highest, 10 = lowest)
- Creation time (older tasks first if priority is equal)
Example:
// These tasks would be dequeued in this order:
queue.enqueue({ priority: 1, createdAt: 1000, ... }); // First
queue.enqueue({ priority: 2, createdAt: 900, ... }); // Third
queue.enqueue({ priority: 1, createdAt: 1100, ... }); // Second
queue.enqueue({ priority: 5, createdAt: 500, ... }); // Fourth
Usage Example
import { TaskQueue } from "@longshot/orchestrator";
import type { Task } from "@longshot/core";
const queue = new TaskQueue();
// Register status change callback
queue.onStatusChange((task, oldStatus, newStatus) => {
console.log(`Task ${task.id}: ${oldStatus} → ${newStatus}`);
});
// Add tasks
queue.enqueue({
id: "task-1",
description: "Implement authentication",
scope: ["src/auth"],
acceptance: "Tests pass",
branch: "worker/task-1",
status: "pending",
priority: 1,
createdAt: Date.now(),
});
queue.enqueue({
id: "task-2",
description: "Add logging",
scope: ["src/logger.ts"],
acceptance: "Logs work",
branch: "worker/task-2",
status: "pending",
priority: 2,
createdAt: Date.now(),
});
// Process queue
while (!queue.isEmpty()) {
const task = queue.dequeue();
if (!task) break;
console.log(`Processing: ${task.description}`);
queue.updateStatus(task.id, "assigned");
queue.updateStatus(task.id, "running");
// ... execute task ...
queue.updateStatus(task.id, "complete");
}
console.log(`Completed: ${queue.getCompletedCount()} tasks`);
Error Handling
Invalid Transitions
try {
queue.updateStatus(taskId, "running");
queue.updateStatus(taskId, "pending"); // ❌ Invalid: running can't go to pending
} catch (error) {
console.error("Invalid state transition:", error.message);
}
Retry Failed Tasks
const task = queue.getTask(taskId);
if (task && task.status === "failed") {
// Failed tasks can be retried
queue.updateStatus(taskId, "pending");
task.retryCount = (task.retryCount || 0) + 1;
}