Skip to main content
FastrAPI provides full WebSocket support for real-time, bidirectional communication between clients and servers. Built on top of fastwebsockets and Tokio, it offers high-performance WebSocket handling with an easy-to-use async API.

Basic WebSocket endpoint

Create a WebSocket endpoint using the @app.websocket() decorator:
main.py
from fastrapi import FastrAPI
from fastrapi.websocket import websocket

app = FastrAPI()

@app.websocket("/ws")
async def websocket_endpoint(ws):
    await ws.accept()
    
    try:
        while True:
            data = await ws.receive_text()
            print(f"Client said: {data}")
            
            await ws.send_json({"reply": "Message received", "echo": data})
            
    except Exception as e:
        print(f"Connection closed: {e}")

if __name__ == "__main__":
    app.serve(host="0.0.0.0", port=8000)
WebSocket handlers must be async functions. They receive a WebSocket object that provides methods for sending and receiving data.

Testing WebSocket connections

Test your WebSocket endpoint with a simple HTML client:
client.html
<!DOCTYPE html>
<html>
<head>
    <title>WebSocket Test</title>
</head>
<body>
    <h1>WebSocket Test Client</h1>
    <input id="messageInput" type="text" placeholder="Enter message">
    <button onclick="sendMessage()">Send</button>
    <div id="messages"></div>

    <script>
        const ws = new WebSocket('ws://localhost:8000/ws');
        
        ws.onopen = () => {
            console.log('Connected');
            addMessage('Connected to server');
        };
        
        ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            addMessage('Server: ' + JSON.stringify(data));
        };
        
        ws.onerror = (error) => {
            console.error('WebSocket error:', error);
        };
        
        ws.onclose = () => {
            addMessage('Disconnected from server');
        };
        
        function sendMessage() {
            const input = document.getElementById('messageInput');
            ws.send(input.value);
            addMessage('You: ' + input.value);
            input.value = '';
        }
        
        function addMessage(msg) {
            const div = document.getElementById('messages');
            div.innerHTML += '<p>' + msg + '</p>';
        }
    </script>
</body>
</html>

WebSocket API

The WebSocket object provides several methods for communication:

Accepting connections

@app.websocket("/ws")
async def websocket_endpoint(ws):
    await ws.accept()
    # Connection is now open
ws.accept()
async method
Accept the WebSocket connection. Call this before sending or receiving data.

Receiving data

@app.websocket("/ws")
async def websocket_endpoint(ws):
    await ws.accept()
    text = await ws.receive_text()
    print(f"Received: {text}")

Sending data

@app.websocket("/ws")
async def websocket_endpoint(ws):
    await ws.accept()
    await ws.send_text("Hello, client!")

Closing connections

@app.websocket("/ws")
async def websocket_endpoint(ws):
    await ws.accept()
    # ... do work ...
    await ws.close(code=1000)  # Normal closure
code
int
default:"1000"
WebSocket close code. 1000 indicates normal closure. See RFC 6455 for other codes.

Broadcast pattern

Implement a broadcast server that sends messages to multiple connected clients:
broadcast.py
from fastrapi import FastrAPI
from typing import Set
import asyncio

app = FastrAPI()
connections: Set = set()

@app.websocket("/ws")
async def websocket_endpoint(ws):
    await ws.accept()
    connections.add(ws)
    
    try:
        while True:
            data = await ws.receive_text()
            
            # Broadcast to all connected clients
            for connection in connections:
                try:
                    await connection.send_json({
                        "message": data,
                        "total_clients": len(connections)
                    })
                except:
                    connections.discard(connection)
                    
    except Exception as e:
        print(f"Connection closed: {e}")
    finally:
        connections.discard(ws)

if __name__ == "__main__":
    app.serve("0.0.0.0", 8000)

Chat room example

Build a simple chat room with user identification:
chat.py
from fastrapi import FastrAPI
from typing import Dict
import json

app = FastrAPI()
rooms: Dict[str, set] = {}

@app.websocket("/chat/{room}")
async def chat_room(ws, room: str):
    await ws.accept()
    
    # Get username from query parameter
    username = "Anonymous"
    
    # Add to room
    if room not in rooms:
        rooms[room] = set()
    rooms[room].add(ws)
    
    # Notify others of new user
    for connection in rooms[room]:
        if connection != ws:
            try:
                await connection.send_json({
                    "type": "user_joined",
                    "username": username,
                    "room": room
                })
            except:
                rooms[room].discard(connection)
    
    try:
        while True:
            data = await ws.receive_text()
            
            # Broadcast message to room
            for connection in rooms[room]:
                try:
                    await connection.send_json({
                        "type": "message",
                        "username": username,
                        "message": data
                    })
                except:
                    rooms[room].discard(connection)
                    
    except Exception as e:
        print(f"Connection closed: {e}")
    finally:
        rooms[room].discard(ws)
        
        # Notify others of user leaving
        for connection in rooms[room]:
            try:
                await connection.send_json({
                    "type": "user_left",
                    "username": username
                })
            except:
                rooms[room].discard(connection)

if __name__ == "__main__":
    app.serve("0.0.0.0", 8000)

Connection state

Check the connection state with the client_state property:
@app.websocket("/ws")
async def websocket_endpoint(ws):
    await ws.accept()
    
    while ws.client_state == 1:  # 1 = Connected
        data = await ws.receive_text()
        await ws.send_text(f"Echo: {data}")
Client states:
  • 1: Connected
  • 3: Disconnected

Error handling

Handle connection errors and closures gracefully:
from fastrapi import FastrAPI, WebSocketException

app = FastrAPI()

@app.websocket("/ws")
async def websocket_endpoint(ws):
    try:
        await ws.accept()
        
        while True:
            try:
                data = await ws.receive_text()
                await ws.send_json({"echo": data})
            except ConnectionError:
                print("Client disconnected")
                break
            except ValueError as e:
                await ws.send_json({"error": str(e)})
                
    except WebSocketException as e:
        print(f"WebSocket error: {e.code} - {e.reason}")
    finally:
        # Cleanup code
        print("Connection closed")

How WebSockets work in FastrAPI

FastrAPI’s WebSocket implementation is built on several high-performance components:
1

Connection upgrade

When a WebSocket request arrives, FastrAPI uses the fastwebsockets crate to upgrade the HTTP connection to a WebSocket connection (src/websocket.rs:42-55).
2

Message routing

Messages are routed through async channels using tokio::sync::mpsc, enabling efficient bidirectional communication (src/websocket.rs:78-79).
3

Python handler

Your Python handler runs in a Tokio task, with the WebSocket object providing an async API that bridges to the Rust implementation (src/websocket.rs:92-96).
4

Frame processing

The socket_pump function handles low-level frame reading/writing, ping/pong frames, and connection lifecycle (src/websocket.rs:123-175).

Performance considerations

  • WebSocket connections are long-lived and use minimal resources when idle
  • Messages are processed asynchronously without blocking other connections
  • The Rust-based implementation provides microsecond-level latency
  • Binary frames are more efficient than text for large data transfers
For maximum throughput, use send_bytes() and receive_bytes() instead of JSON when transferring large amounts of data.

Best practices

Wrap your WebSocket logic in try/except to handle unexpected disconnections:
try:
    while True:
        data = await ws.receive_text()
        await ws.send_text(data)
except Exception:
    # Cleanup
    pass
Send periodic pings to detect dead connections:
async def heartbeat(ws):
    while True:
        await asyncio.sleep(30)
        try:
            await ws.send_json({"type": "ping"})
        except:
            break
For public APIs, consider limiting connection duration:
@app.websocket("/ws")
async def websocket_endpoint(ws):
    await ws.accept()
    timeout = asyncio.create_task(asyncio.sleep(3600))
    
    try:
        await asyncio.wait_for(handle_messages(ws), timeout=3600)
    except asyncio.TimeoutError:
        await ws.close()
Organize connections into logical groups rather than broadcasting to all:
connections = {"room1": set(), "room2": set()}

API reference

Complete WebSocket API documentation

Request handling

Understand async request processing

Error handling

Handle WebSocket errors

Background tasks

Run tasks alongside WebSockets

Build docs developers (and LLMs) love