Skip to main content

Overview

NATS Server provides native MQTT protocol support, allowing MQTT clients to connect and communicate seamlessly with NATS. This bridging enables MQTT devices to leverage NATS’ performance and scalability while maintaining standard MQTT compatibility.
NATS supports MQTT 3.1.1 protocol specification with Quality of Service (QoS) levels 0, 1, and 2.

Why MQTT on NATS?

Integrating MQTT with NATS provides unique advantages:
  • Protocol Bridging: MQTT clients communicate directly with NATS clients
  • Unified Infrastructure: Single server for both MQTT and NATS protocols
  • JetStream Persistence: MQTT messages leverage JetStream for reliability
  • Scalability: NATS clustering and gateway capabilities for MQTT
  • IoT Ready: Native support for resource-constrained devices
  • No External Broker: Built directly into NATS Server

MQTT Protocol Support

Protocol Version

NATS implements MQTT v3.1.1 (protocol level 4) as specified in the OASIS MQTT specification.
Protocol Constants
// MQTT protocol level (mqtt.go:59)
const mqttProtoLevel = byte(0x4)  // MQTT 3.1.1

Quality of Service Levels

MQTT QoS levels determine message delivery guarantees:
Fire and forget delivery with no acknowledgments.
  • Single delivery attempt
  • No persistence or retries
  • Lowest overhead
  • Best for telemetry and non-critical data
Acknowledged delivery with possible duplicates.
  • Message persisted to JetStream
  • Requires PUBACK acknowledgment
  • Redelivered if not acknowledged
  • Default acknowledgment wait: 30 seconds
Four-way handshake for exactly-once semantics.
  • Message deduplication with JetStream
  • PUBREC → PUBREL → PUBCOMP flow
  • Highest reliability, highest overhead
  • Ideal for critical commands

MQTT Packets

NATS implements all standard MQTT packet types (mqtt.go:42-56):
MQTT Packets
CONNECT     - Client connection request
CONNACK     - Connection acknowledgment
PUBLISH     - Publish message
PUBACK      - QoS 1 acknowledgment
PUBREC      - QoS 2 delivery part 1
PUBREL      - QoS 2 delivery part 2
PUBCOMP     - QoS 2 delivery part 3
SUBSCRIBE   - Subscribe to topics
SUBACK      - Subscribe acknowledgment
UNSUBSCRIBE - Unsubscribe from topics
UNSUBACK    - Unsubscribe acknowledgment
PINGREQ     - Keepalive ping
PINGRESP    - Ping response
DISCONNECT  - Clean disconnect

MQTT to NATS Bridging

Subject Translation

MQTT topics are automatically translated to NATS subjects:
Topic Mapping
MQTT Topic         NATS Subject
──────────────     ──────────────
sensors/temp       sensors.temp
device/+/status    device.*.status
alerts/#           alerts.>
Wildcard Translation:
  • MQTT + (single-level) → NATS *
  • MQTT # (multi-level) → NATS >
  • MQTT / (separator) → NATS .
The # wildcard in MQTT matches the current level AND all sub-levels. NATS creates two subscriptions: one for the exact subject and one with > wildcard (see mqtt.go:104-105).

Message Flow

MQTT and NATS clients can communicate seamlessly:

JetStream Integration

MQTT uses JetStream for QoS 1 and QoS 2 persistence (see README-MQTT.md:125-177): Streams Created:
1

Session State

$MQTT_sess - Stores session state and subscriptions
2

Message Delivery

$MQTT_msgs - QoS 1 and 2 message delivery and retries
3

Retained Messages

$MQTT_rmsgs - Persistent retained messages per topic
4

QoS 2 Incoming

$MQTT_qos2in - Deduplicates incoming QoS 2 messages
5

QoS 2 Outgoing

$MQTT_out - Stores PUBREL packets for QoS 2 completion
JetStream must be enabled for MQTT QoS 1 and QoS 2 to function. QoS 0 works without JetStream.

Configuration

Basic MQTT Setup

Enable MQTT by configuring the MQTT port:
nats.conf
mqtt {
    # Listen on standard MQTT port
    port: 1883
    
    # TLS configuration (optional)
    # tls {
    #     cert_file: "/path/to/cert.pem"
    #     key_file: "/path/to/key.pem"
    # }
}

# JetStream required for QoS 1 and 2
jetstream {
    store_dir: "/var/lib/nats/jetstream"
}

Advanced Configuration

mqtt-advanced.conf
mqtt {
    port: 1883
    
    # Authentication
    # Supports users, tokens, or account authentication
    
    # TLS configuration
    tls {
        cert_file: "/path/to/mqtt-cert.pem"
        key_file: "/path/to/mqtt-key.pem"
        ca_file: "/path/to/ca.pem"
        verify: true
    }
    
    # QoS settings
    ack_wait: "30s"              # Time to wait for QoS acks
    max_ack_pending: 1024        # Max unacknowledged messages
    
    # Session settings
    # Note: Session persistence uses JetStream
    
    # WebSocket support (see websocket section)
    # Clients connect to /mqtt on the WebSocket port
}

jetstream {
    store_dir: "/var/lib/nats/jetstream"
    max_memory_store: 1GB
    max_file_store: 10GB
}
port
int
default:"1883"
MQTT listener port
ack_wait
duration
default:"30s"
How long to wait for QoS acknowledgments before redelivery (mqtt.go:145)
max_ack_pending
int
default:"1024"
Maximum outstanding unacknowledged QoS 1 messages per session (mqtt.go:149)

MQTT over WebSocket

MQTT clients can connect via WebSocket:
mqtt-websocket.conf
websocket {
    port: 8080
    no_tls: true
}

mqtt {
    port: 1883
}
Clients connect to: ws://localhost:8080/mqtt (mqtt.go:191)
The /mqtt path is automatically recognized. Use Sec-Websocket-Protocol: mqtt header.

Use Cases

IoT Device Integration

MQTT Device Example
import paho.mqtt.client as mqtt

# Connect MQTT device to NATS
client = mqtt.Client("sensor-001")
client.connect("nats-server.local", 1883, 60)

# Publish sensor data (QoS 1 for reliability)
client.publish("sensors/temperature", "22.5", qos=1)

# Subscribe to commands
def on_message(client, userdata, msg):
    print(f"Command: {msg.payload.decode()}")

client.on_message = on_message
client.subscribe("devices/sensor-001/commands", qos=1)
client.loop_forever()

Hybrid MQTT/NATS Applications

NATS Client Bridge
// NATS application receives MQTT sensor data
nc, _ := nats.Connect("nats://localhost:4222")

// Subscribe to MQTT sensor topics
nc.Subscribe("sensors.>", func(msg *nats.Msg) {
    // Process data from MQTT devices
    fmt.Printf("Sensor data: %s from %s\n", msg.Data, msg.Subject)
    
    // Send commands back to MQTT devices
    nc.Publish("devices.sensor-001.commands", []byte("RESET"))
})

Retained Messages

MQTT retained messages persist the last value:
Retained Messages
# Publish with retain flag
client.publish("status/service", "online", retain=True)

# New subscribers immediately receive the retained message
client.subscribe("status/#")  # Gets "online" immediately
Retained messages are stored in the $MQTT_rmsgs JetStream stream.

Last Will and Testament

Configure automatic notification on disconnect:
Last Will
client.will_set(
    topic="status/sensor-001",
    payload="offline",
    qos=1,
    retain=True
)

client.connect("nats-server.local", 1883)
# Will message sent automatically on unexpected disconnect

Session Management

MQTT sessions track subscriptions and unacknowledged messages:

Clean Sessions

Session Types
# Clean session (default)
client = mqtt.Client("device-001", clean_session=True)
# Session state discarded on disconnect

# Persistent session
client = mqtt.Client("device-001", clean_session=False)
# Subscriptions and QoS messages persist across reconnects
Persistent sessions are stored in the $MQTT_sess JetStream stream and survive server restarts.

Client ID Requirements

Every MQTT client must have a unique client ID:
  • Used as session identifier
  • Hashed for internal storage (mqtt.go:60-63)
  • Duplicate IDs cause previous connection to disconnect
  • Protection against connection flapping (mqtt.go:179-182)

Monitoring

Monitor MQTT connections via /connz endpoint:
Monitor MQTT Connections
curl http://localhost:8222/connz?mqtt_client=sensor-001

# List all MQTT connections
curl http://localhost:8222/connz | jq '.connections[] | select(.mqtt_client)'
Check JetStream MQTT streams:
MQTT Streams
curl http://localhost:8222/jsz?accounts=true | \
  jq '.account_details[].stream_detail[] | select(.name | startswith("$MQTT"))'

Implementation Details

From the MQTT implementation documentation (README-MQTT.md):
  • Connection Handling: MQTT connections create a NATS client with mqtt field set
  • Packet Processing: Custom mqttParse() processes MQTT wire protocol
  • Account Isolation: MQTT streams created per-account with session manager
  • Stream Replication: MQTT streams support clustering with configurable replicas
  • Subject Mapping: Automatic translation between MQTT topics and NATS subjects

Performance Considerations

QoS 0 for Throughput

Use QoS 0 for high-volume telemetry where occasional message loss is acceptable.

Tune Ack Wait

Adjust ack_wait based on client connectivity. Longer for unreliable networks.

Session Limits

Persistent sessions consume memory. Use clean sessions for transient clients.

Max Ack Pending

Increase max_ack_pending for high-throughput QoS 1/2 subscriptions.

Limitations

  • MQTT 5.0 is not yet supported (only MQTT 3.1.1)
  • QoS 2 messages may be redelivered out of original order
  • Shared subscriptions not currently supported
  • Some MQTT extensions not implemented

Next Steps

JetStream

Learn about JetStream persistence used by MQTT

WebSocket

Connect MQTT clients over WebSocket

Configuration

Detailed MQTT configuration options

Monitoring

Monitor MQTT connections and streams

Build docs developers (and LLMs) love