Overview
The SyftJobRunner class monitors job directories for new jobs, executes approved jobs, and manages job lifecycles.
Factory Function
create_runner()
Create a SyftJobRunner from a SyftBox folder.
from syft_job import create_runner
runner = create_runner(
syftbox_folder_path="/path/to/syftbox",
email="[email protected]",
poll_interval=5
)
Path to the SyftBox folder
Email address of the user (must be explicit)
How often to check for new jobs (in seconds)
Configured SyftJobRunner instance
SyftJobRunner Class
Constructor
from syft_job import SyftJobRunner, SyftJobConfig
config = SyftJobConfig.from_syftbox_folder(
syftbox_folder_path="/path/to/syftbox",
email="[email protected]"
)
runner = SyftJobRunner(config, poll_interval=5)
Configuration object for the job system
How often to check for new jobs (in seconds)
Methods
run()
Start monitoring the inbox and approved folders for jobs.
# Start the runner (blocking operation)
runner.run()
# Press Ctrl+C to stop
This method:
- Initializes known jobs with current inbox state
- Continuously monitors for new jobs
- Processes approved jobs automatically
- Runs until interrupted (Ctrl+C)
Output:
🚀 SyftJob Runner started: version: 0.1.23
👤 Monitoring jobs for: [email protected]
📂 Job directory: /path/to/syftbox/[email protected]/app_data/job
⏱️ Poll interval: 5 seconds
⏹️ Press Ctrl+C to stop
==================================================
📭 No existing jobs found
process_approved_jobs()
Process all jobs in the approved directory.
runner.process_approved_jobs(
stream_output=True,
timeout=300,
skip_job_names=["job1", "job2"],
share_outputs_with_submitter=True,
share_logs_with_submitter=True
)
If True, stream output in real-time. If False, capture output at end (CI-friendly)
Timeout in seconds per job. Can also be set via SYFT_DEFAULT_JOB_TIMEOUT_SECONDS environment variable
Optional list of job names to skip
share_outputs_with_submitter
If True, grant read access on outputs to job submitter
share_logs_with_submitter
If True, grant read access on logs to job submitter
Example output:
📋 Found 2 job(s) in approved directory
==================================================
🚀 Executing job: Data Analysis
📁 Job directory: /path/to/job
[[email protected]][Data Analysis] Installing dependencies...
[[email protected]][Data Analysis] Running analysis...
✅ Job Data Analysis completed successfully
📄 Output written to stdout.txt
==================================================
✅ Processed 2 job(s)
check_for_new_jobs()
Check for new jobs in the inbox and print them.
runner.check_for_new_jobs()
This method:
- Scans inbox directory for new jobs
- Prints details for newly detected jobs
- Updates internal tracking of known jobs
Example output:
🔔 NEW JOB DETECTED: [email protected]/Data Processing
📁 Location: /path/to/job
📝 Script preview:
1: #!/bin/bash
2: set -euo pipefail
3: export UV_SYSTEM_PYTHON=false
4: uv venv --python 3.12
5: source .venv/bin/activate
... (more lines)
⚙️ Config:
name: Data Processing
submitted_by: [email protected]
submitted_at: 2026-03-02T10:30:00+00:00
--------------------------------------------------
share_job_results()
Share job outputs and/or logs with the submitter.
runner.share_job_results(
job_name="Data Analysis",
share_outputs=True,
share_logs=True,
user="[email protected]"
)
Name of the job (e.g., “my.job”)
Whether to share output files
Whether to share log files
DS email who submitted the job. If None, searches all user subdirectories
reset_all_jobs()
Delete all jobs and recreate the job folder structure.
This permanently deletes all jobs (inbox, approved, and done). This action cannot be undone.
Example output:
🔄 RESETTING ALL JOBS for [email protected]
📁 Target directory: /path/to/job
📋 Found 3 jobs in inbox:
- Job 1
- Job 2
- Job 3
📋 Found 2 jobs in approved:
- Job 4
- Job 5
⚠️ WARNING: This will permanently delete 5 jobs!
This action cannot be undone.
🗑️ Deleting job directory: /path/to/job
📁 Recreating job folder structure...
✅ Job reset completed successfully!
📊 Summary:
- Deleted 5 jobs total
- inbox: 3 jobs deleted
- approved: 2 jobs deleted
- Clean job directory recreated
Environment Variables
SYFT_DEFAULT_JOB_TIMEOUT_SECONDS
Set the default timeout for job execution.
export SYFT_DEFAULT_JOB_TIMEOUT_SECONDS=600 # 10 minutes
SYFT_DEFAULT_JOB_TIMEOUT_SECONDS
Default timeout in seconds (5 minutes)
SYFT_IS_IN_JOB
Automatically set when a job is running.
import os
if os.getenv("SYFT_IS_IN_JOB") == "true":
print("Running inside a job")
Job Environment
When executing jobs, the runner sets these environment variables:
Path to the SyftBox folder
Email of the datasite owner
Set to “true” when running inside a job
Set to “1” to disable Python output buffering
Job Execution Details
Job Files
After execution, each job directory contains:
run.sh - Executable bash script
config.yaml - Job metadata
approved - Marker file (if approved)
done - Marker file (if completed)
stdout.txt - Standard output
stderr.txt - Standard error output
returncode.txt - Process return code
outputs/ - Job output files
Python Job Script Generation
The runner generates run.sh scripts for Python jobs:
Without pyproject.toml:
#!/bin/bash
set -euo pipefail
export UV_SYSTEM_PYTHON=false
uv venv --python 3.12
source .venv/bin/activate
uv pip install "syft-client" "numpy" "pandas"
python script.py
With pyproject.toml:
#!/bin/bash
set -euo pipefail
export UV_SYSTEM_PYTHON=false
cd project_dir && uv sync --python 3.12 && cd ..
source project_dir/.venv/bin/activate
uv pip install "syft-client" "numpy"
export PYTHONPATH=project_dir:${PYTHONPATH:-}
python project_dir/main.py
Complete Example
from syft_job import create_runner
import time
# Create runner
runner = create_runner(
syftbox_folder_path="~/.syftbox",
email="[email protected]",
poll_interval=5
)
# Option 1: Run continuously (blocking)
runner.run()
# Option 2: Manual control loop
while True:
runner.check_for_new_jobs()
runner.process_approved_jobs(
stream_output=True,
share_outputs_with_submitter=True
)
time.sleep(5)
CI/CD Integration
For non-interactive environments:
# Disable streaming for CI
runner.process_approved_jobs(
stream_output=False, # Capture output at end
timeout=600, # 10 minute timeout
share_outputs_with_submitter=True
)