Workflow Architecture
The main workflow (pentestPipelineWorkflow) orchestrates Shannon’s five-phase pipeline with parallel execution in vulnerability and exploitation phases.
Workflow Structure
// src/temporal/workflows.ts:135-495
export async function pentestPipelineWorkflow(
input: PipelineInput
): Promise<PipelineState> {
const { workflowId } = workflowInfo();
const state: PipelineState = {
status: 'running',
currentPhase: null,
currentAgent: null,
completedAgents: [],
failedAgent: null,
error: null,
startTime: Date.now(),
agentMetrics: {},
summary: null,
};
// Enable queryable progress
setHandler(getProgress, (): PipelineProgress => ({
...state,
workflowId,
elapsedMs: Date.now() - state.startTime,
}));
try {
// Preflight validation
await preflightActs.runPreflightValidation(activityInput);
// Phase 1: Pre-Reconnaissance
await runSequentialPhase('pre-recon', 'pre-recon', a.runPreReconAgent);
// Phase 2: Reconnaissance
await runSequentialPhase('recon', 'recon', a.runReconAgent);
// Phases 3-4: Vulnerability + Exploitation (5 pipelined pairs in parallel)
const pipelineResults = await runWithConcurrencyLimit(pipelineThunks, maxConcurrent);
aggregatePipelineResults(pipelineResults);
// Phase 5: Reporting
await a.assembleReportActivity(activityInput);
state.agentMetrics['report'] = await a.runReportAgent(activityInput);
await a.injectReportMetadataActivity(activityInput);
state.status = 'completed';
return state;
} catch (error) {
state.status = 'failed';
state.error = formatWorkflowError(error, state.currentPhase, state.currentAgent);
throw error;
}
}
src/temporal/workflows.ts:135-495
Activity Proxy Configuration
Temporal activities are proxied with different retry configurations for different modes:// src/temporal/workflows.ts:49-119
// Production retry (default)
const PRODUCTION_RETRY = {
initialInterval: '5 minutes',
maximumInterval: '30 minutes',
backoffCoefficient: 2,
maximumAttempts: 50,
nonRetryableErrorTypes: [
'AuthenticationError',
'PermissionError',
'InvalidRequestError',
'RequestTooLargeError',
'ConfigurationError',
'InvalidTargetError',
'ExecutionLimitError',
],
};
const acts = proxyActivities<typeof activities>({
startToCloseTimeout: '2 hours',
heartbeatTimeout: '60 minutes', // Extended for sub-agent execution
retry: PRODUCTION_RETRY,
});
// Testing retry (fast iteration)
const TESTING_RETRY = {
initialInterval: '10 seconds',
maximumInterval: '30 seconds',
backoffCoefficient: 2,
maximumAttempts: 5,
nonRetryableErrorTypes: PRODUCTION_RETRY.nonRetryableErrorTypes,
};
const testActs = proxyActivities<typeof activities>({
startToCloseTimeout: '30 minutes',
heartbeatTimeout: '30 minutes',
retry: TESTING_RETRY,
});
// Subscription retry (extended for 5h+ rate limit windows)
const SUBSCRIPTION_RETRY = {
initialInterval: '5 minutes',
maximumInterval: '6 hours',
backoffCoefficient: 2,
maximumAttempts: 100,
nonRetryableErrorTypes: PRODUCTION_RETRY.nonRetryableErrorTypes,
};
const subscriptionActs = proxyActivities<typeof activities>({
startToCloseTimeout: '8 hours',
heartbeatTimeout: '2 hours',
retry: SUBSCRIPTION_RETRY,
});
// Select proxy based on mode
function selectActivityProxy(pipelineInput: PipelineInput) {
if (pipelineInput.pipelineTestingMode) return testActs;
if (pipelineInput.pipelineConfig?.retry_preset === 'subscription') return subscriptionActs;
return acts;
}
src/temporal/workflows.ts:49-147
Pipelined Execution
Vulnerability and exploitation phases run as independent pipelines with no synchronization barrier.Pipeline Implementation
// src/temporal/workflows.ts:389-428
async function runVulnExploitPipeline(
vulnType: VulnType,
runVulnAgent: () => Promise<AgentMetrics>,
runExploitAgent: () => Promise<AgentMetrics>
): Promise<VulnExploitPipelineResult> {
const vulnAgentName = `${vulnType}-vuln`;
const exploitAgentName = `${vulnType}-exploit`;
// 1. Run vulnerability analysis (or skip if resumed)
let vulnMetrics: AgentMetrics | null = null;
if (!shouldSkip(vulnAgentName)) {
vulnMetrics = await runVulnAgent();
} else {
log.info(`Skipping ${vulnAgentName} (already complete)`);
}
// 2. Check exploitation queue for actionable findings
const decision = await a.checkExploitationQueue(activityInput, vulnType);
// 3. Conditionally run exploitation agent
let exploitMetrics: AgentMetrics | null = null;
if (decision.shouldExploit) {
if (!shouldSkip(exploitAgentName)) {
exploitMetrics = await runExploitAgent();
} else {
log.info(`Skipping ${exploitAgentName} (already complete)`);
}
}
return {
vulnType,
vulnMetrics,
exploitMetrics,
exploitDecision: {
shouldExploit: decision.shouldExploit,
vulnerabilityCount: decision.vulnerabilityCount,
},
error: null,
};
}
src/temporal/workflows.ts:389-428
Concurrency Control
// src/temporal/workflows.ts:341-363
async function runWithConcurrencyLimit(
thunks: Array<() => Promise<VulnExploitPipelineResult>>,
limit: number
): Promise<PromiseSettledResult<VulnExploitPipelineResult>[]> {
const results: PromiseSettledResult<VulnExploitPipelineResult>[] = [];
const inFlight = new Set<Promise<void>>();
for (const thunk of thunks) {
const slot = thunk().then(
(value) => { results.push({ status: 'fulfilled', value }); },
(reason: unknown) => { results.push({ status: 'rejected', reason }); }
).finally(() => { inFlight.delete(slot); });
inFlight.add(slot);
if (inFlight.size >= limit) {
await Promise.race(inFlight); // Wait for any slot to complete
}
}
await Promise.allSettled(inFlight);
return results;
}
// Usage (src/temporal/workflows.ts:430-447)
const maxConcurrent = input.pipelineConfig?.max_concurrent_pipelines ?? 5;
const pipelineResults = await runWithConcurrencyLimit(pipelineThunks, maxConcurrent);
aggregratePipelineResults(pipelineResults);
src/temporal/workflows.ts:341-447
Key Design Points:
- Default concurrency: 5 pipelines in parallel
- Configurable via
pipelineConfig.max_concurrent_pipelines - Results in completion order, not input order
- Graceful degradation: failed pipelines don’t block others
Activities Implementation
Activities are thin wrappers around service calls, handling Temporal-specific concerns.Core Activity Pattern
// src/temporal/activities.ts:105-197
async function runAgentActivity(
agentName: AgentName,
input: ActivityInput
): Promise<AgentMetrics> {
const startTime = Date.now();
const attemptNumber = Context.current().info.attempt;
// 1. Heartbeat loop - signals worker is alive
const heartbeatInterval = setInterval(() => {
const elapsed = Math.floor((Date.now() - startTime) / 1000);
heartbeat({ agent: agentName, elapsedSeconds: elapsed, attempt: attemptNumber });
}, HEARTBEAT_INTERVAL_MS);
try {
const logger = createActivityLogger();
// 2. Get or create container
const sessionMetadata = buildSessionMetadata(input);
const container = getOrCreateContainer(workflowId, sessionMetadata);
// 3. Create audit session for THIS agent execution
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize(workflowId);
// 4. Execute agent via service (throws PentestError on failure)
const endResult = await container.agentExecution.executeOrThrow(
agentName,
{ webUrl, repoPath, configPath, pipelineTestingMode, attemptNumber },
auditSession,
logger
);
// 5. Return metrics
return {
durationMs: Date.now() - startTime,
inputTokens: null,
outputTokens: null,
costUsd: endResult.cost_usd,
numTurns: null,
model: endResult.model,
};
} catch (error) {
// 6. Classify error for Temporal retry behavior
if (error instanceof ApplicationFailure) {
throw error; // Already classified
}
// Check output validation retry limit
if (
error instanceof PentestError &&
error.code === ErrorCode.OUTPUT_VALIDATION_FAILED &&
attemptNumber >= MAX_OUTPUT_VALIDATION_RETRIES
) {
throw ApplicationFailure.nonRetryable(
`Agent ${agentName} failed output validation after ${attemptNumber} attempts`,
'OutputValidationError'
);
}
// Classify and throw ApplicationFailure
const classified = classifyErrorForTemporal(error);
const message = truncateErrorMessage(error.message);
if (classified.retryable) {
throw ApplicationFailure.create({
message,
type: classified.type,
details: [{ agentName, attemptNumber, elapsed: Date.now() - startTime }],
});
} else {
throw ApplicationFailure.nonRetryable(message, classified.type);
}
} finally {
clearInterval(heartbeatInterval);
}
}
src/temporal/activities.ts:105-197
Activity Functions
All agent activities follow the same pattern:// src/temporal/activities.ts:199-249
export async function runPreReconAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('pre-recon', input);
}
export async function runReconAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('recon', input);
}
export async function runInjectionVulnAgent(input: ActivityInput): Promise<AgentMetrics> {
return runAgentActivity('injection-vuln', input);
}
// ... 10 more agent activities
src/temporal/activities.ts:199-249
Specialized Activities
Preflight Validation
// src/temporal/activities.ts:261-315
export async function runPreflightValidation(input: ActivityInput): Promise<void> {
const startTime = Date.now();
const attemptNumber = Context.current().info.attempt;
const heartbeatInterval = setInterval(() => {
const elapsed = Math.floor((Date.now() - startTime) / 1000);
heartbeat({ phase: 'preflight', elapsedSeconds: elapsed, attempt: attemptNumber });
}, HEARTBEAT_INTERVAL_MS);
try {
const logger = createActivityLogger();
logger.info('Running preflight validation...', { attempt: attemptNumber });
const result = await runPreflightChecks(input.repoPath, input.configPath, logger);
if (isErr(result)) {
const classified = classifyErrorForTemporal(result.error);
const message = truncateErrorMessage(result.error.message);
if (classified.retryable) {
throw ApplicationFailure.create({ message, type: classified.type });
} else {
throw ApplicationFailure.nonRetryable(message, classified.type);
}
}
logger.info('Preflight validation passed');
} finally {
clearInterval(heartbeatInterval);
}
}
src/temporal/activities.ts:261-315
Exploitation Queue Check
// src/temporal/activities.ts:354-367
export async function checkExploitationQueue(
input: ActivityInput,
vulnType: VulnType
): Promise<ExploitationDecision> {
const { repoPath, workflowId } = input;
const logger = createActivityLogger();
// Reuse container's service if available (from prior vuln agent runs)
const existingContainer = getContainer(workflowId);
const checker = existingContainer?.exploitationChecker ?? new ExploitationCheckerService();
return checker.checkQueue(vulnType, repoPath, logger);
}
src/temporal/activities.ts:354-367
Resume Mechanism
Shannon supports resuming interrupted workflows from named workspaces.Resume State Loading
// src/temporal/activities.ts:391-489
export async function loadResumeState(
workspaceName: string,
expectedUrl: string,
expectedRepoPath: string
): Promise<ResumeState> {
// 1. Validate workspace exists
const sessionPath = path.join('./audit-logs', workspaceName, 'session.json');
if (!(await fileExists(sessionPath))) {
throw ApplicationFailure.nonRetryable(
`Workspace not found: ${workspaceName}`,
'WorkspaceNotFoundError'
);
}
// 2. Parse session.json and validate URL match
const session = await readJson<SessionJson>(sessionPath);
if (session.session.webUrl !== expectedUrl) {
throw ApplicationFailure.nonRetryable(
`URL mismatch with workspace\n Workspace URL: ${session.session.webUrl}\n Provided URL: ${expectedUrl}`,
'URLMismatchError'
);
}
// 3. Cross-check agent status with deliverables on disk
const completedAgents: string[] = [];
for (const agentName of ALL_AGENTS) {
const agentData = session.metrics.agents[agentName];
if (!agentData || agentData.status !== 'success') continue;
const deliverableFilename = AGENTS[agentName].deliverableFilename;
const deliverablePath = `${expectedRepoPath}/deliverables/${deliverableFilename}`;
const deliverableExists = await fileExists(deliverablePath);
if (!deliverableExists) {
logger.warn(`Agent ${agentName} shows success but deliverable missing, will re-run`);
continue;
}
completedAgents.push(agentName);
}
// 4. Collect git checkpoints and find latest
const checkpoints = completedAgents
.map((name) => session.metrics.agents[name]?.checkpoint)
.filter((hash): hash is string => hash != null);
if (checkpoints.length === 0) {
throw ApplicationFailure.nonRetryable(
`Cannot resume workspace ${workspaceName}: No checkpoints found`,
'NoCheckpointsError'
);
}
const checkpointHash = await findLatestCommit(expectedRepoPath, checkpoints);
const originalWorkflowId = session.session.originalWorkflowId || session.session.id;
return {
workspaceName,
originalUrl: session.session.webUrl,
completedAgents,
checkpointHash,
originalWorkflowId,
};
}
src/temporal/activities.ts:391-489
Workspace Restoration
// src/temporal/activities.ts:518-552
export async function restoreGitCheckpoint(
repoPath: string,
checkpointHash: string,
incompleteAgents: AgentName[]
): Promise<void> {
const logger = createActivityLogger();
logger.info(`Restoring git workspace to ${checkpointHash}...`);
// Reset to checkpoint
await executeGitCommandWithRetry(
['git', 'reset', '--hard', checkpointHash],
repoPath,
'reset to checkpoint for resume'
);
// Clean untracked files
await executeGitCommandWithRetry(
['git', 'clean', '-fd'],
repoPath,
'clean untracked files for resume'
);
// Delete partial deliverables from incomplete agents
for (const agentName of incompleteAgents) {
const deliverableFilename = AGENTS[agentName].deliverableFilename;
const deliverablePath = `${repoPath}/deliverables/${deliverableFilename}`;
try {
if (await fileExists(deliverablePath)) {
logger.warn(`Cleaning partial deliverable: ${agentName}`);
await fs.unlink(deliverablePath);
}
} catch (error) {
logger.info(`Note: Failed to delete ${deliverablePath}: ${error}`);
}
}
logger.info('Workspace restored to clean state');
}
src/temporal/activities.ts:518-552
Resume Flow in Workflow
// src/temporal/workflows.ts:186-229
if (input.resumeFromWorkspace) {
// 1. Load resume state (validates workspace, cross-checks deliverables)
resumeState = await a.loadResumeState(
input.resumeFromWorkspace,
input.webUrl,
input.repoPath
);
// 2. Restore git workspace and clean up incomplete deliverables
const incompleteAgents = ALL_AGENTS.filter(
(agentName) => !resumeState!.completedAgents.includes(agentName)
) as AgentName[];
await a.restoreGitCheckpoint(
input.repoPath,
resumeState.checkpointHash,
incompleteAgents
);
// 3. Short-circuit if all agents already completed
if (resumeState.completedAgents.length === ALL_AGENTS.length) {
log.info(`All ${ALL_AGENTS.length} agents already completed. Nothing to resume.`);
state.status = 'completed';
state.completedAgents = [...resumeState.completedAgents];
return state;
}
// 4. Record this resume attempt in session.json and workflow.log
await a.recordResumeAttempt(
activityInput,
input.terminatedWorkflows || [],
resumeState.checkpointHash,
resumeState.originalWorkflowId,
resumeState.completedAgents
);
}
const shouldSkip = (agentName: string): boolean => {
return resumeState?.completedAgents.includes(agentName) ?? false;
};
src/temporal/workflows.ts:186-229
Queryable Progress
Workflows expose real-time progress through Temporal queries:// src/temporal/workflows.ts:161-165
setHandler(getProgress, (): PipelineProgress => ({
...state,
workflowId,
elapsedMs: Date.now() - state.startTime,
}));
// Query definition (src/temporal/shared.ts)
export const getProgress = defineQuery<PipelineProgress>('getProgress');
export interface PipelineProgress extends PipelineState {
workflowId: string;
elapsedMs: number;
}
export interface PipelineState {
status: 'running' | 'completed' | 'failed';
currentPhase: string | null;
currentAgent: string | null;
completedAgents: string[];
failedAgent: string | null;
error: string | null;
startTime: number;
agentMetrics: Record<string, AgentMetrics>;
summary: PipelineSummary | null;
}
# Query workflow progress
temporal workflow query \
--workflow-id=shannon-example-com-20240315-120000 \
--name=getProgress
Related Documentation
- Architecture Overview - System design patterns
- Core Modules - Service layer details
- MCP Integration - Tool infrastructure
- Audit System - Logging and metrics
