Skip to main content
The ext.tasks extension provides decorators and classes for creating background tasks that run at specified intervals.

Installation

from discord.ext import tasks

Creating Background Tasks

Basic Loop

import discord
from discord.ext import tasks

class MyClient(discord.Client):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.counter = 0
        self.my_task.start()  # Start the task
    
    @tasks.loop(seconds=60)
    async def my_task(self):
        """Runs every 60 seconds."""
        channel = self.get_channel(123456789)
        self.counter += 1
        await channel.send(f"Task run #{self.counter}")
    
    @my_task.before_loop
    async def before_my_task(self):
        await self.wait_until_ready()

client = MyClient()
client.run("TOKEN")

With Commands Extension

from discord.ext import commands, tasks

bot = commands.Bot(command_prefix="!")

@tasks.loop(minutes=5)
async def status_task():
    """Updates bot status every 5 minutes."""
    await bot.change_presence(
        activity=discord.Game(name="with tasks!")
    )

@status_task.before_loop
async def before_status_task():
    await bot.wait_until_ready()

@bot.event
async def on_ready():
    status_task.start()
    print(f"Logged in as {bot.user}")

bot.run("TOKEN")

Loop Decorators

Time-Based Intervals

@tasks.loop(seconds=30)
async def half_minute_task():
    print("Runs every 30 seconds")

Specific Time Scheduling

from datetime import time, timezone
import discord
from discord.ext import tasks

@tasks.loop(time=time(hour=9, minute=0, tzinfo=timezone.utc))
async def morning_announcement():
    """Runs every day at 9:00 AM UTC."""
    channel = bot.get_channel(123456789)
    await channel.send("Good morning! โ˜€๏ธ")

@tasks.loop(time=[
    time(hour=9, tzinfo=timezone.utc),
    time(hour=12, tzinfo=timezone.utc),
    time(hour=18, tzinfo=timezone.utc)
])
async def multiple_times_task():
    """Runs at 9 AM, 12 PM, and 6 PM UTC."""
    channel = bot.get_channel(123456789)
    await channel.send("Scheduled message!")
When using time parameter, duplicate times are automatically removed and the list is sorted.

Limited Iterations

@tasks.loop(seconds=5, count=10)
async def limited_task():
    """Runs 10 times, then stops."""
    print("This will run exactly 10 times")

Loop Control

Starting and Stopping

from discord.ext import commands, tasks

class TaskCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.task_running = False
    
    @commands.command()
    async def start_task(self, ctx):
        """Starts the background task."""
        if not self.background_task.is_running():
            self.background_task.start()
            await ctx.send("Task started!")
        else:
            await ctx.send("Task is already running.")
    
    @commands.command()
    async def stop_task(self, ctx):
        """Stops the background task."""
        if self.background_task.is_running():
            self.background_task.stop()
            await ctx.send("Task stopped gracefully.")
        else:
            await ctx.send("Task is not running.")
    
    @commands.command()
    async def cancel_task(self, ctx):
        """Cancels the background task immediately."""
        self.background_task.cancel()
        await ctx.send("Task cancelled.")
    
    @commands.command()
    async def restart_task(self, ctx):
        """Restarts the background task."""
        self.background_task.restart()
        await ctx.send("Task restarted!")
    
    @tasks.loop(seconds=30)
    async def background_task(self):
        channel = self.bot.get_channel(123456789)
        await channel.send("Task is running!")

async def setup(bot):
    await bot.add_cog(TaskCog(bot))

Loop Status Methods

@bot.command()
async def task_status(ctx):
    """Check task status."""
    status = []
    
    if my_task.is_running():
        status.append("โœ… Task is running")
    else:
        status.append("โŒ Task is not running")
    
    if my_task.is_being_cancelled():
        status.append("โš ๏ธ Task is being cancelled")
    
    if my_task.failed():
        status.append("โ›” Task has failed")
    
    # Get next iteration time
    next_iteration = my_task.next_iteration
    if next_iteration:
        status.append(f"โฐ Next run: {next_iteration}")
    
    # Get current loop count
    status.append(f"๐Ÿ”ข Iteration: {my_task.current_loop}")
    
    await ctx.send("\n".join(status))

Hooks

Before Loop

@tasks.loop(minutes=5)
async def my_task():
    print("Task running...")

@my_task.before_loop
async def before_my_task():
    """Runs once before the loop starts."""
    await bot.wait_until_ready()
    print("Task initialized!")

After Loop

@tasks.loop(seconds=30)
async def my_task():
    print("Task running...")

@my_task.after_loop
async def after_my_task():
    """Runs after the loop ends."""
    if my_task.is_being_cancelled():
        print("Task was cancelled")
    else:
        print("Task completed naturally")

Error Handling

Custom Error Handler

@tasks.loop(seconds=60)
async def risky_task():
    # Might raise an exception
    result = await some_api_call()
    await process_result(result)

@risky_task.error
async def task_error_handler(error):
    """Handles errors that occur in the task."""
    print(f"Task error: {error}")
    # Log to a channel
    channel = bot.get_channel(123456789)
    await channel.send(f"โš ๏ธ Task encountered an error: {error}")

Reconnect Handling

@tasks.loop(minutes=1, reconnect=True)
async def resilient_task():
    """Automatically retries on connection errors."""
    # This task will automatically retry if it encounters:
    # - OSError
    # - discord.GatewayNotFound
    # - discord.ConnectionClosed
    # - aiohttp.ClientError
    # - asyncio.TimeoutError
    await bot.fetch_user(123456789)

Custom Exception Types

from aiohttp import ClientError

@tasks.loop(seconds=30)
async def api_task():
    # Make API calls
    pass

# Add custom exception types to handle
api_task.add_exception_type(ClientError, TimeoutError)

# Remove specific exception types
api_task.remove_exception_type(TimeoutError)

# Clear all exception types
api_task.clear_exception_types()

Advanced Features

Dynamic Interval Changes

@tasks.loop(seconds=60)
async def dynamic_task():
    print("Running at current interval")

@bot.command()
async def change_interval(ctx, seconds: int):
    """Changes the task interval."""
    dynamic_task.change_interval(seconds=seconds)
    await ctx.send(f"Interval changed to {seconds} seconds")

@bot.command()
async def set_specific_time(ctx, hour: int):
    """Sets task to run at a specific hour."""
    from datetime import time, timezone
    dynamic_task.change_interval(time=time(hour=hour, tzinfo=timezone.utc))
    await ctx.send(f"Task will now run at {hour}:00 UTC")

Overlap Control

@tasks.loop(seconds=10, overlap=False)
async def sequential_task():
    """Waits for previous run to complete."""
    await asyncio.sleep(15)  # Takes longer than interval
    print("This run completes before next starts")

Passing Arguments

class MyCog(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.status_messages = ["Playing", "Watching", "Listening"]
        self.rotating_status.start()
    
    @tasks.loop(seconds=30)
    async def rotating_status(self):
        """The self parameter is automatically injected."""
        message = self.status_messages[self.rotating_status.current_loop % 3]
        await self.bot.change_presence(
            activity=discord.Game(name=message)
        )

Complete Example

import discord
from discord.ext import commands, tasks
from datetime import time, timezone
import asyncio
import random

bot = commands.Bot(command_prefix="!")

class BackgroundTasks(commands.Cog):
    def __init__(self, bot):
        self.bot = bot
        self.counter = 0
        
        # Start all tasks
        self.status_updater.start()
        self.daily_reminder.start()
        self.data_cleanup.start()
    
    @tasks.loop(seconds=60)
    async def status_updater(self):
        """Updates bot status every minute."""
        member_count = sum(g.member_count for g in self.bot.guilds)
        await self.bot.change_presence(
            activity=discord.Game(name=f"with {member_count} users")
        )
    
    @tasks.loop(time=time(hour=9, minute=0, tzinfo=timezone.utc))
    async def daily_reminder(self):
        """Sends daily reminder at 9 AM UTC."""
        for guild in self.bot.guilds:
            channel = guild.system_channel
            if channel:
                await channel.send("๐Ÿ“… Daily reminder: Don't forget to vote!")
    
    @tasks.loop(hours=24)
    async def data_cleanup(self):
        """Cleans up old data every 24 hours."""
        print("Running data cleanup...")
        # Cleanup logic here
        await asyncio.sleep(1)
        print("Cleanup complete!")
    
    @status_updater.before_loop
    @daily_reminder.before_loop
    @data_cleanup.before_loop
    async def before_tasks(self):
        """Wait for bot to be ready before starting tasks."""
        await self.bot.wait_until_ready()
    
    @status_updater.error
    @daily_reminder.error
    @data_cleanup.error
    async def task_error(self, error):
        """Handles errors in tasks."""
        print(f"Task error: {error}")
        # Log to error channel
        error_channel = self.bot.get_channel(123456789)
        if error_channel:
            await error_channel.send(f"โš ๏ธ Task error: {error}")
    
    def cog_unload(self):
        """Clean up tasks when cog is unloaded."""
        self.status_updater.cancel()
        self.daily_reminder.cancel()
        self.data_cleanup.cancel()

async def setup(bot):
    await bot.add_cog(BackgroundTasks(bot))

bot.run("TOKEN")

Best Practices

1

Always Use before_loop

Use @task.before_loop with await bot.wait_until_ready() to ensure the bot is fully ready before the task starts.
2

Handle Errors Gracefully

Implement error handlers using @task.error to prevent tasks from silently failing.
3

Clean Up on Exit

Cancel tasks when cogs are unloaded or when the bot shuts down.
4

Use Reconnect for Network Tasks

Enable reconnect=True for tasks that make network requests to handle temporary connection issues.
5

Monitor Task Status

Regularly check task status and log important events for debugging.
Donโ€™t perform heavy blocking operations in tasks. Use asyncio.sleep() instead of time.sleep() and use asynchronous libraries when possible.

Build docs developers (and LLMs) love