Skip to main content

Overview

OpenCouncil’s task workflow system offloads resource-intensive operations—such as media transcription, AI-powered summarization, and speaker analysis—from the main Next.js application to a dedicated backend task server. This architecture ensures the web application remains responsive while complex tasks execute efficiently in the background.

Architecture components

The task system consists of three primary components working in concert:

Next.js application

User-facing web application that initiates tasks and receives status updates

Task server

Node.js backend server responsible for executing long-running tasks

PostgreSQL database

Stores task status, request payloads, and results for persistence and reprocessing

System flow

Task lifecycle

Every task progresses through a standardized lifecycle from initiation to completion.
1

Initiation

A user action or cron job triggers startTask(), which creates a TaskStatus record with status pending and sends a POST request to the task server with a callback URL.
2

Execution

The task server adds the task to its queue. A worker picks up the task and begins execution, optionally sending progress updates back to the callback URL with status processing.
3

Completion

Upon completion, the worker sends a final update:
  • Success: Status success with result data
  • Error: Status error with error message
4

Update handling

The Next.js callback endpoint receives the update, calls handleTaskUpdate() to update the database, and invokes a task-specific result handler if successful.

Core task types

OpenCouncil supports multiple task types organized by their role in the processing pipeline.
These tasks are essential for meeting processing and enforce idempotency:
  • transcribe: Convert audio/video to text with speaker identification
  • fixTranscript: AI-powered transcript correction and refinement
  • humanReview: Manual review and approval checkpoint
  • transcriptSent: Email transcript to administrative body
  • summarize: Generate AI summaries for meeting segments and subjects
These tasks can run multiple times and don’t enforce idempotency:
  • processAgenda: Extract meeting subjects from agenda documents
  • generatePodcastSpec: Create podcast episode specifications
  • generateHighlight: Generate video highlights from segments
  • splitMediaFile: Split media into smaller chunks
  • generateVoiceprint: Create voice embeddings for speaker identification
  • pollDecisions: Fetch decisions from Diavgeia transparency portal

Task handler registry

The system uses a centralized registry pattern to maintain clean, scalable code:
src/lib/tasks/registry.ts
export type TaskResultHandler = (
  taskId: string, 
  result: any, 
  options?: { force?: boolean }
) => Promise<void>;

export const taskHandlers: Record<string, TaskResultHandler> = {
  transcribe: handleTranscribeResult,
  summarize: handleSummarizeResult,
  generatePodcastSpec: handleGeneratePodcastSpecResult,
  // ... other handlers
};
All handlers follow a consistent signature, making the system predictable and maintainable.

Task reprocessing

A powerful feature of the architecture is the ability to reprocess task results without re-running the expensive backend operation.
How it works: The complete responseBody from the task server is stored in the TaskStatus table, enabling reprocessing from saved data.

Basic reprocessing

The processTaskResponse function uses the handler registry to reprocess stored results:
src/lib/tasks/tasks.ts
export const processTaskResponse = async (
  taskType: string, 
  taskId: string, 
  options?: { force?: boolean }
) => {
  const task = await prisma.taskStatus.findUnique({ where: { id: taskId } });
  if (!task) throw new Error(`Task ${taskId} not found`);

  const handler = taskHandlers[taskType];
  if (!handler) throw new Error(`Unsupported task type: ${taskType}`);

  await handler(taskId, JSON.parse(task.responseBody!), options);
}

Force mode for data cleanup

When reprocessing with force: true, the system:
  1. Deletes all SpeakerTag records (cascades to SpeakerSegment and Utterance)
  2. Recreates everything from stored response
This prevents duplicate data and ensures a clean slate.

UI integration

The TaskStatus component provides a user-friendly reprocessing interface:
  • All tasks: Dialog with explanation, loading states, and feedback messages
  • Transcribe tasks: Two action buttons
    • “Reprocess Only” - Attempts without cleanup (may create duplicates)
    • “Delete & Reprocess” - Cleans up first (recommended)
  • Other tasks: Single “Reprocess from Database” button (safe with upsert)

Automated task initiation

Some tasks run automatically on a schedule via cron-triggered API routes.
The pollDecisions task automatically fetches decisions from Greece’s transparency portal:Endpoint: GET /api/cron/poll-decisionsAuthentication: Authorization: Bearer <CRON_SECRET>Progressive backoff schedule:
Days since first pollMinimum interval
0–7 (week 1)Every cron run
7–14 (week 2)2 days
14–21 (week 3)3 days
21+ (week 4+)7 days
90+Stops
With 2x/day cron: ~14 polls week 1, ~3-4 week 2, ~2-3 week 3, then ~1/week

Setup example

# Generate secret
openssl rand -base64 32

# Add to crontab (poll every 12 hours)
0 0,12 * * * curl -H "Authorization: Bearer $CRON_SECRET" \
  https://your-domain.com/api/cron/poll-decisions

Error handling

The system implements comprehensive error handling:

Initial failures

If the initial fetch to the task server fails, the task status is immediately set to failed

Backend failures

If the task fails during execution, the server reports error status back to the callback

Processing failures

Result processing errors are caught, logged, and trigger Discord alerts to admins

No auto-retry

Failed tasks require manual reprocessing via the TaskStatus UI

Adding a new task

The registry pattern makes adding tasks straightforward:
1

Add to configuration

src/lib/tasks/types.ts
export const TASK_CONFIG = {
  // ... existing tasks
  myNewTask: {
    requiredForPipeline: false, // or true
  },
} as const;
2

Define types

src/lib/apiTypes.ts
export interface MyNewTaskRequest extends TaskRequest {
  myParam: string;
}

export interface MyNewTaskResult {
  outputData: string;
}
3

Create handler

src/lib/tasks/myNewTask.ts
export const handleMyNewTaskResult = async (
  taskId: string, 
  result: MyNewTaskResult,
  options?: { force?: boolean }
) => {
  const task = await prisma.taskStatus.findUnique({ 
    where: { id: taskId } 
  });
  
  // Process results with upsert for idempotency
  await prisma.myEntity.upsert({
    where: { /* unique identifier */ },
    update: { /* fields */ },
    create: { /* fields */ }
  });
};
4

Register handler

src/lib/tasks/registry.ts
import { handleMyNewTaskResult } from './myNewTask';

export const taskHandlers: Record<string, TaskResultHandler> = {
  // ... existing handlers
  myNewTask: handleMyNewTaskResult,
};
5

Implement backend

On the opencouncil-tasks server:
  • Create endpoint (e.g., /my-new-task)
  • Accept MyNewTaskRequest payload
  • Send status updates to callbackUrl
6

Update frontend

import { startTask } from '@/lib/tasks/tasks';

const requestBody: MyNewTaskRequest = { myParam: 'value' };
await startTask('myNewTask', requestBody, meetingId, cityId);

Key principles

Use upsert operations when possible so tasks can be safely reprocessed without side effects.
If your task creates data that can’t be upserted, implement cleanup logic when options?.force is true.
Let errors bubble up—the system handles logging and status updates automatically.
Use Prisma transactions for operations that create multiple related records to ensure data consistency.
Always follow the TaskResultHandler signature for compatibility with the registry system.

File reference

Key implementation files

  • src/lib/tasks/tasks.ts - Core task orchestration logic
  • src/lib/tasks/types.ts - Task configuration and type definitions
  • src/lib/tasks/registry.ts - Task handler registry
  • src/lib/apiTypes.ts - Request/response type definitions
  • src/app/api/cities/[cityId]/meetings/[meetingId]/taskStatuses/[taskStatusId]/route.ts - Callback endpoint
  • src/components/meetings/admin/TaskStatus.tsx - UI component for task management

Build docs developers (and LLMs) love