ext.tasks extension provides decorators and classes for creating background tasks that run at specified intervals.
Installation
from discord.ext import tasks
Creating Background Tasks
Basic Loop
import discord
from discord.ext import tasks
class MyClient(discord.Client):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.counter = 0
self.my_task.start() # Start the task
@tasks.loop(seconds=60)
async def my_task(self):
"""Runs every 60 seconds."""
channel = self.get_channel(123456789)
self.counter += 1
await channel.send(f"Task run #{self.counter}")
@my_task.before_loop
async def before_my_task(self):
await self.wait_until_ready()
client = MyClient()
client.run("TOKEN")
With Commands Extension
from discord.ext import commands, tasks
bot = commands.Bot(command_prefix="!")
@tasks.loop(minutes=5)
async def status_task():
"""Updates bot status every 5 minutes."""
await bot.change_presence(
activity=discord.Game(name="with tasks!")
)
@status_task.before_loop
async def before_status_task():
await bot.wait_until_ready()
@bot.event
async def on_ready():
status_task.start()
print(f"Logged in as {bot.user}")
bot.run("TOKEN")
Loop Decorators
Time-Based Intervals
- Seconds
- Minutes
- Hours
- Combined
@tasks.loop(seconds=30)
async def half_minute_task():
print("Runs every 30 seconds")
@tasks.loop(minutes=10)
async def ten_minute_task():
print("Runs every 10 minutes")
@tasks.loop(hours=1)
async def hourly_task():
print("Runs every hour")
@tasks.loop(hours=2, minutes=30)
async def combined_task():
print("Runs every 2 hours and 30 minutes")
Specific Time Scheduling
from datetime import time, timezone
import discord
from discord.ext import tasks
@tasks.loop(time=time(hour=9, minute=0, tzinfo=timezone.utc))
async def morning_announcement():
"""Runs every day at 9:00 AM UTC."""
channel = bot.get_channel(123456789)
await channel.send("Good morning! โ๏ธ")
@tasks.loop(time=[
time(hour=9, tzinfo=timezone.utc),
time(hour=12, tzinfo=timezone.utc),
time(hour=18, tzinfo=timezone.utc)
])
async def multiple_times_task():
"""Runs at 9 AM, 12 PM, and 6 PM UTC."""
channel = bot.get_channel(123456789)
await channel.send("Scheduled message!")
When using
time parameter, duplicate times are automatically removed and the list is sorted.Limited Iterations
@tasks.loop(seconds=5, count=10)
async def limited_task():
"""Runs 10 times, then stops."""
print("This will run exactly 10 times")
Loop Control
Starting and Stopping
from discord.ext import commands, tasks
class TaskCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.task_running = False
@commands.command()
async def start_task(self, ctx):
"""Starts the background task."""
if not self.background_task.is_running():
self.background_task.start()
await ctx.send("Task started!")
else:
await ctx.send("Task is already running.")
@commands.command()
async def stop_task(self, ctx):
"""Stops the background task."""
if self.background_task.is_running():
self.background_task.stop()
await ctx.send("Task stopped gracefully.")
else:
await ctx.send("Task is not running.")
@commands.command()
async def cancel_task(self, ctx):
"""Cancels the background task immediately."""
self.background_task.cancel()
await ctx.send("Task cancelled.")
@commands.command()
async def restart_task(self, ctx):
"""Restarts the background task."""
self.background_task.restart()
await ctx.send("Task restarted!")
@tasks.loop(seconds=30)
async def background_task(self):
channel = self.bot.get_channel(123456789)
await channel.send("Task is running!")
async def setup(bot):
await bot.add_cog(TaskCog(bot))
Loop Status Methods
@bot.command()
async def task_status(ctx):
"""Check task status."""
status = []
if my_task.is_running():
status.append("โ
Task is running")
else:
status.append("โ Task is not running")
if my_task.is_being_cancelled():
status.append("โ ๏ธ Task is being cancelled")
if my_task.failed():
status.append("โ Task has failed")
# Get next iteration time
next_iteration = my_task.next_iteration
if next_iteration:
status.append(f"โฐ Next run: {next_iteration}")
# Get current loop count
status.append(f"๐ข Iteration: {my_task.current_loop}")
await ctx.send("\n".join(status))
Hooks
Before Loop
@tasks.loop(minutes=5)
async def my_task():
print("Task running...")
@my_task.before_loop
async def before_my_task():
"""Runs once before the loop starts."""
await bot.wait_until_ready()
print("Task initialized!")
After Loop
@tasks.loop(seconds=30)
async def my_task():
print("Task running...")
@my_task.after_loop
async def after_my_task():
"""Runs after the loop ends."""
if my_task.is_being_cancelled():
print("Task was cancelled")
else:
print("Task completed naturally")
Error Handling
Custom Error Handler
@tasks.loop(seconds=60)
async def risky_task():
# Might raise an exception
result = await some_api_call()
await process_result(result)
@risky_task.error
async def task_error_handler(error):
"""Handles errors that occur in the task."""
print(f"Task error: {error}")
# Log to a channel
channel = bot.get_channel(123456789)
await channel.send(f"โ ๏ธ Task encountered an error: {error}")
Reconnect Handling
@tasks.loop(minutes=1, reconnect=True)
async def resilient_task():
"""Automatically retries on connection errors."""
# This task will automatically retry if it encounters:
# - OSError
# - discord.GatewayNotFound
# - discord.ConnectionClosed
# - aiohttp.ClientError
# - asyncio.TimeoutError
await bot.fetch_user(123456789)
Custom Exception Types
from aiohttp import ClientError
@tasks.loop(seconds=30)
async def api_task():
# Make API calls
pass
# Add custom exception types to handle
api_task.add_exception_type(ClientError, TimeoutError)
# Remove specific exception types
api_task.remove_exception_type(TimeoutError)
# Clear all exception types
api_task.clear_exception_types()
Advanced Features
Dynamic Interval Changes
@tasks.loop(seconds=60)
async def dynamic_task():
print("Running at current interval")
@bot.command()
async def change_interval(ctx, seconds: int):
"""Changes the task interval."""
dynamic_task.change_interval(seconds=seconds)
await ctx.send(f"Interval changed to {seconds} seconds")
@bot.command()
async def set_specific_time(ctx, hour: int):
"""Sets task to run at a specific hour."""
from datetime import time, timezone
dynamic_task.change_interval(time=time(hour=hour, tzinfo=timezone.utc))
await ctx.send(f"Task will now run at {hour}:00 UTC")
Overlap Control
- No Overlap (Default)
- Unlimited Overlap
- Limited Overlap
@tasks.loop(seconds=10, overlap=False)
async def sequential_task():
"""Waits for previous run to complete."""
await asyncio.sleep(15) # Takes longer than interval
print("This run completes before next starts")
@tasks.loop(seconds=10, overlap=True)
async def concurrent_task():
"""Allows unlimited concurrent runs."""
await asyncio.sleep(15)
print("Multiple instances can run simultaneously")
@tasks.loop(seconds=10, overlap=2)
async def limited_concurrent_task():
"""Allows up to 2 concurrent runs."""
await asyncio.sleep(20)
print("At most 2 instances run at once")
Passing Arguments
class MyCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.status_messages = ["Playing", "Watching", "Listening"]
self.rotating_status.start()
@tasks.loop(seconds=30)
async def rotating_status(self):
"""The self parameter is automatically injected."""
message = self.status_messages[self.rotating_status.current_loop % 3]
await self.bot.change_presence(
activity=discord.Game(name=message)
)
Complete Example
import discord
from discord.ext import commands, tasks
from datetime import time, timezone
import asyncio
import random
bot = commands.Bot(command_prefix="!")
class BackgroundTasks(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.counter = 0
# Start all tasks
self.status_updater.start()
self.daily_reminder.start()
self.data_cleanup.start()
@tasks.loop(seconds=60)
async def status_updater(self):
"""Updates bot status every minute."""
member_count = sum(g.member_count for g in self.bot.guilds)
await self.bot.change_presence(
activity=discord.Game(name=f"with {member_count} users")
)
@tasks.loop(time=time(hour=9, minute=0, tzinfo=timezone.utc))
async def daily_reminder(self):
"""Sends daily reminder at 9 AM UTC."""
for guild in self.bot.guilds:
channel = guild.system_channel
if channel:
await channel.send("๐
Daily reminder: Don't forget to vote!")
@tasks.loop(hours=24)
async def data_cleanup(self):
"""Cleans up old data every 24 hours."""
print("Running data cleanup...")
# Cleanup logic here
await asyncio.sleep(1)
print("Cleanup complete!")
@status_updater.before_loop
@daily_reminder.before_loop
@data_cleanup.before_loop
async def before_tasks(self):
"""Wait for bot to be ready before starting tasks."""
await self.bot.wait_until_ready()
@status_updater.error
@daily_reminder.error
@data_cleanup.error
async def task_error(self, error):
"""Handles errors in tasks."""
print(f"Task error: {error}")
# Log to error channel
error_channel = self.bot.get_channel(123456789)
if error_channel:
await error_channel.send(f"โ ๏ธ Task error: {error}")
def cog_unload(self):
"""Clean up tasks when cog is unloaded."""
self.status_updater.cancel()
self.daily_reminder.cancel()
self.data_cleanup.cancel()
async def setup(bot):
await bot.add_cog(BackgroundTasks(bot))
bot.run("TOKEN")
Best Practices
Always Use before_loop
Use
@task.before_loop with await bot.wait_until_ready() to ensure the bot is fully ready before the task starts.Handle Errors Gracefully
Implement error handlers using
@task.error to prevent tasks from silently failing.Use Reconnect for Network Tasks
Enable
reconnect=True for tasks that make network requests to handle temporary connection issues.Donโt perform heavy blocking operations in tasks. Use
asyncio.sleep() instead of time.sleep() and use asynchronous libraries when possible.