Skip to main content

Overview

The Transporter manages real-time, bidirectional communication between your plugin and the Hub Server using WebSockets. Built on Phoenix Channels, it handles connection lifecycle, message passing, and error recovery.

Creating a Transporter

Transporters are created automatically when you initialize a plugin:
const plugin = await createPlugin({
  name: "my-plugin",
  display_name: { en_US: "My Plugin" },
  transporterOptions: {
    heartbeatIntervalMs: 30000,
    onOpen: () => console.log("Connected!"),
    onClose: () => console.log("Disconnected!"),
    onError: (error) => console.error("Connection error:", error)
  }
})
// Internally: const transporter = createTransporter(transporterOptions)

Transporter Options

heartbeatIntervalMs
number
default:30000
Interval in milliseconds between heartbeat messages to keep the connection alive.
onOpen
function
Callback executed when the WebSocket connection is established.
onOpen: () => void
onClose
function
Callback executed when the WebSocket connection closes.
onClose: (event: CloseEvent) => void
onError
function
Callback executed when a connection error occurs.
onError: (error: Error, transport: string, establishedConnections: number) => void
onMessage
function
Callback executed for every message received from the Hub Server.
onMessage: (message: any) => void

Connection URL

The transporter automatically constructs the WebSocket URL from environment variables:
const url = `${env.HUB_WS_URL}/${env.HUB_MODE}_socket`

// Examples:
// Debug mode: "wss://api.atomemo.ai/debug_socket"
// Release mode: "wss://api.atomemo.ai/release_socket"

Environment Variables

HUB_WS_URL
string
required
Base WebSocket URL of the Hub Server (e.g., wss://api.atomemo.ai)
HUB_MODE
'debug' | 'release'
required
Runtime mode determining which socket endpoint to use
HUB_DEBUG_API_KEY
string
API key for debug mode authentication (required in non-production environments)
DEBUG
boolean
default:true
Enable debug logging for WebSocket communication

Socket Configuration

The transporter creates a Phoenix Socket with the following configuration:
const socket = new Socket(url, {
  debug: env.DEBUG,
  heartbeatIntervalMs: options.heartbeatIntervalMs ?? 30000,
  logger: customLogger,
  params: env.NODE_ENV !== "production" 
    ? { api_key: env.HUB_DEBUG_API_KEY } 
    : undefined
})

Debug Logging

When DEBUG is enabled, the transporter logs all socket events with detailed formatting:
logger(kind, message, data) {
  const timestamp = chalk.bgGrey(` ${new Date().toLocaleString()} `)
  const coloredKind = chalk.underline.bold.yellow(kind.toUpperCase())
  const coloredMessage = chalk.italic.red(message)
  const inspection = data ? Bun.inspect(data, { colors: true }) : ""
  
  console.debug(`${timestamp} ${coloredKind} ${coloredMessage}`, inspection)
}
Example output:
 3/1/2026, 10:30:45 AM  TRANSPORT Connected to wss://api.atomemo.ai/debug_socket
 3/1/2026, 10:30:45 AM  CHANNEL:JOINED Joined debug_plugin:my-plugin successfully { status: 'ok' }

Connection Lifecycle

1. Initialization

When the transporter is created:
const transporter = createTransporter(options)
// Socket is created and event handlers are attached
// Connection is established immediately with socket.connect()

2. Connection

Establishing the connection:
socket.connect()
// Triggers: onOpen callback

3. Channel Joining

Connecting to a specific channel:
const { channel, dispose } = await transporter.connect(channelName)
This:
  1. Creates a Phoenix Channel with the specified name
  2. Attempts to join the channel
  3. Returns the channel and a dispose function
  4. Throws an error if joining fails

4. Message Exchange

Once connected, the channel can send and receive messages:
// Send messages
channel.push("register_plugin", { /* data */ })

// Receive messages
channel.on("invoke_tool", (message) => {
  // Handle tool invocation
})

5. Disconnection

Graceful shutdown:
dispose()
// 1. Leaves the channel
// 2. Disconnects the socket
// 3. Exits the process

Channel Operations

Joining a Channel

The connect method returns a Promise that resolves when the channel is joined:
const { channel, dispose } = await transporter.connect("debug_plugin:my-plugin")

Implementation

connect: (channelName: string) => {
  return new Promise<{ channel: Channel; dispose: () => void }>((resolve, reject) => {
    const channel = socket.channel(channelName, {})
    
    channel
      .join()
      .receive("ok", (response) => {
        socket.log("channel:joined", `Joined ${channelName} successfully`, response)
        resolve({
          channel,
          dispose: () => {
            channel.leave()
            socket.disconnect()
            process.exit(0)
          }
        })
      })
      .receive("error", (response) => {
        socket.log("channel:error", `Failed to join ${channelName}`, response)
        reject(new Error(`Failed to join ${channelName}: ${response.reason}`))
      })
      .receive("timeout", (response) => {
        socket.log("channel:timeout", `Timeout while joining ${channelName}`, response)
        reject(new Error(`Timeout while joining ${channelName}`))
      })
  })
}

Response Handling

The Phoenix Channel join method supports three response types:
ok
success
Channel joined successfully. Returns the channel and dispose function.
error
failure
Failed to join the channel. Rejects the Promise with an error containing the reason.
timeout
failure
Channel join timed out. Rejects the Promise with a timeout error.

Error Handling

Connection Errors

The transporter includes special handling for authentication errors:
socket.onError((error, transport, establishedConnections) => {
  if (error instanceof ErrorEvent && 
      error.message.includes("failed: Expected 101 status code")) {
    
    console.error("Error: Can't connect to the Plugin Hub server.\n")
    
    if (Bun.env.NODE_ENV !== "production") {
      console.error("This is usually because the Debug API Key is missing or has expired.\n")
      console.error("Run `atomemo plugin refresh-key` to get a new key.\n")
    }
    
    process.exit(1)
  }
  
  options.onError?.(error, transport, establishedConnections)
})
This provides helpful error messages when:
  • The API key is missing
  • The API key has expired
  • The connection is refused

Custom Error Handling

You can implement custom error handling with the onError callback:
const plugin = await createPlugin({
  name: "my-plugin",
  transporterOptions: {
    onError: (error, transport, establishedConnections) => {
      // Log to monitoring service
      logger.error("WebSocket error", {
        error: error.message,
        transport,
        connections: establishedConnections
      })
      
      // Implement retry logic
      if (establishedConnections === 0) {
        setTimeout(() => reconnect(), 5000)
      }
    }
  }
})

Event Handlers

The transporter provides lifecycle event handlers:

onOpen

Called when the WebSocket connection is established:
transporterOptions: {
  onOpen: () => {
    console.log("✓ Connected to Hub Server")
    // Initialize resources
    // Log connection time
    // Update status indicators
  }
}

onClose

Called when the connection closes:
transporterOptions: {
  onClose: (event) => {
    console.log(`Connection closed: ${event.code} - ${event.reason}`)
    // Clean up resources
    // Log disconnection
    // Attempt reconnection if appropriate
  }
}

onError

Called when errors occur:
transporterOptions: {
  onError: (error, transport, establishedConnections) => {
    console.error(`Error on ${transport}:`, error.message)
    console.log(`Established connections: ${establishedConnections}`)
    // Handle specific error types
    // Implement retry strategies
    // Alert monitoring systems
  }
}

onMessage

Called for every message received (useful for logging/debugging):
transporterOptions: {
  onMessage: (message) => {
    console.log("Received:", message)
    // Log all messages
    // Monitor message patterns
    // Debug message flow
  }
}

Heartbeat Mechanism

The transporter sends periodic heartbeat messages to keep the connection alive:
heartbeatIntervalMs: 30000 // 30 seconds (default)
Heartbeats:
  • Prevent idle connection timeouts
  • Detect broken connections
  • Maintain connection state
The default heartbeat interval (30 seconds) works well for most use cases. Adjust only if you have specific requirements or experience connection issues.

Example: Custom Transporter Configuration

import { createPlugin } from "@choiceopen/atomemo-plugin-sdk-js"

const plugin = await createPlugin({
  name: "my-plugin",
  display_name: { en_US: "My Plugin" },
  description: { en_US: "A plugin with custom transporter settings" },
  
  transporterOptions: {
    // Custom heartbeat interval
    heartbeatIntervalMs: 60000, // 1 minute
    
    // Connection opened
    onOpen: () => {
      console.log("🔌 Connected to Hub Server")
      console.log(`Mode: ${process.env.HUB_MODE}`)
      console.log(`URL: ${process.env.HUB_WS_URL}`)
    },
    
    // Connection closed
    onClose: (event) => {
      const reason = event.reason || "Unknown reason"
      console.log(`🔌 Disconnected: ${reason} (Code: ${event.code})`)
      
      // Clean up resources
      cleanupResources()
    },
    
    // Connection error
    onError: (error, transport, connections) => {
      console.error(`❌ Error on ${transport}:`, error.message)
      
      if (connections === 0) {
        console.error("No active connections. Check your network and API key.")
      }
      
      // Send to error tracking service
      trackError(error, { transport, connections })
    },
    
    // Message logging
    onMessage: (message) => {
      // Log all messages in development
      if (process.env.NODE_ENV === "development") {
        console.debug("📨 Message:", JSON.stringify(message, null, 2))
      }
    }
  }
})

// Start the plugin
await plugin.run()

Best Practices

The default 30 seconds works for most cases. Longer intervals reduce network usage but increase detection time for broken connections.
Always implement the onError callback to handle connection failures gracefully and provide helpful error messages.
Use onOpen, onClose, and onMessage callbacks to log connection events during development for debugging.
Use the dispose function returned by transporter.connect() to properly clean up resources and close connections.
In production, use event callbacks to monitor connection health and send metrics to your monitoring system.

Environment Configuration

Ensure these environment variables are properly set:
# Required
HUB_WS_URL="wss://api.atomemo.ai"
HUB_MODE="debug"  # or "release"

# Required in debug mode
HUB_DEBUG_API_KEY="your-api-key-here"

# Optional
DEBUG="true"  # Enable debug logging
NODE_ENV="development"  # Development or production
Never commit API keys to version control. Use environment variables and keep your .env file in .gitignore.

Troubleshooting

Connection Refused (101 Error)

Problem: Expected 101 status code error Solution:
  1. Check that HUB_DEBUG_API_KEY is set correctly
  2. Verify the API key hasn’t expired
  3. Run atomemo plugin refresh-key to get a new key

Heartbeat Timeouts

Problem: Connection drops after period of inactivity Solution:
  1. Reduce heartbeatIntervalMs value
  2. Check network stability
  3. Verify firewall/proxy settings allow WebSocket connections

Channel Join Failures

Problem: Unable to join channel Solution:
  1. Verify channel name format matches expected pattern
  2. Check Hub Server logs for rejection reasons
  3. Ensure plugin is properly registered (debug mode)

Next Steps

Plugins

Learn about the plugin lifecycle and how transporter fits in

Registry

Understand how features are registered and transmitted

Environment Setup

Configure your development environment properly

Build docs developers (and LLMs) love