Skip to main content

Overview

Grip AI’s scheduling system enables automated execution of prompts on a recurring basis using cron expressions. Jobs can run silently or publish results to messaging channels like Telegram, making it ideal for monitoring, reporting, and automated workflows.

Architecture

The cron service consists of:
  1. Job persistence — JSON storage in workspace/cron/jobs.json
  2. Scheduler loop — Checks every 30 seconds for due jobs
  3. Execution engine — Runs jobs through the agent engine with timeouts
  4. Message bus integration — Routes results to channels (Telegram, Slack, etc.)
# From grip/cron/service.py:46-54
class CronService:
    """Manages cron job persistence and periodic execution.
    
    On start(), spawns an asyncio loop that checks every 30 seconds
    whether any job is due. When a job fires, it calls engine.run()
    with the job's prompt as the user message. If the job has a reply_to
    session key (e.g. "telegram:12345"), the result is published to
    the message bus so it reaches the originating channel.
    """

Job Definition

# From grip/cron/service.py:26-36
@dataclass(slots=True)
class CronJob:
    """A single scheduled task definition."""
    id: str                    # Unique identifier (auto-generated)
    name: str                  # Human-readable name
    schedule: str              # Cron expression
    prompt: str                # Task to execute
    enabled: bool = True       # Active/inactive toggle
    last_run: str | None = None  # ISO timestamp of last execution
    created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
    reply_to: str = ""         # Channel routing (e.g., "telegram:12345")

Cron Expressions

Jobs use standard cron syntax for scheduling:
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
│ │ │ │ │
* * * * *
# Every 5 minutes
*/5 * * * *

# Every hour at minute 0
0 * * * *

# Every day at 9:00 AM
0 9 * * *

# Every Monday at 8:00 AM
0 8 * * 1

# First day of every month at midnight
0 0 1 * *

# Every weekday at 6:00 PM
0 18 * * 1-5
Grip AI requires the croniter package for full cron expression support. Install with pip install croniter.

Creating Jobs

Programmatic API

# From grip/cron/service.py:101-129
def add_job(self, name: str, schedule: str, prompt: str, reply_to: str = "") -> CronJob:
    """Create and persist a new cron job.
    
    Args:
        name: Human-readable job name.
        schedule: Cron expression (e.g. "*/5 * * * *").
        prompt: The prompt to send to the engine when the job fires.
        reply_to: Session key to route results to (e.g. "telegram:12345").
                  When set, job output is published to the message bus.
    
    Raises:
        ValueError: If reply_to is set but not in "channel:chat_id" format.
    """
    if reply_to and ":" not in reply_to:
        raise ValueError(
            f"Invalid reply_to format: '{reply_to}'. "
            "Expected 'channel:chat_id' (e.g. 'telegram:12345')."
        )
    job = CronJob(
        id=f"cron_{uuid.uuid4().hex[:8]}",
        name=name,
        schedule=schedule,
        prompt=prompt,
        reply_to=reply_to,
    )
    self._jobs[job.id] = job
    self._save_jobs()
    logger.info("Cron job added: {} ({})", name, schedule)
    return job

CLI Usage

# Add a job
grip cron add "daily-report" "0 9 * * *" "Generate daily metrics report"

# Add with channel routing
grip cron add "alerts" "*/15 * * * *" "Check system health" --reply-to telegram:12345

# List all jobs
grip cron list

# Disable a job
grip cron disable cron_a1b2c3d4

# Enable a job
grip cron enable cron_a1b2c3d4

# Remove a job
grip cron remove cron_a1b2c3d4

Execution Flow

When a job becomes due:
# From grip/cron/service.py:237-283
async def _execute_job(self, job: CronJob) -> None:
    """Run a single cron job through the engine with a timeout.
    
    If the job has a reply_to session key and a message bus is available,
    the result is published to the bus so the originating channel
    (e.g. Telegram) receives the response.
    
    Uses self._executing to prevent the same job from being re-fired
    while a previous execution is still in progress.
    """
    self._executing.add(job.id)
    try:
        logger.info("Executing cron job: {} ({})", job.name, job.id)
        job.last_run = datetime.now(UTC).isoformat()
        self._save_jobs()
        
        timeout = self._config.exec_timeout_minutes * 60
        session_key = f"cron:{job.id}"
        
        try:
            result = await asyncio.wait_for(
                self._engine.run(job.prompt, session_key=session_key),
                timeout=timeout,
            )
            logger.info(
                "Cron job {} completed: {} iterations, response length {}",
                job.id,
                result.iterations,
                len(result.response),
            )
            
            if job.reply_to and self._bus and result.response:
                await self._publish_result(job, result.response)
        
        except TimeoutError:
            logger.error("Cron job {} timed out after {}s", job.id, timeout)
            if job.reply_to and self._bus:
                await self._publish_result(
                    job,
                    f"Cron job '{job.name}' timed out after {self._config.exec_timeout_minutes} minutes.",
                )
        except Exception as exc:
            logger.error("Cron job {} failed: {}", job.id, exc)
            if job.reply_to and self._bus:
                await self._publish_result(job, f"Cron job '{job.name}' failed: {exc}")
    finally:
        self._executing.discard(job.id)
Key features:
  • Per-job timeout (configurable via config.cron.exec_timeout_minutes)
  • Isolated session keys (cron:{job_id})
  • Automatic retry prevention (via _executing set)
  • Error reporting to channels

Channel Delivery

Jobs can publish results to messaging channels:
# From grip/cron/service.py:285-305
async def _publish_result(self, job: CronJob, text: str) -> None:
    """Publish a cron job result to the message bus for channel delivery."""
    from grip.bus.events import OutboundMessage
    
    parts = job.reply_to.split(":", 1)
    if len(parts) != 2:
        logger.warning("Invalid reply_to format for cron job {}: {}", job.id, job.reply_to)
        return
    
    channel, chat_id = parts
    try:
        await self._bus.publish_outbound(
            OutboundMessage(
                channel=channel,
                chat_id=chat_id,
                text=text,
            )
        )
        logger.info("Cron job {} result published to {}:{}", job.id, channel, chat_id)
    except Exception as exc:
        logger.error("Failed to publish cron result for {}: {}", job.id, exc)
Supported channels:
  • telegram:chat_id
  • slack:channel_id
  • discord:channel_id
  1. Create a Telegram bot via @BotFather
  2. Get your chat ID by sending a message to @userinfobot
  3. Configure the bot token in config.yaml:
    channels:
      telegram:
        bot_token: "your-bot-token"
    
  4. Create a job with reply_to="telegram:YOUR_CHAT_ID"

Persistence

Jobs are stored in workspace/cron/jobs.json:
[
  {
    "id": "cron_a1b2c3d4",
    "name": "daily-report",
    "schedule": "0 9 * * *",
    "prompt": "Generate daily metrics report",
    "enabled": true,
    "last_run": "2026-02-28T09:00:15.123456+00:00",
    "created_at": "2026-02-01T12:00:00.000000+00:00",
    "reply_to": "telegram:12345"
  }
]
Saves are atomic to prevent corruption:
# From grip/cron/service.py:93-99
def _save_jobs(self) -> None:
    """Persist all jobs to the JSON file atomically."""
    self._cron_dir.mkdir(parents=True, exist_ok=True)
    data = [job.to_dict() for job in self._jobs.values()]
    tmp = self._jobs_file.with_suffix(".tmp")
    tmp.write_text(json.dumps(data, indent=2), encoding="utf-8")
    tmp.rename(self._jobs_file)  # Atomic rename

Heartbeat Service

The scheduler runs continuously with a 30-second check interval:
# From grip/cron/service.py:159-170
async def start(self) -> None:
    """Start the cron scheduler loop. Runs until cancelled."""
    self._running = True
    logger.info("Cron service started ({} jobs loaded)", len(self._jobs))
    
    while self._running:
        await self._check_and_run_due_jobs()
        await asyncio.sleep(self._check_interval)

async def stop(self) -> None:
    """Signal the scheduler to stop."""
    self._running = False
    logger.info("Cron service stopped")
The 30-second interval is hardcoded. For sub-minute precision, modify _check_interval in the source.

Configuration

# config.yaml
cron:
  enabled: true
  exec_timeout_minutes: 10  # Per-job execution timeout

Best Practices

  • Keep prompts focused: Each job should do one thing well
  • Set realistic timeouts: Complex tasks need longer timeouts
  • Use idempotent prompts: Jobs should be safe to re-run
  • Test before scheduling: Run prompts manually first
  • Jobs that fail repeatedly remain enabled (manual intervention required)
  • Check logs at workspace/logs/ for failure details
  • Use reply_to for automatic failure notifications
  • Monitor last_run timestamps to detect stuck jobs
  • Avoid overlapping executions (jobs block themselves via _executing)
  • Stagger jobs to prevent resource contention
  • Use longer intervals for expensive operations
  • Monitor total execution time via logs

Example Use Cases

{
  "name": "standup",
  "schedule": "0 9 * * 1-5",
  "prompt": "Review yesterday's git commits and generate a standup report",
  "reply_to": "slack:team-standup"
}

Build docs developers (and LLMs) love