Skip to main content

Overview

Adding support for a new database to DBHub involves implementing two interfaces:
  1. DSNParser: Parses connection strings into database-specific configuration
  2. Connector: Implements database operations (connect, query, schema discovery)
This guide walks through creating a new connector from scratch.

Connector Interface

All connectors must implement the Connector interface. Location: src/connectors/interface.ts:106-205
export interface Connector {
  /** Unique identifier (e.g., "postgres", "mysql") */
  id: ConnectorType;

  /** Human-readable name */
  name: string;

  /** DSN parser for this connector */
  dsnParser: DSNParser;

  /** Get the source ID for this connector instance */
  getId(): string;

  /** Create a new instance (for multi-source support) */
  clone(): Connector;

  /** Connect to database */
  connect(dsn: string, initScript?: string, config?: ConnectorConfig): Promise<void>;

  /** Close connection */
  disconnect(): Promise<void>;

  /** Get all schemas */
  getSchemas(): Promise<string[]>;

  /** Get tables in a schema */
  getTables(schema?: string): Promise<string[]>;

  /** Get table column information */
  getTableSchema(tableName: string, schema?: string): Promise<TableColumn[]>;

  /** Check if table exists */
  tableExists(tableName: string, schema?: string): Promise<boolean>;

  /** Get table indexes */
  getTableIndexes(tableName: string, schema?: string): Promise<TableIndex[]>;

  /** Get stored procedures/functions */
  getStoredProcedures(schema?: string, routineType?: "procedure" | "function"): Promise<string[]>;

  /** Get stored procedure details */
  getStoredProcedureDetail(procedureName: string, schema?: string): Promise<StoredProcedure>;

  /** Get table row count (optional - uses statistics) */
  getTableRowCount?(tableName: string, schema?: string): Promise<number | null>;

  /** Get table comment (optional) */
  getTableComment?(tableName: string, schema?: string): Promise<string | null>;

  /** Execute SQL query */
  executeSQL(sql: string, options: ExecuteOptions, parameters?: any[]): Promise<SQLResult>;
}

DSNParser Interface

Location: src/connectors/interface.ts:82-104
export interface DSNParser {
  /**
   * Parse a connection string into connector-specific configuration
   * @param dsn - Database connection string
   * @param config - Optional database-specific configuration options
   */
  parse(dsn: string, config?: ConnectorConfig): Promise<any>;

  /** Generate a sample DSN string for documentation */
  getSampleDSN(): string;

  /** Validate DSN format */
  isValidDSN(dsn: string): boolean;
}

Step-by-Step: Building a Connector

Step 1: Create Directory Structure

mkdir -p src/connectors/mydb
touch src/connectors/mydb/index.ts

Step 2: Implement DSNParser

// src/connectors/mydb/index.ts
import { DSNParser, ConnectorConfig } from "../interface.js";
import { SafeURL } from "../../utils/safe-url.js";
import { obfuscateDSNPassword } from "../../utils/dsn-obfuscate.js";

class MyDBDSNParser implements DSNParser {
  async parse(dsn: string, config?: ConnectorConfig): Promise<any> {
    // Validate DSN format
    if (!this.isValidDSN(dsn)) {
      const obfuscatedDSN = obfuscateDSNPassword(dsn);
      const expectedFormat = this.getSampleDSN();
      throw new Error(
        `Invalid MyDB DSN format.\nProvided: ${obfuscatedDSN}\nExpected: ${expectedFormat}`
      );
    }

    // Parse DSN using SafeURL (handles special characters)
    const url = new SafeURL(dsn);

    const dbConfig = {
      host: url.hostname,
      port: url.port ? parseInt(url.port) : 5000,
      database: url.pathname ? url.pathname.substring(1) : '',
      user: url.username,
      password: url.password,
    };

    // Handle query parameters
    url.forEachSearchParam((value, key) => {
      if (key === "sslmode") {
        dbConfig.ssl = value !== "disable";
      }
      // Add other parameters as needed
    });

    // Apply connection timeout if specified
    if (config?.connectionTimeoutSeconds) {
      dbConfig.connectionTimeout = config.connectionTimeoutSeconds * 1000;
    }

    // Apply query timeout if specified
    if (config?.queryTimeoutSeconds) {
      dbConfig.queryTimeout = config.queryTimeoutSeconds * 1000;
    }

    return dbConfig;
  }

  getSampleDSN(): string {
    return "mydb://user:password@localhost:5000/dbname?sslmode=require";
  }

  isValidDSN(dsn: string): boolean {
    return dsn.startsWith('mydb://');
  }
}

Step 3: Implement Connector Class

import { Connector, ConnectorType, SQLResult, TableColumn, TableIndex, StoredProcedure, ExecuteOptions } from "../interface.js";
import MyDBClient from "mydb-client"; // Your database client library

export class MyDBConnector implements Connector {
  id: ConnectorType = "mydb";
  name = "MyDB";
  dsnParser = new MyDBDSNParser();

  private client: MyDBClient | null = null;
  private sourceId: string = "default";

  getId(): string {
    return this.sourceId;
  }

  clone(): Connector {
    return new MyDBConnector();
  }

  async connect(dsn: string, initScript?: string, config?: ConnectorConfig): Promise<void> {
    try {
      const dbConfig = await this.dsnParser.parse(dsn, config);

      // Apply read-only mode if configured
      if (config?.readonly) {
        dbConfig.readOnly = true;
      }

      this.client = new MyDBClient(dbConfig);
      await this.client.connect();

      // Run init script if provided
      if (initScript) {
        await this.client.query(initScript);
      }

      // Test connection
      await this.client.query('SELECT 1');
    } catch (err) {
      console.error("Failed to connect to MyDB:", err);
      throw err;
    }
  }

  async disconnect(): Promise<void> {
    if (this.client) {
      await this.client.close();
      this.client = null;
    }
  }

  async executeSQL(sql: string, options: ExecuteOptions, parameters?: any[]): Promise<SQLResult> {
    if (!this.client) {
      throw new Error("Not connected to database");
    }

    try {
      // Apply row limit if specified
      let query = sql;
      if (options.maxRows) {
        query = `${sql} LIMIT ${options.maxRows}`;
      }

      const result = await this.client.query(query, parameters);

      return {
        rows: result.rows,
        rowCount: result.rowCount,
      };
    } catch (error) {
      throw new Error(`Query failed: ${(error as Error).message}`);
    }
  }

  async getSchemas(): Promise<string[]> {
    const result = await this.executeSQL(
      'SELECT schema_name FROM information_schema.schemata ORDER BY schema_name',
      {}
    );
    return result.rows.map(row => row.schema_name);
  }

  async getTables(schema?: string): Promise<string[]> {
    const schemaToUse = schema || 'public';
    const result = await this.executeSQL(
      `SELECT table_name FROM information_schema.tables 
       WHERE table_schema = $1 ORDER BY table_name`,
      {},
      [schemaToUse]
    );
    return result.rows.map(row => row.table_name);
  }

  async getTableSchema(tableName: string, schema?: string): Promise<TableColumn[]> {
    const schemaToUse = schema || 'public';
    const result = await this.executeSQL(
      `SELECT 
         column_name,
         data_type,
         is_nullable,
         column_default
       FROM information_schema.columns
       WHERE table_schema = $1 AND table_name = $2
       ORDER BY ordinal_position`,
      {},
      [schemaToUse, tableName]
    );

    return result.rows.map(row => ({
      column_name: row.column_name,
      data_type: row.data_type,
      is_nullable: row.is_nullable,
      column_default: row.column_default,
      description: null,
    }));
  }

  async tableExists(tableName: string, schema?: string): Promise<boolean> {
    const schemaToUse = schema || 'public';
    const result = await this.executeSQL(
      `SELECT EXISTS (
         SELECT 1 FROM information_schema.tables
         WHERE table_schema = $1 AND table_name = $2
       ) as exists`,
      {},
      [schemaToUse, tableName]
    );
    return result.rows[0]?.exists === true;
  }

  async getTableIndexes(tableName: string, schema?: string): Promise<TableIndex[]> {
    // Implementation depends on your database's system catalog
    // This is a simplified example
    const schemaToUse = schema || 'public';
    const result = await this.executeSQL(
      `SELECT 
         index_name,
         column_names,
         is_unique,
         is_primary
       FROM mydb_indexes
       WHERE schema_name = $1 AND table_name = $2`,
      {},
      [schemaToUse, tableName]
    );

    return result.rows.map(row => ({
      index_name: row.index_name,
      column_names: row.column_names,
      is_unique: row.is_unique,
      is_primary: row.is_primary,
    }));
  }

  async getStoredProcedures(schema?: string, routineType?: "procedure" | "function"): Promise<string[]> {
    // If your database doesn't support stored procedures, return empty array
    return [];
  }

  async getStoredProcedureDetail(procedureName: string, schema?: string): Promise<StoredProcedure> {
    throw new Error("Stored procedures not supported by MyDB");
  }

  // Optional: Implement for better performance
  async getTableRowCount(tableName: string, schema?: string): Promise<number | null> {
    try {
      const schemaToUse = schema || 'public';
      const result = await this.executeSQL(
        `SELECT row_count FROM mydb_statistics 
         WHERE schema_name = $1 AND table_name = $2`,
        {},
        [schemaToUse, tableName]
      );
      return result.rows[0]?.row_count || null;
    } catch {
      return null; // Fallback to COUNT(*) in search-objects
    }
  }

  // Optional: Implement if database supports table comments
  async getTableComment(tableName: string, schema?: string): Promise<string | null> {
    try {
      const schemaToUse = schema || 'public';
      const result = await this.executeSQL(
        `SELECT comment FROM mydb_table_comments 
         WHERE schema_name = $1 AND table_name = $2`,
        {},
        [schemaToUse, tableName]
      );
      return result.rows[0]?.comment || null;
    } catch {
      return null;
    }
  }
}

Step 4: Register the Connector

Connectors are auto-registered when imported.
// src/connectors/mydb/index.ts (add at bottom)
import { ConnectorRegistry } from "../interface.js";

// Auto-register on import
ConnectorRegistry.register(new MyDBConnector());
Then import in the main connector index:
// src/connectors/index.ts
import "./postgres/index.js";
import "./mysql/index.js";
import "./mydb/index.js"; // Add your connector

Step 5: Update Type Definitions

Add your connector type to the ConnectorType union:
// src/connectors/interface.ts
export type ConnectorType = 
  | "postgres" 
  | "mysql" 
  | "mariadb" 
  | "sqlite" 
  | "sqlserver"
  | "mydb"; // Add your type

DSN Format Design

Best Practices

  1. Follow URI Standards: Use scheme://user:password@host:port/database?params format
  2. Unique Scheme: Choose a scheme that doesn’t conflict (e.g., mydb://, not http://)
  3. Sensible Defaults: Default port, SSL mode, etc.
  4. Query Parameters: Support SSL, timeouts, and database-specific options

Example DSN Formats

// Basic connection
mydb://user:password@localhost:5000/dbname

// With SSL
mydb://user:password@localhost:5000/dbname?sslmode=require

// With custom parameters
mydb://user:password@localhost:5000/dbname?sslmode=require&pool_size=10

// Local file (like SQLite)
mydb:///path/to/database.db
mydb:///:memory:

Writing Integration Tests

Create Test File

touch src/connectors/__tests__/mydb.integration.test.ts

Implement Test Using IntegrationTestBase

import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { GenericContainer, StartedTestContainer } from 'testcontainers';
import { MyDBConnector } from '../mydb/index.js';
import { IntegrationTestBase, type TestContainer, type DatabaseTestConfig } from './shared/integration-test-base.js';
import type { Connector } from '../interface.js';

class MyDBTestContainer implements TestContainer {
  constructor(private container: StartedTestContainer) {}
  
  getConnectionUri(): string {
    const host = this.container.getHost();
    const port = this.container.getMappedPort(5000);
    return `mydb://testuser:testpass@${host}:${port}/testdb`;
  }
  
  async stop(): Promise<void> {
    await this.container.stop();
  }
}

class MyDBIntegrationTest extends IntegrationTestBase<MyDBTestContainer> {
  constructor() {
    const config: DatabaseTestConfig = {
      expectedSchemas: ['public', 'test_schema'],
      expectedTables: ['users', 'orders'],
      expectedTestSchemaTable: 'products',
      testSchema: 'test_schema',
      supportsStoredProcedures: false,
      supportsComments: true,
    };
    super(config);
  }

  async createContainer(): Promise<MyDBTestContainer> {
    const container = await new GenericContainer('mydb:latest')
      .withExposedPorts(5000)
      .withEnvironment({
        MYDB_USER: 'testuser',
        MYDB_PASSWORD: 'testpass',
        MYDB_DATABASE: 'testdb',
      })
      .start();
    
    return new MyDBTestContainer(container);
  }

  createConnector(): Connector {
    return new MyDBConnector();
  }

  async setupTestData(connector: Connector): Promise<void> {
    // Create test schema
    await connector.executeSQL('CREATE SCHEMA test_schema', {});
    
    // Create users table
    await connector.executeSQL(`
      CREATE TABLE users (
        id INTEGER PRIMARY KEY,
        name VARCHAR(100),
        email VARCHAR(100) UNIQUE
      )
    `, {});

    // Insert test data
    await connector.executeSQL(`
      INSERT INTO users (id, name, email) VALUES
      (1, 'John Doe', '[email protected]'),
      (2, 'Jane Smith', '[email protected]')
    `, {});
  }

  // Add database-specific tests
  createDatabaseSpecificTests(): void {
    describe('MyDB Specific Features', () => {
      it('should support MyDB-specific data type', async () => {
        const connector = this.connector!;
        await connector.executeSQL(
          'CREATE TABLE test_types (id INT, special_col MYDB_TYPE)',
          {}
        );
        const schema = await connector.getTableSchema('test_types');
        expect(schema.find(c => c.column_name === 'special_col')?.data_type)
          .toBe('MYDB_TYPE');
      });
    });
  }
}

const test = new MyDBIntegrationTest();
test.run();

Run Your Tests

# Run your connector tests
pnpm test src/connectors/__tests__/mydb.integration.test.ts

# Run all integration tests
pnpm test:integration

Best Practices

1. Resource Management

Always release database resources:
const client = await this.pool.acquire();
try {
  const result = await client.query(sql);
  return result;
} finally {
  await this.pool.release(client); // Always release
}

2. Error Handling

Provide context in error messages:
try {
  await this.client.connect();
} catch (error) {
  throw new Error(
    `Failed to connect to MyDB at ${host}:${port}: ${error.message}`
  );
}

3. Schema Defaults

Handle missing schema parameters gracefully:
async getTables(schema?: string): Promise<string[]> {
  // Use sensible default if schema not provided
  const schemaToUse = schema || this.getDefaultSchema();
  // ...
}

4. SQL Injection Prevention

Always use parameterized queries:
// Bad - SQL injection risk
const sql = `SELECT * FROM ${tableName}`;

// Good - parameterized
const sql = 'SELECT * FROM ?? WHERE id = ?';
await client.query(sql, [tableName, userId]);

5. Connection Pooling

Use connection pools for better performance:
import { Pool } from 'mydb-client';

private pool: Pool | null = null;

async connect(dsn: string): Promise<void> {
  const config = await this.dsnParser.parse(dsn);
  this.pool = new Pool(config);
}

async executeSQL(sql: string): Promise<SQLResult> {
  const client = await this.pool!.acquire();
  try {
    return await client.query(sql);
  } finally {
    await this.pool!.release(client);
  }
}

Handling Database-Specific Features

Custom Data Types

Map database-specific types to standard types:
function normalizeDataType(dbType: string): string {
  const typeMap: Record<string, string> = {
    'int4': 'integer',
    'int8': 'bigint',
    'varchar': 'character varying',
    // Add your database's type mappings
  };
  return typeMap[dbType.toLowerCase()] || dbType;
}

Identifier Quoting

Use the identifier quoter utility:
import { quoteIdentifier, quoteQualifiedIdentifier } from "../../utils/identifier-quoter.js";

// Quote a single identifier
const quoted = quoteIdentifier('my table', 'mydb');
// Result: "my table" or `my table` depending on database

// Quote schema.table
const qualified = quoteQualifiedIdentifier('users', 'public', 'mydb');
// Result: "public"."users"

Read-Only Mode

Implement both SDK-level and application-level read-only enforcement:
async connect(dsn: string, initScript?: string, config?: ConnectorConfig): Promise<void> {
  const dbConfig = await this.dsnParser.parse(dsn, config);

  // SDK-level: Set database session to read-only
  if (config?.readonly) {
    dbConfig.readOnly = true;
    // Or use SQL: SET SESSION TRANSACTION READ ONLY
  }

  this.client = new MyDBClient(dbConfig);
  await this.client.connect();
}

async executeSQL(sql: string, options: ExecuteOptions): Promise<SQLResult> {
  // Application-level: Validate SQL keywords
  if (options.readonly) {
    // Validation happens in tool handler via allowed-keywords.ts
  }
  // ...
}

Example: Complete Minimal Connector

Here’s a simplified example for reference:
// src/connectors/mydb/index.ts
import {
  Connector,
  ConnectorType,
  DSNParser,
  ConnectorRegistry,
  SQLResult,
  TableColumn,
  TableIndex,
  StoredProcedure,
  ExecuteOptions,
  ConnectorConfig,
} from "../interface.js";
import { SafeURL } from "../../utils/safe-url.js";

class MyDBDSNParser implements DSNParser {
  async parse(dsn: string, config?: ConnectorConfig) {
    const url = new SafeURL(dsn);
    return {
      host: url.hostname,
      port: url.port ? parseInt(url.port) : 5000,
      database: url.pathname.substring(1),
      user: url.username,
      password: url.password,
    };
  }

  getSampleDSN(): string {
    return "mydb://user:password@localhost:5000/dbname";
  }

  isValidDSN(dsn: string): boolean {
    return dsn.startsWith('mydb://');
  }
}

export class MyDBConnector implements Connector {
  id: ConnectorType = "mydb";
  name = "MyDB";
  dsnParser = new MyDBDSNParser();
  private client: any = null;
  private sourceId = "default";

  getId() { return this.sourceId; }
  clone() { return new MyDBConnector(); }

  async connect(dsn: string) {
    const config = await this.dsnParser.parse(dsn);
    // Connect to database...
  }

  async disconnect() {
    await this.client?.close();
  }

  async executeSQL(sql: string, options: ExecuteOptions): Promise<SQLResult> {
    const result = await this.client.query(sql);
    return { rows: result.rows, rowCount: result.rowCount };
  }

  async getSchemas(): Promise<string[]> {
    const result = await this.executeSQL('SELECT schema_name FROM ...', {});
    return result.rows.map(r => r.schema_name);
  }

  async getTables(schema?: string): Promise<string[]> {
    // Implementation
  }

  async getTableSchema(tableName: string, schema?: string): Promise<TableColumn[]> {
    // Implementation
  }

  async tableExists(tableName: string, schema?: string): Promise<boolean> {
    // Implementation
  }

  async getTableIndexes(tableName: string, schema?: string): Promise<TableIndex[]> {
    // Implementation
  }

  async getStoredProcedures(): Promise<string[]> {
    return []; // If not supported
  }

  async getStoredProcedureDetail(): Promise<StoredProcedure> {
    throw new Error("Not supported");
  }
}

// Auto-register
ConnectorRegistry.register(new MyDBConnector());

Checklist

Before submitting your connector:
  • Implements all required Connector interface methods
  • Implements DSNParser interface
  • Registered with ConnectorRegistry
  • Added to ConnectorType union type
  • DSN validation works correctly
  • Connection pooling implemented
  • Error messages are descriptive
  • Resource cleanup in disconnect()
  • Integration tests written
  • All tests pass (pnpm test)
  • Documentation updated (README, docs site)
  • Example DSN in CLAUDE.md

Next Steps

Build docs developers (and LLMs) love