Overview
TrailBase includes a built-in job scheduler that executes background tasks on a cron-like schedule. Jobs can be defined in your configuration or dynamically registered via WASM components.System Jobs
TrailBase includes several system jobs that run automatically:Backup
Daily database backups (disabled by default)
Heartbeat
Health check every minute (for monitoring)
Log Cleaner
Hourly cleanup of old log entries
Auth Cleaner
Hourly removal of expired sessions
Query Optimizer
Daily SQLite query optimizer
File Deletions
Hourly cleanup of deleted files
System Job Configuration
Configure system jobs in yourconfig.textproto:
jobs {
system_jobs {
id: 1 # Backup job
schedule: "@daily"
disabled: false
}
system_jobs {
id: 2 # Heartbeat
schedule: "*/5 * * * * * *" # Every 5 seconds
disabled: false
}
system_jobs {
id: 3 # Log Cleaner
schedule: "@hourly"
disabled: false
}
system_jobs {
id: 4 # Auth Cleaner
schedule: "@hourly"
disabled: false
}
system_jobs {
id: 5 # Query Optimizer
schedule: "@daily"
disabled: false
}
system_jobs {
id: 6 # File Deletions
schedule: "@hourly"
disabled: false
}
}
server {
# Log retention for cleanup job
logs_retention_sec: 604800 # 7 days
}
auth {
# Session TTL for cleanup job
refresh_token_ttl_sec: 2592000 # 30 days
}
Cron Syntax
Jobs use cron syntax with optional 7 fields:sec min hour day month weekday year
* * * * * * *
Field Values
| Field | Range | Special Values |
|---|---|---|
| sec | 0-59 | *, / |
| min | 0-59 | *, / |
| hour | 0-23 | *, / |
| day | 1-31 | *, / |
| month | 1-12 | *, / |
| weekday | 0-6 (Sun-Sat) | *, / |
| year | 1970-3000 | *, / |
Named Schedules
"@hourly" // 0 * * * * *
"@daily" // 0 0 * * * *
"@weekly" // 0 0 * * 0 *
"@monthly" // 0 0 1 * * *
"@yearly" // 0 0 1 1 * *
Examples
// Every minute
"37 * * * * *" // At 37 seconds past every minute
// Every hour
"0 0 * * * *" // At the start of every hour
// Every day at 2 AM
"0 0 2 * * *"
// Every Monday at 9 AM
"0 0 9 * * 1"
// Every 5 minutes
"0 */5 * * * *"
// Every 30 seconds
"*/30 * * * * * *"
// Weekdays at 8 AM
"0 0 8 * * 1-5"
// First day of every month
"0 0 0 1 * *"
Interval-based schedules like “every 100 seconds” are not supported. Cron is time-based, not interval-based. Use the smallest supported interval (seconds) and implement your own timing logic if needed.
Creating Jobs in WASM Components
use trailbase_wasm::job::Job;
use trailbase_wasm::http::HttpRoute;
use trailbase_wasm::db::{query, execute, Value};
use trailbase_wasm::{Guest, export};
struct MyJobs;
impl Guest for MyJobs {
fn job_handlers() -> Vec<Job> {
vec![
// Using convenience methods
Job::minutely("process_queue", process_queue),
Job::hourly("cleanup_old_data", cleanup_old_data),
Job::daily("generate_report", generate_report),
// Using custom schedule
Job::new(
"sync_external_api",
"0 */15 * * * *", // Every 15 minutes
sync_external_api
).unwrap(),
Job::new(
"backup_critical_data",
"0 0 */6 * * *", // Every 6 hours
backup_critical_data
).unwrap(),
]
}
}
async fn process_queue() {
println!("Processing queue at {}", chrono::Utc::now());
let pending = query(
"SELECT id, data FROM queue WHERE status = 'pending' LIMIT 100",
[]
).await.unwrap();
for row in pending {
let id = row[0].as_integer().unwrap();
let data = row[1].as_text().unwrap();
// Process item
match process_item(data).await {
Ok(_) => {
execute(
"UPDATE queue SET status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = $1",
[Value::Integer(id)]
).await.unwrap();
}
Err(e) => {
execute(
"UPDATE queue SET status = 'failed', error = $1 WHERE id = $2",
[Value::Text(e.to_string()), Value::Integer(id)]
).await.unwrap();
}
}
}
}
async fn cleanup_old_data() {
println!("Cleaning up old data");
// Delete records older than 90 days
let affected = execute(
"DELETE FROM logs WHERE created_at < datetime('now', '-90 days')",
[]
).await.unwrap();
println!("Deleted {} old log entries", affected);
}
async fn generate_report() {
println!("Generating daily report");
let stats = query(
r#"
SELECT
COUNT(*) as total_users,
COUNT(DISTINCT date(created_at)) as active_days
FROM users
WHERE created_at >= datetime('now', '-1 day')
"#,
[]
).await.unwrap();
// Generate and store report
execute(
"INSERT INTO reports (type, data, created_at) VALUES ('daily', $1, CURRENT_TIMESTAMP)",
[Value::Text(format!("{:?}", stats))]
).await.unwrap();
}
async fn sync_external_api() {
use trailbase_wasm::fetch::get;
println!("Syncing with external API");
match get("https://api.example.com/data").await {
Ok(data) => {
// Process and store data
println!("Synced {} bytes", data.len());
}
Err(e) => {
eprintln!("Sync failed: {}", e);
}
}
}
async fn backup_critical_data() {
println!("Backing up critical data");
let data = query(
"SELECT id, data FROM critical_data ORDER BY id",
[]
).await.unwrap();
// Export to backup location
println!("Backed up {} records", data.len());
}
export!(MyJobs);
Job Lifecycle
Fromcrates/core/src/scheduler.rs:
pub struct Job {
pub id: i32,
state: Arc<Mutex<JobState>>,
}
impl Job {
// Start the job (schedule it to run)
pub fn start(&self) {
// Spawns a tokio task that waits for next scheduled time
}
// Run the job immediately
async fn run_now(&self) -> Result<(), String> {
// Executes the job callback
}
// Get next scheduled run time
pub fn next_run(&self) -> Option<DateTime<Utc>> {
// Returns when job will run next
}
// Stop the job
fn stop(&self) {
// Aborts the scheduled task
}
// Check if job is running
pub fn running(&self) -> bool {
// Returns true if scheduled
}
// Get last execution result
pub fn latest(&self) -> Option<(DateTime<Utc>, Duration, Option<String>)> {
// Returns: (start_time, duration, error)
}
}
Error Handling
Jobs should handle errors gracefully:async fn resilient_job() {
match do_work().await {
Ok(_) => {
println!("Job completed successfully");
}
Err(e) => {
eprintln!("Job failed: {}", e);
// Log error to database
let _ = execute(
"INSERT INTO job_errors (job_name, error, created_at) VALUES ($1, $2, CURRENT_TIMESTAMP)",
[
Value::Text("resilient_job".to_string()),
Value::Text(e.to_string()),
]
).await;
// Optionally: send alert
// send_alert(&format!("Job failed: {}", e)).await;
}
}
}
Long-Running Jobs
Jobs should complete quickly. For long-running tasks, create work items and process them incrementally:
// BAD: Process all items at once
async fn bad_job() {
let all_items = query("SELECT * FROM large_table", []).await.unwrap();
for item in all_items {
expensive_operation(item).await;
}
}
// GOOD: Process in batches
async fn good_job() {
const BATCH_SIZE: i64 = 100;
let count: i64 = query(
"SELECT COUNT(*) FROM work_queue WHERE status = 'pending'",
[]
).await.unwrap()[0][0].as_integer().unwrap();
if count == 0 {
return; // Nothing to do
}
// Process only one batch per job run
let batch = query(
"SELECT id, data FROM work_queue WHERE status = 'pending' LIMIT $1",
[Value::Integer(BATCH_SIZE)]
).await.unwrap();
for row in batch {
let id = row[0].as_integer().unwrap();
match process_item(&row[1]).await {
Ok(_) => {
execute(
"UPDATE work_queue SET status = 'completed' WHERE id = $1",
[Value::Integer(id)]
).await.unwrap();
}
Err(e) => {
execute(
"UPDATE work_queue SET status = 'failed', error = $1 WHERE id = $2",
[Value::Text(e.to_string()), Value::Integer(id)]
).await.unwrap();
}
}
}
println!("Processed batch, {} remaining", count - BATCH_SIZE.max(0));
}
Monitoring Jobs
Admin API
List and monitor jobs via the admin API:# List all jobs
curl -H "Authorization: Bearer $ADMIN_TOKEN" \
https://api.example.com/_/admin/jobs
# Run a job manually
curl -X POST \
-H "Authorization: Bearer $ADMIN_TOKEN" \
https://api.example.com/_/admin/jobs/123/run
Job Status
// Get job execution history from database
let history = query(
r#"
SELECT
job_name,
status,
started_at,
completed_at,
error
FROM job_executions
WHERE job_name = $1
ORDER BY started_at DESC
LIMIT 10
"#,
[Value::Text("my_job".to_string())]
).await?;
Best Practices
Idempotent jobs
Design jobs to be safely re-run. Check for existing work before processing:
async fn idempotent_job() {
// Check if already processed today
let exists = query(
"SELECT 1 FROM reports WHERE date = date('now') AND type = 'daily'",
[]
).await.unwrap();
if !exists.is_empty() {
println!("Report already generated today");
return;
}
// Generate report...
}
Transaction safety
Use transactions for multi-step operations:
use trailbase_wasm::db::Transaction;
async fn transactional_job() {
let mut tx = Transaction::begin().unwrap();
tx.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1", [])?;
tx.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2", [])?;
tx.commit()?;
}
Resource cleanup
Always clean up resources, even on failure:
async fn cleanup_job() {
let temp_files = create_temp_files().await;
let result = process_files(&temp_files).await;
// Always cleanup
for file in temp_files {
let _ = delete_file(&file).await;
}
result
}
Logging
Log job execution for debugging:
async fn logged_job() {
let start = std::time::Instant::now();
println!("[{}] Job started", chrono::Utc::now());
let result = do_work().await;
let duration = start.elapsed();
println!("[{}] Job completed in {:?}: {:?}",
chrono::Utc::now(), duration, result);
}
Rate limiting
Avoid overwhelming external APIs:
use tokio::time::{sleep, Duration};
async fn rate_limited_job() {
let items = query("SELECT id, url FROM items LIMIT 100", []).await.unwrap();
for row in items {
fetch_url(row[1].as_text().unwrap()).await;
// Wait between requests
sleep(Duration::from_millis(100)).await;
}
}
Example: Email Digest Job
use trailbase_wasm::job::Job;
use trailbase_wasm::db::{query, execute, Value};
use trailbase_wasm::fetch::{fetch, Request};
async fn send_daily_digest() {
println!("Sending daily digest emails");
// Get users who want daily digests
let users = query(
r#"
SELECT u.id, u.email, u.name
FROM users u
WHERE u.email_preferences LIKE '%daily_digest%'
AND u.verified = 1
"#,
[]
).await.unwrap();
for user_row in users {
let user_id = user_row[0].as_integer().unwrap();
let email = user_row[1].as_text().unwrap();
let name = user_row[2].as_text().unwrap();
// Get user's activity for the day
let activity = query(
r#"
SELECT
COUNT(*) as new_messages,
(SELECT COUNT(*) FROM notifications WHERE user_id = $1 AND created_at >= date('now')) as notifications
FROM messages
WHERE recipient_id = $1
AND created_at >= date('now')
"#,
[Value::Integer(user_id)]
).await.unwrap();
if let Some(row) = activity.first() {
let new_messages = row[0].as_integer().unwrap();
let notifications = row[1].as_integer().unwrap();
if new_messages > 0 || notifications > 0 {
// Send email via external service
let _ = send_digest_email(
email,
name,
new_messages,
notifications
).await;
}
}
}
println!("Daily digest sent to {} users", users.len());
}
impl Guest for DigestJobs {
fn job_handlers() -> Vec<Job> {
vec![
Job::new(
"daily_digest",
"0 0 8 * * *", // Every day at 8 AM
send_daily_digest
).unwrap(),
]
}
}
Next Steps
Send emails from jobs
Custom Endpoints
Trigger jobs via API
Vector Search
Batch embedding generation
WASM Components
Create job handlers