FastrAPI provides full WebSocket support for real-time, bidirectional communication between clients and servers. Built on top of fastwebsockets and Tokio, it offers high-performance WebSocket handling with an easy-to-use async API.
Basic WebSocket endpoint
Create a WebSocket endpoint using the @app.websocket() decorator:
from fastrapi import FastrAPI
from fastrapi.websocket import websocket
app = FastrAPI()
@app.websocket ( "/ws" )
async def websocket_endpoint ( ws ):
await ws.accept()
try :
while True :
data = await ws.receive_text()
print ( f "Client said: { data } " )
await ws.send_json({ "reply" : "Message received" , "echo" : data})
except Exception as e:
print ( f "Connection closed: { e } " )
if __name__ == "__main__" :
app.serve( host = "0.0.0.0" , port = 8000 )
WebSocket handlers must be async functions. They receive a WebSocket object that provides methods for sending and receiving data.
Testing WebSocket connections
Test your WebSocket endpoint with a simple HTML client:
<! DOCTYPE html >
< html >
< head >
< title > WebSocket Test </ title >
</ head >
< body >
< h1 > WebSocket Test Client </ h1 >
< input id = "messageInput" type = "text" placeholder = "Enter message" >
< button onclick = " sendMessage ()" > Send </ button >
< div id = "messages" ></ div >
< script >
const ws = new WebSocket ( 'ws://localhost:8000/ws' );
ws . onopen = () => {
console . log ( 'Connected' );
addMessage ( 'Connected to server' );
};
ws . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
addMessage ( 'Server: ' + JSON . stringify ( data ));
};
ws . onerror = ( error ) => {
console . error ( 'WebSocket error:' , error );
};
ws . onclose = () => {
addMessage ( 'Disconnected from server' );
};
function sendMessage () {
const input = document . getElementById ( 'messageInput' );
ws . send ( input . value );
addMessage ( 'You: ' + input . value );
input . value = '' ;
}
function addMessage ( msg ) {
const div = document . getElementById ( 'messages' );
div . innerHTML += '<p>' + msg + '</p>' ;
}
</ script >
</ body >
</ html >
WebSocket API
The WebSocket object provides several methods for communication:
Accepting connections
@app.websocket ( "/ws" )
async def websocket_endpoint ( ws ):
await ws.accept()
# Connection is now open
Accept the WebSocket connection. Call this before sending or receiving data.
Receiving data
Text messages
Binary data
JSON data
@app.websocket ( "/ws" )
async def websocket_endpoint ( ws ):
await ws.accept()
text = await ws.receive_text()
print ( f "Received: { text } " )
Sending data
Text messages
Binary data
JSON data
@app.websocket ( "/ws" )
async def websocket_endpoint ( ws ):
await ws.accept()
await ws.send_text( "Hello, client!" )
Closing connections
@app.websocket ( "/ws" )
async def websocket_endpoint ( ws ):
await ws.accept()
# ... do work ...
await ws.close( code = 1000 ) # Normal closure
WebSocket close code. 1000 indicates normal closure. See RFC 6455 for other codes.
Broadcast pattern
Implement a broadcast server that sends messages to multiple connected clients:
from fastrapi import FastrAPI
from typing import Set
import asyncio
app = FastrAPI()
connections: Set = set ()
@app.websocket ( "/ws" )
async def websocket_endpoint ( ws ):
await ws.accept()
connections.add(ws)
try :
while True :
data = await ws.receive_text()
# Broadcast to all connected clients
for connection in connections:
try :
await connection.send_json({
"message" : data,
"total_clients" : len (connections)
})
except :
connections.discard(connection)
except Exception as e:
print ( f "Connection closed: { e } " )
finally :
connections.discard(ws)
if __name__ == "__main__" :
app.serve( "0.0.0.0" , 8000 )
Chat room example
Build a simple chat room with user identification:
from fastrapi import FastrAPI
from typing import Dict
import json
app = FastrAPI()
rooms: Dict[ str , set ] = {}
@app.websocket ( "/chat/ {room} " )
async def chat_room ( ws , room : str ):
await ws.accept()
# Get username from query parameter
username = "Anonymous"
# Add to room
if room not in rooms:
rooms[room] = set ()
rooms[room].add(ws)
# Notify others of new user
for connection in rooms[room]:
if connection != ws:
try :
await connection.send_json({
"type" : "user_joined" ,
"username" : username,
"room" : room
})
except :
rooms[room].discard(connection)
try :
while True :
data = await ws.receive_text()
# Broadcast message to room
for connection in rooms[room]:
try :
await connection.send_json({
"type" : "message" ,
"username" : username,
"message" : data
})
except :
rooms[room].discard(connection)
except Exception as e:
print ( f "Connection closed: { e } " )
finally :
rooms[room].discard(ws)
# Notify others of user leaving
for connection in rooms[room]:
try :
await connection.send_json({
"type" : "user_left" ,
"username" : username
})
except :
rooms[room].discard(connection)
if __name__ == "__main__" :
app.serve( "0.0.0.0" , 8000 )
Connection state
Check the connection state with the client_state property:
@app.websocket ( "/ws" )
async def websocket_endpoint ( ws ):
await ws.accept()
while ws.client_state == 1 : # 1 = Connected
data = await ws.receive_text()
await ws.send_text( f "Echo: { data } " )
Client states:
1: Connected
3: Disconnected
Error handling
Handle connection errors and closures gracefully:
from fastrapi import FastrAPI, WebSocketException
app = FastrAPI()
@app.websocket ( "/ws" )
async def websocket_endpoint ( ws ):
try :
await ws.accept()
while True :
try :
data = await ws.receive_text()
await ws.send_json({ "echo" : data})
except ConnectionError :
print ( "Client disconnected" )
break
except ValueError as e:
await ws.send_json({ "error" : str (e)})
except WebSocketException as e:
print ( f "WebSocket error: { e.code } - { e.reason } " )
finally :
# Cleanup code
print ( "Connection closed" )
How WebSockets work in FastrAPI
FastrAPI’s WebSocket implementation is built on several high-performance components:
Connection upgrade
When a WebSocket request arrives, FastrAPI uses the fastwebsockets crate to upgrade the HTTP connection to a WebSocket connection (src/websocket.rs:42-55).
Message routing
Messages are routed through async channels using tokio::sync::mpsc, enabling efficient bidirectional communication (src/websocket.rs:78-79).
Python handler
Your Python handler runs in a Tokio task, with the WebSocket object providing an async API that bridges to the Rust implementation (src/websocket.rs:92-96).
Frame processing
The socket_pump function handles low-level frame reading/writing, ping/pong frames, and connection lifecycle (src/websocket.rs:123-175).
WebSocket connections are long-lived and use minimal resources when idle
Messages are processed asynchronously without blocking other connections
The Rust-based implementation provides microsecond-level latency
Binary frames are more efficient than text for large data transfers
For maximum throughput, use send_bytes() and receive_bytes() instead of JSON when transferring large amounts of data.
Best practices
Always handle disconnections
Wrap your WebSocket logic in try/except to handle unexpected disconnections: try :
while True :
data = await ws.receive_text()
await ws.send_text(data)
except Exception :
# Cleanup
pass
Send periodic pings to detect dead connections: async def heartbeat ( ws ):
while True :
await asyncio.sleep( 30 )
try :
await ws.send_json({ "type" : "ping" })
except :
break
Limit connection lifetime
For public APIs, consider limiting connection duration: @app.websocket ( "/ws" )
async def websocket_endpoint ( ws ):
await ws.accept()
timeout = asyncio.create_task(asyncio.sleep( 3600 ))
try :
await asyncio.wait_for(handle_messages(ws), timeout = 3600 )
except asyncio.TimeoutError:
await ws.close()
Use rooms/channels for scalability
Organize connections into logical groups rather than broadcasting to all: connections = { "room1" : set (), "room2" : set ()}
API reference Complete WebSocket API documentation
Request handling Understand async request processing
Error handling Handle WebSocket errors
Background tasks Run tasks alongside WebSockets