Skip to main content

Overview

AI Studio uses Trigger.dev v4 for background job orchestration. All AI processing happens asynchronously to provide real-time progress updates and handle long-running tasks (up to 30 minutes for video generation).

Why Trigger.dev?

  • Long-running tasks - AI processing can take minutes, not suitable for API routes
  • Real-time progress - Metadata updates allow UI to show live progress
  • Retry logic - Automatic retries with exponential backoff
  • Batch operations - Process multiple clips in parallel
  • Observability - Built-in logging and error tracking

Workflow Architecture

trigger/
├── process-image.ts            # Image enhancement (Nano Banana Pro)
├── inpaint-image.ts            # Inpainting for edits
├── video-orchestrator.ts       # Main video coordinator
├── generate-video-clip.ts      # Individual clip generation (Kling)
├── generate-transition-clip.ts # Seamless transitions
└── compile-video.ts            # FFmpeg compilation

Image Processing Workflow

process-image.ts

Enhances real estate images using Fal.ai’s Nano Banana Pro model.
// trigger/process-image.ts:30
export const processImageTask = task({
  id: "process-image",
  maxDuration: 300, // 5 minutes
  retry: {
    maxAttempts: 3,
    minTimeoutInMs: 1000,
    maxTimeoutInMs: 10_000,
    factor: 2,
  },
  run: async (payload: ProcessImagePayload) => {
    const { imageId } = payload;

    // Step 1: Fetch image record
    metadata.set("status", {
      step: "fetching",
      label: "Loading image…",
      progress: 10,
    });

    const image = await getImageGenerationById(imageId);
    if (!image) {
      throw new Error(`Image not found: ${imageId}`);
    }

    // Update status to processing
    await updateImageGeneration(imageId, { status: "processing" });

    // Step 2: Upload to Fal.ai storage
    metadata.set("status", {
      step: "uploading",
      label: "Preparing for AI…",
      progress: 25,
    });

    const imageResponse = await fetch(image.originalImageUrl);
    const imageBlob = await imageResponse.blob();
    const falImageUrl = await fal.storage.upload(
      new File([imageBlob], "input.jpg", { type: imageBlob.type })
    );

    // Step 3: Call Fal.ai API
    metadata.set("status", {
      step: "processing",
      label: "Enhancing image…",
      progress: 50,
    });

    const result = await fal.subscribe(NANO_BANANA_PRO_EDIT, {
      input: {
        prompt: image.prompt,
        image_urls: [falImageUrl],
        num_images: 1,
        output_format: "jpeg",
      },
    });

    const resultImageUrl = result.images[0].url;

    // Step 4: Save to Supabase
    metadata.set("status", {
      step: "saving",
      label: "Saving result…",
      progress: 80,
    });

    const resultImageResponse = await fetch(resultImageUrl);
    const resultImageBuffer = await resultImageResponse.arrayBuffer();
    
    const storedResultUrl = await uploadImage(
      new Uint8Array(resultImageBuffer),
      resultPath,
      "image/jpeg"
    );

    // Update image record
    await updateImageGeneration(imageId, {
      status: "completed",
      resultImageUrl: storedResultUrl,
    });

    metadata.set("status", {
      step: "completed",
      label: "Complete",
      progress: 100,
    });

    return { success: true, resultUrl: storedResultUrl };
  },
});
Status Updates:
export interface ProcessImageStatus {
  step: "fetching" | "uploading" | "processing" | "saving" | "completed" | "failed";
  label: string;
  progress?: number; // 0-100
}
Error Handling:
catch (error) {
  metadata.set("status", {
    step: "failed",
    label: "Processing failed",
    progress: 0,
  });

  await updateImageGeneration(imageId, {
    status: "failed",
    errorMessage: error.message,
  });

  throw error; // Trigger retry logic
}

Video Generation Workflow

video-orchestrator.ts

Coordinates the entire video generation pipeline.
// trigger/video-orchestrator.ts:29
export const generateVideoTask = task({
  id: "generate-video",
  queue: {
    name: "video-generation",
    concurrencyLimit: 1, // Process one video at a time
  },
  maxDuration: 1800, // 30 minutes total
  retry: {
    maxAttempts: 1, // Don't retry the orchestrator itself
  },
  run: async (payload: GenerateVideoPayload) => {
    const { videoProjectId } = payload;

    // Step 1: Fetch video project and clips
    metadata.set("status", {
      step: "starting",
      label: "Starting video generation…",
      progress: 5,
    });

    const projectData = await getVideoProjectById(videoProjectId);
    const clips = await getVideoClips(videoProjectId);

    await updateVideoProject(videoProjectId, {
      status: "generating",
      clipCount: clips.length,
      estimatedCost: calculateVideoCost(clips.length, 5, true),
    });

    // Step 2: Generate all clips in parallel
    metadata.set("status", {
      step: "generating",
      label: `Generating ${clips.length} clips…",
      progress: 10,
    });

    const clipResults = await generateVideoClipTask.batchTriggerAndWait(
      clips.map(clip => ({
        payload: {
          clipId: clip.id,
          tailImageUrl: clip.endImageUrl || clip.sourceImageUrl,
          targetRoomLabel: clip.roomLabel || clip.roomType,
        },
      }))
    );

    // Check results
    const successfulClips = clipResults.runs.filter(r => r.ok);
    const failedClips = clipResults.runs.filter(r => !r.ok);

    if (successfulClips.length === 0) {
      throw new Error("All clip generations failed");
    }

    // Step 2.5: Generate transition clips for seamless transitions
    const clipsWithTransitions = clips.filter((clip, index) => 
      clip.transitionType === "seamless" && index < clips.length - 1
    );

    if (clipsWithTransitions.length > 0) {
      const transitionResults = await generateTransitionClipTask.batchTriggerAndWait(
        clipsWithTransitions.map(clip => {
          const clipIndex = clips.findIndex(c => c.id === clip.id);
          const nextClip = clips[clipIndex + 1];

          return {
            payload: {
              clipId: clip.id,
              fromImageUrl: clip.endImageUrl || clip.sourceImageUrl,
              toImageUrl: nextClip.sourceImageUrl,
              videoProjectId,
              workspaceId: videoProject.workspaceId,
              aspectRatio: videoProject.aspectRatio,
            },
          };
        })
      );
    }

    // Step 3: Compile video
    metadata.set("status", {
      step: "compiling",
      label: "Compiling video…",
      progress: 70,
    });

    await updateVideoProject(videoProjectId, {
      status: "compiling",
    });

    const compileResult = await compileVideoTask.triggerAndWait({
      videoProjectId,
    });

    if (!compileResult.ok) {
      throw new Error("Video compilation failed");
    }

    // Update final status
    await updateVideoProject(videoProjectId, {
      actualCost: calculateVideoCost(successfulClips.length, 5, true),
    });

    metadata.set("status", {
      step: "completed",
      label: "Complete",
      progress: 100,
    });

    return {
      success: true,
      finalVideoUrl: compileResult.output.finalVideoUrl,
      successfulClips: successfulClips.length,
      failedClips: failedClips.length,
    };
  },
});

generate-video-clip.ts

Generates individual 5-second clips using Kling Video API.
// trigger/generate-video-clip.ts:38
export const generateVideoClipTask = task({
  id: "generate-video-clip",
  maxDuration: 300, // 5 minutes - Kling can take a while
  retry: {
    maxAttempts: 2,
    minTimeoutInMs: 2000,
    maxTimeoutInMs: 30_000,
    factor: 2,
  },
  run: async (payload: GenerateVideoClipPayload) => {
    const { clipId, tailImageUrl, targetRoomLabel } = payload;

    const clip = await getVideoClipById(clipId);
    const videoProjectData = await getVideoProjectById(clip.videoProjectId);

    // Upload source and tail images to Fal.ai storage
    const imageBlob = await fetch(clip.sourceImageUrl).then(r => r.blob());
    const falImageUrl = await fal.storage.upload(
      new File([imageBlob], "source.jpg", { type: imageBlob.type })
    );

    let falTailImageUrl = falImageUrl;
    if (tailImageUrl) {
      const tailBlob = await fetch(tailImageUrl).then(r => r.blob());
      falTailImageUrl = await fal.storage.upload(
        new File([tailBlob], "tail.jpg", { type: tailBlob.type })
      );
    }

    // Generate motion prompt
    let motionPrompt = clip.motionPrompt || getMotionPrompt(
      clip.roomType,
      targetRoomLabel
    );

    // Add native audio prompt if enabled
    if (videoProjectData.videoProject.generateNativeAudio) {
      const track = videoProjectData.musicTrack;
      const audioPrompt = track
        ? `Background audio: ${track.mood} ${track.category} music inspired by "${track.name}".`
        : "Background cinematic ambient music.";
      
      motionPrompt = `${motionPrompt} ${audioPrompt} Ambient environmental sounds of a ${targetRoomLabel}.`;
    }

    // Call Kling Video API
    const result = await fal.subscribe(KLING_VIDEO_PRO, {
      input: {
        image_url: falImageUrl,
        tail_image_url: falTailImageUrl,
        prompt: motionPrompt,
        duration: "5",
        aspect_ratio: videoProjectData.videoProject.aspectRatio,
        generate_audio: videoProjectData.videoProject.generateNativeAudio,
        negative_prompt: DEFAULT_NEGATIVE_PROMPT,
      },
      onQueueUpdate: (update) => {
        if (update.status === "IN_PROGRESS") {
          metadata.set("status", {
            step: "generating",
            label: "Generating video…",
            progress: 50,
          });
        }
      },
    });

    // Save to Supabase
    const resultVideoBuffer = await fetch(result.video.url).then(r => r.arrayBuffer());
    const storedVideoUrl = await uploadVideo(
      new Uint8Array(resultVideoBuffer),
      getVideoPath(workspaceId, videoProjectId, `${clipId}.mp4`),
      "video/mp4"
    );

    await updateVideoClip(clipId, {
      status: "completed",
      clipUrl: storedVideoUrl,
    });

    return { success: true, clipUrl: storedVideoUrl };
  },
});

compile-video.ts

Concatenates clips with FFmpeg and adds background music.
// trigger/compile-video.ts:35
export const compileVideoTask = task({
  id: "compile-video",
  maxDuration: 600, // 10 minutes for FFmpeg compilation
  machine: "medium-1x", // More CPU for FFmpeg
  retry: {
    maxAttempts: 2,
    minTimeoutInMs: 5000,
    maxTimeoutInMs: 60_000,
    factor: 2,
  },
  run: async (payload: CompileVideoPayload) => {
    const { videoProjectId } = payload;
    const workDir = join(tmpdir(), `video-compile-${videoProjectId}`);

    try {
      // Fetch project and clips
      const projectData = await getVideoProjectById(videoProjectId);
      const clips = await getVideoClips(videoProjectId);
      const completedClips = clips
        .filter(c => c.status === "completed" && c.clipUrl)
        .sort((a, b) => a.sequenceOrder - b.sequenceOrder);

      // Create work directory
      mkdirSync(workDir, { recursive: true });

      // Download all clips and transitions
      const clipPaths = [];
      for (let i = 0; i < completedClips.length; i++) {
        const clip = completedClips[i];

        // Download main clip
        const clipPath = join(workDir, `clip_${i.toString().padStart(3, "0")}.mp4`);
        const buffer = await fetch(clip.clipUrl).then(r => r.arrayBuffer());
        writeFileSync(clipPath, Buffer.from(buffer));
        clipPaths.push(clipPath);

        // Download transition clip if seamless
        if (i < completedClips.length - 1 && clip.transitionType === "seamless" && clip.transitionClipUrl) {
          const transitionPath = join(workDir, `transition_${i}.mp4`);
          const transitionBuffer = await fetch(clip.transitionClipUrl).then(r => r.arrayBuffer());
          writeFileSync(transitionPath, Buffer.from(transitionBuffer));
          clipPaths.push(transitionPath);
        }
      }

      // Download music track if selected
      let musicPath = null;
      if (musicTrack?.audioUrl) {
        const musicBuffer = await fetch(musicTrack.audioUrl).then(r => r.arrayBuffer());
        musicPath = join(workDir, "music.mp3");
        writeFileSync(musicPath, Buffer.from(musicBuffer));
      }

      // Create concat file for FFmpeg
      const concatListPath = join(workDir, "concat.txt");
      const concatContent = clipPaths.map(p => `file '${p}'`).join("\n");
      writeFileSync(concatListPath, concatContent);

      // Build FFmpeg command
      let ffmpegCmd;
      if (musicPath) {
        // With music: concatenate clips and add audio
        const musicVolume = (videoProject.musicVolume ?? 50) / 100;
        ffmpegCmd = `ffmpeg -f concat -safe 0 -i "${concatListPath}" -i "${musicPath}" -filter_complex "[1:a]volume=${musicVolume}[music];[0:a][music]amix=inputs=2:duration=first:dropout_transition=2[aout]" -map 0:v -map "[aout]" -c:v libx264 -preset fast -crf 23 -c:a aac -b:a 192k -shortest -y "${outputPath}"`;
      } else {
        // Without music: just concatenate clips
        ffmpegCmd = `ffmpeg -f concat -safe 0 -i "${concatListPath}" -c:v libx264 -preset fast -crf 23 -c:a aac -b:a 192k -y "${outputPath}"`;
      }

      execSync(ffmpegCmd, {
        cwd: workDir,
        timeout: 300_000, // 5 minute timeout
      });

      // Upload compiled video
      const outputBuffer = readFileSync(outputPath);
      const finalVideoUrl = await uploadVideo(
        new Uint8Array(outputBuffer),
        getVideoPath(workspaceId, videoProjectId, "final.mp4"),
        "video/mp4"
      );

      // Update project
      await updateVideoProject(videoProjectId, {
        status: "completed",
        finalVideoUrl,
        durationSeconds: calculateTotalDuration(completedClips),
      });

      // Cleanup
      rmSync(workDir, { recursive: true });

      return { success: true, finalVideoUrl };
    } catch (error) {
      // Cleanup on error
      if (existsSync(workDir)) {
        rmSync(workDir, { recursive: true });
      }
      throw error;
    }
  },
});

Batch Operations

Trigger.dev supports batch operations for parallel processing:
// Trigger multiple tasks in parallel
const results = await generateVideoClipTask.batchTriggerAndWait(
  clips.map(clip => ({
    payload: { clipId: clip.id }
  }))
);

// Check results
const successful = results.runs.filter(r => r.ok);
const failed = results.runs.filter(r => !r.ok);

console.log(`Success: ${successful.length}, Failed: ${failed.length}`);

Real-Time Progress

Frontend components use @trigger.dev/react-hooks to display progress:
import { useRealtimeRun } from "@trigger.dev/react-hooks";

function ProcessingStatus({ runId }) {
  const { run } = useRealtimeRun(runId, {
    enabled: !!runId,
    refreshInterval: 1000, // Poll every second
  });

  const status = run?.metadata?.status as ProcessImageStatus;

  return (
    <div>
      <p>{status?.label || "Processing…"}</p>
      <ProgressBar value={status?.progress || 0} />
    </div>
  );
}

Queue Configuration

Video generation uses a dedicated queue with concurrency limits:
queue: {
  name: "video-generation",
  concurrencyLimit: 1, // Process one video at a time
}
This prevents overwhelming the AI APIs and ensures predictable costs.

Retry Strategies

Image Processing

retry: {
  maxAttempts: 3,
  minTimeoutInMs: 1000,
  maxTimeoutInMs: 10_000,
  factor: 2, // Exponential backoff
}

Video Generation

retry: {
  maxAttempts: 2, // Fewer retries for expensive operations
  minTimeoutInMs: 2000,
  maxTimeoutInMs: 30_000,
  factor: 2,
}

Orchestrator

retry: {
  maxAttempts: 1, // Don't retry orchestrator (child tasks handle retries)
}

Error Handling

All tasks follow the same error handling pattern:
try {
  // Task logic...
} catch (error) {
  const errorMessage = error instanceof Error ? error.message : "Unknown error";

  // Update metadata for UI
  metadata.set("status", {
    step: "failed",
    label: `Failed: ${errorMessage}`,
    progress: 0,
  });

  // Update database
  await updateRecord(id, {
    status: "failed",
    errorMessage,
  });

  // Re-throw to trigger retry logic
  throw error;
}

Local Development

Run Trigger.dev locally for testing:
# Terminal 1: Start Next.js dev server
pnpm dev

# Terminal 2: Start Trigger.dev worker
pnpm trigger
The local worker connects to Trigger.dev cloud for job coordination.

Deployment

Deploy workflows to Trigger.dev cloud:
pnpm trigger:deploy
This builds and uploads all tasks in trigger/ directory.

Monitoring

Trigger.dev dashboard provides:
  • Run history - All task executions
  • Logs - Structured logging output
  • Metrics - Success rate, duration, errors
  • Retry tracking - Automatic retry attempts
Access at: https://cloud.trigger.dev

AI Model Configuration

Fal.ai Models

// lib/fal.ts
export const NANO_BANANA_PRO_EDIT = "fal-ai/nano-banana-pro/edit";
export const KLING_VIDEO_PRO = "fal-ai/kling-video/v2.6/pro/image-to-video";
export const QWEN_IMAGE_EDIT_INPAINT = "fal-ai/qwen-image-edit/inpaint";

Motion Prompts

Room-specific motion prompts are generated in lib/video/motion-prompts.ts:
export function getMotionPrompt(
  roomType: VideoRoomType,
  roomLabel?: string
): string {
  const prompts = {
    "living-room": "Slow camera pan across a beautifully staged living room. Gentle ambient lighting, subtle shadows moving.",
    "kitchen": "Smooth dolly through modern kitchen. Natural light streaming through windows.",
    "bedroom": "Gentle camera movement through serene bedroom. Soft curtains swaying.",
    // ... more rooms
  };

  return prompts[roomType] || "Cinematic camera movement through the space.";
}

Cost Tracking

Video costs are calculated and stored:
// trigger/video-orchestrator.ts:68
const estimatedCost = costToCents(
  calculateVideoCost(
    clips.length,
    VIDEO_DEFAULTS.CLIP_DURATION,
    videoProject.generateNativeAudio
  )
);

await updateVideoProject(videoProjectId, {
  estimatedCost, // Set before generation
});

// ... after generation

await updateVideoProject(videoProjectId, {
  actualCost, // Set after completion
});
See Database Schema for payment and billing tables.

Build docs developers (and LLMs) love