Skip to main content
Shannon uses Temporal for durable workflow orchestration, providing crash recovery, queryable progress, and intelligent retry with exponential backoff.

Workflow Architecture

The main workflow (pentestPipelineWorkflow) orchestrates Shannon’s five-phase pipeline with parallel execution in vulnerability and exploitation phases.

Workflow Structure

// src/temporal/workflows.ts:135-495
export async function pentestPipelineWorkflow(
  input: PipelineInput
): Promise<PipelineState> {
  const { workflowId } = workflowInfo();
  
  const state: PipelineState = {
    status: 'running',
    currentPhase: null,
    currentAgent: null,
    completedAgents: [],
    failedAgent: null,
    error: null,
    startTime: Date.now(),
    agentMetrics: {},
    summary: null,
  };

  // Enable queryable progress
  setHandler(getProgress, (): PipelineProgress => ({
    ...state,
    workflowId,
    elapsedMs: Date.now() - state.startTime,
  }));

  try {
    // Preflight validation
    await preflightActs.runPreflightValidation(activityInput);

    // Phase 1: Pre-Reconnaissance
    await runSequentialPhase('pre-recon', 'pre-recon', a.runPreReconAgent);

    // Phase 2: Reconnaissance
    await runSequentialPhase('recon', 'recon', a.runReconAgent);

    // Phases 3-4: Vulnerability + Exploitation (5 pipelined pairs in parallel)
    const pipelineResults = await runWithConcurrencyLimit(pipelineThunks, maxConcurrent);
    aggregatePipelineResults(pipelineResults);

    // Phase 5: Reporting
    await a.assembleReportActivity(activityInput);
    state.agentMetrics['report'] = await a.runReportAgent(activityInput);
    await a.injectReportMetadataActivity(activityInput);

    state.status = 'completed';
    return state;
  } catch (error) {
    state.status = 'failed';
    state.error = formatWorkflowError(error, state.currentPhase, state.currentAgent);
    throw error;
  }
}
From src/temporal/workflows.ts:135-495

Activity Proxy Configuration

Temporal activities are proxied with different retry configurations for different modes:
// src/temporal/workflows.ts:49-119
// Production retry (default)
const PRODUCTION_RETRY = {
  initialInterval: '5 minutes',
  maximumInterval: '30 minutes',
  backoffCoefficient: 2,
  maximumAttempts: 50,
  nonRetryableErrorTypes: [
    'AuthenticationError',
    'PermissionError',
    'InvalidRequestError',
    'RequestTooLargeError',
    'ConfigurationError',
    'InvalidTargetError',
    'ExecutionLimitError',
  ],
};

const acts = proxyActivities<typeof activities>({
  startToCloseTimeout: '2 hours',
  heartbeatTimeout: '60 minutes',  // Extended for sub-agent execution
  retry: PRODUCTION_RETRY,
});

// Testing retry (fast iteration)
const TESTING_RETRY = {
  initialInterval: '10 seconds',
  maximumInterval: '30 seconds',
  backoffCoefficient: 2,
  maximumAttempts: 5,
  nonRetryableErrorTypes: PRODUCTION_RETRY.nonRetryableErrorTypes,
};

const testActs = proxyActivities<typeof activities>({
  startToCloseTimeout: '30 minutes',
  heartbeatTimeout: '30 minutes',
  retry: TESTING_RETRY,
});

// Subscription retry (extended for 5h+ rate limit windows)
const SUBSCRIPTION_RETRY = {
  initialInterval: '5 minutes',
  maximumInterval: '6 hours',
  backoffCoefficient: 2,
  maximumAttempts: 100,
  nonRetryableErrorTypes: PRODUCTION_RETRY.nonRetryableErrorTypes,
};

const subscriptionActs = proxyActivities<typeof activities>({
  startToCloseTimeout: '8 hours',
  heartbeatTimeout: '2 hours',
  retry: SUBSCRIPTION_RETRY,
});

// Select proxy based on mode
function selectActivityProxy(pipelineInput: PipelineInput) {
  if (pipelineInput.pipelineTestingMode) return testActs;
  if (pipelineInput.pipelineConfig?.retry_preset === 'subscription') return subscriptionActs;
  return acts;
}
From src/temporal/workflows.ts:49-147

Pipelined Execution

Vulnerability and exploitation phases run as independent pipelines with no synchronization barrier.

Pipeline Implementation

// src/temporal/workflows.ts:389-428
async function runVulnExploitPipeline(
  vulnType: VulnType,
  runVulnAgent: () => Promise<AgentMetrics>,
  runExploitAgent: () => Promise<AgentMetrics>
): Promise<VulnExploitPipelineResult> {
  const vulnAgentName = `${vulnType}-vuln`;
  const exploitAgentName = `${vulnType}-exploit`;

  // 1. Run vulnerability analysis (or skip if resumed)
  let vulnMetrics: AgentMetrics | null = null;
  if (!shouldSkip(vulnAgentName)) {
    vulnMetrics = await runVulnAgent();
  } else {
    log.info(`Skipping ${vulnAgentName} (already complete)`);
  }

  // 2. Check exploitation queue for actionable findings
  const decision = await a.checkExploitationQueue(activityInput, vulnType);

  // 3. Conditionally run exploitation agent
  let exploitMetrics: AgentMetrics | null = null;
  if (decision.shouldExploit) {
    if (!shouldSkip(exploitAgentName)) {
      exploitMetrics = await runExploitAgent();
    } else {
      log.info(`Skipping ${exploitAgentName} (already complete)`);
    }
  }

  return {
    vulnType,
    vulnMetrics,
    exploitMetrics,
    exploitDecision: {
      shouldExploit: decision.shouldExploit,
      vulnerabilityCount: decision.vulnerabilityCount,
    },
    error: null,
  };
}
From src/temporal/workflows.ts:389-428

Concurrency Control

// src/temporal/workflows.ts:341-363
async function runWithConcurrencyLimit(
  thunks: Array<() => Promise<VulnExploitPipelineResult>>,
  limit: number
): Promise<PromiseSettledResult<VulnExploitPipelineResult>[]> {
  const results: PromiseSettledResult<VulnExploitPipelineResult>[] = [];
  const inFlight = new Set<Promise<void>>();

  for (const thunk of thunks) {
    const slot = thunk().then(
      (value) => { results.push({ status: 'fulfilled', value }); },
      (reason: unknown) => { results.push({ status: 'rejected', reason }); }
    ).finally(() => { inFlight.delete(slot); });

    inFlight.add(slot);

    if (inFlight.size >= limit) {
      await Promise.race(inFlight);  // Wait for any slot to complete
    }
  }

  await Promise.allSettled(inFlight);
  return results;
}

// Usage (src/temporal/workflows.ts:430-447)
const maxConcurrent = input.pipelineConfig?.max_concurrent_pipelines ?? 5;
const pipelineResults = await runWithConcurrencyLimit(pipelineThunks, maxConcurrent);
aggregratePipelineResults(pipelineResults);
From src/temporal/workflows.ts:341-447 Key Design Points:
  • Default concurrency: 5 pipelines in parallel
  • Configurable via pipelineConfig.max_concurrent_pipelines
  • Results in completion order, not input order
  • Graceful degradation: failed pipelines don’t block others

Activities Implementation

Activities are thin wrappers around service calls, handling Temporal-specific concerns.

Core Activity Pattern

// src/temporal/activities.ts:105-197
async function runAgentActivity(
  agentName: AgentName,
  input: ActivityInput
): Promise<AgentMetrics> {
  const startTime = Date.now();
  const attemptNumber = Context.current().info.attempt;

  // 1. Heartbeat loop - signals worker is alive
  const heartbeatInterval = setInterval(() => {
    const elapsed = Math.floor((Date.now() - startTime) / 1000);
    heartbeat({ agent: agentName, elapsedSeconds: elapsed, attempt: attemptNumber });
  }, HEARTBEAT_INTERVAL_MS);

  try {
    const logger = createActivityLogger();

    // 2. Get or create container
    const sessionMetadata = buildSessionMetadata(input);
    const container = getOrCreateContainer(workflowId, sessionMetadata);

    // 3. Create audit session for THIS agent execution
    const auditSession = new AuditSession(sessionMetadata);
    await auditSession.initialize(workflowId);

    // 4. Execute agent via service (throws PentestError on failure)
    const endResult = await container.agentExecution.executeOrThrow(
      agentName,
      { webUrl, repoPath, configPath, pipelineTestingMode, attemptNumber },
      auditSession,
      logger
    );

    // 5. Return metrics
    return {
      durationMs: Date.now() - startTime,
      inputTokens: null,
      outputTokens: null,
      costUsd: endResult.cost_usd,
      numTurns: null,
      model: endResult.model,
    };
  } catch (error) {
    // 6. Classify error for Temporal retry behavior
    if (error instanceof ApplicationFailure) {
      throw error;  // Already classified
    }

    // Check output validation retry limit
    if (
      error instanceof PentestError &&
      error.code === ErrorCode.OUTPUT_VALIDATION_FAILED &&
      attemptNumber >= MAX_OUTPUT_VALIDATION_RETRIES
    ) {
      throw ApplicationFailure.nonRetryable(
        `Agent ${agentName} failed output validation after ${attemptNumber} attempts`,
        'OutputValidationError'
      );
    }

    // Classify and throw ApplicationFailure
    const classified = classifyErrorForTemporal(error);
    const message = truncateErrorMessage(error.message);

    if (classified.retryable) {
      throw ApplicationFailure.create({
        message,
        type: classified.type,
        details: [{ agentName, attemptNumber, elapsed: Date.now() - startTime }],
      });
    } else {
      throw ApplicationFailure.nonRetryable(message, classified.type);
    }
  } finally {
    clearInterval(heartbeatInterval);
  }
}
From src/temporal/activities.ts:105-197

Activity Functions

All agent activities follow the same pattern:
// src/temporal/activities.ts:199-249
export async function runPreReconAgent(input: ActivityInput): Promise<AgentMetrics> {
  return runAgentActivity('pre-recon', input);
}

export async function runReconAgent(input: ActivityInput): Promise<AgentMetrics> {
  return runAgentActivity('recon', input);
}

export async function runInjectionVulnAgent(input: ActivityInput): Promise<AgentMetrics> {
  return runAgentActivity('injection-vuln', input);
}

// ... 10 more agent activities
From src/temporal/activities.ts:199-249

Specialized Activities

Preflight Validation

// src/temporal/activities.ts:261-315
export async function runPreflightValidation(input: ActivityInput): Promise<void> {
  const startTime = Date.now();
  const attemptNumber = Context.current().info.attempt;

  const heartbeatInterval = setInterval(() => {
    const elapsed = Math.floor((Date.now() - startTime) / 1000);
    heartbeat({ phase: 'preflight', elapsedSeconds: elapsed, attempt: attemptNumber });
  }, HEARTBEAT_INTERVAL_MS);

  try {
    const logger = createActivityLogger();
    logger.info('Running preflight validation...', { attempt: attemptNumber });

    const result = await runPreflightChecks(input.repoPath, input.configPath, logger);

    if (isErr(result)) {
      const classified = classifyErrorForTemporal(result.error);
      const message = truncateErrorMessage(result.error.message);

      if (classified.retryable) {
        throw ApplicationFailure.create({ message, type: classified.type });
      } else {
        throw ApplicationFailure.nonRetryable(message, classified.type);
      }
    }

    logger.info('Preflight validation passed');
  } finally {
    clearInterval(heartbeatInterval);
  }
}
From src/temporal/activities.ts:261-315

Exploitation Queue Check

// src/temporal/activities.ts:354-367
export async function checkExploitationQueue(
  input: ActivityInput,
  vulnType: VulnType
): Promise<ExploitationDecision> {
  const { repoPath, workflowId } = input;
  const logger = createActivityLogger();

  // Reuse container's service if available (from prior vuln agent runs)
  const existingContainer = getContainer(workflowId);
  const checker = existingContainer?.exploitationChecker ?? new ExploitationCheckerService();

  return checker.checkQueue(vulnType, repoPath, logger);
}
From src/temporal/activities.ts:354-367

Resume Mechanism

Shannon supports resuming interrupted workflows from named workspaces.

Resume State Loading

// src/temporal/activities.ts:391-489
export async function loadResumeState(
  workspaceName: string,
  expectedUrl: string,
  expectedRepoPath: string
): Promise<ResumeState> {
  // 1. Validate workspace exists
  const sessionPath = path.join('./audit-logs', workspaceName, 'session.json');
  if (!(await fileExists(sessionPath))) {
    throw ApplicationFailure.nonRetryable(
      `Workspace not found: ${workspaceName}`,
      'WorkspaceNotFoundError'
    );
  }

  // 2. Parse session.json and validate URL match
  const session = await readJson<SessionJson>(sessionPath);
  if (session.session.webUrl !== expectedUrl) {
    throw ApplicationFailure.nonRetryable(
      `URL mismatch with workspace\n  Workspace URL: ${session.session.webUrl}\n  Provided URL:  ${expectedUrl}`,
      'URLMismatchError'
    );
  }

  // 3. Cross-check agent status with deliverables on disk
  const completedAgents: string[] = [];
  for (const agentName of ALL_AGENTS) {
    const agentData = session.metrics.agents[agentName];
    if (!agentData || agentData.status !== 'success') continue;

    const deliverableFilename = AGENTS[agentName].deliverableFilename;
    const deliverablePath = `${expectedRepoPath}/deliverables/${deliverableFilename}`;
    const deliverableExists = await fileExists(deliverablePath);

    if (!deliverableExists) {
      logger.warn(`Agent ${agentName} shows success but deliverable missing, will re-run`);
      continue;
    }

    completedAgents.push(agentName);
  }

  // 4. Collect git checkpoints and find latest
  const checkpoints = completedAgents
    .map((name) => session.metrics.agents[name]?.checkpoint)
    .filter((hash): hash is string => hash != null);

  if (checkpoints.length === 0) {
    throw ApplicationFailure.nonRetryable(
      `Cannot resume workspace ${workspaceName}: No checkpoints found`,
      'NoCheckpointsError'
    );
  }

  const checkpointHash = await findLatestCommit(expectedRepoPath, checkpoints);
  const originalWorkflowId = session.session.originalWorkflowId || session.session.id;

  return {
    workspaceName,
    originalUrl: session.session.webUrl,
    completedAgents,
    checkpointHash,
    originalWorkflowId,
  };
}
From src/temporal/activities.ts:391-489

Workspace Restoration

// src/temporal/activities.ts:518-552
export async function restoreGitCheckpoint(
  repoPath: string,
  checkpointHash: string,
  incompleteAgents: AgentName[]
): Promise<void> {
  const logger = createActivityLogger();
  logger.info(`Restoring git workspace to ${checkpointHash}...`);

  // Reset to checkpoint
  await executeGitCommandWithRetry(
    ['git', 'reset', '--hard', checkpointHash],
    repoPath,
    'reset to checkpoint for resume'
  );

  // Clean untracked files
  await executeGitCommandWithRetry(
    ['git', 'clean', '-fd'],
    repoPath,
    'clean untracked files for resume'
  );

  // Delete partial deliverables from incomplete agents
  for (const agentName of incompleteAgents) {
    const deliverableFilename = AGENTS[agentName].deliverableFilename;
    const deliverablePath = `${repoPath}/deliverables/${deliverableFilename}`;
    try {
      if (await fileExists(deliverablePath)) {
        logger.warn(`Cleaning partial deliverable: ${agentName}`);
        await fs.unlink(deliverablePath);
      }
    } catch (error) {
      logger.info(`Note: Failed to delete ${deliverablePath}: ${error}`);
    }
  }

  logger.info('Workspace restored to clean state');
}
From src/temporal/activities.ts:518-552

Resume Flow in Workflow

// src/temporal/workflows.ts:186-229
if (input.resumeFromWorkspace) {
  // 1. Load resume state (validates workspace, cross-checks deliverables)
  resumeState = await a.loadResumeState(
    input.resumeFromWorkspace,
    input.webUrl,
    input.repoPath
  );

  // 2. Restore git workspace and clean up incomplete deliverables
  const incompleteAgents = ALL_AGENTS.filter(
    (agentName) => !resumeState!.completedAgents.includes(agentName)
  ) as AgentName[];

  await a.restoreGitCheckpoint(
    input.repoPath,
    resumeState.checkpointHash,
    incompleteAgents
  );

  // 3. Short-circuit if all agents already completed
  if (resumeState.completedAgents.length === ALL_AGENTS.length) {
    log.info(`All ${ALL_AGENTS.length} agents already completed. Nothing to resume.`);
    state.status = 'completed';
    state.completedAgents = [...resumeState.completedAgents];
    return state;
  }

  // 4. Record this resume attempt in session.json and workflow.log
  await a.recordResumeAttempt(
    activityInput,
    input.terminatedWorkflows || [],
    resumeState.checkpointHash,
    resumeState.originalWorkflowId,
    resumeState.completedAgents
  );
}

const shouldSkip = (agentName: string): boolean => {
  return resumeState?.completedAgents.includes(agentName) ?? false;
};
From src/temporal/workflows.ts:186-229

Queryable Progress

Workflows expose real-time progress through Temporal queries:
// src/temporal/workflows.ts:161-165
setHandler(getProgress, (): PipelineProgress => ({
  ...state,
  workflowId,
  elapsedMs: Date.now() - state.startTime,
}));

// Query definition (src/temporal/shared.ts)
export const getProgress = defineQuery<PipelineProgress>('getProgress');

export interface PipelineProgress extends PipelineState {
  workflowId: string;
  elapsedMs: number;
}

export interface PipelineState {
  status: 'running' | 'completed' | 'failed';
  currentPhase: string | null;
  currentAgent: string | null;
  completedAgents: string[];
  failedAgent: string | null;
  error: string | null;
  startTime: number;
  agentMetrics: Record<string, AgentMetrics>;
  summary: PipelineSummary | null;
}
Usage:
# Query workflow progress
temporal workflow query \
  --workflow-id=shannon-example-com-20240315-120000 \
  --name=getProgress

Build docs developers (and LLMs) love