Skip to main content

Overview

The agent worker is a BullMQ-based job processor that runs headless Chromium browsers via Playwright to execute user prompts across AI providers (ChatGPT, Claude, Gemini, Perplexity, Google AI Overview). It extracts responses and sources, stores them in ClickHouse, and triggers analysis.

Tech Stack

  • Queue: BullMQ (Redis-backed)
  • Browser Automation: Playwright (Chromium)
  • Proxy Management: Custom rotating proxy pool with scoring
  • Stealth: Custom CDP-based stealth configuration
  • Language: TypeScript with ES modules

Architecture

Job Flow

┌─────────────┐
│  Web App    │ submits job group
└──────┬──────┘


┌─────────────────────────────────────────┐
│  Redis (Job Queue + Progress Tracking)  │
└──────┬──────────────────────────────────┘


┌─────────────────────────────────────────┐
│  Worker: One BullMQ worker per provider │
│  - chatgpt-worker                       │
│  - claude-worker                        │
│  - gemini-worker                        │
│  - perplexity-worker                    │
│  - ai-overview-worker                   │
└──────┬──────────────────────────────────┘


┌─────────────────────────────────────────┐
│  Agent Handler                          │
│  1. Fetch proxies from pool             │
│  2. Launch CDP browser (warm or cold)   │
│  3. Navigate to provider                │
│  4. Run prompts sequentially            │
│  5. Extract responses + sources         │
└──────┬──────────────────────────────────┘


┌─────────────────────────────────────────┐
│  Store Results                          │
│  - ClickHouse: prompt_responses         │
│  - Redis: Update progress               │
└──────┬──────────────────────────────────┘


┌─────────────────────────────────────────┐
│  Trigger Analysis (Background)          │
│  - Analyzes brand mentions              │
│  - Stores in prompt_analysis table      │
└─────────────────────────────────────────┘

Worker Implementation

Worker Initialization

Each provider gets its own BullMQ worker:
apps/agent/src/worker.ts
import { Worker } from "bullmq";
import { PROVIDER_LIST } from "@oneglanse/types";
import { getQueueName } from "@oneglanse/services";
import { handleJob } from "./worker/jobHandler.js";

export let workers: Worker[] = [];

async function startWorkers() {
  await waitForRedis();
  
  const workerConcurrency = env.AGENT_WORKER_CONCURRENCY || 1;
  
  const connection = {
    host: env.REDIS_HOST,
    port: env.REDIS_PORT,
    password: env.REDIS_PASSWORD,
  };
  
  workers = PROVIDER_LIST.map((provider) => {
    const w = new Worker(getQueueName(provider), handleJob, {
      connection,
      concurrency: workerConcurrency,
      lockDuration: 15 * 60 * 1000, // 15 minutes
      stalledInterval: 60 * 1000,
      maxStalledCount: 5,
    });
    
    w.on("active", (job) => plog.log("Job started", job.id));
    w.on("completed", (job) => plog.success("Job completed", job.id));
    w.on("failed", (job, err) => plog.error("Job failed", job?.id, err));
    
    return w;
  });
}
Reference: apps/agent/src/worker.ts:12-55

Job Handler

The job handler processes jobs for a single provider:
apps/agent/src/worker/jobHandler.ts
export async function handleJob(job: Job<ProviderJobData>): Promise<boolean> {
  const { provider, jobGroupId, prompts, user_id, workspace_id } = job.data;
  
  // Generate execution timestamp
  const executionTime = new Date().toISOString();
  
  const promptPayload: PromptPayload = {
    user_id,
    workspace_id,
    prompts: prompts.map(({ id, prompt }) => ({ id, prompt })),
    created_at: executionTime,
  };
  
  // Initialize progress tracking in Redis
  const progressKey = `job:${jobGroupId}:result`;
  await updateProgress(progressKey, provider, "running", null);
  
  let wrapped: AgentResult = { status: "rejected", data: [] };
  
  // Run the agent
  try {
    const { label, factory } = providerConfig[provider];
    const result = await agentHandler(
      label,
      factory,
      promptPayload,
      provider
    );
    wrapped = {
      status: result.length > 0 ? "fulfilled" : "rejected",
      data: result,
    };
  } catch (err) {
    plog.error(`failed:`, toErrorMessage(err));
  }
  
  // Store successful results
  if (wrapped.status === "fulfilled" && wrapped.data.length > 0) {
    await storePromptResponses({
      results: { [provider]: wrapped },
      userId: user_id,
      workspaceId: workspace_id,
      promptRunAt: executionTime,
    });
    
    // Trigger analysis asynchronously
    runAnalysisInBackground({
      workspaceId: workspace_id,
      userId: user_id,
      provider,
      jobGroupId,
    });
  }
  
  // Update final progress
  const finalStatus = wrapped.status === "fulfilled" ? "completed" : "failed";
  await updateProgress(progressKey, provider, finalStatus, wrapped.data.length);
  
  return true;
}
Reference: apps/agent/src/worker/jobHandler.ts:81-188

Playwright Browser Automation

CDP Browser Launch

The worker uses Chrome DevTools Protocol (CDP) for better control:
apps/agent/src/lib/browser/launch.ts
export async function launchContext(
  provider: Provider,
): Promise<{
  browser: Browser;
  context: BrowserContext;
  proxy: string | null;
  cleanup: () => Promise<void>;
}> {
  await cleanupStaleCdpDirs();
  
  // Get proxy from pool
  let proxy = getNextProxy();
  if (!proxy) {
    await fetchProxies({ forceRefresh: true });
    proxy = getNextProxy();
  }
  
  // Spawn Chrome with CDP
  const port = await getFreePort();
  const userDataDir = `/tmp/cdp-${provider}-${port}`;
  
  const chromeProcess = spawnChromiumCDP(port, userDataDir);
  const wsEndpoint = await waitForCDPEndpoint(port);
  const browser = await chromium.connectOverCDP(wsEndpoint);
  
  // Create stealth context
  const context = await browser.newContext({
    viewport: { width: 1920, height: 1080 },
    ...(proxy ? { proxy: { server: proxy } } : {}),
    ...STEALTH_CONTEXT_OPTIONS,
  });
  
  await context.addInitScript(STEALTH_INIT_SCRIPT);
  
  return { browser, context, proxy, cleanup };
}
Reference: apps/agent/src/lib/browser/launch.ts:44-134

Warm Browser Pool

To avoid repeated browser spawns, the agent maintains a warm pool:
apps/agent/src/core/agentHandler.ts
const warmFactory: AgentFactory = async () => {
  const warm = await getWarmBrowser(provider).catch(() => null);
  
  if (warm) {
    const config = PROVIDER_CONFIGS[provider];
    try {
      // Reuse existing browser, just navigate to clean slate
      await navigateWithRetry(warm.page, config.url, {
        waitUntil: "domcontentloaded",
        timeout: 30_000,
      });
      
      if (config.postNavigationHook) {
        await config.postNavigationHook(warm.page);
      }
      
      return {
        browser: warm.browser,
        context: warm.context,
        page: warm.page,
        proxy: warm.proxy ?? undefined,
        cleanup: warm.cleanup ?? undefined,
      };
    } catch {
      // Navigation failed, close and fall back to cold factory
      await warm.cleanup?.().catch(() => {});
    }
  }
  
  return agentFactory(); // Cold start
};
Reference: apps/agent/src/core/agentHandler.ts:31-58

Stealth Configuration

The agent uses stealth techniques to avoid detection:
export const STEALTH_CONTEXT_OPTIONS = {
  userAgent: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36...",
  locale: "en-US",
  timezoneId: "America/New_York",
  permissions: ["geolocation"],
  geolocation: { latitude: 40.7128, longitude: -74.0060 },
  colorScheme: "light" as const,
};

export const STEALTH_INIT_SCRIPT = `
  // Override navigator.webdriver
  Object.defineProperty(navigator, 'webdriver', { get: () => false });
  
  // Override chrome runtime
  window.chrome = { runtime: {} };
  
  // Override permissions
  const originalQuery = window.navigator.permissions.query;
  window.navigator.permissions.query = (parameters) => (
    parameters.name === 'notifications' ?
      Promise.resolve({ state: Notification.permission }) :
      originalQuery(parameters)
  );
`;

Provider Integrations

Provider Configuration

Each provider has a dedicated configuration:
apps/agent/src/core/providers/index.ts
export const PROVIDER_CONFIGS: Record<Provider, ProviderConfig> = {
  gemini: geminiConfig,
  chatgpt: chatgptConfig,
  perplexity: perplexityConfig,
  claude: claudeConfig,
  "ai-overview": aiOverviewConfig,
};
Reference: apps/agent/src/core/providers/index.ts:19-25

Provider Config Structure

Each provider config defines:
export interface ProviderConfig {
  label: string;                    // Display name
  url: string;                      // Provider URL
  requiresWarmup: boolean;          // Whether to warm up editor
  skip?: boolean;                   // Skip this provider
  postNavigationHook?: (page: Page) => Promise<void>;
  
  // Steps
  askPrompt: (page: Page, prompt: string) => Promise<void>;
  extractResponse: (page: Page) => Promise<string>;
  extractSources: (page: Page, response: string) => Promise<Source[]>;
}

Example: ChatGPT Provider

apps/agent/src/core/providers/chatgpt/index.ts
export const chatgptConfig: ProviderConfig = {
  label: "ChatGPT",
  url: "https://chatgpt.com",
  requiresWarmup: true,
  
  askPrompt: async (page, prompt) => {
    const editor = await findEditor(page);
    await editor.fill(prompt);
    const sendButton = await findSendButton(page);
    await sendButton.click();
    await waitForFinish(page);
  },
  
  extractResponse: async (page) => {
    const responseElement = await findResponseElement(page);
    return toMarkdown(await responseElement.innerHTML());
  },
  
  extractSources: extractChatGPTSources,
};

Proxy Management

Proxy Pool

The agent maintains a scored proxy pool:
apps/agent/src/lib/browser/proxy/pool.ts
export function getNextProxy(): string | null {
  if (proxyRecords.length === 0) return null;
  
  // Sort by score (descending), then by last used time (ascending)
  proxyRecords.sort((a, b) => {
    if (b.score !== a.score) return b.score - a.score;
    return a.lastUsedAt - b.lastUsedAt;
  });
  
  const record = proxyRecords[0];
  record.lastUsedAt = Date.now();
  return record.proxy;
}

export function recordProxyResult(
  proxy: string,
  success: boolean,
  reason?: string,
  provider?: Provider,
): void {
  const record = proxyRecords.find((r) => r.proxy === proxy);
  if (!record) return;
  
  if (success) {
    record.score = Math.min(100, record.score + 5);
    record.consecutiveFailures = 0;
  } else {
    record.score = Math.max(0, record.score - 10);
    record.consecutiveFailures++;
    
    if (record.consecutiveFailures >= 3) {
      // Remove proxy from pool
      proxyRecords = proxyRecords.filter((r) => r.proxy !== proxy);
    }
  }
}

Proxy Retry Logic

The agent retries with different proxies on failure:
export async function runWithProxyPool(
  label: string,
  factory: AgentFactory,
  payload: PromptPayload,
  provider: Provider,
  refreshProxies: () => Promise<void>,
): Promise<AskPromptResult[]> {
  const maxAttempts = 3;
  let lastError: unknown;
  
  for (let attempt = 1; attempt <= maxAttempts; attempt++) {
    try {
      const { browser, context, page, proxy, cleanup } = await factory();
      
      try {
        const results = await runAgents(payload, page, provider);
        
        if (proxy) {
          recordProxyResult(proxy, true, undefined, provider);
        }
        
        return results;
      } finally {
        await cleanup?.();
      }
    } catch (err) {
      lastError = err;
      
      if (attempt < maxAttempts) {
        await refreshProxies();
      }
    }
  }
  
  throw lastError;
}

Job Processing Flow

1. Submit Job Group

The web app submits a job group with one job per enabled provider:
packages/services/src/agent/jobs.ts
export async function submitAgentJobGroup(args: {
  workspaceId: string;
  userId: string;
}): Promise<SubmitAgentJobResult> {
  const { workspaceId, userId } = args;
  
  const prompts = await fetchUserPromptsForWorkspace({ workspaceId });
  if (!prompts || prompts.length === 0) {
    return { status: "empty" };
  }
  
  const jobGroupId = randomUUID();
  const workspace = await getWorkspaceById({ workspaceId });
  const enabledProviders = JSON.parse(workspace.enabledProviders) as Provider[];
  
  // Initialize progress in Redis
  const progress = {
    status: "pending",
    updateId: 0,
    providers: Object.fromEntries(enabledProviders.map(p => [p, "pending"])),
    results: Object.fromEntries(enabledProviders.map(p => [p, 0])),
    stats: {
      totalPrompts: prompts.length,
      expectedResponses: prompts.length * enabledProviders.length,
      actualResponses: 0,
    },
  };
  
  await redis.set(
    `job:${jobGroupId}:result`,
    JSON.stringify(progress),
    "EX",
    60 * 60
  );
  
  // Submit one job per provider
  await Promise.all(
    enabledProviders.map((provider) =>
      getProviderQueue(provider).add("run-agent", {
        jobGroupId,
        provider,
        prompts,
        user_id: userId,
        workspace_id: workspaceId,
      })
    )
  );
  
  return { status: "queued", jobGroupId };
}
Reference: packages/services/src/agent/jobs.ts:18-72

2. Process Job

The worker picks up the job and runs the agent:
1. Update Redis progress to "running"
2. Launch CDP browser with proxy
3. Navigate to provider URL
4. For each prompt:
   a. Input prompt into editor
   b. Wait for response to complete
   c. Extract response text (markdown)
   d. Extract sources (title, URL, citation)
5. Store results in ClickHouse
6. Update Redis progress to "completed"
7. Trigger analysis in background

3. Store Results

Results are stored in ClickHouse:
await clickhouse.insert({
  table: "analytics.prompt_responses",
  values: responses.map(r => ({
    id: randomUUID(),
    prompt_id: r.promptId,
    prompt: r.prompt,
    user_id: userId,
    workspace_id: workspaceId,
    model: r.model,
    model_provider: provider,
    response: r.response,
    sources: r.sources,
    is_analysed: false,
    prompt_run_at: executionTime,
    created_at: new Date(),
  })),
  format: "JSONEachRow",
});

Adding New Providers

To add a new AI provider:

1. Create Provider Config

Create apps/agent/src/core/providers/myprovider/index.ts:
import type { ProviderConfig } from "../types.js";
import { extractMyProviderSources } from "./lib/extractSources.js";

export const myProviderConfig: ProviderConfig = {
  label: "MyProvider",
  url: "https://myprovider.com/chat",
  requiresWarmup: false,
  
  askPrompt: async (page, prompt) => {
    // Find input element
    const input = await page.locator('[data-testid="chat-input"]');
    await input.fill(prompt);
    
    // Submit
    const sendBtn = await page.locator('button[type="submit"]');
    await sendBtn.click();
    
    // Wait for response
    await page.waitForSelector('.response-complete');
  },
  
  extractResponse: async (page) => {
    const responseEl = await page.locator('.response-content').last();
    const html = await responseEl.innerHTML();
    return toMarkdown(html);
  },
  
  extractSources: extractMyProviderSources,
};

2. Register Provider

Add to apps/agent/src/core/providers/index.ts:
import { myProviderConfig } from "./myprovider/index.js";

export const PROVIDER_CONFIGS: Record<Provider, ProviderConfig> = {
  // ... existing providers
  myprovider: myProviderConfig,
};

3. Add to Type Definitions

Update packages/types/src/provider.ts:
export const PROVIDER_LIST = [
  "chatgpt",
  "claude",
  "gemini",
  "perplexity",
  "ai-overview",
  "myprovider", // Add here
] as const;

export type Provider = (typeof PROVIDER_LIST)[number];

4. Update Database

Add the provider to default enabled providers in the schema:
packages/db/src/schema/workspace.ts
const DEFAULT_PROVIDERS_JSON =
  '["chatgpt","claude","perplexity","gemini","ai-overview","myprovider"]';

Environment Variables

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=

# Worker concurrency
AGENT_WORKER_CONCURRENCY=1

# ClickHouse
CLICKHOUSE_URL=http://localhost:8123
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=
CLICKHOUSE_DB=analytics

# Proxy (optional)
PROXY_API_URL=https://proxy-api.example.com

Development Commands

# Run worker locally
pnpm dev

# Build worker
pnpm build

# Type checking
pnpm typecheck

Graceful Shutdown

The worker handles SIGTERM/SIGINT gracefully:
apps/agent/src/index.ts
const shutdown = async (signal: string) => {
  logger.log(`Received ${signal}. Starting graceful shutdown...`);
  
  // 1. Close warm browser pool
  await closeAllWarm();
  
  // 2. Close workers (wait for current jobs to finish)
  await Promise.all(workers.map((w) => w.close()));
  
  // 3. Close Redis connection
  await redis.quit();
  
  process.exit(0);
};

process.on("SIGTERM", () => void shutdown("SIGTERM"));
process.on("SIGINT", () => void shutdown("SIGINT"));
Reference: apps/agent/src/index.ts:7-48

Build docs developers (and LLMs) love