Skip to main content
The dispatcher system is responsible for executing background tasks in AWX. It manages worker processes, consumes messages from queues, and runs Python code to accomplish various tasks.

Overview

AWX uses the dispatcherd library for all task management. This is a dedicated task queue system that distributes work across machines in an AWX installation.

Key Components

┌───────────────────────────────┐
│      Task Publishers          │
│   (Django Views/Signals)      │
└─────────────┬──────────────────┘

             │ publish task

┌───────────────────────────────┐
│      Message Queues           │
│    (PostgreSQL pg_notify)    │
└─────────────┬──────────────────┘

             │ consume

┌───────────────────────────────┐
│       awx-manage            │
│       dispatcherd            │
│  (Dispatcher Process)       │
└─────────────┬──────────────────┘

             │ distribute to

┌───────────────────────────────┐
│      Worker Pool            │
│   (Child Processes)         │
│                              │
│  Worker 1  Worker 2  ...    │
└───────────────────────────────┘

Dispatcherd Library

AWX uses the dispatcherd library for task management:

Task Decorator

Tasks are decorated using:
from dispatcherd.publish import task

@task()
def my_task(arg1, arg2):
    """A simple task function"""
    return arg1 + arg2

Task Queues and Workers

Task Queue Abstraction

AWX uses a Task Queue abstraction to distribute work: Input: Unit of work called a Task Workers: Dedicated processes on every AWX node that monitor queues Communication: Via distributed queues using PostgreSQL’s pg_notify

Clustered Installations

Clustered AWX installations consist of:
  • Multiple workers spread across every node
  • High availability
  • Horizontal scaling

Message Types

Direct Messages

Bound directly to a specific named queue. Example: Launching a Job Template
  1. AWX looks at available capacity
  2. Chooses an Execution Node
  3. Publishes message to node-specific queue
  4. Dispatcher on that node listens for events
Characteristics:
  • Targeted to specific node
  • Consumed by one worker process
  • Used for job execution, inventory updates, etc.

Shared Direct Queues

Some direct queues are bound by every AWX node. Example: Inventory deletion task
  • Any available node may perform the work
  • First available worker processes the task

Fanout Messages

Sent out in a broadcast fashion. Example: Changing a setting in AWX API
  • Message broadcast to every AWX node
  • Code runs on every node
  • Used for cache invalidation, configuration updates
Characteristics:
  • Broadcast to all nodes
  • Every node processes the message
  • Ensures cluster-wide consistency

Defining Tasks

Function-Based Tasks

Simple functions decorated with @task():
from dispatcherd.publish import task

@task()
def add(a, b):
    """Add two numbers"""
    return a + b

Class-Based Tasks

Classes with a run() method:
from dispatcherd.publish import task

@task()
class Adder:
    def run(self, a, b):
        """Add two numbers"""
        return a + b

Task Location

Tasks are defined in awx.main.tasks module:
awx/main/tasks/
├── __init__.py
├── jobs.py          # Job execution tasks
├── system.py        # System tasks
├── callback.py      # Callback processing
├── policy.py        # Cleanup/maintenance
└── ...

Running Tasks

Publishing Tasks

To run a task in the background:
# Function-based task
add.apply_async([1, 1])

# Class-based task
Adder.apply_async([1, 1])

# With keyword arguments
add.apply_async(args=[1], kwargs={'b': 2})

# To specific queue
add.apply_async([1, 1], queue='awx_node_1')

Message Format

When you run apply_async(), a JSON message is composed:
{
    "uuid": "550e8400-e29b-41d4-a716-446655440000",
    "args": [1, 1],
    "kwargs": {},
    "task": "awx.main.tasks.system.add",
    "time_pub": 1234567890.123
}

Task Execution

When a worker receives the message:
  1. Deserialize: Parse JSON message
  2. Import: Import the task callable
  3. Execute: Run the Python code
  4. Return: Task completes, result may be stored
# Worker executes
awx.main.tasks.system.add(1, 1)

Dispatcher Implementation

The Dispatcher Process

Every node runs awx-manage dispatcherd:
# Start dispatcher
awx-manage dispatcherd

# With custom pool size
awx-manage dispatcherd --workers 8
Responsibilities:
  • Uses kombu library for message consumption
  • Consumes from appropriate queues for the node:
    • Default shared queue
    • Node-specific queue (by hostname)
    • Broadcast queue
  • Manages pool of child processes
  • Distributes inbound messages to workers

Worker Pool

The dispatcher manages a pool of child processes:
class WorkerPool:
    def __init__(self):
        self.workers = []
        self.min_workers = 4
        self.max_workers = 60
    
    def init_workers(self, work_loop):
        """Initialize worker processes"""
        for i in range(self.min_workers):
            worker = Process(target=work_loop)
            worker.start()
            self.workers.append(worker)
Worker scaling:
  • Minimum workers: 4 (default)
  • Maximum workers: 60 (default)
  • Auto-scales based on load

Task Resolution

The dispatcher resolves tasks by dotted path:
def resolve_callable(task):
    """
    Transform dotted notation into callable function:
    awx.main.tasks.system.delete_inventory
    awx.main.tasks.jobs.RunProjectUpdate
    """
    if not task.startswith('awx.'):
        raise ValueError(f'{task} is not a valid awx task')
    
    module, target = task.rsplit('.', 1)
    module = importlib.import_module(module)
    _call = getattr(module, target, None)
    
    if not hasattr(_call, 'apply_async'):
        raise ValueError(f'{task} is not decorated with @task()')
    
    return _call

Running Tasks

Workers execute tasks via run_callable():
def run_callable(body):
    """Execute a task from message body"""
    task = body['task']
    uuid = body.get('uuid', '<unknown>')
    args = body.get('args', [])
    kwargs = body.get('kwargs', {})
    
    # Import and execute
    _call = resolve_callable(task)
    logger.info(f'task {uuid} starting {task}(*{args})')
    
    return _call(*args, **kwargs)

Dispatcher Control

dispatcherctl Command

The awx-manage dispatcherctl command provides debugging capabilities:

Check Status

$ awx-manage dispatcherctl status

awx[pid:9610] workers total=4 min=4 max=60
.  worker[pid:9758] sent=12 finished=12 qsize=0 rss=106.730MB [IDLE]
.  worker[pid:9769] sent=5 finished=5 qsize=0 rss=105.141MB [IDLE]
.  worker[pid:9782] sent=5 finished=4 qsize=1 rss=110.430MB
     - running 0c1deb4d-25ae-49a9-804f-a8afd05aff29 RunJob(*[9])
.  worker[pid:9787] sent=3 finished=3 qsize=0 rss=101.824MB [IDLE]
Information shown:
  • Worker PIDs
  • Tasks sent to each worker
  • Tasks completed
  • Queue size per worker
  • Memory usage (RSS)
  • Currently running tasks with UUIDs

List Running Tasks

$ awx-manage dispatcherctl running

['eb3b0a83-86da-413d-902a-16d7d530a6b25', 'f447266a-23da-42b4-8025-fe379d2db96f']
Returns UUIDs of currently running tasks (corresponds to main_unifiedjob.celery_task_id in database).

Task Categories

Housekeeping Tasks

Background maintenance and scheduling:
  • run_task_manager: Periodic task that schedules jobs
  • run_dependency_manager: Creates job dependencies
  • run_workflow_manager: Manages workflow execution
See Task Manager documentation for details.

Heartbeats and Capacity

Periodic tasks running on every node:
@task()
def heartbeat():
    """Record heartbeat and capacity"""
    instance = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
    instance.last_seen = now()
    instance.capacity = calculate_capacity()
    instance.save()
Purpose:
  • Record node heartbeat
  • Calculate and report capacity
  • Reap jobs from dead nodes

Job Execution Tasks

Run Ansible playbooks and commands:
@task()
class RunJob:
    """Execute a job template"""
    def run(self, job_id):
        job = Job.objects.get(id=job_id)
        # Prepare ansible-runner
        # Execute playbook
        # Stream events
        pass

@task()
class RunProjectUpdate:
    """Update a project from SCM"""
    def run(self, project_update_id):
        # Clone/update git repo
        pass

@task()
class RunInventoryUpdate:
    """Sync inventory from external source"""
    def run(self, inventory_update_id):
        # Run ansible-inventory
        # Parse and import results
        pass

Administrative Tasks

Maintenance and cleanup:
@task()
def purge_old_stdout_files():
    """Clean up old job output files"""
    pass

@task()
def delete_project_files(project_path):
    """Delete project files from filesystem"""
    pass

@task()
def handle_setting_changes(setting_name):
    """Invalidate cache when setting changes"""
    pass

Notification Tasks

Send notifications:
@task()
def send_notifications(notification_list):
    """Send job notifications"""
    for notification in notification_list:
        # Send via email, Slack, webhook, etc.
        pass

Callback Receiver

Special dispatcher process for handling Ansible callback events:
class AWXConsumerRedis:
    """Consumer for callback events from Redis"""
    def __init__(self, name, worker):
        self.name = name
        self.pool = WorkerPool()
        self.redis = get_redis_client()
    
    def run(self):
        """Consume callback events"""
        while True:
            # Read events from Redis
            # Process and save to database
            pass
Purpose:
  • Receive Ansible events from running jobs
  • Process event data
  • Save to database for UI display

Task Routing

Queue Selection

Tasks can be routed to specific queues:
# Route to specific node
task.apply_async([args], queue=f'awx_{instance.hostname}')

# Route to default queue (any node)
task.apply_async([args], queue='awx')

# Fanout to all nodes
task.apply_async([args], exchange='broadcast')

Execution Node Selection

For job execution:
  1. Task Manager selects execution node
  2. Considers capacity and instance groups
  3. Routes task to node-specific queue
  4. Dispatcher on that node processes task

Monitoring and Debugging

Check Dispatcher Health

# Check if dispatcher is running
ps aux | grep dispatcherd

# Check worker status
awx-manage dispatcherctl status

# View dispatcher logs
tail -f /var/log/tower/dispatcher.log

Task Tracking

Tasks have UUIDs that can be tracked:
# In Django shell
from awx.main.models import UnifiedJob

# Find job by task UUID
job = UnifiedJob.objects.get(celery_task_id='550e8400-...')
print(f"Status: {job.status}")
print(f"Node: {job.execution_node}")

Common Issues

Dispatcher not running:
# Start dispatcher
awx-manage dispatcherd

# Check for errors
journalctl -u awx-dispatcher
Workers stuck:
# Check worker status
awx-manage dispatcherctl status

# Restart dispatcher if needed
systemctl restart awx-dispatcher
High memory usage:
# Check worker memory
awx-manage dispatcherctl status

# Adjust worker pool size
awx-manage dispatcherd --workers 4

Performance Tuning

Worker Pool Size

# In settings
DISPATCHER_MIN_WORKERS = 4   # Minimum workers
DISPATCHER_MAX_WORKERS = 60  # Maximum workers
Considerations:
  • More workers = more concurrency
  • Each worker consumes memory
  • Balance based on workload and resources

Queue Backlog

Monitor queue depth:
from awx.main.dispatch.pool import WorkerPool

pool = WorkerPool()
for worker in pool.workers:
    print(f"Worker {worker.pid}: Queue size {worker.qsize}")
Large queue sizes indicate workers are overloaded.

Task Prioritization

Currently AWX uses FIFO (First In, First Out) for task processing. Future enhancements may include:
  • Priority queues
  • Task preemption
  • Resource-based scheduling

Security Considerations

Task Validation

Only tasks starting with awx. are allowed:
if not task.startswith('awx.'):
    raise ValueError(f'{task} is not a valid awx task')
This prevents arbitrary code execution.

Credential Handling

Credentials are never passed in task arguments:
  • Retrieved from database within task
  • Decrypted at runtime
  • Never logged

Process Isolation

Worker processes are isolated:
  • Separate process per task
  • Failures don’t affect other tasks
  • Resource limits via cgroups (in containers)

Next Steps

Build docs developers (and LLMs) love