Skip to main content

Overview

AWX uses WebSockets to provide real-time updates as jobs execute, enabling live playbook output and status updates in the UI. Based on docs/websockets.md from the AWX source.

WebSocket Endpoint

wss://awx.example.com/websocket/
WebSocket connections require authentication via a valid token in the URL.

Connection

Establish Connection

const token = "YOUR_API_TOKEN";
const ws = new WebSocket(`wss://awx.example.com/websocket/?token=${token}`);

ws.onopen = () => {
    console.log("WebSocket connected");
};

ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log("Received:", data);
};

ws.onerror = (error) => {
    console.error("WebSocket error:", error);
};

ws.onclose = () => {
    console.log("WebSocket closed");
};

Python Example

import websocket
import json

def on_message(ws, message):
    data = json.loads(message)
    print(f"Received: {data}")

def on_error(ws, error):
    print(f"Error: {error}")

def on_close(ws, close_status_code, close_msg):
    print("WebSocket closed")

def on_open(ws):
    print("WebSocket connected")
    # Subscribe to groups
    ws.send(json.dumps({
        "groups": {
            "jobs": ["status_changed"],
            "job_events": [123]
        }
    }))

token = "YOUR_API_TOKEN"
ws_url = f"wss://awx.example.com/websocket/?token={token}"

ws = websocket.WebSocketApp(
    ws_url,
    on_open=on_open,
    on_message=on_message,
    on_error=on_error,
    on_close=on_close
)

ws.run_forever()

Subscriptions

After connecting, subscribe to event groups by sending a JSON message:
{
  "groups": {
    "jobs": ["status_changed", "summary"],
    "schedules": ["changed"],
    "ad_hoc_command_events": [1, 2, 3],
    "job_events": [123, 456],
    "workflow_events": [789],
    "project_update_events": [10],
    "inventory_update_events": [20],
    "system_job_events": [5],
    "control": ["limit_reached_1"]
  }
}

Event Groups

jobs
array
Subscribe to job status changes:
  • "status_changed" - Job status updates
  • "summary" - Job summaries
job_events
array
Subscribe to events for specific job IDs
workflow_events
array
Subscribe to workflow job events by ID
project_update_events
array
Subscribe to project update events by ID
inventory_update_events
array
Subscribe to inventory update events by ID
ad_hoc_command_events
array
Subscribe to ad hoc command events by ID
system_job_events
array
Subscribe to system job events by ID
schedules
array
Subscribe to schedule changes:
  • "changed" - Schedule modifications
control
array
Control channel messages:
  • "limit_reached_<user_id>" - Rate limit notifications
Sending a new subscription message replaces all previous subscriptions.

Event Messages

Job Status Changed

{
  "type": "job",
  "id": 123,
  "status": "running",
  "created": "2024-01-15T10:30:00Z",
  "started": "2024-01-15T10:30:05Z",
  "finished": null,
  "elapsed": 15.5,
  "job_template_id": 10,
  "inventory_id": 3,
  "project_id": 5
}

Job Event

{
  "type": "job_event",
  "job": 123,
  "id": 5678,
  "event": "runner_on_ok",
  "counter": 42,
  "stdout": "ok: [web01.example.com]",
  "start_line": 150,
  "end_line": 151,
  "created": "2024-01-15T10:30:15Z",
  "host": 10,
  "host_name": "web01.example.com",
  "task": "Deploy application",
  "play": "Configure web servers",
  "role": "webserver",
  "failed": false,
  "changed": true
}

Workflow Event

{
  "type": "workflow_job",
  "id": 50,
  "status": "running",
  "workflow_nodes": [
    {
      "id": 100,
      "identifier": "deploy_db",
      "job": 123,
      "status": "successful"
    },
    {
      "id": 101,
      "identifier": "deploy_backend",
      "job": 124,
      "status": "running"
    }
  ]
}

Event Types

Playbook Events

  • playbook_on_start - Playbook execution begins
  • playbook_on_play_start - Play starts
  • playbook_on_task_start - Task starts
  • playbook_on_stats - Final statistics
  • playbook_on_notify - Handler notification

Runner Events

  • runner_on_start - Task begins on host
  • runner_on_ok - Task succeeded
  • runner_on_failed - Task failed
  • runner_on_skipped - Task skipped
  • runner_on_unreachable - Host unreachable
  • runner_on_async_poll - Async task polling
  • runner_on_async_ok - Async task completed
  • runner_on_async_failed - Async task failed
  • runner_retry - Task retry

Item Events

  • runner_item_on_ok - Loop item succeeded
  • runner_item_on_failed - Loop item failed
  • runner_item_on_skipped - Loop item skipped

Live Job Monitoring

const token = "YOUR_API_TOKEN";
const jobId = 123;
const ws = new WebSocket(`wss://awx.example.com/websocket/?token=${token}`);

ws.onopen = () => {
    // Subscribe to specific job events
    ws.send(JSON.stringify({
        groups: {
            jobs: ["status_changed"],
            job_events: [jobId]
        }
    }));
};

ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    
    if (data.type === "job" && data.id === jobId) {
        console.log(`Job status: ${data.status}`);
        console.log(`Elapsed: ${data.elapsed}s`);
        
        if (["successful", "failed", "error", "canceled"].includes(data.status)) {
            console.log("Job finished");
            ws.close();
        }
    }
    
    if (data.type === "job_event" && data.job === jobId) {
        console.log(`[${data.event}] ${data.stdout}`);
    }
};

Architecture

AWX uses django-channels with Redis for WebSocket support:
  1. Task Pods - Generate events during job execution
  2. wsrelay - Relays events from task pods to web pods
  3. Web Pods - Serve WebSocket connections to clients
  4. Redis - Pub/sub backend for event distribution

Event Flow

Task Pod → wsrelay → Web Pod Redis → Django Channels → WebSocket Client

Heartbeat System

Web pods send heartbeats via pg_notify so task pods know which web pods are active and need event relays.

Security

The relay endpoint used by wsrelay is protected by a shared secret to prevent unauthorized access. Only wsrelay can connect to the relay endpoint.

Best Practices

Only subscribe to events you need. Subscribing to all job events can be overwhelming.
WebSocket connections can drop. Implement automatic reconnection with exponential backoff.
Update subscriptions as needed when navigating between jobs/workflows.
Close WebSocket connections when no longer needed to conserve resources.

Complete Example

import websocket
import json
import time
import threading

class AWXWebSocket:
    def __init__(self, base_url, token):
        self.base_url = base_url.replace("https://", "wss://").replace("http://", "ws://")
        self.token = token
        self.ws = None
        self.connected = False
    
    def connect(self):
        ws_url = f"{self.base_url}/websocket/?token={self.token}"
        self.ws = websocket.WebSocketApp(
            ws_url,
            on_open=self._on_open,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close
        )
        
        # Run in background thread
        ws_thread = threading.Thread(target=self.ws.run_forever)
        ws_thread.daemon = True
        ws_thread.start()
        
        # Wait for connection
        timeout = 5
        start = time.time()
        while not self.connected and time.time() - start < timeout:
            time.sleep(0.1)
    
    def _on_open(self, ws):
        self.connected = True
        print("WebSocket connected")
    
    def _on_message(self, ws, message):
        data = json.loads(message)
        self.handle_message(data)
    
    def _on_error(self, ws, error):
        print(f"WebSocket error: {error}")
    
    def _on_close(self, ws, close_status_code, close_msg):
        self.connected = False
        print("WebSocket closed")
    
    def subscribe(self, groups):
        if self.ws and self.connected:
            self.ws.send(json.dumps({"groups": groups}))
    
    def handle_message(self, data):
        # Override this method to handle messages
        print(f"Received: {data}")
    
    def close(self):
        if self.ws:
            self.ws.close()

# Usage
class JobMonitor(AWXWebSocket):
    def __init__(self, base_url, token, job_id):
        super().__init__(base_url, token)
        self.job_id = job_id
        self.job_finished = False
    
    def start_monitoring(self):
        self.connect()
        self.subscribe({
            "jobs": ["status_changed"],
            "job_events": [self.job_id]
        })
    
    def handle_message(self, data):
        if data.get("type") == "job" and data.get("id") == self.job_id:
            print(f"Job {self.job_id}: {data['status']} (elapsed: {data.get('elapsed', 0)}s)")
            
            if data["status"] in ["successful", "failed", "error", "canceled"]:
                self.job_finished = True
                self.close()
        
        elif data.get("type") == "job_event" and data.get("job") == self.job_id:
            if data.get("stdout"):
                print(data["stdout"])

# Monitor a job
monitor = JobMonitor("https://awx.example.com", "YOUR_TOKEN", 123)
monitor.start_monitoring()

# Wait for job to finish
while not monitor.job_finished:
    time.sleep(1)

print("Job monitoring complete")

Build docs developers (and LLMs) love