Skip to main content
Blnk integrates with Typesense, an open-source search engine optimized for speed and developer experience. The integration provides fast, typo-tolerant search across all financial records.

Architecture

The search system is implemented in /internal/search/search.go and uses an event-driven indexing model:
  • Automatic Indexing: Records are indexed asynchronously via queue
  • Batch Indexing: Dependencies (balances) are indexed before primary records (transactions)
  • Schema Migration: Automatic schema updates when new fields are added
  • Reindexing: Full reindex capability for initial setup or recovery

Collections

Blnk maintains five Typesense collections:
CollectionPurposeKey Fields
ledgersLedger accountsledger_id, name, created_at
balancesBalance recordsbalance_id, balance, currency, ledger_id
transactionsTransaction historytransaction_id, reference, status, amount
reconciliationsReconciliation jobsreconciliation_id, status, matched_transactions
identitiesCustomer identitiesidentity_id, first_name, email_address

Collection Schemas

Schemas are defined in search.go:524-660 and include: Transactions Schema:
Fields: []api.Field{
    {Name: "precise_amount", Type: "string", Facet: &facet},
    {Name: "amount", Type: "float", Facet: &facet},
    {Name: "transaction_id", Type: "string", Facet: &facet},
    {Name: "source", Type: "string", Reference: &sourceId, Facet: &facet},
    {Name: "destination", Type: "string", Reference: &destinationId, Facet: &facet},
    {Name: "reference", Type: "string", Facet: &facet},
    {Name: "currency", Type: "string", Facet: &facet},
    {Name: "status", Type: "string", Facet: &facet},
    {Name: "created_at", Type: "int64", Facet: &facet},
    {Name: "meta_data", Type: "object", Facet: &facet, Optional: &enableNested},
    // ... more fields
}
Key Features:
  • All fields are facetable for filtering
  • References between collections (e.g., transactions → balances)
  • Nested object support for metadata
  • Timestamp fields as Unix timestamps for range queries

Configuration

Typesense Connection

Configure Typesense in your blnk.json:
{
  "typesense": {
    "dns": "http://localhost:8108",
    "api_key": "your-api-key"
  }
}
Or via environment variables:
BLNK_TYPESENSE_DNS=http://localhost:8108
BLNK_TYPESENSE_API_KEY=your-api-key

Docker Deployment

Run Typesense alongside Blnk:
version: '3'
services:
  typesense:
    image: typesense/typesense:26.0
    ports:
      - "8108:8108"
    volumes:
      - typesense-data:/data
    environment:
      TYPESENSE_DATA_DIR: /data
      TYPESENSE_API_KEY: blnk-api-key
    command: '--data-dir /data --api-key=blnk-api-key --enable-cors'

volumes:
  typesense-data:

Indexing Operations

Automatic Indexing

Records are indexed automatically via the queue system:
// Source: search.go:229-246
func (t *TypesenseClient) HandleNotification(ctx context.Context, table string, data map[string]interface{}) error {
    config, ok := collectionConfigs[table]
    if !ok {
        return fmt.Errorf("unknown collection: %s", table)
    }
    
    // Process and normalize the data
    if err := t.processMetadata(data); err != nil {
        return err
    }
    t.convertLargeNumbers(config, data)
    t.ensureSchemaFields(config, data)
    t.normalizeTimeFields(config, data)
    
    // Upsert the document
    return t.upsertDocument(ctx, table, data)
}

Batch Indexing

Dependency-aware batch indexing ensures referential integrity:
// Source: search.go:252-278
func (t *TypesenseClient) HandleBatchNotification(ctx context.Context, batch *IndexBatch) error {
    // Deduplicate dependencies to avoid redundant indexing
    batch.Deduplicate()
    
    // Step 1: Index all dependencies first (in order)
    for _, dep := range batch.Dependencies {
        data, err := toMap(dep.Data)
        if err != nil {
            return fmt.Errorf("failed to convert dependency %s/%s to map: %w", dep.Collection, dep.DocumentID, err)
        }
        if err := t.HandleNotification(ctx, dep.Collection, data); err != nil {
            return fmt.Errorf("failed to index dependency %s/%s: %w", dep.Collection, dep.DocumentID, err)
        }
    }
    
    // Step 2: Index primary entity after dependencies exist
    if batch.Primary != nil {
        data, err := toMap(batch.Primary.Data)
        if err != nil {
            return fmt.Errorf("failed to convert primary %s/%s to map: %w", batch.Primary.Collection, batch.Primary.DocumentID, err)
        }
        if err := t.HandleNotification(ctx, batch.Primary.Collection, data); err != nil {
            return fmt.Errorf("failed to index primary %s/%s: %w", batch.Primary.Collection, batch.Primary.DocumentID, err)
        }
    }
    
    return nil
}
Example: When indexing a transaction, balances are indexed first:
Transaction T1: A → B for $100
1. Index balance A (source)
2. Index balance B (destination)  
3. Index transaction T1

Search API

Search across collections using the Blnk API:
POST /search/transactions
Content-Type: application/json

{
  "q": "payment",
  "query_by": "reference,description",
  "filter_by": "status:APPLIED && currency:USD",
  "sort_by": "created_at:desc",
  "per_page": 20
}
Response:
{
  "found": 42,
  "hits": [
    {
      "document": {
        "transaction_id": "txn_abc123",
        "reference": "payment-invoice-001",
        "amount": 100.50,
        "currency": "USD",
        "status": "APPLIED",
        "created_at": 1699564800
      },
      "highlight": {
        "reference": "<mark>payment</mark>-invoice-001"
      }
    }
  ]
}

Advanced Filtering

Date Range Queries:
{
  "q": "*",
  "filter_by": "created_at:>1699564800 && created_at:<1699651200"
}
Amount Range:
{
  "filter_by": "amount:>=100 && amount:<=1000"
}
Multiple Conditions:
{
  "filter_by": "status:APPLIED && (currency:USD || currency:EUR) && amount:>100"
}
Metadata Search:
{
  "filter_by": "meta_data.customer_id:=12345"
}
Search across multiple collections simultaneously:
// Source: search.go:225-227
func (t *TypesenseClient) MultiSearch(ctx context.Context, searchRequests api.MultiSearchSearchesParameter) (*api.MultiSearchResult, error) {
    return t.Client.MultiSearch.Perform(ctx, &api.MultiSearchParams{}, searchRequests)
}
Example:
POST /multi_search
Content-Type: application/json

{
  "searches": [
    {
      "collection": "transactions",
      "q": "john",
      "query_by": "reference"
    },
    {
      "collection": "identities",
      "q": "john",
      "query_by": "first_name,last_name,email_address"
    }
  ]
}

Reindexing

Reindex all data from PostgreSQL to Typesense.

Triggering a Reindex

Via API:
POST /admin/reindex
Content-Type: application/json

{
  "batch_size": 1000
}
Response:
{
  "message": "Reindex operation started",
  "progress": {
    "status": "in_progress",
    "phase": "indexing_ledgers",
    "total_records": 0,
    "processed_records": 0,
    "started_at": "2024-01-15T10:30:00Z"
  }
}

Monitoring Progress

GET /admin/reindex/progress
Response:
{
  "status": "in_progress",
  "phase": "indexing_transactions",
  "total_records": 150000,
  "processed_records": 75000,
  "started_at": "2024-01-15T10:30:00Z",
  "errors": []
}

Reindex Process

The reindex operation follows this sequence (reindex.go:90-142):
  1. Drop Collections: Remove existing collections
  2. Create Collections: Recreate with latest schemas
  3. Index Ledgers: Index all ledger accounts
  4. Index Identities: Index customer identities
  5. Index Balances: Index all balance records
  6. Index Transactions: Index all transactions
// Source: reindex.go:90-142
func (r *ReindexService) StartReindex(ctx context.Context) (*ReindexProgress, error) {
    r.mu.Lock()
    r.progress = &ReindexProgress{
        Status:    "in_progress",
        Phase:     "starting",
        StartedAt: time.Now(),
    }
    r.mu.Unlock()
    
    if err := r.dropCollections(ctx); err != nil {
        return r.failWithError(err, "drop_collections")
    }
    
    if err := r.createCollections(ctx); err != nil {
        return r.failWithError(err, "create_collections")
    }
    
    if err := r.indexLedgers(ctx); err != nil {
        return r.failWithError(err, "indexing_ledgers")
    }
    
    if err := r.indexIdentities(ctx); err != nil {
        return r.failWithError(err, "indexing_identities")
    }
    
    if err := r.indexBalances(ctx); err != nil {
        return r.failWithError(err, "indexing_balances")
    }
    
    if err := r.indexTransactions(ctx); err != nil {
        return r.failWithError(err, "indexing_transactions")
    }
    
    r.mu.Lock()
    now := time.Now()
    r.progress.Status = "completed"
    r.progress.Phase = "done"
    r.progress.CompletedAt = &now
    r.mu.Unlock()
    
    return r.GetProgressPtr(), nil
}
Batch Processing: Data is indexed in configurable batches (default: 1000 records):
// Source: reindex.go:189-236
func (r *ReindexService) indexLedgers(ctx context.Context) error {
    var offset int
    var totalIndexed int64
    
    for {
        ledgers, err := r.datasource.GetAllLedgers(r.config.BatchSize, offset)
        if err != nil {
            return err
        }
        
        if len(ledgers) == 0 {
            break
        }
        
        batchCount := len(ledgers)
        for _, ledger := range ledgers {
            data, err := toMap(ledger)
            if err != nil {
                r.addError("ledger " + ledger.LedgerID + ": " + err.Error())
                continue
            }
            
            if err := r.client.HandleNotification(ctx, CollectionLedgers, data); err != nil {
                r.addError("ledger " + ledger.LedgerID + ": " + err.Error())
                continue
            }
            totalIndexed++
        }
        
        r.updateProgress("indexing_ledgers", totalIndexed, totalIndexed)
        offset += batchCount
    }
    
    return nil
}

Automatic Reindex on Startup

Blnk automatically reindexes if:
  • Database has data
  • Typesense is empty
// Source: reindex.go:485-506
func TryReindexIfNeeded(ctx context.Context, client *TypesenseClient, ds database.IDataSource) {
    if client == nil || ds == nil {
        return
    }
    if !shouldReindex(ctx, client, ds) {
        return
    }
    
    logrus.Info("Database has data but Typesense is empty, triggering one-time reindex")
    go func() {
        reindexCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
        defer cancel()
        
        svc := NewReindexService(client, ds, ReindexConfig{BatchSize: 1000})
        _, err := svc.StartReindex(reindexCtx)
        if err != nil {
            logrus.WithError(err).Error("reindex failed")
            return
        }
        logrus.Info("reindex completed successfully")
    }()
}

Schema Migration

Typesense schemas can be updated without reindexing:
// Source: search.go:440-474
func (t *TypesenseClient) MigrateTypeSenseSchema(ctx context.Context, collectionName string) error {
    collection := t.Client.Collection(collectionName)
    
    currentSchemaResponse, err := collection.Retrieve(ctx)
    if err != nil {
        return fmt.Errorf("failed to retrieve current schema: %w", err)
    }
    
    config, ok := collectionConfigs[collectionName]
    if !ok {
        return fmt.Errorf("unknown collection: %s", collectionName)
    }
    latestSchema := config.Schema
    
    newFields, _ := compareSchemas(currentSchema, latestSchema)
    
    for _, field := range newFields {
        updateSchema := &api.CollectionUpdateSchema{
            Fields: []api.Field{field},
        }
        
        _, err := collection.Update(ctx, updateSchema)
        if err != nil {
            return fmt.Errorf("failed to add field %s: %w", field.Name, err)
        }
        logrus.Infof("Added new field %s to collection %s", field.Name, collectionName)
    }
    
    return nil
}

Performance Tuning

Batch Size Configuration

Optimal batch sizes for reindexing:
  • Small datasets (< 100K records): 1000
  • Medium datasets (100K - 1M records): 5000
  • Large datasets (> 1M records): 10000
POST /admin/reindex
{
  "batch_size": 5000
}

Typesense Memory Settings

Configure Typesense memory based on index size:
# For production workloads
typesense-server \
  --data-dir /data \
  --api-key=your-api-key \
  --enable-cors \
  --thread-pool-size=8 \
  --log-level=INFO
Memory Requirements:
  • 1M documents ≈ 500MB RAM
  • 10M documents ≈ 5GB RAM
  • 100M documents ≈ 50GB RAM

Query Performance

Use Field Weighting:
{
  "q": "john",
  "query_by": "reference,description",
  "query_by_weights": "2,1"  // Prioritize reference
}
Limit Result Size:
{
  "per_page": 20,  // Fewer results = faster
  "page": 1
}
Use Caching: Typesense caches queries automatically. For frequently-used filters, the cache provides sub-millisecond responses.

Best Practices

  1. Enable Automatic Indexing: Ensure queue system is running for real-time index updates
  2. Schedule Reindexing: Run full reindex during low-traffic periods
    # Weekly reindex via cron
    0 2 * * 0 curl -X POST http://localhost:5001/admin/reindex
    
  3. Monitor Index Health: Check collection stats regularly
    curl http://localhost:8108/collections/transactions
    
  4. Use Precise Filters: Narrow searches with filters before full-text search
    {
      "filter_by": "status:APPLIED && created_at:>1699564800",
      "q": "*"
    }
    
  5. Backup Typesense Data: Regular snapshots of /data directory

Troubleshooting

Search Returns No Results

Check Collection Exists:
curl http://localhost:8108/collections/transactions
Verify Documents Indexed:
curl http://localhost:8108/collections/transactions/documents/search?q=*&per_page=1
Trigger Reindex:
curl -X POST http://localhost:5001/admin/reindex

Slow Search Queries

Enable Query Logging:
typesense-server --log-level=DEBUG
Check Query Complexity:
  • Avoid wildcard prefix searches (*john)
  • Use filters to narrow result set
  • Reduce per_page size

Reindex Failures

Check Logs:
GET /admin/reindex/progress
Errors array shows failed documents:
{
  "errors": [
    "transaction txn_123: failed to convert field",
    "balance bal_456: invalid timestamp"
  ]
}
Common Issues:
  • Invalid field types (e.g., string in int64 field)
  • Missing required fields
  • Typesense connection timeout

Build docs developers (and LLMs) love