Skip to main content
The Action Cable subscription provider delivers real-time GraphQL updates through WebSocket connections. It uses Action Cable’s pub/sub mechanism to broadcast subscription results and supports multiple storage backends for tracking active subscriptions.

Architecture Overview

The subscription system consists of three main components:
1

Provider

Manages subscription lifecycle and delivery through Action Cable
  • Accepts subscriptions from channels
  • Broadcasts updates via WebSocket streams
  • Handles asynchronous event processing
2

Store

Persists subscription metadata and enables fast lookups
  • Indexes by field, scope, and arguments
  • Supports searching for subscriptions to update
  • Memory or custom storage backends
3

Channel

Connects clients via WebSocket and routes messages
  • Receives GraphQL operations from clients
  • Streams subscription updates back to clients
  • Manages connection lifecycle

Quick Start

Ensure Action Cable is properly configured:
config/cable.yml
development:
  adapter: redis
  url: redis://localhost:6379/1
  channel_prefix: myapp_development

production:
  adapter: redis
  url: <%= ENV.fetch("REDIS_URL") { "redis://localhost:6379/1" } %>
  channel_prefix: myapp_production
Mount the cable endpoint:
config/routes.rb
Rails.application.routes.draw do
  mount ActionCable.server => '/cable'
end

Configure the Provider

The Action Cable provider is configured on your GraphQL schema:
app/graphql/app_schema.rb
module GraphQL
  class AppSchema < Rails::GraphQL::Schema
    # Configure subscription provider
    use_subscription_provider :action_cable,
      cable: ::ActionCable,
      prefix: 'rails-graphql',
      store: Rails::GraphQL::Subscription::Store::Memory.new,
      logger: Rails.logger
  end
end

Provider Options

OptionDefaultDescription
cable::ActionCableThe Action Cable class to use
prefix'rails-graphql'Stream name prefix
storeMemory.newSubscription storage backend
loggerRails::GraphQL.loggerLogger instance

How It Works

The provider uses Action Cable’s internal pub/sub for coordination (lib/rails/graphql/subscription/provider/action_cable.rb:15-29):
class ActionCable < Base
  INTERNAL_CHANNEL = 'rails-graphql:events'

  def initialize(*args, **options)
    @cable = options.fetch(:cable, ::ActionCable)
    @prefix = options.fetch(:prefix, 'rails-graphql')

    @event_callback = ->(message) do
      method_name, args, xargs = Marshal.load(message)
      @mutex.synchronize { send(method_name, *args, **xargs) }
    end

    super
  end
end

Stream Management

Each subscription gets its own stream (lib/rails/graphql/subscription/provider/action_cable.rb:74-82):
def stream_name(item)
  "#{prefix}:#{item.sid}"
end

protected

def stream_from(item)
  item.origin.stream_from(stream_name(item))
end

Adding Subscriptions

When a client subscribes (lib/rails/graphql/subscription/provider/action_cable.rb:39-47):
def add(*subscriptions)
  with_pubsub do
    subscriptions.each do |item|
      log(:added, item)
      store.add(item)
      stream_from(item)
    end
  end
end

Broadcasting Updates

Updates are broadcast to subscribers (lib/rails/graphql/subscription/provider/action_cable.rb:57-72):
def async_update(item, data = nil, **xargs)
  return if (item = store.fetch(item)).nil?
  removing = false

  log(:updated, item) do
    data = execute(item, **xargs) if data.nil?
    store.update!(item)

    unless (removing = unsubscribing?(data))
      data = { 'result' => data, 'more' => true }
      cable.server.broadcast(stream_name(item), data)
    end
  end

  async_remove(item) if removing
end
The provider automatically executes the subscription query to generate fresh data unless you provide the data parameter.

Memory Store

The default store keeps subscriptions in memory using concurrent data structures (lib/rails/graphql/subscription/store/memory.rb:15-32):
class Memory < Base
  attr_reader :list, :index

  def initialize
    # The list store a simple association between sid and
    @list = Concurrent::Map.new

    # This store the index in a way that is possible to search
    # subscriptions in a fast manner
    @index = Concurrent::Map.new do |h1, key1|               # Fields
      scopes = Concurrent::Map.new do |h2, key2|             # Scopes
        arguments = Concurrent::Map.new do |h3, key3|        # Arguments
          h3.fetch_or_store(key3, Concurrent::Array.new)     # SIDs
        end

        h2.fetch_or_store(key2, arguments)
      end

      h1.fetch_or_store(key1, scopes)
    end
  end
end

Three-Level Index

Subscriptions are indexed for efficient searching:
  1. Field - The subscription field (e.g., messageAdded)
  2. Scope - Optional scope identifier (e.g., User#1)
  3. Arguments - Query arguments hash

Adding to Store

# lib/rails/graphql/subscription/store/memory.rb:47-61
def add(subscription)
  if has?(subscription.sid)
    raise ::ArgumentError, +"SID #{subscription.sid} is already taken."
  end

  # Rewrite the scope, to save memory
  scope = possible_scopes(subscription.scope)&.first
  subscription.instance_variable_set(:@scope, scope)

  # Save to the list and to the index
  list[subscription.sid] = subscription
  index_set = subscription_to_index(subscription).reduce(index, &:[])
  index_set << subscription.sid
  subscription.sid
end

Searching Subscriptions

# lib/rails/graphql/subscription/store/memory.rb:98-120
def search(**xargs, &block)
  xargs = serialize(**xargs)
  field, scope, args = xargs.values_at(:field, :scope, :args)

  if field.nil? && args.nil? && scope.nil?
    list.each(&block) unless block.nil?
    return all
  end

  [].tap do |result|
    GraphQL.enumerate(field || index.keys).each do |key1|
      GraphQL.enumerate(scope || index[key1].keys).each do |key2|
        GraphQL.enumerate(args || index[key2].keys).each do |key3|
          items = index.fetch(key1, nil)&.fetch(key2, nil)&.fetch(key3, nil)
          items.each(&list.method(:[])).each(&block) unless block.nil?
          result.concat(items || EMPTY_ARRAY)
        end
      end
    end
  end
end

Triggering Updates

Broadcast updates to active subscriptions:

Basic Trigger

app/models/message.rb
class Message < ApplicationRecord
  after_create_commit :broadcast_creation
  
  private
  
  def broadcast_creation
    field = GraphQL::AppSchema[:subscription][:message_added]
    field.trigger(scope: channel)
  end
end

Scoped Updates

Use scopes to target specific subscriptions:
app/models/message.rb
class Message < ApplicationRecord
  belongs_to :channel
  
  after_create_commit :broadcast_creation
  
  private
  
  def broadcast_creation
    field = GraphQL::AppSchema[:subscription][:message_added]
    
    # Only notify subscriptions for this channel
    field.trigger(scope: channel)
  end
end

Hash-Based Scope Optimization

Avoid database queries when triggering (from docs/guides/subscriptions/memory-store.md:27-36):
field = GraphQL::AppSchema[:subscription][:user]

# These are equivalent:
field.trigger(scope: { User => 1 })
field.trigger(scope: User.find(1))
field.trigger(scope: User.find(1).hash)

# They all use the same approach towards #hash
User.hash ^ 1.hash
The memory store uses Ruby’s #hash method to minimize memory footprint. This is fully compatible with ActiveRecord and other Rails objects.

Custom Providers

Create custom providers by inheriting from Rails::GraphQL::Subscription::Provider::Base (lib/rails/graphql/subscription/provider/base.rb:18-68):
class Base
  class_attribute :abstract, instance_accessor: false, default: false

  delegate :fetch, :search, :find_each, to: :store

  def initialize(**options)
    @store = options.fetch(:store, Store::Memory.new)
    @logger = options.fetch(:logger, GraphQL.logger)
    @mutex = Mutex.new

    validate!
  end

  # Before even generating the item, check if the operation can be subscribed
  def accepts?(operation)
    raise NotImplementedError, +"#{self.class.name} does not implement accepts?"
  end

  # Add one or more subscriptions to the provider
  def add(*subscriptions)
    raise NotImplementedError, +"#{self.class.name} does not implement add"
  end

  # Remove one subscription from the provider
  async_exec def remove(item)
    raise NotImplementedError, +"#{self.class.name} does not implement remove"
  end

  # Update one single subscription
  async_exec def update(item, data = nil, **xargs)
    raise NotImplementedError, +"#{self.class.name} does not implement update"
  end
end

Example: HTTP Webhook Provider

lib/graphql/subscription/provider/webhook.rb
module GraphQL
  module Subscription
    module Provider
      class Webhook < Rails::GraphQL::Subscription::Provider::Base
        self.abstract = false
        
        attr_reader :http_client
        
        def initialize(**options)
          @http_client = options.fetch(:http_client, HTTP)
          super
        end
        
        def accepts?(operation)
          operation.request.context[:webhook_url].present?
        end
        
        def add(*subscriptions)
          subscriptions.each do |item|
            log(:added, item)
            store.add(item)
          end
        end
        
        def async_remove(item)
          return if (item = store.fetch(item)).nil?
          store.remove(item)
          log(:removed, item)
        end
        
        def async_update(item, data = nil, **xargs)
          return if (item = store.fetch(item)).nil?
          
          log(:updated, item) do
            data = execute(item, **xargs) if data.nil?
            store.update!(item)
            
            unless unsubscribing?(data)
              webhook_url = item.context[:webhook_url]
              http_client.post(webhook_url, json: { result: data })
            else
              async_remove(item)
            end
          end
        end
      end
    end
  end
end
Register the custom provider:
app/graphql/app_schema.rb
module GraphQL
  class AppSchema < Rails::GraphQL::Schema
    use_subscription_provider :webhook,
      store: Rails::GraphQL::Subscription::Store::Memory.new,
      http_client: HTTP.timeout(5)
  end
end

Custom Stores

Implement custom storage backends by inheriting from Rails::GraphQL::Subscription::Store::Base:
lib/graphql/subscription/store/redis.rb
module GraphQL
  module Subscription
    module Store
      class Redis < Rails::GraphQL::Subscription::Store::Base
        self.abstract = false
        
        attr_reader :redis
        
        def initialize(redis_client = ::Redis.new)
          @redis = redis_client
        end
        
        def all
          redis.smembers('subscriptions:sids')
        end
        
        def add(subscription)
          sid = subscription.sid
          
          # Store subscription data
          redis.hset("subscription:#{sid}", subscription.to_h)
          redis.sadd('subscriptions:sids', sid)
          
          # Index for searching
          field_key = "index:field:#{subscription.field.hash}"
          redis.sadd(field_key, sid)
          
          sid
        end
        
        def fetch(*sids)
          return nil if sids.empty?
          
          items = sids.map do |sid|
            data = redis.hgetall("subscription:#{sid}")
            data.empty? ? nil : deserialize_subscription(data)
          end
          
          items.one? ? items.first : items
        end
        
        def remove(item)
          sid = instance?(item) ? item.sid : item
          
          # Remove from index
          data = redis.hgetall("subscription:#{sid}")
          if data.present?
            field_key = "index:field:#{data['field_hash']}"
            redis.srem(field_key, sid)
          end
          
          # Remove subscription data
          redis.del("subscription:#{sid}")
          redis.srem('subscriptions:sids', sid)
        end
        
        def has?(item)
          sid = instance?(item) ? item.sid : item
          redis.sismember('subscriptions:sids', sid)
        end
        
        def search(**options, &block)
          # Implement search logic based on indexed fields
          # Return matching SIDs
        end
      end
    end
  end
end

Async Execution

Providers use async execution for thread safety (lib/rails/graphql/subscription/provider/base.rb:38-58):
def async_exec(*method_names)
  method_names.each do |method_name|
    async_method_name = :"async_#{method_name}"

    class_eval do
      alias_method async_method_name, method_name

      define_method(method_name) do |*args, **xargs|
        if @mutex.owned?
          send(async_method_name, *args, **xargs)
        else
          async_exec(async_method_name, *args, **xargs)
        end
      end
    end
  end
end
The Action Cable provider overrides async execution to use pub/sub (lib/rails/graphql/subscription/provider/action_cable.rb:88-92):
def async_exec(method_name, *args, **xargs)
  payload = [method_name, args, store.serialize(**xargs)]
  with_pubsub { @pubsub.broadcast(INTERNAL_CHANNEL, Marshal.dump(payload)) }
  nil
end

Monitoring

Subscription events emit ActiveSupport notifications (lib/rails/graphql/subscription/provider/base.rb:154-158):
def log(event, item = nil, &block)
  data = { item: item, type: event, provider: self }
  ActiveSupport::Notifications.instrument('subscription.graphql', **data, &block)
end
Subscribe to events:
config/initializers/graphql_monitoring.rb
ActiveSupport::Notifications.subscribe('subscription.graphql') do |name, start, finish, id, payload|
  duration = finish - start
  
  Rails.logger.info({
    event: 'subscription',
    type: payload[:type],
    sid: payload[:item]&.sid,
    duration: duration
  }.to_json)
end

Testing

Test subscriptions in your specs:
spec/graphql/subscriptions/message_added_spec.rb
require 'rails_helper'

RSpec.describe 'MessageAdded subscription' do
  let(:channel) { create(:channel) }
  let(:subscription_query) do
    <<~GQL
      subscription MessageAdded($channelId: ID!) {
        messageAdded(channelId: $channelId) {
          id
          body
        }
      }
    GQL
  end
  
  it 'triggers when message is created' do
    # Subscribe
    result = GraphQL::AppSchema.execute(
      subscription_query,
      variables: { channelId: channel.id },
      context: { channel: mock_channel }
    )
    
    expect(result.dig('data', 'messageAdded')).to be_nil
    
    # Trigger update
    message = create(:message, channel: channel)
    
    # Verify broadcast was called
    expect(ActionCable.server).to have_received(:broadcast)
      .with(matching(/rails-graphql:/), hash_including('result'))
  end
end

Performance Considerations

The memory store works well for single-instance deployments but requires all updates to be handled by the same Rails instance that created the subscription.
For multi-instance deployments:
  1. Use Redis for Action Cable - Ensures pub/sub works across instances
  2. Implement a distributed store - Share subscription data across instances
  3. Consider webhook providers - For external clients

Reference

Provider Methods

MethodDescription
accepts?(operation)Check if provider can handle operation
add(*subscriptions)Register new subscriptions
remove(item)Unsubscribe and cleanup
update(item, data)Broadcast update to subscription
search(**options)Find subscriptions matching criteria

Store Methods

MethodDescription
allReturn all subscription IDs
add(subscription)Store a new subscription
fetch(*sids)Retrieve subscriptions by ID
remove(item)Delete subscription
search(**options)Find subscriptions
has?(item)Check if subscription exists
For complete implementation details, see:

Build docs developers (and LLMs) love