The midPilot Connector Generator is built as a modular microservice with four primary processing modules, a PostgreSQL database layer, and an asynchronous job system. This architecture enables scalable, AI-powered connector generation from any API documentation.
System Overview
Core Components
Session Management
Sessions are the foundation of the connector generation workflow. Each session:
Has a unique UUID identifier
Stores all processing results (documentation, schemas, generated code)
Tracks associated jobs and their status
Maintains session data in a flexible JSONB structure
Key Endpoints:
POST /api/v1/session - Create a new session
GET /api/v1/session/{sessionId} - Retrieve session data
DELETE /api/v1/session/{sessionId} - Delete session and all associated data
Session Data Structure:
# Database Model: src/common/database/models/session.py
class Session ( Base ):
session_id: UUID
created_at: datetime
updated_at: datetime
jobs: List[Job] # All jobs in this session
documentation_items: List[DocumentationItem] # Processed docs
session_data: List[SessionData] # Key-value storage
Sessions store results with predictable keys like objectClassesOutput, {objectClass}AttributesOutput, and {objectClass}Search for easy retrieval.
Module 1: Discovery
Purpose: Automatically discover relevant API documentation URLs
Location: src/modules/discovery/
The Discovery module uses web search and AI to find candidate documentation URLs for a given application.
How it works:
Search Query Generation
Uses the application name and version to construct targeted search queries
Web Search
Executes searches using configured search providers (Brave Search, DuckDuckGo)
URL Filtering
LLMs analyze search results to identify relevant documentation URLs
Result Storage
Stores candidate URLs in session as discoveryOutput
API Endpoints:
POST /api/v1/discovery/{sessionId}/discovery - Start discovery job
GET /api/v1/discovery/{sessionId}/discovery - Get discovery status
Configuration:
# src/config.py
class SearchSettings ( BaseModel ):
method_name: str = ""
discovery_input_check_interval: timedelta = timedelta( weeks = 4 )
class BraveSettings ( BaseModel ):
api_key: str = ""
endpoint: str = "https://api.search.brave.com/res/v1/web/search"
Module 2: Scrape
Purpose: Scrape and process documentation from URLs
Location: src/modules/scrape/
The Scrape module fetches documentation from URLs, chunks it, and processes it with LLMs to extract structured information.
How it works:
URL Crawling
Uses Crawl4AI to fetch HTML content from documentation URLs
Content Extraction
Converts HTML to clean markdown format
Intelligent Chunking
Splits content into chunks with token overlap for LLM processing # src/common/chunks.py
chunks = split_text_with_token_overlap(
doc_text,
max_tokens = config.scrape_and_process.chunk_length, # Default: 20000
overlap_ratio = 0.05
)
LLM Processing
Each chunk is analyzed to extract:
Summary
Metadata (app name, version, category)
Tags for relevance matching
Database Storage
Stores processed chunks as DocumentationItem records
API Endpoints:
POST /api/v1/scrape/{sessionId}/scrape - Start scraping job
GET /api/v1/scrape/{sessionId}/scrape - Get scraping status
POST /api/v1/session/{sessionId}/documentation - Upload documentation file
Configuration:
# src/config.py
class ScrapeAndProcessSettings ( BaseModel ):
max_scraper_iterations: int = 4
max_iterations_filter_irrelevant: int = 5
chunk_length: int = 20000
max_concurrent: int = 20
chunk_categories: list[ str ] = [
"spec_yaml" , "spec_json" , "reference_api" ,
"reference_other" , "overview" , "index" ,
"tutorial" , "non-technical" , "other"
]
Documentation can come from two sources: scraped from URLs or uploaded as files. Both are processed identically after chunking.
Module 3: Digester
Purpose: Extract structured schemas from documentation
Location: src/modules/digester/
The Digester module uses LLMs to analyze documentation chunks and extract object classes, attributes, endpoints, authentication, and relationships.
Extraction Pipeline:
Object Classes
POST /api/v1/digester/{sessionId}/classesIdentifies primary object types (User, Group, Account, etc.) from documentation Output: {
"objectClasses" : [
{
"name" : "User" ,
"description" : "User account object" ,
"relevant" : true ,
"relevantChunks" : [{ "docUuid" : "..." }]
}
]
}
Attributes
POST /api/v1/digester/{sessionId}/classes/{objectClass}/attributesExtracts attribute schemas for each object class Output: {
"attributes" : {
"id" : {
"type" : "string" ,
"required" : true ,
"readOnly" : false ,
"multivalued" : false
}
}
}
Endpoints
POST /api/v1/digester/{sessionId}/classes/{objectClass}/endpointsDiscovers API endpoints for CRUD operations Output: {
"endpoints" : [
{
"operation" : "list" ,
"method" : "GET" ,
"path" : "/api/v1/users"
}
]
}
Authentication
POST /api/v1/digester/{sessionId}/authExtracts authentication methods and requirements
Metadata
POST /api/v1/digester/{sessionId}/metadataExtracts API metadata like base URLs and versioning
Relations
POST /api/v1/digester/{sessionId}/relationsIdentifies relationships between object classes
Relevancy Filtering:
The Digester uses a relevancy filtering system to focus LLM processing on the most important chunks:
# src/common/chunk_filter/filter.py
async def filter_documentation_items (
criteria : FilterCriteria,
session_id : UUID ,
db : AsyncSession
) -> List[Dict[ str , Any]]
Filter Criteria:
Required tags (e.g., [“endpoint”, “api”])
Allowed categories (e.g., [“spec_yaml”, “reference_api”])
Minimum relevancy scores
App name/version matching
Module 4: CodeGen
Purpose: Generate Groovy connector code
Location: src/modules/codegen/
The CodeGen module takes extracted schemas and generates production-ready Groovy code for ConnID connectors.
Code Generation Operations:
Native Schema POST /codegen/{sessionId}/classes/{objectClass}/native-schemaGenerates the native schema definition
ConnID Mapping POST /codegen/{sessionId}/classes/{objectClass}/connidCreates attribute mappings between native and ConnID schemas
Search Operation POST /codegen/{sessionId}/classes/{objectClass}/search/{intent}Generates code to list/search objects
Create Operation POST /codegen/{sessionId}/classes/{objectClass}/createGenerates code to create new objects
Update Operation POST /codegen/{sessionId}/classes/{objectClass}/updateGenerates code to update existing objects
Delete Operation POST /codegen/{sessionId}/classes/{objectClass}/deleteGenerates code to delete objects
Relation Code POST /codegen/{sessionId}/relations/{relationName}Generates code for managing relationships
Code Generation Process:
# src/modules/codegen/core/generate_groovy.py
async def create_search (
attributes : dict ,
endpoints : dict ,
session_id : UUID ,
object_class : str
) -> dict :
# 1. Load relevant documentation chunks
# 2. Generate Groovy code using LLM
# 3. Validate and format code
# 4. Return structured result
All code generation is context-aware, using relevant documentation chunks to ensure the generated code matches the actual API behavior.
Database Layer
Technology: PostgreSQL 15 with async support (asyncpg, SQLAlchemy)
Location: src/common/database/
Data Models
Session
Job
DocumentationItem
SessionData
RelevantChunk
# src/common/database/models/session.py
class Session ( Base ):
session_id: UUID
created_at: datetime
updated_at: datetime
Central entity that owns all other data # src/common/database/models/job.py
class Job ( Base ):
job_id: UUID
session_id: UUID
job_type: str
status: str # queued, running, finished, failed
input : Dict[ str , Any]
result: Dict[ str , Any]
errors: List[ str ]
progress: JobProgress
Tracks asynchronous operations # src/common/database/models/documentation_item.py
class DocumentationItem ( Base ):
id : UUID
session_id: UUID
page_id: UUID
source: str # 'scraper' or 'upload'
content: str
summary: str
metadata: Dict[ str , Any]
scrape_job_ids: List[ str ]
Stores processed documentation chunks # src/common/database/models/session_data.py
class SessionData ( Base ):
session_id: UUID
key: str
value: Dict[ str , Any] # JSONB
Flexible key-value storage for session state # src/common/database/models/relevant_chunk.py
class RelevantChunk ( Base ):
session_id: UUID
object_class: str
doc_item_uuid: UUID
relevance_score: float
Tracks which chunks are relevant to which object classes
Database Migration
The system uses Alembic for database migrations:
# src/common/database/config.py
DATABASE_URL = postgresql+asyncpg://user:pass@host:port/db
# Run migrations
uv run alembic upgrade head
Job System
Purpose: Handle long-running asynchronous operations
Location: src/common/jobs.py
Job Lifecycle
Job Creation
job_id = await schedule_coroutine_job(
job_type = "digester.getObjectClass" ,
input_payload = { ... },
worker = service.extract_object_classes,
initial_stage = "queue" ,
session_id = session_id
)
Job Execution
Worker function runs asynchronously, updating progress and status
Progress Tracking
# src/common/database/models/job_progress.py
class JobProgress ( Base ):
stage: str # queue, chunking, processing, finishing
message: str
iteration: int
processed_documents: int
total_documents: int
Result Storage
Job result stored in database and optionally in session data
Job Status Values
queued - Job created, waiting for execution
running - Job currently executing
finished - Job completed successfully, result available
failed - Job encountered an error, check errors array
Job Recovery
# src/common/jobs.py
async def recover_stale_running_jobs ():
"""Reset jobs stuck in 'running' state after restart"""
On application startup, any jobs in “running” state are reset to “failed” to handle crashes.
LLM Integration
Location: src/common/llm.py, src/common/chunk_processor/
LLM Provider Configuration
# src/config.py
class LLMSettings ( BaseModel ):
openai_api_key: str = ""
openai_api_base: str = "https://openrouter.ai/api/v1"
model_name: str = "openai/gpt-oss-20b"
request_timeout: int = 120
provider_order: List[ str ] = [
"groq" , "wandb/fp4" , "clarifai/fp4"
]
Chunk Processing Pipeline
# src/common/chunk_processor/processor.py
async def process_chunk (
chunk : str ,
app_name : str ,
app_version : str
) -> ProcessedChunk:
# 1. Build prompt with context
# 2. Call LLM with structured output
# 3. Parse and validate response
# 4. Return structured data
LLM Tasks:
Extract metadata from chunks (app name, version, category)
Identify object classes in documentation
Extract attribute schemas with types and constraints
Discover API endpoints with methods and paths
Generate Groovy code from schemas and examples
Tracing with Langfuse
Optional LLM tracing for debugging and monitoring:
# src/config.py
class LangfuseSettings ( BaseModel ):
public_key: str = "emptykey"
secret_key: str = "emptykey"
tracing_enabled: bool = False
environment: str = "dev-whoami"
Enable Langfuse tracing to track LLM calls, token usage, latency, and debug prompt/response pairs.
API Structure
Base URL: http://localhost:8090/api/v1
Router Configuration:
# src/router.py
root_router.include_router(session_router, prefix = "/session" )
root_router.include_router(discovery_router, prefix = "/discovery" )
root_router.include_router(scrape_router, prefix = "/scrape" )
root_router.include_router(digester_router, prefix = "/digester" )
root_router.include_router(codegen_router, prefix = "/codegen" )
All endpoints follow a consistent pattern:
Session-centric (most operations require {sessionId})
Asynchronous (return jobId immediately)
RESTful (standard HTTP methods)
Deployment Considerations
Environment Configuration
# .env
APP__PORT = 8090
APP__HOST = 0.0.0.0
# Database
DATABASE__HOST = localhost
DATABASE__PORT = 5432
DATABASE__USER = user
DATABASE__PASSWORD = password
DATABASE__NAME = db
# LLM
LLM__OPENAI_API_KEY = your-key
LLM__MODEL_NAME = openai/gpt-4o
# Optional: Langfuse tracing
LANGFUSE__TRACING_ENABLED = true
LANGFUSE__PUBLIC_KEY = pk-...
LANGFUSE__SECRET_KEY = sk-...
Docker Deployment
# Build
docker compose build
# Run with database
docker compose up
The application includes:
PostgreSQL 15 database service
Automatic migration on startup
Health check endpoint at /health
Scalability
Current Implementation:
Single worker process
In-process job queue
Session-based state management
Scaling Considerations:
Add Redis for distributed job queue
Implement horizontal scaling with multiple workers
Consider caching for frequently accessed session data
Use connection pooling for database (already configured)
The current job system is in-process. For production deployments with multiple workers, implement a distributed job queue like Celery or RQ.
Next Steps
Quickstart Guide Try the complete workflow with a real API
API Reference Explore detailed API documentation
Configuration Configure LLM providers and settings
Installation Set up local development environment