Skip to main content

Overview

The CronJob class wraps any callable (including Swarms agents) and turns it into a scheduled job that runs at specified intervals. It provides robust scheduling, error handling, retry logic, and execution tracking with optional callback support for output customization.

Constructor

Create a CronJob instance to schedule recurring agent executions.
from swarms.structs.cron_job import CronJob
from swarms import Agent

# Create an agent
agent = Agent(
    agent_name="Financial-Analyst",
    system_prompt="You are a financial analyst...",
    model_name="gpt-4"
)

# Create scheduled job
cron = CronJob(
    agent=agent,
    interval="10minutes",
    job_id="financial_analysis_job"
)

Parameters

agent
Union[Any, Callable]
default:"None"
The Swarms Agent instance or callable to be scheduled
interval
str
default:"None"
The interval string (e.g., “5seconds”, “10minutes”, “1hour”)
job_id
str
default:"None"
Optional unique identifier for the job. If not provided, one will be generated.
callback
Callable[[Any, str, dict], Any]
default:"None"
Optional callback function to customize output processing.Signature: callback(output: Any, task: str, metadata: dict) -> Any
  • output: The original output from the agent
  • task: The task that was executed
  • metadata: Dictionary containing job_id, timestamp, execution_count, etc.
  • Returns: The customized output

Attributes

job_id
str
Unique identifier for the job
is_running
bool
Flag indicating if the job is currently running
execution_count
int
Number of times the job has been executed
start_time
float
Timestamp when the job was started

Methods

run

Schedule and run the job with a specified task.
cron.run(
    task="Analyze Q4 earnings for AAPL",
    img="/path/to/chart.png"  # Optional image parameter
)
task
str
required
The task string to be executed by the agent
**kwargs
Any
Additional parameters to pass to the agent’s run method (e.g., img, imgs, correct_answer, streaming_callback)
Raises:
  • CronJobConfigError: If agent or interval is not configured
  • CronJobExecutionError: If task execution fails
Behavior:
  • Schedules the task according to the configured interval
  • Starts the background execution thread
  • Blocks until KeyboardInterrupt (Ctrl+C) is received
  • Automatically stops on interrupt

batched_run

Run multiple tasks sequentially with the same schedule.
tasks = [
    "Analyze AAPL stock",
    "Analyze GOOGL stock",
    "Analyze MSFT stock"
]

results = cron.batched_run(tasks)
tasks
List[str]
required
List of task strings to execute
**kwargs
Any
Additional parameters to pass to the agent’s run method
return
List[Any]
List of results from each task execution

start

Manually start the scheduled job.
cron.start()
Raises:
  • CronJobExecutionError: If the job fails to start
Behavior:
  • Creates a daemon thread for job execution
  • Sets is_running to True
  • Records start_time
  • If already running, logs a warning

stop

Stop the scheduled job.
cron.stop()
Raises:
  • CronJobExecutionError: If the job fails to stop properly
Behavior:
  • Sets is_running to False
  • Waits up to 5 seconds for thread to terminate
  • Clears the schedule
  • Logs warning if thread doesn’t terminate gracefully

set_callback

Set or update the callback function for output customization.
def custom_callback(output, task, metadata):
    # Process output
    processed = output.upper()
    
    # Log execution info
    print(f"Execution #{metadata['execution_count']}")
    print(f"Task: {task}")
    print(f"Timestamp: {metadata['timestamp']}")
    
    return processed

cron.set_callback(custom_callback)
callback
Callable[[Any, str, dict], Any]
required
Callback function with signature: callback(output, task, metadata) -> Any

get_execution_stats

Get execution statistics for the cron job.
stats = cron.get_execution_stats()
print(stats)
# {
#   "job_id": "financial_analysis_job",
#   "is_running": True,
#   "execution_count": 42,
#   "start_time": 1234567890.123,
#   "uptime": 3600.5,
#   "interval": "10minutes"
# }
return
dict
Dictionary containing:
  • job_id: Job identifier
  • is_running: Current running status
  • execution_count: Number of executions
  • start_time: Start timestamp
  • uptime: Time elapsed since start (seconds)
  • interval: Configured interval string

Interval Formats

The interval parameter accepts strings in the format <number><unit>:

Supported Units

  • Seconds: "5seconds", "30second"
  • Minutes: "10minutes", "1minute"
  • Hours: "2hours", "1hour"

Examples

# Every 5 seconds
CronJob(agent=agent, interval="5seconds")

# Every 30 seconds
CronJob(agent=agent, interval="30seconds")

# Every 10 minutes
CronJob(agent=agent, interval="10minutes")

# Every 2 hours
CronJob(agent=agent, interval="2hours")

Complete Examples

Basic Scheduled Analysis

from swarms import Agent
from swarms.structs.cron_job import CronJob

# Create financial analyst agent
agent = Agent(
    agent_name="Market-Analyst",
    system_prompt="""You are an expert market analyst.
    Provide concise daily market updates focusing on:
    1. Major index movements
    2. Notable sector performance
    3. Key market drivers
    """,
    model_name="gpt-4",
    max_loops=1,
    verbose=True
)

# Schedule to run every hour
cron = CronJob(
    agent=agent,
    interval="1hour",
    job_id="hourly_market_update"
)

# Run the scheduled job
try:
    cron.run(task="Provide a market update for the current hour")
except KeyboardInterrupt:
    print("Stopping scheduled job...")
    cron.stop()

Multi-Stock Analysis with Batching

from swarms import Agent
from swarms.structs.cron_job import CronJob

# Create stock analysis agent
stock_agent = Agent(
    agent_name="Stock-Analyzer",
    system_prompt="Analyze stock performance and provide investment insights.",
    model_name="gpt-4",
    max_loops=2
)

# Schedule every 30 minutes
cron = CronJob(
    agent=stock_agent,
    interval="30minutes",
    job_id="stock_analysis"
)

# Analyze multiple stocks
stocks = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
tasks = [f"Analyze {symbol} stock performance" for symbol in stocks]

try:
    results = cron.batched_run(tasks)
    for symbol, result in zip(stocks, results):
        print(f"\n{symbol} Analysis:")
        print(result)
except KeyboardInterrupt:
    print("Analysis stopped.")
    cron.stop()

Custom Callback for Output Processing

from swarms import Agent
from swarms.structs.cron_job import CronJob
import json
from datetime import datetime

def save_analysis_callback(output, task, metadata):
    """Save analysis results to file with metadata."""
    result = {
        "job_id": metadata["job_id"],
        "execution_number": metadata["execution_count"],
        "timestamp": datetime.fromtimestamp(metadata["timestamp"]).isoformat(),
        "task": task,
        "analysis": output,
        "uptime_seconds": metadata.get("start_time", 0)
    }
    
    # Save to file
    filename = f"analysis_{metadata['execution_count']}.json"
    with open(filename, 'w') as f:
        json.dump(result, f, indent=2)
    
    print(f"Saved analysis #{metadata['execution_count']} to {filename}")
    return output

# Create agent
agent = Agent(
    agent_name="Crypto-Analyst",
    system_prompt="Analyze cryptocurrency market trends.",
    model_name="gpt-4"
)

# Create cron with callback
cron = CronJob(
    agent=agent,
    interval="15minutes",
    job_id="crypto_analysis",
    callback=save_analysis_callback
)

try:
    cron.run(task="Analyze Bitcoin and Ethereum market trends")
except KeyboardInterrupt:
    cron.stop()

Image Analysis with Scheduling

from swarms import Agent
from swarms.structs.cron_job import CronJob

# Create vision-capable agent
vision_agent = Agent(
    agent_name="Chart-Analyzer",
    system_prompt="Analyze financial charts and provide technical insights.",
    model_name="gpt-4o",  # Vision-capable model
    max_loops=1
)

# Schedule chart analysis every 2 hours
cron = CronJob(
    agent=vision_agent,
    interval="2hours",
    job_id="chart_analysis"
)

try:
    cron.run(
        task="Analyze this chart for support/resistance levels and trend direction",
        img="/path/to/chart.png"
    )
except KeyboardInterrupt:
    cron.stop()

Monitoring Execution Stats

from swarms import Agent
from swarms.structs.cron_job import CronJob
import time

agent = Agent(
    agent_name="Monitor-Agent",
    system_prompt="Monitor system metrics.",
    model_name="gpt-4"
)

cron = CronJob(
    agent=agent,
    interval="10seconds",
    job_id="monitor"
)

# Start the job in background
cron.start()

try:
    # Monitor stats every 30 seconds
    while True:
        time.sleep(30)
        stats = cron.get_execution_stats()
        print(f"\nExecution Stats:")
        print(f"  Running: {stats['is_running']}")
        print(f"  Executions: {stats['execution_count']}")
        print(f"  Uptime: {stats['uptime']:.2f}s")
except KeyboardInterrupt:
    print("Stopping monitor...")
    cron.stop()

Exception Handling

The CronJob class defines several custom exceptions:

CronJobError

Base exception class for all CronJob errors.

CronJobConfigError

Raised for configuration errors.
try:
    cron = CronJob(agent=None, interval="10minutes")
except CronJobConfigError as e:
    print(f"Configuration error: {e}")

CronJobScheduleError

Raised for scheduling related errors.
try:
    cron = CronJob(agent=agent, interval="invalid_format")
except CronJobConfigError as e:
    print(f"Invalid interval: {e}")

CronJobExecutionError

Raised for execution related errors.
try:
    cron.run(task="")
except CronJobExecutionError as e:
    print(f"Execution failed: {e}")

Best Practices

  1. Use descriptive job IDs: Make job identifiers meaningful for tracking
  2. Set appropriate intervals: Choose intervals based on task complexity and resource availability
  3. Implement callbacks: Use callbacks for logging, saving results, or sending notifications
  4. Monitor execution stats: Regularly check stats to ensure jobs are running as expected
  5. Handle interrupts: Always wrap run() in try-except to handle KeyboardInterrupt
  6. Consider agent limits: Ensure your agent’s max_loops is appropriate for scheduled tasks
  7. Log failures: Enable verbose mode and implement proper error logging
  8. Test intervals: Start with longer intervals and optimize based on performance
  9. Resource management: Be mindful of API rate limits and costs with frequent scheduling
  10. Graceful shutdown: Always call stop() when terminating scheduled jobs

Thread Safety

CronJob uses threading for background execution:
  • Daemon threads are used to prevent blocking program exit
  • Thread-safe scheduling with the schedule library
  • Proper cleanup on stop() with timeout handling

Callback Metadata

The callback function receives a metadata dictionary with:
{
    "job_id": str,              # Job identifier
    "timestamp": float,         # Unix timestamp of execution
    "execution_count": int,     # Number of times executed
    "task": str,                # The task that was run
    "kwargs": dict,             # Additional kwargs passed
    "start_time": float,        # Job start timestamp
    "is_running": bool          # Current running status
}
Use this metadata for:
  • Logging execution history
  • Conditional processing based on execution count
  • Time-based analysis
  • Debugging and monitoring

Build docs developers (and LLMs) love