Skip to main content

Overview

TrailBase includes a built-in job scheduler that executes background tasks on a cron-like schedule. Jobs can be defined in your configuration or dynamically registered via WASM components.

System Jobs

TrailBase includes several system jobs that run automatically:

Backup

Daily database backups (disabled by default)

Heartbeat

Health check every minute (for monitoring)

Log Cleaner

Hourly cleanup of old log entries

Auth Cleaner

Hourly removal of expired sessions

Query Optimizer

Daily SQLite query optimizer

File Deletions

Hourly cleanup of deleted files

System Job Configuration

Configure system jobs in your config.textproto:
jobs {
  system_jobs {
    id: 1  # Backup job
    schedule: "@daily"
    disabled: false
  }
  
  system_jobs {
    id: 2  # Heartbeat
    schedule: "*/5 * * * * * *"  # Every 5 seconds
    disabled: false
  }
  
  system_jobs {
    id: 3  # Log Cleaner
    schedule: "@hourly"
    disabled: false
  }
  
  system_jobs {
    id: 4  # Auth Cleaner  
    schedule: "@hourly"
    disabled: false
  }
  
  system_jobs {
    id: 5  # Query Optimizer
    schedule: "@daily"
    disabled: false
  }
  
  system_jobs {
    id: 6  # File Deletions
    schedule: "@hourly"
    disabled: false
  }
}

server {
  # Log retention for cleanup job
  logs_retention_sec: 604800  # 7 days
}

auth {
  # Session TTL for cleanup job
  refresh_token_ttl_sec: 2592000  # 30 days
}

Cron Syntax

Jobs use cron syntax with optional 7 fields:
sec  min  hour  day  month  weekday  year
*    *    *     *    *      *        *

Field Values

FieldRangeSpecial Values
sec0-59*, /
min0-59*, /
hour0-23*, /
day1-31*, /
month1-12*, /
weekday0-6 (Sun-Sat)*, /
year1970-3000*, /

Named Schedules

"@hourly"   // 0 * * * * *
"@daily"    // 0 0 * * * *
"@weekly"   // 0 0 * * 0 *
"@monthly"  // 0 0 1 * * *
"@yearly"   // 0 0 1 1 * *

Examples

// Every minute
"37 * * * * *"  // At 37 seconds past every minute

// Every hour
"0 0 * * * *"  // At the start of every hour

// Every day at 2 AM
"0 0 2 * * *"

// Every Monday at 9 AM
"0 0 9 * * 1"

// Every 5 minutes
"0 */5 * * * *"

// Every 30 seconds
"*/30 * * * * * *"

// Weekdays at 8 AM
"0 0 8 * * 1-5"

// First day of every month
"0 0 0 1 * *"
Interval-based schedules like “every 100 seconds” are not supported. Cron is time-based, not interval-based. Use the smallest supported interval (seconds) and implement your own timing logic if needed.

Creating Jobs in WASM Components

use trailbase_wasm::job::Job;
use trailbase_wasm::http::HttpRoute;
use trailbase_wasm::db::{query, execute, Value};
use trailbase_wasm::{Guest, export};

struct MyJobs;

impl Guest for MyJobs {
    fn job_handlers() -> Vec<Job> {
        vec![
            // Using convenience methods
            Job::minutely("process_queue", process_queue),
            Job::hourly("cleanup_old_data", cleanup_old_data),
            Job::daily("generate_report", generate_report),
            
            // Using custom schedule
            Job::new(
                "sync_external_api",
                "0 */15 * * * *",  // Every 15 minutes
                sync_external_api
            ).unwrap(),
            
            Job::new(
                "backup_critical_data",
                "0 0 */6 * * *",  // Every 6 hours
                backup_critical_data
            ).unwrap(),
        ]
    }
}

async fn process_queue() {
    println!("Processing queue at {}", chrono::Utc::now());
    
    let pending = query(
        "SELECT id, data FROM queue WHERE status = 'pending' LIMIT 100",
        []
    ).await.unwrap();
    
    for row in pending {
        let id = row[0].as_integer().unwrap();
        let data = row[1].as_text().unwrap();
        
        // Process item
        match process_item(data).await {
            Ok(_) => {
                execute(
                    "UPDATE queue SET status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = $1",
                    [Value::Integer(id)]
                ).await.unwrap();
            }
            Err(e) => {
                execute(
                    "UPDATE queue SET status = 'failed', error = $1 WHERE id = $2",
                    [Value::Text(e.to_string()), Value::Integer(id)]
                ).await.unwrap();
            }
        }
    }
}

async fn cleanup_old_data() {
    println!("Cleaning up old data");
    
    // Delete records older than 90 days
    let affected = execute(
        "DELETE FROM logs WHERE created_at < datetime('now', '-90 days')",
        []
    ).await.unwrap();
    
    println!("Deleted {} old log entries", affected);
}

async fn generate_report() {
    println!("Generating daily report");
    
    let stats = query(
        r#"
        SELECT 
          COUNT(*) as total_users,
          COUNT(DISTINCT date(created_at)) as active_days
        FROM users
        WHERE created_at >= datetime('now', '-1 day')
        "#,
        []
    ).await.unwrap();
    
    // Generate and store report
    execute(
        "INSERT INTO reports (type, data, created_at) VALUES ('daily', $1, CURRENT_TIMESTAMP)",
        [Value::Text(format!("{:?}", stats))]
    ).await.unwrap();
}

async fn sync_external_api() {
    use trailbase_wasm::fetch::get;
    
    println!("Syncing with external API");
    
    match get("https://api.example.com/data").await {
        Ok(data) => {
            // Process and store data
            println!("Synced {} bytes", data.len());
        }
        Err(e) => {
            eprintln!("Sync failed: {}", e);
        }
    }
}

async fn backup_critical_data() {
    println!("Backing up critical data");
    
    let data = query(
        "SELECT id, data FROM critical_data ORDER BY id",
        []
    ).await.unwrap();
    
    // Export to backup location
    println!("Backed up {} records", data.len());
}

export!(MyJobs);

Job Lifecycle

From crates/core/src/scheduler.rs:
pub struct Job {
    pub id: i32,
    state: Arc<Mutex<JobState>>,
}

impl Job {
    // Start the job (schedule it to run)
    pub fn start(&self) {
        // Spawns a tokio task that waits for next scheduled time
    }
    
    // Run the job immediately
    async fn run_now(&self) -> Result<(), String> {
        // Executes the job callback
    }
    
    // Get next scheduled run time
    pub fn next_run(&self) -> Option<DateTime<Utc>> {
        // Returns when job will run next
    }
    
    // Stop the job
    fn stop(&self) {
        // Aborts the scheduled task
    }
    
    // Check if job is running
    pub fn running(&self) -> bool {
        // Returns true if scheduled
    }
    
    // Get last execution result
    pub fn latest(&self) -> Option<(DateTime<Utc>, Duration, Option<String>)> {
        // Returns: (start_time, duration, error)
    }
}

Error Handling

Jobs should handle errors gracefully:
async fn resilient_job() {
    match do_work().await {
        Ok(_) => {
            println!("Job completed successfully");
        }
        Err(e) => {
            eprintln!("Job failed: {}", e);
            
            // Log error to database
            let _ = execute(
                "INSERT INTO job_errors (job_name, error, created_at) VALUES ($1, $2, CURRENT_TIMESTAMP)",
                [
                    Value::Text("resilient_job".to_string()),
                    Value::Text(e.to_string()),
                ]
            ).await;
            
            // Optionally: send alert
            // send_alert(&format!("Job failed: {}", e)).await;
        }
    }
}

Long-Running Jobs

Jobs should complete quickly. For long-running tasks, create work items and process them incrementally:
// BAD: Process all items at once
async fn bad_job() {
    let all_items = query("SELECT * FROM large_table", []).await.unwrap();
    for item in all_items {
        expensive_operation(item).await;
    }
}

// GOOD: Process in batches
async fn good_job() {
    const BATCH_SIZE: i64 = 100;
    
    let count: i64 = query(
        "SELECT COUNT(*) FROM work_queue WHERE status = 'pending'",
        []
    ).await.unwrap()[0][0].as_integer().unwrap();
    
    if count == 0 {
        return;  // Nothing to do
    }
    
    // Process only one batch per job run
    let batch = query(
        "SELECT id, data FROM work_queue WHERE status = 'pending' LIMIT $1",
        [Value::Integer(BATCH_SIZE)]
    ).await.unwrap();
    
    for row in batch {
        let id = row[0].as_integer().unwrap();
        match process_item(&row[1]).await {
            Ok(_) => {
                execute(
                    "UPDATE work_queue SET status = 'completed' WHERE id = $1",
                    [Value::Integer(id)]
                ).await.unwrap();
            }
            Err(e) => {
                execute(
                    "UPDATE work_queue SET status = 'failed', error = $1 WHERE id = $2",
                    [Value::Text(e.to_string()), Value::Integer(id)]
                ).await.unwrap();
            }
        }
    }
    
    println!("Processed batch, {} remaining", count - BATCH_SIZE.max(0));
}

Monitoring Jobs

Admin API

List and monitor jobs via the admin API:
# List all jobs
curl -H "Authorization: Bearer $ADMIN_TOKEN" \
  https://api.example.com/_/admin/jobs

# Run a job manually
curl -X POST \
  -H "Authorization: Bearer $ADMIN_TOKEN" \
  https://api.example.com/_/admin/jobs/123/run

Job Status

// Get job execution history from database
let history = query(
    r#"
    SELECT 
      job_name,
      status,
      started_at,
      completed_at,
      error
    FROM job_executions
    WHERE job_name = $1
    ORDER BY started_at DESC
    LIMIT 10
    "#,
    [Value::Text("my_job".to_string())]
).await?;

Best Practices

1

Idempotent jobs

Design jobs to be safely re-run. Check for existing work before processing:
async fn idempotent_job() {
    // Check if already processed today
    let exists = query(
        "SELECT 1 FROM reports WHERE date = date('now') AND type = 'daily'",
        []
    ).await.unwrap();
    
    if !exists.is_empty() {
        println!("Report already generated today");
        return;
    }
    
    // Generate report...
}
2

Transaction safety

Use transactions for multi-step operations:
use trailbase_wasm::db::Transaction;

async fn transactional_job() {
    let mut tx = Transaction::begin().unwrap();
    
    tx.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1", [])?;
    tx.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2", [])?;
    
    tx.commit()?;
}
3

Resource cleanup

Always clean up resources, even on failure:
async fn cleanup_job() {
    let temp_files = create_temp_files().await;
    
    let result = process_files(&temp_files).await;
    
    // Always cleanup
    for file in temp_files {
        let _ = delete_file(&file).await;
    }
    
    result
}
4

Logging

Log job execution for debugging:
async fn logged_job() {
    let start = std::time::Instant::now();
    println!("[{}] Job started", chrono::Utc::now());
    
    let result = do_work().await;
    
    let duration = start.elapsed();
    println!("[{}] Job completed in {:?}: {:?}", 
             chrono::Utc::now(), duration, result);
}
5

Rate limiting

Avoid overwhelming external APIs:
use tokio::time::{sleep, Duration};

async fn rate_limited_job() {
    let items = query("SELECT id, url FROM items LIMIT 100", []).await.unwrap();
    
    for row in items {
        fetch_url(row[1].as_text().unwrap()).await;
        
        // Wait between requests
        sleep(Duration::from_millis(100)).await;
    }
}

Example: Email Digest Job

use trailbase_wasm::job::Job;
use trailbase_wasm::db::{query, execute, Value};
use trailbase_wasm::fetch::{fetch, Request};

async fn send_daily_digest() {
    println!("Sending daily digest emails");
    
    // Get users who want daily digests
    let users = query(
        r#"
        SELECT u.id, u.email, u.name
        FROM users u
        WHERE u.email_preferences LIKE '%daily_digest%'
          AND u.verified = 1
        "#,
        []
    ).await.unwrap();
    
    for user_row in users {
        let user_id = user_row[0].as_integer().unwrap();
        let email = user_row[1].as_text().unwrap();
        let name = user_row[2].as_text().unwrap();
        
        // Get user's activity for the day
        let activity = query(
            r#"
            SELECT 
              COUNT(*) as new_messages,
              (SELECT COUNT(*) FROM notifications WHERE user_id = $1 AND created_at >= date('now')) as notifications
            FROM messages
            WHERE recipient_id = $1
              AND created_at >= date('now')
            "#,
            [Value::Integer(user_id)]
        ).await.unwrap();
        
        if let Some(row) = activity.first() {
            let new_messages = row[0].as_integer().unwrap();
            let notifications = row[1].as_integer().unwrap();
            
            if new_messages > 0 || notifications > 0 {
                // Send email via external service
                let _ = send_digest_email(
                    email,
                    name,
                    new_messages,
                    notifications
                ).await;
            }
        }
    }
    
    println!("Daily digest sent to {} users", users.len());
}

impl Guest for DigestJobs {
    fn job_handlers() -> Vec<Job> {
        vec![
            Job::new(
                "daily_digest",
                "0 0 8 * * *",  // Every day at 8 AM
                send_daily_digest
            ).unwrap(),
        ]
    }
}

Next Steps

Email

Send emails from jobs

Custom Endpoints

Trigger jobs via API

Vector Search

Batch embedding generation

WASM Components

Create job handlers

Build docs developers (and LLMs) love