Overview
Adding support for a new database to DBHub involves implementing two interfaces:- DSNParser: Parses connection strings into database-specific configuration
- Connector: Implements database operations (connect, query, schema discovery)
Connector Interface
All connectors must implement theConnector interface.
Location: src/connectors/interface.ts:106-205
export interface Connector {
/** Unique identifier (e.g., "postgres", "mysql") */
id: ConnectorType;
/** Human-readable name */
name: string;
/** DSN parser for this connector */
dsnParser: DSNParser;
/** Get the source ID for this connector instance */
getId(): string;
/** Create a new instance (for multi-source support) */
clone(): Connector;
/** Connect to database */
connect(dsn: string, initScript?: string, config?: ConnectorConfig): Promise<void>;
/** Close connection */
disconnect(): Promise<void>;
/** Get all schemas */
getSchemas(): Promise<string[]>;
/** Get tables in a schema */
getTables(schema?: string): Promise<string[]>;
/** Get table column information */
getTableSchema(tableName: string, schema?: string): Promise<TableColumn[]>;
/** Check if table exists */
tableExists(tableName: string, schema?: string): Promise<boolean>;
/** Get table indexes */
getTableIndexes(tableName: string, schema?: string): Promise<TableIndex[]>;
/** Get stored procedures/functions */
getStoredProcedures(schema?: string, routineType?: "procedure" | "function"): Promise<string[]>;
/** Get stored procedure details */
getStoredProcedureDetail(procedureName: string, schema?: string): Promise<StoredProcedure>;
/** Get table row count (optional - uses statistics) */
getTableRowCount?(tableName: string, schema?: string): Promise<number | null>;
/** Get table comment (optional) */
getTableComment?(tableName: string, schema?: string): Promise<string | null>;
/** Execute SQL query */
executeSQL(sql: string, options: ExecuteOptions, parameters?: any[]): Promise<SQLResult>;
}
DSNParser Interface
Location:src/connectors/interface.ts:82-104
export interface DSNParser {
/**
* Parse a connection string into connector-specific configuration
* @param dsn - Database connection string
* @param config - Optional database-specific configuration options
*/
parse(dsn: string, config?: ConnectorConfig): Promise<any>;
/** Generate a sample DSN string for documentation */
getSampleDSN(): string;
/** Validate DSN format */
isValidDSN(dsn: string): boolean;
}
Step-by-Step: Building a Connector
Step 1: Create Directory Structure
mkdir -p src/connectors/mydb
touch src/connectors/mydb/index.ts
Step 2: Implement DSNParser
// src/connectors/mydb/index.ts
import { DSNParser, ConnectorConfig } from "../interface.js";
import { SafeURL } from "../../utils/safe-url.js";
import { obfuscateDSNPassword } from "../../utils/dsn-obfuscate.js";
class MyDBDSNParser implements DSNParser {
async parse(dsn: string, config?: ConnectorConfig): Promise<any> {
// Validate DSN format
if (!this.isValidDSN(dsn)) {
const obfuscatedDSN = obfuscateDSNPassword(dsn);
const expectedFormat = this.getSampleDSN();
throw new Error(
`Invalid MyDB DSN format.\nProvided: ${obfuscatedDSN}\nExpected: ${expectedFormat}`
);
}
// Parse DSN using SafeURL (handles special characters)
const url = new SafeURL(dsn);
const dbConfig = {
host: url.hostname,
port: url.port ? parseInt(url.port) : 5000,
database: url.pathname ? url.pathname.substring(1) : '',
user: url.username,
password: url.password,
};
// Handle query parameters
url.forEachSearchParam((value, key) => {
if (key === "sslmode") {
dbConfig.ssl = value !== "disable";
}
// Add other parameters as needed
});
// Apply connection timeout if specified
if (config?.connectionTimeoutSeconds) {
dbConfig.connectionTimeout = config.connectionTimeoutSeconds * 1000;
}
// Apply query timeout if specified
if (config?.queryTimeoutSeconds) {
dbConfig.queryTimeout = config.queryTimeoutSeconds * 1000;
}
return dbConfig;
}
getSampleDSN(): string {
return "mydb://user:password@localhost:5000/dbname?sslmode=require";
}
isValidDSN(dsn: string): boolean {
return dsn.startsWith('mydb://');
}
}
Step 3: Implement Connector Class
import { Connector, ConnectorType, SQLResult, TableColumn, TableIndex, StoredProcedure, ExecuteOptions } from "../interface.js";
import MyDBClient from "mydb-client"; // Your database client library
export class MyDBConnector implements Connector {
id: ConnectorType = "mydb";
name = "MyDB";
dsnParser = new MyDBDSNParser();
private client: MyDBClient | null = null;
private sourceId: string = "default";
getId(): string {
return this.sourceId;
}
clone(): Connector {
return new MyDBConnector();
}
async connect(dsn: string, initScript?: string, config?: ConnectorConfig): Promise<void> {
try {
const dbConfig = await this.dsnParser.parse(dsn, config);
// Apply read-only mode if configured
if (config?.readonly) {
dbConfig.readOnly = true;
}
this.client = new MyDBClient(dbConfig);
await this.client.connect();
// Run init script if provided
if (initScript) {
await this.client.query(initScript);
}
// Test connection
await this.client.query('SELECT 1');
} catch (err) {
console.error("Failed to connect to MyDB:", err);
throw err;
}
}
async disconnect(): Promise<void> {
if (this.client) {
await this.client.close();
this.client = null;
}
}
async executeSQL(sql: string, options: ExecuteOptions, parameters?: any[]): Promise<SQLResult> {
if (!this.client) {
throw new Error("Not connected to database");
}
try {
// Apply row limit if specified
let query = sql;
if (options.maxRows) {
query = `${sql} LIMIT ${options.maxRows}`;
}
const result = await this.client.query(query, parameters);
return {
rows: result.rows,
rowCount: result.rowCount,
};
} catch (error) {
throw new Error(`Query failed: ${(error as Error).message}`);
}
}
async getSchemas(): Promise<string[]> {
const result = await this.executeSQL(
'SELECT schema_name FROM information_schema.schemata ORDER BY schema_name',
{}
);
return result.rows.map(row => row.schema_name);
}
async getTables(schema?: string): Promise<string[]> {
const schemaToUse = schema || 'public';
const result = await this.executeSQL(
`SELECT table_name FROM information_schema.tables
WHERE table_schema = $1 ORDER BY table_name`,
{},
[schemaToUse]
);
return result.rows.map(row => row.table_name);
}
async getTableSchema(tableName: string, schema?: string): Promise<TableColumn[]> {
const schemaToUse = schema || 'public';
const result = await this.executeSQL(
`SELECT
column_name,
data_type,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
ORDER BY ordinal_position`,
{},
[schemaToUse, tableName]
);
return result.rows.map(row => ({
column_name: row.column_name,
data_type: row.data_type,
is_nullable: row.is_nullable,
column_default: row.column_default,
description: null,
}));
}
async tableExists(tableName: string, schema?: string): Promise<boolean> {
const schemaToUse = schema || 'public';
const result = await this.executeSQL(
`SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = $1 AND table_name = $2
) as exists`,
{},
[schemaToUse, tableName]
);
return result.rows[0]?.exists === true;
}
async getTableIndexes(tableName: string, schema?: string): Promise<TableIndex[]> {
// Implementation depends on your database's system catalog
// This is a simplified example
const schemaToUse = schema || 'public';
const result = await this.executeSQL(
`SELECT
index_name,
column_names,
is_unique,
is_primary
FROM mydb_indexes
WHERE schema_name = $1 AND table_name = $2`,
{},
[schemaToUse, tableName]
);
return result.rows.map(row => ({
index_name: row.index_name,
column_names: row.column_names,
is_unique: row.is_unique,
is_primary: row.is_primary,
}));
}
async getStoredProcedures(schema?: string, routineType?: "procedure" | "function"): Promise<string[]> {
// If your database doesn't support stored procedures, return empty array
return [];
}
async getStoredProcedureDetail(procedureName: string, schema?: string): Promise<StoredProcedure> {
throw new Error("Stored procedures not supported by MyDB");
}
// Optional: Implement for better performance
async getTableRowCount(tableName: string, schema?: string): Promise<number | null> {
try {
const schemaToUse = schema || 'public';
const result = await this.executeSQL(
`SELECT row_count FROM mydb_statistics
WHERE schema_name = $1 AND table_name = $2`,
{},
[schemaToUse, tableName]
);
return result.rows[0]?.row_count || null;
} catch {
return null; // Fallback to COUNT(*) in search-objects
}
}
// Optional: Implement if database supports table comments
async getTableComment(tableName: string, schema?: string): Promise<string | null> {
try {
const schemaToUse = schema || 'public';
const result = await this.executeSQL(
`SELECT comment FROM mydb_table_comments
WHERE schema_name = $1 AND table_name = $2`,
{},
[schemaToUse, tableName]
);
return result.rows[0]?.comment || null;
} catch {
return null;
}
}
}
Step 4: Register the Connector
Connectors are auto-registered when imported.// src/connectors/mydb/index.ts (add at bottom)
import { ConnectorRegistry } from "../interface.js";
// Auto-register on import
ConnectorRegistry.register(new MyDBConnector());
// src/connectors/index.ts
import "./postgres/index.js";
import "./mysql/index.js";
import "./mydb/index.js"; // Add your connector
Step 5: Update Type Definitions
Add your connector type to theConnectorType union:
// src/connectors/interface.ts
export type ConnectorType =
| "postgres"
| "mysql"
| "mariadb"
| "sqlite"
| "sqlserver"
| "mydb"; // Add your type
DSN Format Design
Best Practices
- Follow URI Standards: Use
scheme://user:password@host:port/database?paramsformat - Unique Scheme: Choose a scheme that doesn’t conflict (e.g.,
mydb://, nothttp://) - Sensible Defaults: Default port, SSL mode, etc.
- Query Parameters: Support SSL, timeouts, and database-specific options
Example DSN Formats
// Basic connection
mydb://user:password@localhost:5000/dbname
// With SSL
mydb://user:password@localhost:5000/dbname?sslmode=require
// With custom parameters
mydb://user:password@localhost:5000/dbname?sslmode=require&pool_size=10
// Local file (like SQLite)
mydb:///path/to/database.db
mydb:///:memory:
Writing Integration Tests
Create Test File
touch src/connectors/__tests__/mydb.integration.test.ts
Implement Test Using IntegrationTestBase
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { GenericContainer, StartedTestContainer } from 'testcontainers';
import { MyDBConnector } from '../mydb/index.js';
import { IntegrationTestBase, type TestContainer, type DatabaseTestConfig } from './shared/integration-test-base.js';
import type { Connector } from '../interface.js';
class MyDBTestContainer implements TestContainer {
constructor(private container: StartedTestContainer) {}
getConnectionUri(): string {
const host = this.container.getHost();
const port = this.container.getMappedPort(5000);
return `mydb://testuser:testpass@${host}:${port}/testdb`;
}
async stop(): Promise<void> {
await this.container.stop();
}
}
class MyDBIntegrationTest extends IntegrationTestBase<MyDBTestContainer> {
constructor() {
const config: DatabaseTestConfig = {
expectedSchemas: ['public', 'test_schema'],
expectedTables: ['users', 'orders'],
expectedTestSchemaTable: 'products',
testSchema: 'test_schema',
supportsStoredProcedures: false,
supportsComments: true,
};
super(config);
}
async createContainer(): Promise<MyDBTestContainer> {
const container = await new GenericContainer('mydb:latest')
.withExposedPorts(5000)
.withEnvironment({
MYDB_USER: 'testuser',
MYDB_PASSWORD: 'testpass',
MYDB_DATABASE: 'testdb',
})
.start();
return new MyDBTestContainer(container);
}
createConnector(): Connector {
return new MyDBConnector();
}
async setupTestData(connector: Connector): Promise<void> {
// Create test schema
await connector.executeSQL('CREATE SCHEMA test_schema', {});
// Create users table
await connector.executeSQL(`
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100) UNIQUE
)
`, {});
// Insert test data
await connector.executeSQL(`
INSERT INTO users (id, name, email) VALUES
(1, 'John Doe', '[email protected]'),
(2, 'Jane Smith', '[email protected]')
`, {});
}
// Add database-specific tests
createDatabaseSpecificTests(): void {
describe('MyDB Specific Features', () => {
it('should support MyDB-specific data type', async () => {
const connector = this.connector!;
await connector.executeSQL(
'CREATE TABLE test_types (id INT, special_col MYDB_TYPE)',
{}
);
const schema = await connector.getTableSchema('test_types');
expect(schema.find(c => c.column_name === 'special_col')?.data_type)
.toBe('MYDB_TYPE');
});
});
}
}
const test = new MyDBIntegrationTest();
test.run();
Run Your Tests
# Run your connector tests
pnpm test src/connectors/__tests__/mydb.integration.test.ts
# Run all integration tests
pnpm test:integration
Best Practices
1. Resource Management
Always release database resources:const client = await this.pool.acquire();
try {
const result = await client.query(sql);
return result;
} finally {
await this.pool.release(client); // Always release
}
2. Error Handling
Provide context in error messages:try {
await this.client.connect();
} catch (error) {
throw new Error(
`Failed to connect to MyDB at ${host}:${port}: ${error.message}`
);
}
3. Schema Defaults
Handle missing schema parameters gracefully:async getTables(schema?: string): Promise<string[]> {
// Use sensible default if schema not provided
const schemaToUse = schema || this.getDefaultSchema();
// ...
}
4. SQL Injection Prevention
Always use parameterized queries:// Bad - SQL injection risk
const sql = `SELECT * FROM ${tableName}`;
// Good - parameterized
const sql = 'SELECT * FROM ?? WHERE id = ?';
await client.query(sql, [tableName, userId]);
5. Connection Pooling
Use connection pools for better performance:import { Pool } from 'mydb-client';
private pool: Pool | null = null;
async connect(dsn: string): Promise<void> {
const config = await this.dsnParser.parse(dsn);
this.pool = new Pool(config);
}
async executeSQL(sql: string): Promise<SQLResult> {
const client = await this.pool!.acquire();
try {
return await client.query(sql);
} finally {
await this.pool!.release(client);
}
}
Handling Database-Specific Features
Custom Data Types
Map database-specific types to standard types:function normalizeDataType(dbType: string): string {
const typeMap: Record<string, string> = {
'int4': 'integer',
'int8': 'bigint',
'varchar': 'character varying',
// Add your database's type mappings
};
return typeMap[dbType.toLowerCase()] || dbType;
}
Identifier Quoting
Use the identifier quoter utility:import { quoteIdentifier, quoteQualifiedIdentifier } from "../../utils/identifier-quoter.js";
// Quote a single identifier
const quoted = quoteIdentifier('my table', 'mydb');
// Result: "my table" or `my table` depending on database
// Quote schema.table
const qualified = quoteQualifiedIdentifier('users', 'public', 'mydb');
// Result: "public"."users"
Read-Only Mode
Implement both SDK-level and application-level read-only enforcement:async connect(dsn: string, initScript?: string, config?: ConnectorConfig): Promise<void> {
const dbConfig = await this.dsnParser.parse(dsn, config);
// SDK-level: Set database session to read-only
if (config?.readonly) {
dbConfig.readOnly = true;
// Or use SQL: SET SESSION TRANSACTION READ ONLY
}
this.client = new MyDBClient(dbConfig);
await this.client.connect();
}
async executeSQL(sql: string, options: ExecuteOptions): Promise<SQLResult> {
// Application-level: Validate SQL keywords
if (options.readonly) {
// Validation happens in tool handler via allowed-keywords.ts
}
// ...
}
Example: Complete Minimal Connector
Here’s a simplified example for reference:// src/connectors/mydb/index.ts
import {
Connector,
ConnectorType,
DSNParser,
ConnectorRegistry,
SQLResult,
TableColumn,
TableIndex,
StoredProcedure,
ExecuteOptions,
ConnectorConfig,
} from "../interface.js";
import { SafeURL } from "../../utils/safe-url.js";
class MyDBDSNParser implements DSNParser {
async parse(dsn: string, config?: ConnectorConfig) {
const url = new SafeURL(dsn);
return {
host: url.hostname,
port: url.port ? parseInt(url.port) : 5000,
database: url.pathname.substring(1),
user: url.username,
password: url.password,
};
}
getSampleDSN(): string {
return "mydb://user:password@localhost:5000/dbname";
}
isValidDSN(dsn: string): boolean {
return dsn.startsWith('mydb://');
}
}
export class MyDBConnector implements Connector {
id: ConnectorType = "mydb";
name = "MyDB";
dsnParser = new MyDBDSNParser();
private client: any = null;
private sourceId = "default";
getId() { return this.sourceId; }
clone() { return new MyDBConnector(); }
async connect(dsn: string) {
const config = await this.dsnParser.parse(dsn);
// Connect to database...
}
async disconnect() {
await this.client?.close();
}
async executeSQL(sql: string, options: ExecuteOptions): Promise<SQLResult> {
const result = await this.client.query(sql);
return { rows: result.rows, rowCount: result.rowCount };
}
async getSchemas(): Promise<string[]> {
const result = await this.executeSQL('SELECT schema_name FROM ...', {});
return result.rows.map(r => r.schema_name);
}
async getTables(schema?: string): Promise<string[]> {
// Implementation
}
async getTableSchema(tableName: string, schema?: string): Promise<TableColumn[]> {
// Implementation
}
async tableExists(tableName: string, schema?: string): Promise<boolean> {
// Implementation
}
async getTableIndexes(tableName: string, schema?: string): Promise<TableIndex[]> {
// Implementation
}
async getStoredProcedures(): Promise<string[]> {
return []; // If not supported
}
async getStoredProcedureDetail(): Promise<StoredProcedure> {
throw new Error("Not supported");
}
}
// Auto-register
ConnectorRegistry.register(new MyDBConnector());
Checklist
Before submitting your connector:- Implements all required
Connectorinterface methods - Implements
DSNParserinterface - Registered with
ConnectorRegistry - Added to
ConnectorTypeunion type - DSN validation works correctly
- Connection pooling implemented
- Error messages are descriptive
- Resource cleanup in
disconnect() - Integration tests written
- All tests pass (
pnpm test) - Documentation updated (README, docs site)
- Example DSN in
CLAUDE.md
Next Steps
- Testing Guide - Write comprehensive tests
- Architecture - Understand the connector system
- Development Overview - Submit your contribution