Skip to main content

System Overview

Open Wearables is a self-hosted health data aggregation platform built with a modern microservices architecture. Each deployment serves a single organization with full control over data and infrastructure. Open Wearables Architecture

Tech Stack

Backend

FastAPI - High-performance Python web framework
  • Python 3.13+
  • SQLAlchemy 2.0 ORM
  • Pydantic for validation
  • Alembic for migrations

Frontend

React + TypeScript - Modern developer portal
  • React 19
  • TanStack Router
  • TanStack Query
  • Tailwind CSS 4.0
  • shadcn/ui components

Database

PostgreSQL 18 - Relational data storage
  • User profiles
  • Health metrics
  • Provider connections
  • OAuth tokens

Task Queue

Celery + Redis - Background processing
  • Data synchronization
  • Scheduled tasks
  • Webhook processing
  • Provider API calls

Core Components

Backend API (FastAPI)

The backend serves as the central hub for all platform operations:
# app/main.py - Application entry point
from fastapi import FastAPI
from app.api import head_router
from app.config import settings

api = FastAPI(title=settings.api_name)
api.include_router(head_router)
Key responsibilities:
  • RESTful API endpoints at /api/v1/*
  • OAuth flow orchestration for wearable providers
  • User authentication and authorization
  • Data normalization and validation
  • Real-time health data queries
Architectural layers:
1

Routes Layer

HTTP endpoints defined in app/api/routes/v1/
@router.get("/users/{user_id}")
async def get_user(user_id: UUID, db: DbSession) -> UserRead:
    return user_service.get(db, user_id, raise_404=True)
2

Services Layer

Business logic in app/services/
class UserService(AppService):
    def create(self, db: DbSession, payload: UserCreate) -> User:
        internal = UserCreateInternal(**payload.model_dump())
        return super().create(db, internal)
3

Repositories Layer

Database operations in app/repositories/
class UserRepository(CrudRepository):
    def get_by_email(self, db: DbSession, email: str) -> User | None:
        return db.query(self.model).filter(
            self.model.email == email
        ).one_or_none()
4

Models Layer

SQLAlchemy models in app/models/
class User(BaseDbModel):
    id: Mapped[PrimaryKey[UUID]]
    email: Mapped[Unique[email]]
    created_at: Mapped[datetime_tz]

Frontend Portal (React)

Web-based dashboard for managing your integration: File-based routing structure:
src/routes/
├── __root.tsx              # Root layout with providers
├── _authenticated.tsx      # Protected route guard
├── _authenticated/
│   ├── dashboard.tsx       # Main dashboard
│   ├── users.tsx           # User management
│   └── settings.tsx        # Configuration
└── login.tsx               # Public login page
State management pattern:
// src/hooks/api/use-users.ts
import { useQuery } from '@tanstack/react-query';
import { queryKeys } from '@/lib/query/keys';
import { usersService } from '@/lib/api';

export function useUsers() {
  return useQuery({
    queryKey: queryKeys.users.list(),
    queryFn: () => usersService.getAll(),
  });
}

PostgreSQL Database

Relational database storing all platform data with a normalized schema: Core tables:
TablePurpose
developerAdmin accounts for the developer portal
applicationOAuth applications (future multi-app support)
api_keyAPI credentials for accessing the platform
userEnd users who connect their wearables
user_connectionProvider OAuth tokens and connection status
provider_settingsProvider-specific configuration
event_recordNormalized health events (workouts, sleep)
data_point_seriesTime series data (heart rate, steps)
Migrations managed with Alembic:
# Create a new migration
make create_migration m="Add heart rate table"

# Apply migrations
make migrate

# Rollback last migration
make downgrade

Celery Task Queue

Background job processor handling asynchronous operations: Task types:
Automatically syncs data from connected providers on a schedule:
@celery_app.task
def periodic_sync_task():
    """Runs every hour (configurable via SYNC_INTERVAL_SECONDS)"""
    for connection in active_connections:
        sync_vendor_data_task.delay(connection.id)
Configured interval in .env:
SYNC_INTERVAL_SECONDS=3600  # 1 hour
Worker configuration:
# docker-compose.yml
celery-worker:
  command: scripts/start/worker.sh
  environment:
    - DB_HOST=db
    - REDIS_HOST=redis

celery-beat:
  command: scripts/start/beat.sh
  # Scheduler for periodic tasks

Redis Cache

In-memory data store serving multiple purposes:
  • Message broker for Celery task queue
  • Result backend for task status and return values
  • Session cache for OAuth flows (state parameters)
  • Rate limiting for API endpoints (future)

Data Flow

OAuth Connection Flow

1

Initiate Connection

User clicks “Connect Garmin” in the developer portal or your app calls the SDK:
POST /api/v1/oauth/garmin/authorize
Backend generates OAuth state and returns authorization URL.
2

User Authorization

User is redirected to Garmin’s OAuth page, logs in, and grants permissions.
3

Callback Handling

Garmin redirects back to:
GET /api/v1/oauth/garmin/callback?code=xxx&state=yyy
Backend exchanges code for access/refresh tokens and stores in user_connection table.
4

Initial Sync

Celery task is triggered to backfill historical data:
sync_vendor_data_task.delay(connection_id)

Data Synchronization Flow

1

Scheduled Trigger

Celery Beat triggers periodic sync every hour (configurable):
@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(
        settings.sync_interval_seconds,
        periodic_sync_task.s(),
    )
2

Strategy Selection

Provider-specific strategy handles API calls:
strategy = ProviderFactory.get_strategy('garmin')
activities = strategy.fetch_activities(connection, since=last_sync)
3

Data Normalization

Raw provider data is transformed to unified schema:
normalized = {
    'type': 'workout',
    'start_time': parse_datetime(raw['startTime']),
    'duration_seconds': raw['duration'],
    'calories': raw['activeKilocalories'],
}
4

Database Storage

Normalized data is stored in event_record and data_point_series tables.

API Query Flow

Provider Strategy Pattern

Extensible architecture for adding new wearable integrations:
# app/services/providers/base_strategy.py
class BaseProviderStrategy(ABC):
    @abstractmethod
    def fetch_activities(self, connection: UserConnection) -> list[dict]:
        """Fetch workout data from provider API"""
        pass
    
    @abstractmethod
    def normalize_activity(self, raw: dict) -> EventRecord:
        """Transform provider data to unified schema"""
        pass
Implemented providers:
# app/services/providers/garmin/strategy.py
class GarminStrategy(BaseProviderStrategy):
    api_base_url = "https://apis.garmin.com"
    
    def fetch_activities(self, connection, since=None):
        response = self.client.get(
            f"{self.api_base_url}/wellness-api/rest/activities",
            headers={"Authorization": f"Bearer {connection.access_token}"}
        )
        return response.json()
See the Provider Development Guide to add support for new wearables.

Security Architecture

Authentication

JWT-based authentication for admin users:
@router.post("/token")
def login(credentials: OAuth2PasswordRequestForm):
    developer = verify_credentials(credentials)
    access_token = create_access_token(developer.id)
    return {"access_token": access_token, "token_type": "bearer"}

Data Isolation

Each API key is scoped to a single application:
# All queries automatically filter by API key scope
@router.get("/users")
def list_users(api_key: ApiKeyDep, db: DbSession):
    return db.query(User).filter(
        User.application_id == api_key.application_id
    ).all()

Monitoring & Observability

Celery Flower Dashboard

Real-time monitoring of background tasks:
# Access at http://localhost:5555
docker compose logs flower
Features:
  • Task execution history
  • Worker status and performance
  • Task success/failure rates
  • Real-time task monitoring

Application Logs

Structured JSON logging to stdout:
# app/main.py
import logging

logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s - %(name)s] (%(levelname)s) %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)]
)
View logs:
docker compose logs -f app          # Backend
docker compose logs -f celery-worker # Workers
docker compose logs -f frontend     # Frontend

Sentry Integration

Error tracking and performance monitoring:
# backend/config/.env
SENTRY_ENABLED=True
SENTRY_DSN=your-sentry-dsn
SENTRY_ENV=production
SENTRY_SAMPLES_RATE=0.5

Deployment Considerations

Docker Compose (Production)

The docker-compose.yml is production-ready with health checks:
db:
  healthcheck:
    test: ["CMD-SHELL", "pg_isready -U open-wearables"]
    interval: 5s
    timeout: 5s
    retries: 5

app:
  depends_on:
    db:
      condition: service_healthy
  restart: on-failure

Environment Variables

Critical settings to change for production:
Security checklist:
  • Generate a strong SECRET_KEY (64+ characters)
  • Change ADMIN_PASSWORD from default
  • Set ENVIRONMENT=production
  • Update CORS_ORIGINS to your frontend domain
  • Enable and configure SENTRY_DSN
  • Set provider credentials (Garmin, Polar, etc.)
  • Configure email settings with real SMTP/Resend credentials

Scaling

Add more Celery workers for increased throughput:
docker compose up --scale celery-worker=3

Next Steps

API Reference

Explore all available endpoints and data models

Provider Setup

Configure OAuth for Garmin, Polar, Suunto, and more

Add New Provider

Extend the platform with custom integrations

Contributing

Join the open-source community

Build docs developers (and LLMs) love