Type registry pattern
All extensible components use a registry pattern:# Global registries
from syft_space.components.dataset_types.registry import DATASET_TYPE_REGISTRY
from syft_space.components.model_types.registry import MODEL_TYPE_REGISTRY
from syft_space.components.policy_types.registry import POLICY_TYPE_REGISTRY
# Register custom types
DATASET_TYPE_REGISTRY.register("my_custom_db", MyCustomDatasetType)
MODEL_TYPE_REGISTRY.register("my_custom_model", MyCustomModelType)
POLICY_TYPE_REGISTRY.register("my_custom_policy", MyCustomPolicyType)
Custom dataset types
Add support for new vector databases or data sources.Implementation
Create a new dataset type class:from syft_space.components.dataset_types.base import DatasetType
from pydantic import BaseModel, Field
class PineconeConfig(BaseModel):
"""Configuration for Pinecone vector database."""
api_key: str = Field(description="Pinecone API key")
environment: str = Field(description="Pinecone environment")
index_name: str = Field(description="Index name")
class PineconeDatasetType(DatasetType):
"""Pinecone vector database integration."""
name = "pinecone"
description = "Pinecone managed vector database"
configuration_schema = PineconeConfig
async def search(
self,
query_vector: list[float],
limit: int = 10,
threshold: float = 0.7,
**kwargs
) -> list[dict]:
"""Search for similar vectors in Pinecone."""
import pinecone
# Initialize Pinecone client
pinecone.init(
api_key=self.config.api_key,
environment=self.config.environment
)
# Query index
index = pinecone.Index(self.config.index_name)
results = index.query(
vector=query_vector,
top_k=limit,
include_metadata=True
)
# Format results
return [
{
"document_id": match["id"],
"content": match["metadata"]["text"],
"similarity_score": match["score"],
"metadata": match["metadata"]
}
for match in results["matches"]
if match["score"] >= threshold
]
async def upsert(
self,
documents: list[dict],
embeddings: list[list[float]]
) -> None:
"""Insert or update documents in Pinecone."""
import pinecone
pinecone.init(
api_key=self.config.api_key,
environment=self.config.environment
)
index = pinecone.Index(self.config.index_name)
# Prepare upsert data
vectors = [
(doc["id"], emb, doc["metadata"])
for doc, emb in zip(documents, embeddings)
]
# Batch upsert
index.upsert(vectors=vectors)
async def delete(self, document_ids: list[str]) -> None:
"""Delete documents from Pinecone."""
import pinecone
pinecone.init(
api_key=self.config.api_key,
environment=self.config.environment
)
index = pinecone.Index(self.config.index_name)
index.delete(ids=document_ids)
Registration
Register your custom type at startup:# In backend/syft_space/components/dataset_types/__init__.py
from .pinecone_type import PineconeDatasetType
def register_builtin_types() -> None:
"""Register all built-in dataset types."""
from .registry import DATASET_TYPE_REGISTRY
# ... existing types ...
# Register custom type
DATASET_TYPE_REGISTRY.register(
"pinecone",
PineconeDatasetType
)
Usage
Users can now create datasets with your custom type:curl -X POST http://localhost:8080/api/v1/datasets/ \
-H "Content-Type: application/json" \
-d '{
"name": "my-pinecone-db",
"dtype": "pinecone",
"configuration": {
"api_key": "...",
"environment": "us-east1-gcp",
"index_name": "my-index"
}
}'
Custom model types
Add support for new AI model providers.Implementation
from syft_space.components.model_types.base import ModelType
from pydantic import BaseModel, Field
class CohereConfig(BaseModel):
"""Configuration for Cohere API."""
api_key: str = Field(description="Cohere API key")
model_name: str = Field(
default="command",
description="Model name (command, command-light, etc.)"
)
class CohereModelType(ModelType):
"""Cohere AI model integration."""
name = "cohere"
description = "Cohere language models"
configuration_schema = CohereConfig
async def chat(
self,
messages: list[dict],
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> dict:
"""Generate chat completion with Cohere."""
import cohere
# Initialize client
co = cohere.Client(self.config.api_key)
# Convert messages to Cohere format
chat_history = messages[:-1]
message = messages[-1]["content"]
# Generate response
response = co.chat(
message=message,
chat_history=chat_history,
model=self.config.model_name,
temperature=temperature,
max_tokens=max_tokens
)
# Format response
return {
"id": response.generation_id,
"model": self.config.model_name,
"message": {
"role": "assistant",
"content": response.text,
"tokens": response.meta.tokens.output_tokens
},
"finish_reason": "COMPLETE",
"usage": {
"prompt_tokens": response.meta.tokens.input_tokens,
"completion_tokens": response.meta.tokens.output_tokens,
"total_tokens": response.meta.tokens.total_tokens
}
}
async def embed(
self,
texts: list[str],
**kwargs
) -> list[list[float]]:
"""Generate embeddings with Cohere."""
import cohere
co = cohere.Client(self.config.api_key)
response = co.embed(
texts=texts,
model="embed-english-v3.0"
)
return response.embeddings
Registration
# In backend/syft_space/components/model_types/__init__.py
from .cohere_type import CohereModelType
def register_builtin_types() -> None:
"""Register all built-in model types."""
from .registry import MODEL_TYPE_REGISTRY
MODEL_TYPE_REGISTRY.register("cohere", CohereModelType)
Custom policy types
Add new access control or enforcement policies.Implementation
from syft_space.components.policy_types.base import PolicyType
from pydantic import BaseModel, Field
class QuotaConfig(BaseModel):
"""Configuration for quota policy."""
max_tokens_per_month: int = Field(
description="Maximum tokens per user per month"
)
reset_day: int = Field(
default=1,
ge=1,
le=28,
description="Day of month to reset quota"
)
class QuotaPolicyType(PolicyType):
"""Enforce monthly token quotas per user."""
name = "quota"
description = "Monthly token quota enforcement"
configuration_schema = QuotaConfig
hook_type = "pre" # Run before query execution
async def enforce(
self,
context: PolicyContext
) -> None:
"""Check if user is within quota."""
from datetime import datetime
# Get current month's usage
usage = await self._get_monthly_usage(
user_email=context.user_email,
month=datetime.now().strftime("%Y-%m")
)
# Check quota
if usage >= self.config.max_tokens_per_month:
raise PolicyViolationError(
f"Monthly quota exceeded. Used {usage} of {self.config.max_tokens_per_month} tokens."
)
async def _get_monthly_usage(self, user_email: str, month: str) -> int:
"""Get user's token usage for current month."""
# Query database for usage
result = await self.db.execute(
select(func.sum(QueryLog.total_tokens))
.where(
QueryLog.user_email == user_email,
func.strftime("%Y-%m", QueryLog.created_at) == month
)
)
return result.scalar() or 0
Registration
# In backend/syft_space/components/policy_types/__init__.py
from .quota_type import QuotaPolicyType
def register_builtin_types() -> None:
"""Register all built-in policy types."""
from .registry import POLICY_TYPE_REGISTRY
POLICY_TYPE_REGISTRY.register("quota", QuotaPolicyType)
Testing custom types
Unit tests
import pytest
from syft_space.components.dataset_types.pinecone_type import (
PineconeDatasetType,
PineconeConfig
)
@pytest.mark.asyncio
async def test_pinecone_search():
"""Test Pinecone search functionality."""
config = PineconeConfig(
api_key="test-key",
environment="test-env",
index_name="test-index"
)
dataset = PineconeDatasetType(config=config)
# Mock Pinecone client
with patch("pinecone.Index") as mock_index:
mock_index.return_value.query.return_value = {
"matches": [
{
"id": "doc1",
"score": 0.95,
"metadata": {"text": "Test document"}
}
]
}
results = await dataset.search(
query_vector=[0.1, 0.2, 0.3],
limit=10
)
assert len(results) == 1
assert results[0]["similarity_score"] == 0.95
Integration tests
@pytest.mark.integration
async def test_custom_dataset_end_to_end():
"""Test creating and using custom dataset type."""
# Create dataset
response = await client.post(
"/api/v1/datasets/",
json={
"name": "test-pinecone",
"dtype": "pinecone",
"configuration": {
"api_key": os.getenv("PINECONE_API_KEY"),
"environment": "test",
"index_name": "test"
}
}
)
assert response.status_code == 201
# Use in endpoint
# ... test query flow ...
Distribution
As package
Distribute as separate package:# setup.py
from setuptools import setup
setup(
name="syft-space-pinecone",
version="0.1.0",
packages=["syft_space_pinecone"],
install_requires=[
"syft-space>=0.1.0",
"pinecone-client>=2.0.0"
]
)
pip install syft-space-pinecone
# In main.py startup
from syft_space_pinecone import PineconeDatasetType
from syft_space.components.dataset_types.registry import DATASET_TYPE_REGISTRY
DATASET_TYPE_REGISTRY.register("pinecone", PineconeDatasetType)
Via configuration
Load from config file:# config.yaml
custom_types:
datasets:
- module: my_package.pinecone_type
class: PineconeDatasetType
name: pinecone
import importlib
import yaml
def load_custom_types():
with open("config.yaml") as f:
config = yaml.safe_load(f)
for dtype in config.get("custom_types", {}).get("datasets", []):
module = importlib.import_module(dtype["module"])
cls = getattr(module, dtype["class"])
DATASET_TYPE_REGISTRY.register(dtype["name"], cls)
Best practices
Configuration validation
Configuration validation
Use Pydantic models for configuration:
- Type safety
- Automatic validation
- Clear error messages
- Documentation generation
Error handling
Error handling
Raise appropriate exceptions:
ConfigurationErrorfor invalid configConnectionErrorfor network issuesAuthenticationErrorfor credential problemsPolicyViolationErrorfor policy enforcement
Async operations
Async operations
Use async/await throughout:
- Database queries
- HTTP requests
- File operations
- Prevents blocking event loop
Resource cleanup
Resource cleanup
Implement cleanup methods:
- Close connections
- Release resources
- Cancel background tasks
- Use context managers
See existing type implementations in
backend/syft_space/components/ for more examples.