Skip to main content
Custom data connectors allow you to integrate Spice.ai with any data source by implementing the DataConnector and DataConnectorFactory traits. This guide covers the architecture, trait requirements, and implementation patterns.

Architecture Overview

Spice.ai uses a plugin-based architecture for data connectors built on three key components:
  1. DataConnectorFactory: Creates connector instances with configuration
  2. DataConnector: Provides data access through DataFusion TableProvider
  3. Registration: Automatic discovery via linkme-based plugin system
┌─────────────────────────────────────────┐
│  Spice Runtime                          │
│  ┌───────────────────────────────────┐ │
│  │  Data Connector Registry          │ │
│  │  (linkme distributed slice)       │ │
│  └───────────────────────────────────┘ │
│           ↓                             │
│  ┌───────────────────────────────────┐ │
│  │  DataConnectorFactory             │ │
│  │  - validates parameters           │ │
│  │  - creates connector instances    │ │
│  └───────────────────────────────────┘ │
│           ↓                             │
│  ┌───────────────────────────────────┐ │
│  │  DataConnector                    │ │
│  │  - read_provider()                │ │
│  │  - read_write_provider()          │ │
│  │  - changes_stream()               │ │
│  └───────────────────────────────────┘ │
│           ↓                             │
│  ┌───────────────────────────────────┐ │
│  │  TableProvider (DataFusion)       │ │
│  │  - scan()                         │ │
│  │  - insert_into()                  │ │
│  └───────────────────────────────────┘ │
└─────────────────────────────────────────┘

DataConnectorFactory Trait

Required Methods

use std::{any::Any, future::Future, pin::Pin, sync::Arc};
use runtime::dataconnector::{DataConnectorFactory, DataConnector, ConnectorParams};
use runtime::parameters::ParameterSpec;

pub trait DataConnectorFactory: Send + Sync {
    /// Returns a reference for downcasting
    fn as_any(&self) -> &dyn Any;

    /// Creates a new connector instance
    fn create(
        &self,
        params: ConnectorParams,
    ) -> Pin<Box<dyn Future<Output = NewDataConnectorResult> + Send>>;

    /// Parameter prefix (e.g., "pg" for postgres)
    fn prefix(&self) -> &'static str;

    /// List of accepted parameters
    fn parameters(&self) -> &'static [ParameterSpec];

    /// Whether unsupported_type_action is supported
    fn supports_unsupported_type_action(&self) -> bool {
        false
    }

    /// Reserved keywords that cannot be used as table names
    fn reserved_keywords(&self) -> &'static [&'static str] {
        &[]
    }
}

Parameter Specification

use runtime::parameters::{ParameterSpec, ParameterType};

const FILE_PARAMETERS: &[ParameterSpec] = &[
    ParameterSpec::connector("file_format")
        .description("File format (csv, parquet, json, etc.)")
        .required(),
    ParameterSpec::connector("delimiter")
        .description("CSV delimiter character")
        .default(Some(",")),
    ParameterSpec::runtime("client_timeout")
        .description("Client timeout in seconds")
        .default(Some("30")),
];
Parameter types:
  • ParameterType::Connector: Prefixed with connector prefix (e.g., pg_hosthost)
  • ParameterType::Runtime: No prefix, used by runtime systems

DataConnector Trait

Core Interface

use async_trait::async_trait;
use datafusion::datasource::TableProvider;
use runtime::component::dataset::Dataset;
use runtime::dataconnector::{DataConnector, DataConnectorResult};

#[async_trait]
pub trait DataConnector: Debug + Send + Sync + 'static {
    /// Returns reference for downcasting
    fn as_any(&self) -> &dyn Any;

    /// Returns TableProvider for reading data (REQUIRED)
    async fn read_provider(
        &self,
        dataset: &Dataset,
    ) -> DataConnectorResult<Arc<dyn TableProvider>>;

    /// Returns TableProvider for reading and writing (Optional)
    async fn read_write_provider(
        &self,
        dataset: &Dataset,
    ) -> Option<DataConnectorResult<Arc<dyn TableProvider>>> {
        None
    }

    /// Resolves the refresh mode
    fn resolve_refresh_mode(&self, refresh_mode: Option<RefreshMode>) -> RefreshMode {
        refresh_mode.unwrap_or(RefreshMode::Full)
    }

    /// Whether CDC streaming is supported
    fn supports_changes_stream(&self) -> bool {
        false
    }

    /// Returns CDC stream if supported
    fn changes_stream(
        &self,
        federated_table: Arc<FederatedTable>,
        dataset: &Dataset,
        accelerated_table_provider: Arc<dyn TableProvider>,
        accelerator_write_mutex: Arc<Mutex<()>>,
    ) -> Option<ChangesStream> {
        None
    }

    /// Hook called when accelerated table is registered
    async fn on_accelerated_table_registration(
        &self,
        dataset: &Dataset,
        accelerated_table: &mut AcceleratedTable,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        Ok(())
    }

    /// Returns metrics provider
    fn metrics_provider(&self) -> Option<Arc<dyn MetricsProvider>> {
        None
    }

    /// Controls when connector initializes
    fn initialization(&self) -> ComponentInitialization {
        ComponentInitialization::default()
    }
}

Implementation Example: File Connector

Factory Implementation

use std::{any::Any, sync::Arc};
use runtime::dataconnector::{
    DataConnectorFactory, DataConnector, ConnectorParams,
    listing::LISTING_TABLE_PARAMETERS,
};
use runtime::parameters::Parameters;
use tokio::runtime::Handle;

#[derive(Default, Debug, Copy, Clone)]
pub struct FileFactory {}

impl FileFactory {
    pub fn new() -> Self {
        Self {}
    }

    pub fn new_arc() -> Arc<dyn DataConnectorFactory> {
        Arc::new(Self {}) as Arc<dyn DataConnectorFactory>
    }
}

impl DataConnectorFactory for FileFactory {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn create(
        &self,
        params: ConnectorParams,
    ) -> Pin<Box<dyn Future<Output = NewDataConnectorResult> + Send>> {
        Box::pin(async move {
            Ok(Arc::new(File {
                params: params.parameters,
                tokio_io_runtime: params.io_runtime,
            }) as Arc<dyn DataConnector>)
        })
    }

    fn prefix(&self) -> &'static str {
        "file"
    }

    fn parameters(&self) -> &'static [ParameterSpec] {
        LISTING_TABLE_PARAMETERS
    }
}

Connector Implementation

use async_trait::async_trait;
use datafusion::datasource::TableProvider;
use runtime::component::dataset::Dataset;
use runtime::dataconnector::{
    DataConnector, DataConnectorResult, listing::ListingTableConnector,
};

#[derive(Debug)]
pub struct File {
    params: Parameters,
    tokio_io_runtime: Handle,
}

#[async_trait]
impl ListingTableConnector for File {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn get_params(&self) -> &Parameters {
        &self.params
    }

    fn get_tokio_io_runtime(&self) -> Handle {
        self.tokio_io_runtime.clone()
    }

    fn get_object_store_url(
        &self,
        dataset: &Dataset,
        path: Option<&str>,
    ) -> DataConnectorResult<Url> {
        // Convert dataset path to object store URL
        let path = path.unwrap_or_else(|| dataset.from.as_str());
        Url::parse(&format!("file:{}", path))
            .map_err(|e| DataConnectorError::InvalidConfiguration {
                dataconnector: "file".to_string(),
                message: format!("Invalid file path: {}", e),
                connector_component: ConnectorComponent::from(dataset),
            })
    }
}

Registration with Linkme

The register_data_connector! macro automatically registers your connector:
use runtime::register_data_connector;

// Simple form (generates function and static names)
register_data_connector!("file", FileFactory);

// Explicit form (full control over names)
register_data_connector!(
    register_file_connector,
    FILE_CONNECTOR_REGISTRATION,
    "file",
    FileFactory
);

How Registration Works

  1. Macro Expansion: Creates constructor function and static registration
  2. Link-Time Collection: linkme collects all registrations into distributed slice
  3. Runtime Discovery: register_all() iterates slice and registers factories
// Expanded macro (simplified)
fn register_file_connector() -> Arc<dyn DataConnectorFactory> {
    FileFactory::new_arc()
}

#[linkme::distributed_slice(DATA_CONNECTOR_REGISTRATIONS)]
pub static FILE_CONNECTOR_REGISTRATION: DataConnectorRegistration =
    DataConnectorRegistration::new("file", register_file_connector);

Error Handling

Use structured errors with SNAFU:
use snafu::prelude::*;
use runtime::dataconnector::{DataConnectorError, ConnectorComponent};

// Connection errors
return Err(DataConnectorError::UnableToConnectInternal {
    dataconnector: "mydb".to_string(),
    connector_component: ConnectorComponent::from(dataset),
    source: connection_err.into(),
});

// Configuration errors
return Err(DataConnectorError::InvalidConfiguration {
    dataconnector: "mydb".to_string(),
    connector_component: ConnectorComponent::from(dataset),
    message: "host parameter is required".to_string(),
    source: err.into(),
});

// Schema errors
return Err(DataConnectorError::UnableToGetSchema {
    dataconnector: "mydb".to_string(),
    connector_component: ConnectorComponent::from(dataset),
    table_name: dataset.name.to_string(),
});

Advanced Features

Change Data Capture (CDC)

Implement CDC streaming for real-time updates:
use data_components::cdc::ChangesStream;

impl DataConnector for MyConnector {
    fn supports_changes_stream(&self) -> bool {
        true
    }

    fn changes_stream(
        &self,
        federated_table: Arc<FederatedTable>,
        dataset: &Dataset,
        accelerated_table_provider: Arc<dyn TableProvider>,
        accelerator_write_mutex: Arc<Mutex<()>>,
    ) -> Option<ChangesStream> {
        Some(ChangesStream::new(/* ... */))
    }
}

File Watching

Trigger refreshes on file changes:
use notify::{RecommendedWatcher, RecursiveMode, Watcher};

async fn on_accelerated_table_registration(
    &self,
    dataset: &Dataset,
    accelerated_table: &mut AcceleratedTable,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let path = get_path(dataset);
    let Some(refresh_trigger) = accelerated_table.refresh_trigger().cloned() else {
        return Ok(());
    };

    let watcher_task = tokio::spawn(async move {
        let mut watcher = notify::recommended_watcher(
            move |res: Result<notify::Event, _>| {
                if let Ok(event) = res {
                    if event.kind.is_modify() {
                        refresh_trigger.trigger();
                    }
                }
            },
        )?;

        watcher.watch(&path, RecursiveMode::Recursive)?;
        // Keep watcher alive
    });

    accelerated_table.add_join_handle(watcher_task);
    Ok(())
}

Connection Pooling

Always use connection pools for database connectors:
use deadpool::managed::Pool;

#[derive(Debug)]
pub struct MyConnector {
    pool: Pool<MyConnectionManager>,
}

impl MyConnector {
    pub async fn new(params: Parameters) -> Result<Self> {
        // Pool creation should never fail - errors only on get()
        let pool = Pool::builder(manager).build()?;
        Ok(Self { pool })
    }

    async fn get_connection(&self) -> Result<Connection> {
        self.pool.get().await.map_err(Into::into)
    }
}

Best Practices

DO:

  • ✅ Use connection pooling for all database connectors
  • ✅ Use SNAFU for error handling with context
  • ✅ Validate parameters in factory’s create() method
  • ✅ Use tracing:: macros for logging
  • ✅ Implement connection retries with exponential backoff
  • ✅ Use spawn_blocking for blocking I/O operations
  • ✅ Support zero-copy operations with Arrow arrays
  • ✅ Add comprehensive parameter documentation

DON’T:

  • ❌ Use .unwrap() or .expect() in production code
  • ❌ Use log:: macros (use tracing:: instead)
  • ❌ Block async runtime with synchronous operations
  • ❌ Create new connections on every request
  • ❌ Use assert!() for error handling
  • ❌ Copy Arrow data unnecessarily
  • ❌ Hold locks across .await points
  • ❌ Add newlines in log messages

Testing

Create integration tests in test/spicepods/:
#[cfg(test)]
mod tests {
    use super::*;
    use runtime::dataconnector::create_new_connector;

    #[tokio::test]
    async fn test_connector_creation() {
        let params = ConnectorParams {
            parameters: Parameters::default(),
            component: ConnectorComponent::Dataset(dataset),
            // ...
        };

        let connector = create_new_connector("myconnector", params)
            .await
            .expect("should exist")
            .expect("should create");

        let provider = connector
            .read_provider(&dataset)
            .await
            .expect("should provide");

        let schema = provider.schema();
        assert_eq!(schema.fields().len(), 3);
    }
}

Feature Flags

Gate heavy dependencies behind feature flags: Cargo.toml:
[dependencies]
mydb-client = { version = "1.0", optional = true }

[features]
mydb = ["dep:mydb-client"]
Code:
#[cfg(feature = "mydb")]
pub mod mydb;

#[cfg(feature = "mydb")]
register_data_connector!("mydb", MyDbFactory);

Next Steps

Build docs developers (and LLMs) love