Skip to main content

Overview

The SyftJobRunner class monitors job directories for new jobs, executes approved jobs, and manages job lifecycles.

Factory Function

create_runner()

Create a SyftJobRunner from a SyftBox folder.
from syft_job import create_runner

runner = create_runner(
    syftbox_folder_path="/path/to/syftbox",
    email="[email protected]",
    poll_interval=5
)
syftbox_folder_path
str
required
Path to the SyftBox folder
email
str
required
Email address of the user (must be explicit)
poll_interval
int
default:"5"
How often to check for new jobs (in seconds)
return
SyftJobRunner
Configured SyftJobRunner instance

SyftJobRunner Class

Constructor

from syft_job import SyftJobRunner, SyftJobConfig

config = SyftJobConfig.from_syftbox_folder(
    syftbox_folder_path="/path/to/syftbox",
    email="[email protected]"
)

runner = SyftJobRunner(config, poll_interval=5)
config
SyftJobConfig
required
Configuration object for the job system
poll_interval
int
default:"5"
How often to check for new jobs (in seconds)

Methods

run()

Start monitoring the inbox and approved folders for jobs.
# Start the runner (blocking operation)
runner.run()

# Press Ctrl+C to stop
This method:
  1. Initializes known jobs with current inbox state
  2. Continuously monitors for new jobs
  3. Processes approved jobs automatically
  4. Runs until interrupted (Ctrl+C)
Output:
🚀 SyftJob Runner started: version: 0.1.23
👤 Monitoring jobs for: [email protected]
📂 Job directory: /path/to/syftbox/[email protected]/app_data/job
⏱️  Poll interval: 5 seconds
⏹️  Press Ctrl+C to stop
==================================================
📭 No existing jobs found

process_approved_jobs()

Process all jobs in the approved directory.
runner.process_approved_jobs(
    stream_output=True,
    timeout=300,
    skip_job_names=["job1", "job2"],
    share_outputs_with_submitter=True,
    share_logs_with_submitter=True
)
stream_output
bool
default:"True"
If True, stream output in real-time. If False, capture output at end (CI-friendly)
timeout
int
default:"300"
Timeout in seconds per job. Can also be set via SYFT_DEFAULT_JOB_TIMEOUT_SECONDS environment variable
skip_job_names
List[str]
Optional list of job names to skip
share_outputs_with_submitter
bool
default:"False"
If True, grant read access on outputs to job submitter
share_logs_with_submitter
bool
default:"False"
If True, grant read access on logs to job submitter
Example output:
📋 Found 2 job(s) in approved directory

==================================================
🚀 Executing job: Data Analysis
📁 Job directory: /path/to/job
[[email protected]][Data Analysis] Installing dependencies...
[[email protected]][Data Analysis] Running analysis...
✅ Job Data Analysis completed successfully
📄 Output written to stdout.txt
==================================================

✅ Processed 2 job(s)

check_for_new_jobs()

Check for new jobs in the inbox and print them.
runner.check_for_new_jobs()
This method:
  • Scans inbox directory for new jobs
  • Prints details for newly detected jobs
  • Updates internal tracking of known jobs
Example output:
🔔 NEW JOB DETECTED: [email protected]/Data Processing
📁 Location: /path/to/job
📝 Script preview:
   1: #!/bin/bash
   2: set -euo pipefail
   3: export UV_SYSTEM_PYTHON=false
   4: uv venv --python 3.12
   5: source .venv/bin/activate
   ... (more lines)
⚙️  Config:
   name: Data Processing
   submitted_by: [email protected]
   submitted_at: 2026-03-02T10:30:00+00:00
--------------------------------------------------

share_job_results()

Share job outputs and/or logs with the submitter.
runner.share_job_results(
    job_name="Data Analysis",
    share_outputs=True,
    share_logs=True,
    user="[email protected]"
)
job_name
str
required
Name of the job (e.g., “my.job”)
share_outputs
bool
required
Whether to share output files
share_logs
bool
required
Whether to share log files
user
str
DS email who submitted the job. If None, searches all user subdirectories

reset_all_jobs()

Delete all jobs and recreate the job folder structure.
runner.reset_all_jobs()
This permanently deletes all jobs (inbox, approved, and done). This action cannot be undone.
Example output:
🔄 RESETTING ALL JOBS for [email protected]
📁 Target directory: /path/to/job
📋 Found 3 jobs in inbox:
   - Job 1
   - Job 2
   - Job 3
📋 Found 2 jobs in approved:
   - Job 4
   - Job 5

⚠️  WARNING: This will permanently delete 5 jobs!
   This action cannot be undone.
🗑️  Deleting job directory: /path/to/job
📁 Recreating job folder structure...
✅ Job reset completed successfully!
📊 Summary:
   - Deleted 5 jobs total
   - inbox: 3 jobs deleted
   - approved: 2 jobs deleted
   - Clean job directory recreated

Environment Variables

SYFT_DEFAULT_JOB_TIMEOUT_SECONDS

Set the default timeout for job execution.
export SYFT_DEFAULT_JOB_TIMEOUT_SECONDS=600  # 10 minutes
SYFT_DEFAULT_JOB_TIMEOUT_SECONDS
int
default:"300"
Default timeout in seconds (5 minutes)

SYFT_IS_IN_JOB

Automatically set when a job is running.
import os

if os.getenv("SYFT_IS_IN_JOB") == "true":
    print("Running inside a job")

Job Environment

When executing jobs, the runner sets these environment variables:
SYFTBOX_FOLDER
str
Path to the SyftBox folder
SYFTBOX_EMAIL
str
Email of the datasite owner
SYFT_IS_IN_JOB
str
Set to “true” when running inside a job
PYTHONUNBUFFERED
str
Set to “1” to disable Python output buffering

Job Execution Details

Job Files

After execution, each job directory contains:
  • run.sh - Executable bash script
  • config.yaml - Job metadata
  • approved - Marker file (if approved)
  • done - Marker file (if completed)
  • stdout.txt - Standard output
  • stderr.txt - Standard error output
  • returncode.txt - Process return code
  • outputs/ - Job output files

Python Job Script Generation

The runner generates run.sh scripts for Python jobs: Without pyproject.toml:
#!/bin/bash
set -euo pipefail
export UV_SYSTEM_PYTHON=false
uv venv --python 3.12
source .venv/bin/activate
uv pip install "syft-client" "numpy" "pandas"
python script.py
With pyproject.toml:
#!/bin/bash
set -euo pipefail
export UV_SYSTEM_PYTHON=false
cd project_dir && uv sync --python 3.12 && cd ..
source project_dir/.venv/bin/activate
uv pip install "syft-client" "numpy"
export PYTHONPATH=project_dir:${PYTHONPATH:-}
python project_dir/main.py

Complete Example

from syft_job import create_runner
import time

# Create runner
runner = create_runner(
    syftbox_folder_path="~/.syftbox",
    email="[email protected]",
    poll_interval=5
)

# Option 1: Run continuously (blocking)
runner.run()

# Option 2: Manual control loop
while True:
    runner.check_for_new_jobs()
    runner.process_approved_jobs(
        stream_output=True,
        share_outputs_with_submitter=True
    )
    time.sleep(5)

CI/CD Integration

For non-interactive environments:
# Disable streaming for CI
runner.process_approved_jobs(
    stream_output=False,  # Capture output at end
    timeout=600,  # 10 minute timeout
    share_outputs_with_submitter=True
)

Build docs developers (and LLMs) love