Grip AI’s scheduling system enables automated execution of prompts on a recurring basis using cron expressions. Jobs can run silently or publish results to messaging channels like Telegram, making it ideal for monitoring, reporting, and automated workflows.
Job persistence — JSON storage in workspace/cron/jobs.json
Scheduler loop — Checks every 30 seconds for due jobs
Execution engine — Runs jobs through the agent engine with timeouts
Message bus integration — Routes results to channels (Telegram, Slack, etc.)
# From grip/cron/service.py:46-54class CronService: """Manages cron job persistence and periodic execution. On start(), spawns an asyncio loop that checks every 30 seconds whether any job is due. When a job fires, it calls engine.run() with the job's prompt as the user message. If the job has a reply_to session key (e.g. "telegram:12345"), the result is published to the message bus so it reaches the originating channel. """
┌───────────── minute (0 - 59)│ ┌───────────── hour (0 - 23)│ │ ┌───────────── day of month (1 - 31)│ │ │ ┌───────────── month (1 - 12)│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)│ │ │ │ │* * * * *
# Every 5 minutes*/5 * * * *# Every hour at minute 00 * * * *# Every day at 9:00 AM0 9 * * *# Every Monday at 8:00 AM0 8 * * 1# First day of every month at midnight0 0 1 * *# Every weekday at 6:00 PM0 18 * * 1-5
Grip AI requires the croniter package for full cron expression support. Install with pip install croniter.
# From grip/cron/service.py:101-129def add_job(self, name: str, schedule: str, prompt: str, reply_to: str = "") -> CronJob: """Create and persist a new cron job. Args: name: Human-readable job name. schedule: Cron expression (e.g. "*/5 * * * *"). prompt: The prompt to send to the engine when the job fires. reply_to: Session key to route results to (e.g. "telegram:12345"). When set, job output is published to the message bus. Raises: ValueError: If reply_to is set but not in "channel:chat_id" format. """ if reply_to and ":" not in reply_to: raise ValueError( f"Invalid reply_to format: '{reply_to}'. " "Expected 'channel:chat_id' (e.g. 'telegram:12345')." ) job = CronJob( id=f"cron_{uuid.uuid4().hex[:8]}", name=name, schedule=schedule, prompt=prompt, reply_to=reply_to, ) self._jobs[job.id] = job self._save_jobs() logger.info("Cron job added: {} ({})", name, schedule) return job
# From grip/cron/service.py:237-283async def _execute_job(self, job: CronJob) -> None: """Run a single cron job through the engine with a timeout. If the job has a reply_to session key and a message bus is available, the result is published to the bus so the originating channel (e.g. Telegram) receives the response. Uses self._executing to prevent the same job from being re-fired while a previous execution is still in progress. """ self._executing.add(job.id) try: logger.info("Executing cron job: {} ({})", job.name, job.id) job.last_run = datetime.now(UTC).isoformat() self._save_jobs() timeout = self._config.exec_timeout_minutes * 60 session_key = f"cron:{job.id}" try: result = await asyncio.wait_for( self._engine.run(job.prompt, session_key=session_key), timeout=timeout, ) logger.info( "Cron job {} completed: {} iterations, response length {}", job.id, result.iterations, len(result.response), ) if job.reply_to and self._bus and result.response: await self._publish_result(job, result.response) except TimeoutError: logger.error("Cron job {} timed out after {}s", job.id, timeout) if job.reply_to and self._bus: await self._publish_result( job, f"Cron job '{job.name}' timed out after {self._config.exec_timeout_minutes} minutes.", ) except Exception as exc: logger.error("Cron job {} failed: {}", job.id, exc) if job.reply_to and self._bus: await self._publish_result(job, f"Cron job '{job.name}' failed: {exc}") finally: self._executing.discard(job.id)
Key features:
Per-job timeout (configurable via config.cron.exec_timeout_minutes)
# From grip/cron/service.py:285-305async def _publish_result(self, job: CronJob, text: str) -> None: """Publish a cron job result to the message bus for channel delivery.""" from grip.bus.events import OutboundMessage parts = job.reply_to.split(":", 1) if len(parts) != 2: logger.warning("Invalid reply_to format for cron job {}: {}", job.id, job.reply_to) return channel, chat_id = parts try: await self._bus.publish_outbound( OutboundMessage( channel=channel, chat_id=chat_id, text=text, ) ) logger.info("Cron job {} result published to {}:{}", job.id, channel, chat_id) except Exception as exc: logger.error("Failed to publish cron result for {}: {}", job.id, exc)
# From grip/cron/service.py:93-99def _save_jobs(self) -> None: """Persist all jobs to the JSON file atomically.""" self._cron_dir.mkdir(parents=True, exist_ok=True) data = [job.to_dict() for job in self._jobs.values()] tmp = self._jobs_file.with_suffix(".tmp") tmp.write_text(json.dumps(data, indent=2), encoding="utf-8") tmp.rename(self._jobs_file) # Atomic rename
The scheduler runs continuously with a 30-second check interval:
# From grip/cron/service.py:159-170async def start(self) -> None: """Start the cron scheduler loop. Runs until cancelled.""" self._running = True logger.info("Cron service started ({} jobs loaded)", len(self._jobs)) while self._running: await self._check_and_run_due_jobs() await asyncio.sleep(self._check_interval)async def stop(self) -> None: """Signal the scheduler to stop.""" self._running = False logger.info("Cron service stopped")
The 30-second interval is hardcoded. For sub-minute precision, modify _check_interval in the source.