Skip to main content

Overview

Monitoring services poll SyftBox for changes and trigger actions:
  • JobMonitor - Detects new jobs, approvals, and completions
  • PeerMonitor - Detects peer connection requests and approvals
  • DaemonManager - Manages background service lifecycle
All monitors support foreground polling and background threading.

JobMonitor

Monitors job lifecycle events using a hybrid approach:
  1. Google Drive API - Polls for new job submissions (lightweight, no sync)
  2. Local filesystem - Checks for status changes (approved, executed)

Quick Start

from syft_client.notifications import JobMonitor

client = sc.login_do(email="[email protected]", token_path="token.json")
monitor = JobMonitor.from_client(client)

# Run in foreground
monitor.check(interval=30)  # Poll every 30 seconds

# Run in background thread
thread = monitor.start(interval=30)

Factory Methods

from_client
classmethod
Create monitor from authenticated clientParameters:
  • client: SyftboxManager - Client from sc.login_do()
  • gmail_token_path: Optional[str] - Gmail token path (default: ~/.syft-notifications/gmail_token.json)
  • drive_token_path: Optional[str] - Drive token path (auto-detected from credentials dir)
  • notifications: bool = True - Enable email notifications
Returns: JobMonitor instance
client = sc.login_do(email="[email protected]", token_path="token.json")
monitor = JobMonitor.from_client(
    client,
    gmail_token_path="~/.syft-creds/gmail_token.json",
    notifications=True
)
from_config
classmethod
Create monitor from YAML configuration fileParameters:
  • config_path: str - Path to config.yaml
Returns: JobMonitor instanceConfig format:
syftbox_root: ~/SyftBox
do_email: [email protected]
drive_token_path: ~/.syft-creds/token_do.json
gmail_token_path: ~/.syft-creds/gmail_token.json
state_file: ~/.syft-creds/state.json
notify_on_new_job: true
notify_on_approved: true
notify_on_executed: true

Constructor

from syft_client.notifications import JobMonitor, GmailSender, JsonStateManager

monitor = JobMonitor(
    syftbox_root=Path("~/SyftBox"),
    do_email="[email protected]",
    sender=gmail_sender,
    state=state_manager,
    config={
        "notify_on_new_job": True,
        "notify_on_approved": True,
        "notify_on_executed": True
    },
    drive_token_path=Path("~/.syft-creds/token_do.json"),
    client=None  # Optional fallback
)
syftbox_root
Path
Path to SyftBox directory
do_email
str
Data Owner email address
sender
NotificationSender
Email sender implementation (e.g., GmailSender)
state
StateManager
State tracker to prevent duplicate notifications
config
dict
Configuration with notification toggles:
  • notify_on_new_job: bool
  • notify_on_approved: bool
  • notify_on_executed: bool
drive_token_path
Optional[Path]
Google Drive token for direct polling (enables lightweight mode)
client
Optional[SyftboxManager]
Fallback client if no drive_token_path (uses full sync)

Monitoring Methods

check
method
Run monitoring checks (blocking)Parameters:
  • interval: Optional[int] - If None, runs once. If set, polls every N seconds
  • duration: Optional[int] - How long to run in seconds (None = infinite)
# Single check
monitor.check()

# Poll every 30s forever
monitor.check(interval=30)

# Poll for 1 hour
monitor.check(interval=30, duration=3600)
start
method
Start monitoring in background thread (non-blocking)Parameters:
  • interval: int = 10 - Check interval in seconds
Returns: threading.Thread - Background thread handle
thread = monitor.start(interval=30)

# Do other work while monitor runs in background
client.sync()

# Check if still running
print(thread.is_alive())
stop
method
Stop background monitoring thread
monitor.stop()

Detection Logic

New Jobs (via Drive API):
  1. Find inbox folders: syft_outbox_inbox_{sender}_to_{recipient}
  2. List message files: msgv2_*
  3. Download and parse messages
  4. Extract job info from ProposedFileChangesMessage
  5. Send notification if not already processed
Status Changes (via filesystem):
  1. Scan {syftbox_root}/{do_email}/app_data/job/
  2. For each job directory:
    • Check for approved marker file → Send approval notification
    • Check for done marker file → Send completion notification
  3. Only notify for jobs previously detected as “new”

PeerMonitor

Monitors peer connection requests using Google Drive API.

Quick Start

from syft_client.notifications import PeerMonitor

client = sc.login_do(email="[email protected]", token_path="token.json")
monitor = PeerMonitor.from_client(client)

# Run in foreground
monitor.check(interval=30)

# Run in background
thread = monitor.start(interval=30)

Factory Method

from_client
classmethod
Create monitor from authenticated clientParameters:
  • client: SyftboxManager - Client from sc.login_do()
  • gmail_token_path: Optional[str] - Gmail token path
  • notifications: bool = True - Enable notifications
Returns: PeerMonitor instance
monitor = PeerMonitor.from_client(client, notifications=True)

Constructor

from syft_client.notifications import PeerMonitor

monitor = PeerMonitor(
    do_email="[email protected]",
    drive_token_path=Path("~/.syft-creds/token_do.json"),
    sender=gmail_sender,
    state=state_manager,
    config={
        "notify_on_new_peer": True,
        "notify_on_peer_granted": True
    }
)
do_email
str
Data Owner email address
drive_token_path
Path
Google Drive OAuth token path (required for Drive polling)
sender
NotificationSender
Email sender implementation
state
StateManager
State tracker for notification history
config
dict
Configuration:
  • notify_on_new_peer: bool - Send notification for new peer requests
  • notify_on_peer_granted: bool - Send notification when peer approved

Methods

notify_peer_granted
method
Manually trigger peer approval notificationParameters:
  • ds_email: str - Data Scientist email
Call this after approving a peer:
client.approve_peer_request("[email protected]")
monitor.notify_peer_granted("[email protected]")
Note: Automatic detection of peer approvals is not possible via Drive API alone, as add_peer_as_do() doesn’t create new folders. Must be called manually.

Detection Logic

New Peer Requests:
  1. Query Drive for folders: syft_outbox_inbox_{sender}_to_{do_email}
  2. Filter folders where sender ≠ DO (external peer requests)
  3. Compare with previous snapshot
  4. Send notifications:
    • To DO: “New peer request from
    • To DS: “Peer request sent to

DaemonManager

Manages background daemon processes with PID tracking and log rotation.

Quick Start

from syft_client.notifications import DaemonManager

manager = DaemonManager(config_path="~/.syft-creds/daemon.yaml")

# Start daemon
manager.start(interval=30)

# Check status
manager.status()

# View logs
manager.logs(follow=True)

# Stop daemon
manager.stop()

Constructor

__init__
constructor
Parameters:
  • config_path: Path - Path to daemon configuration YAML
Creates files in ~/.syft-creds/:
  • syft-notify.pid - Process ID file
  • syft-notify.log - Main log file
  • syft-notify.error.log - Error log file

Daemon Control

start
method
Start daemon in backgroundParameters:
  • interval: Optional[int] - Check interval (overrides config)
Returns: bool - True if started successfullyForks into background, redirects stdout/stderr to log files.
stop
method
Stop running daemonReturns: bool - True if stopped successfullySends SIGTERM for graceful shutdown, waits 10s, then sends SIGKILL if needed.
restart
method
Restart daemonParameters:
  • interval: Optional[int] - Check interval
Returns: bool - True if restarted successfully
status
method
Check daemon status and display recent activityReturns: bool - True if runningPrints:
  • PID and config path
  • Log file locations
  • Last 5 log lines

Log Management

logs
method
Display daemon logsParameters:
  • follow: bool = False - Follow logs (like tail -f)
  • lines: int = 50 - Number of lines to show (if not following)
# Show last 50 lines
manager.logs()

# Follow in real-time
manager.logs(follow=True)

Log Rotation

Automatic log rotation:
  • Max file size: 10 MB
  • Backup count: 7 files
  • Format: YYYY-MM-DD HH:MM:SS - logger - level - message

Signal Handling

  • SIGTERM - Graceful shutdown
  • SIGHUP - Config reload (not yet implemented)

PID Management

get_pid
method
Get daemon process IDReturns: Optional[int] - PID or None if not running
is_running
method
Check if daemon process existsReturns: bool - True if process is aliveUses os.kill(pid, 0) to check without sending actual signal.

Monitor Base Class

All monitors inherit from abstract Monitor base class.

Abstract Methods

from syft_client.notifications.base import Monitor

class CustomMonitor(Monitor):
    def __init__(self, sender, state, config):
        super().__init__(sender, state, config)
    
    def _check_all_entities(self):
        # Implement monitoring logic
        pass
_check_all_entities
abstract method
Check all entities for events (must be implemented by subclasses)This is where monitor-specific logic goes:
  • Query data sources (Drive API, filesystem, database)
  • Detect changes
  • Send notifications
  • Update state

Common Configuration

do_email: [email protected]
syftbox_root: ~/SyftBox

# Token paths
gmail_token_path: ~/.syft-creds/gmail_token.json
drive_token_path: ~/.syft-creds/token_do.json
state_file: ~/.syft-creds/state.json

# Email format: "html", "text", or "both"
email_format: html

# Job notifications
notify_on_new_job: true
notify_on_approved: true
notify_on_executed: true

# Peer notifications
notify_on_new_peer: true
notify_on_peer_granted: true

CLI Integration

The syft-bg CLI wraps these monitoring services:
# Start all monitors
syft-bg start

# Start specific monitor
syft-bg start notify

# View monitor status
syft-bg status

# View logs
syft-bg logs notify
syft-bg logs notify -f  # Follow

# Stop monitors
syft-bg stop

Service Registry

Services are registered in syft_bg/services/registry.py:
from syft_bg.services.manager import ServiceManager

manager = ServiceManager()

# List available services
services = manager.list_services()  # ["notify", "approve"]

# Get service status
status = manager.get_all_status()

# Start/stop services
manager.start_service("notify")
manager.stop_service("approve")

Drive API Integration

Scopes Required

DRIVE_SCOPES = ["https://www.googleapis.com/auth/drive"]
Read-only access to:
  • List folders
  • Read file metadata
  • Download file contents

Building Drive Service

from google.oauth2.credentials import Credentials
from syft_client.sync.connections.drive.gdrive_transport import build_drive_service

credentials = Credentials.from_authorized_user_file(
    "~/.syft-creds/token_do.json",
    DRIVE_SCOPES
)
service = build_drive_service(credentials)

Thread Safety

Important: googleapiclient.discovery.build() is not thread-safe. Monitors create Drive service on the main thread in __init__:
class JobMonitor(Monitor):
    def __init__(self, ...):
        # Called on main thread - safe
        self._drive_service = self._create_drive_service()
    
    def _check_all_entities(self):
        # Uses pre-created service - safe
        results = self._drive_service.files().list(...).execute()

Best Practices

  1. Use from_client for notebooks - Simplest setup with existing client
  2. Use from_config for daemons - Better for production services
  3. Enable Drive token - Enables lightweight polling without full sync
  4. Set reasonable intervals:
    • Notifications: 30 seconds
    • Approvals: 5 seconds
  5. Monitor logs regularly - Use syft-bg logs -f to catch issues
  6. Use state management - Prevents duplicate notifications
  7. Run in background threads - Non-blocking for interactive notebooks

Error Handling

Monitors catch and log errors without crashing:
try:
    self._check_all_entities()
except Exception as e:
    print(f"⚠️ {self.__class__.__name__} error: {e}")
    # Continues monitoring on next interval
Errors are logged to:
  • Console (if running in foreground)
  • ~/.syft-creds/syft-notify.log (if running as daemon)
  • ~/.syft-creds/syft-notify.error.log (stderr)

Performance Considerations

Lightweight Polling

JobMonitor with Drive API:
  • No full sync required
  • Only downloads message headers
  • Parses only job-related messages
  • Skips non-job messages without processing

State Caching

PeerMonitor snapshots:
  • Stores previous peer list in state
  • Only processes new peers (delta detection)
  • Avoids re-processing known peers

Interval Tuning

# Conservative (low API usage)
monitor.check(interval=60)  # Every minute

# Balanced (recommended)
monitor.check(interval=30)  # Every 30 seconds

# Aggressive (high responsiveness)
monitor.check(interval=5)   # Every 5 seconds

See Also

Build docs developers (and LLMs) love