Skip to main content

Overview

The @oneglanse/services package contains business logic and data access functions used by both the web app and agent worker. It provides a shared, type-safe interface for interacting with PostgreSQL, ClickHouse, Redis, and external APIs.

Purpose

The services layer serves as:
  1. Shared Business Logic - Reusable functions across web and agent
  2. Data Access Abstraction - Centralized database queries
  3. Type Safety - Enforced input/output types with Zod
  4. Error Handling - Consistent error patterns
  5. External Integrations - LLM APIs, Redis, queue management

Package Structure

packages/services/src/
├── agent/
│   ├── jobs.ts          # Job submission and management
│   ├── queue.ts         # BullMQ queue configuration
│   └── redis.ts         # Redis client for agents
├── analysis/
│   ├── runAnalysis.ts   # LLM-based brand analysis
│   ├── fetchAnalysedPrompts.ts
│   └── resetWorkspaceAnalysis.ts
├── prompt/
│   ├── fetchUserPromptsForWorkspace.ts
│   ├── fetchPromptResponsesForWorkspace.ts
│   ├── fetchPromptSourcesForWorkspace.ts
│   └── scheduler.ts     # Cron scheduling for prompts
├── workspace/
│   ├── create.ts        # Workspace creation
│   ├── query.ts         # Workspace queries
│   ├── members.ts       # Member management
│   └── settings.ts      # Workspace settings
├── llm/
│   └── index.ts         # ChatGPT client
└── index.ts             # Public exports

Key Services

Agent Services

Submit Agent Job Group

Submits a job to run prompts across all enabled providers:
packages/services/src/agent/jobs.ts
import { submitAgentJobGroup } from "@oneglanse/services";

const result = await submitAgentJobGroup({
  workspaceId: "workspace_123",
  userId: "user_456",
});

if (result.status === "queued") {
  console.log("Job group ID:", result.jobGroupId);
}
Implementation:
packages/services/src/agent/jobs.ts
export async function submitAgentJobGroup(args: {
  workspaceId: string;
  userId: string;
}): Promise<SubmitAgentJobResult> {
  const { workspaceId, userId } = args;
  
  // Fetch workspace prompts
  const prompts = await fetchUserPromptsForWorkspace({ workspaceId });
  if (!prompts || prompts.length === 0) {
    return { status: "empty" };
  }
  
  const jobGroupId = randomUUID();
  const workspace = await getWorkspaceById({ workspaceId });
  const enabledProviders = JSON.parse(
    workspace.enabledProviders ?? ALL_PROVIDERS_JSON
  ) as Provider[];
  
  // Initialize progress tracking
  const progress = {
    status: "pending" as const,
    updateId: 0,
    providers: Object.fromEntries(
      enabledProviders.map((p) => [p, "pending"])
    ),
    results: Object.fromEntries(
      enabledProviders.map((p) => [p, 0])
    ),
    stats: {
      totalPrompts: prompts.length,
      expectedResponses: prompts.length * enabledProviders.length,
      actualResponses: 0,
    },
  };
  
  await redis.set(
    `job:${jobGroupId}:result`,
    JSON.stringify(progress),
    "EX",
    60 * 60
  );
  
  // Submit one job per provider
  await Promise.all(
    enabledProviders.map((provider) =>
      getProviderQueue(provider).add("run-agent", {
        jobGroupId,
        provider,
        prompts,
        user_id: userId,
        workspace_id: workspaceId,
      })
    )
  );
  
  return { status: "queued", jobGroupId };
}
Reference: packages/services/src/agent/jobs.ts:18-72

Queue Management

Each provider has a dedicated BullMQ queue:
packages/services/src/agent/queue.ts
import { getProviderQueue } from "@oneglanse/services";

const chatgptQueue = getProviderQueue("chatgpt");
const claudeQueue = getProviderQueue("claude");

// Add job to queue
await chatgptQueue.add("run-agent", {
  jobGroupId: "job_123",
  provider: "chatgpt",
  prompts: [...],
  user_id: "user_456",
  workspace_id: "workspace_789",
});
Implementation:
packages/services/src/agent/queue.ts
import { Queue } from "bullmq";
import type { Provider } from "@oneglanse/types";

const connection = {
  host: env.REDIS_HOST,
  port: env.REDIS_PORT,
  password: env.REDIS_PASSWORD,
};

const queues = new Map<Provider, Queue>();

export function getQueueName(provider: Provider): string {
  return `oneglanse-agent-${provider}`;
}

export function getProviderQueue(provider: Provider): Queue {
  let q = queues.get(provider);
  if (!q) {
    q = new Queue(getQueueName(provider), {
      connection,
      defaultJobOptions: {
        attempts: 1,
        removeOnComplete: true,
        removeOnFail: false,
      },
    });
    queues.set(provider, q);
  }
  return q;
}
Reference: packages/services/src/agent/queue.ts:5-33

Analysis Services

Run Analysis

Analyzes a prompt response for brand mentions using GPT-4:
packages/services/src/analysis/runAnalysis.ts
import { runAnalysis } from "@oneglanse/services";

const result = await runAnalysis({
  prompt: "Best CRM software for startups",
  response: "HubSpot is a great choice...",
  brand_name: "HubSpot",
  brand_domain: "hubspot.com",
  model_provider: "chatgpt",
});

console.log(result.presence);        // true/false
console.log(result.rank);            // 1-10 or null
console.log(result.sentiment_score); // -1 to 1
console.log(result.competitors);     // ["Salesforce", ...]
Implementation:
packages/services/src/analysis/runAnalysis.ts
import { chatgpt } from "../llm/index.js";
import { analysisPrompt } from "./analysisPrompt.js";

export async function runAnalysis(
  input: AnalysisInputSingle,
): Promise<BrandAnalysisResult> {
  const prompt = analysisPrompt(input);
  
  const systemPrompt =
    "You are an expert brand intelligence analyst. " +
    "You respond ONLY with valid JSON — no markdown, no code fences. " +
    "Be precise, evidence-based, and conservative in your scoring.";
  
  let response;
  try {
    response = await chatgpt.responses.create({
      model: "gpt-4.1",
      temperature: 0,
      input: [
        { role: "system", content: systemPrompt },
        { role: "user", content: prompt },
      ],
      text: {
        format: { type: "json_object" },
      },
    });
  } catch (err) {
    throw new ExternalServiceError(
      "ChatGPT",
      "Failed to analyze response.",
      502,
      { responseLength: input.response.length },
      err
    );
  }
  
  const text = response.output_text?.trim() || "";
  const parsed = JSON.parse(text);
  
  return parsed as BrandAnalysisResult;
}
Reference: packages/services/src/analysis/runAnalysis.ts:6-64

Fetch Analysed Prompts

Retrieves analyzed prompt responses for a workspace:
import { fetchAnalysedPromptsForWorkspace } from "@oneglanse/services";

const prompts = await fetchAnalysedPromptsForWorkspace({
  workspaceId: "workspace_123",
});

// Returns array of AnalysisRecord with joined data
prompts.forEach(p => {
  console.log(p.prompt);              // Original prompt
  console.log(p.response);            // AI response
  console.log(p.brand_analysis);      // Parsed analysis JSON
  console.log(p.model_provider);      // chatgpt, claude, etc.
});

Prompt Services

Fetch User Prompts

Retrieves all prompts for a workspace:
packages/services/src/prompt/fetchUserPromptsForWorkspace.ts
import { fetchUserPromptsForWorkspace } from "@oneglanse/services";

const prompts = await fetchUserPromptsForWorkspace({
  workspaceId: "workspace_123",
});

// Returns: [{ id: "...", prompt: "...", created_at: ... }, ...]
Implementation:
export async function fetchUserPromptsForWorkspace(
  args: FetchUserPromptsForWorkspaceArgs,
): Promise<UserPrompt[]> {
  const { workspaceId } = args;
  
  const result = await clickhouse.query({
    query: `
      SELECT DISTINCT
        id,
        prompt,
        created_at
      FROM analytics.user_prompts
      WHERE workspace_id = {workspaceId:String}
      ORDER BY created_at DESC
    `,
    query_params: { workspaceId },
    format: "JSONEachRow",
  });
  
  return (await result.json()) as UserPrompt[];
}

Fetch Prompt Responses

Retrieves all prompt responses for a workspace:
packages/services/src/prompt/fetchPromptResponsesForWorkspace.ts
import { fetchPromptResponsesForWorkspace } from "@oneglanse/services";

const responses = await fetchPromptResponsesForWorkspace({
  workspaceId: "workspace_123",
});

responses.forEach(r => {
  console.log(r.prompt);          // Original prompt
  console.log(r.response);        // AI response
  console.log(r.model_provider);  // chatgpt, claude, etc.
  console.log(r.sources);         // Extracted sources
});
Implementation:
packages/services/src/prompt/fetchPromptResponsesForWorkspace.ts
export async function fetchPromptResponsesForWorkspace(
  args: FetchPromptResponsesForWorkspaceArgs,
): Promise<PromptResponse[]> {
  const { workspaceId } = args;
  
  const result = await clickhouse.query({
    query: `
      SELECT *
      FROM analytics.prompt_responses
      WHERE workspace_id = {workspaceId:String}
    `,
    query_params: { workspaceId },
    format: "JSONEachRow",
  });
  
  return (await result.json()) as PromptResponse[];
}
Reference: packages/services/src/prompt/fetchPromptResponsesForWorkspace.ts:7-24

Fetch Prompt Sources

Retrieves aggregated source citations:
import { fetchPromptSourcesForWorkspace } from "@oneglanse/services";

const sources = await fetchPromptSourcesForWorkspace({
  workspaceId: "workspace_123",
});

// Returns sources with citation counts
sources.forEach(s => {
  console.log(s.domain);      // "hubspot.com"
  console.log(s.count);       // 5 (times cited)
  console.log(s.title);       // Latest article title
});

Workspace Services

Get Workspace by ID

Retrieves a single workspace:
packages/services/src/workspace/query.ts
import { getWorkspaceById } from "@oneglanse/services";

const workspace = await getWorkspaceById({
  workspaceId: "workspace_123",
});

console.log(workspace.name);             // "Acme Corp"
console.log(workspace.domain);           // "acme.com"
console.log(workspace.enabledProviders); // '["chatgpt","claude"]'
console.log(workspace.schedule);         // "0 9 * * *"
Implementation:
packages/services/src/workspace/query.ts
import { db, schema } from "@oneglanse/db";
import { NotFoundError } from "@oneglanse/errors";
import { and, eq, isNull } from "drizzle-orm";

export async function getWorkspaceById(
  args: GetWorkspaceByIdArgs,
): Promise<Workspace> {
  const { workspaceId } = args;
  
  if (!workspaceId || workspaceId.trim() === "") {
    throw new ValidationError("Workspace ID is undefined.");
  }
  
  const [workspace] = await db
    .select()
    .from(schema.workspaces)
    .where(
      and(
        eq(schema.workspaces.id, workspaceId),
        isNull(schema.workspaces.deletedAt)
      )
    )
    .execute();
  
  if (!workspace) {
    throw new NotFoundError(`Workspace with ID ${workspaceId} not found.`);
  }
  
  return workspace;
}
Reference: packages/services/src/workspace/query.ts:19-44

Get Workspaces for User

Retrieves all workspaces a user has access to:
import { getWorkspacesForUser } from "@oneglanse/services";

const workspaces = await getWorkspacesForUser({
  tenantId: "org_123",
  userId: "user_456",
});

// Returns array of workspaces within the organization
Implementation:
packages/services/src/workspace/query.ts
export async function getWorkspacesForUser(
  args: GetWorkspacesForUserArgs,
): Promise<Workspace[]> {
  const { tenantId, userId } = args;
  
  const workspaces = await db
    .select({
      id: schema.workspaces.id,
      name: schema.workspaces.name,
      slug: schema.workspaces.slug,
      domain: schema.workspaces.domain,
      tenantId: schema.workspaces.tenantId,
      schedule: schema.workspaces.schedule,
      enabledProviders: schema.workspaces.enabledProviders,
      createdAt: schema.workspaces.createdAt,
      deletedAt: schema.workspaces.deletedAt,
    })
    .from(schema.workspaces)
    .innerJoin(
      schema.workspaceMembers,
      and(
        eq(schema.workspaceMembers.workspaceId, schema.workspaces.id),
        eq(schema.workspaceMembers.userId, userId),
        isNull(schema.workspaceMembers.deletedAt)
      )
    )
    .where(
      and(
        eq(schema.workspaces.tenantId, tenantId),
        isNull(schema.workspaces.deletedAt)
      )
    )
    .execute();
  
  return workspaces;
}
Reference: packages/services/src/workspace/query.ts:46-85

How Apps Use Services

Web App (tRPC)

The web app calls services through tRPC routers:
apps/web/src/server/api/routers/prompt/prompt.ts
import {
  fetchUserPromptsForWorkspace,
  storePromptsForWorkspace,
} from "@oneglanse/services";

export const promptRouter = createTRPCRouter({
  store: authorizedWorkspaceProcedure
    .input(
      z.object({
        prompts: z.array(z.string().min(1).max(500)).min(1).max(100),
      })
    )
    .mutation(async ({ input, ctx }) => {
      const { prompts } = input;
      const { user: { id: userId }, workspaceId } = ctx;
      
      // Call service function
      return storePromptsForWorkspace({
        prompts,
        workspaceId,
        userId,
      });
    }),
  
  fetchUserPrompts: authorizedWorkspaceProcedure
    .query(async ({ ctx }) => {
      // Call service function
      return fetchUserPromptsForWorkspace({
        workspaceId: ctx.workspaceId,
      });
    }),
});
Reference: apps/web/src/server/api/routers/prompt/prompt.ts:14-54

Agent Worker

The agent worker calls services directly:
apps/agent/src/worker/jobHandler.ts
import {
  storePromptResponses,
  getWorkspaceById,
} from "@oneglanse/services";

export async function handleJob(job: Job<ProviderJobData>) {
  const { provider, workspaceId, userId } = job.data;
  
  // Fetch workspace settings
  const workspace = await getWorkspaceById({ workspaceId });
  
  // ... run agent ...
  
  // Store results
  await storePromptResponses({
    results: { [provider]: wrapped },
    userId,
    workspaceId,
    promptRunAt: executionTime,
  });
}

Error Handling

Services use typed errors from @oneglanse/errors:
import {
  ValidationError,
  NotFoundError,
  ExternalServiceError,
} from "@oneglanse/errors";

// Validation error
if (!workspaceId) {
  throw new ValidationError("Workspace ID is required");
}

// Not found error
if (!workspace) {
  throw new NotFoundError(`Workspace ${workspaceId} not found`);
}

// External service error
try {
  const result = await chatgpt.responses.create(...);
} catch (err) {
  throw new ExternalServiceError(
    "ChatGPT",
    "Failed to analyze response",
    502,
    { responseLength: input.length },
    err
  );
}

Type Safety

All services have strict input/output types:
import type {
  GetWorkspaceByIdArgs,
  Workspace,
  AnalysisInputSingle,
  BrandAnalysisResult,
} from "@oneglanse/types";

export async function getWorkspaceById(
  args: GetWorkspaceByIdArgs
): Promise<Workspace> {
  // ...
}

export async function runAnalysis(
  input: AnalysisInputSingle
): Promise<BrandAnalysisResult> {
  // ...
}

Testing Services

Services can be tested independently:
import { getWorkspaceById } from "@oneglanse/services";
import { expect, test } from "vitest";

test("getWorkspaceById returns workspace", async () => {
  const workspace = await getWorkspaceById({
    workspaceId: "test_workspace",
  });
  
  expect(workspace).toBeDefined();
  expect(workspace.id).toBe("test_workspace");
});

test("getWorkspaceById throws on invalid ID", async () => {
  await expect(
    getWorkspaceById({ workspaceId: "" })
  ).rejects.toThrow(ValidationError);
});

Development Commands

# Type checking
pnpm typecheck

# Build services package
pnpm build

# Lint
pnpm lint

Build docs developers (and LLMs) love