The Action Cable subscription provider delivers real-time GraphQL updates through WebSocket connections. It uses Action Cable’s pub/sub mechanism to broadcast subscription results and supports multiple storage backends for tracking active subscriptions.
Architecture Overview
The subscription system consists of three main components:
Provider
Manages subscription lifecycle and delivery through Action Cable
- Accepts subscriptions from channels
- Broadcasts updates via WebSocket streams
- Handles asynchronous event processing
Store
Persists subscription metadata and enables fast lookups
- Indexes by field, scope, and arguments
- Supports searching for subscriptions to update
- Memory or custom storage backends
Channel
Connects clients via WebSocket and routes messages
- Receives GraphQL operations from clients
- Streams subscription updates back to clients
- Manages connection lifecycle
Quick Start
Ensure Action Cable is properly configured:
development:
adapter: redis
url: redis://localhost:6379/1
channel_prefix: myapp_development
production:
adapter: redis
url: <%= ENV.fetch("REDIS_URL") { "redis://localhost:6379/1" } %>
channel_prefix: myapp_production
Mount the cable endpoint:
Rails.application.routes.draw do
mount ActionCable.server => '/cable'
end
The Action Cable provider is configured on your GraphQL schema:
app/graphql/app_schema.rb
module GraphQL
class AppSchema < Rails::GraphQL::Schema
# Configure subscription provider
use_subscription_provider :action_cable,
cable: ::ActionCable,
prefix: 'rails-graphql',
store: Rails::GraphQL::Subscription::Store::Memory.new,
logger: Rails.logger
end
end
Provider Options
| Option | Default | Description |
|---|
cable | ::ActionCable | The Action Cable class to use |
prefix | 'rails-graphql' | Stream name prefix |
store | Memory.new | Subscription storage backend |
logger | Rails::GraphQL.logger | Logger instance |
How It Works
The provider uses Action Cable’s internal pub/sub for coordination (lib/rails/graphql/subscription/provider/action_cable.rb:15-29):
class ActionCable < Base
INTERNAL_CHANNEL = 'rails-graphql:events'
def initialize(*args, **options)
@cable = options.fetch(:cable, ::ActionCable)
@prefix = options.fetch(:prefix, 'rails-graphql')
@event_callback = ->(message) do
method_name, args, xargs = Marshal.load(message)
@mutex.synchronize { send(method_name, *args, **xargs) }
end
super
end
end
Stream Management
Each subscription gets its own stream (lib/rails/graphql/subscription/provider/action_cable.rb:74-82):
def stream_name(item)
"#{prefix}:#{item.sid}"
end
protected
def stream_from(item)
item.origin.stream_from(stream_name(item))
end
Adding Subscriptions
When a client subscribes (lib/rails/graphql/subscription/provider/action_cable.rb:39-47):
def add(*subscriptions)
with_pubsub do
subscriptions.each do |item|
log(:added, item)
store.add(item)
stream_from(item)
end
end
end
Broadcasting Updates
Updates are broadcast to subscribers (lib/rails/graphql/subscription/provider/action_cable.rb:57-72):
def async_update(item, data = nil, **xargs)
return if (item = store.fetch(item)).nil?
removing = false
log(:updated, item) do
data = execute(item, **xargs) if data.nil?
store.update!(item)
unless (removing = unsubscribing?(data))
data = { 'result' => data, 'more' => true }
cable.server.broadcast(stream_name(item), data)
end
end
async_remove(item) if removing
end
The provider automatically executes the subscription query to generate fresh data unless you provide the data parameter.
Memory Store
The default store keeps subscriptions in memory using concurrent data structures (lib/rails/graphql/subscription/store/memory.rb:15-32):
class Memory < Base
attr_reader :list, :index
def initialize
# The list store a simple association between sid and
@list = Concurrent::Map.new
# This store the index in a way that is possible to search
# subscriptions in a fast manner
@index = Concurrent::Map.new do |h1, key1| # Fields
scopes = Concurrent::Map.new do |h2, key2| # Scopes
arguments = Concurrent::Map.new do |h3, key3| # Arguments
h3.fetch_or_store(key3, Concurrent::Array.new) # SIDs
end
h2.fetch_or_store(key2, arguments)
end
h1.fetch_or_store(key1, scopes)
end
end
end
Three-Level Index
Subscriptions are indexed for efficient searching:
- Field - The subscription field (e.g.,
messageAdded)
- Scope - Optional scope identifier (e.g.,
User#1)
- Arguments - Query arguments hash
Adding to Store
# lib/rails/graphql/subscription/store/memory.rb:47-61
def add(subscription)
if has?(subscription.sid)
raise ::ArgumentError, +"SID #{subscription.sid} is already taken."
end
# Rewrite the scope, to save memory
scope = possible_scopes(subscription.scope)&.first
subscription.instance_variable_set(:@scope, scope)
# Save to the list and to the index
list[subscription.sid] = subscription
index_set = subscription_to_index(subscription).reduce(index, &:[])
index_set << subscription.sid
subscription.sid
end
Searching Subscriptions
# lib/rails/graphql/subscription/store/memory.rb:98-120
def search(**xargs, &block)
xargs = serialize(**xargs)
field, scope, args = xargs.values_at(:field, :scope, :args)
if field.nil? && args.nil? && scope.nil?
list.each(&block) unless block.nil?
return all
end
[].tap do |result|
GraphQL.enumerate(field || index.keys).each do |key1|
GraphQL.enumerate(scope || index[key1].keys).each do |key2|
GraphQL.enumerate(args || index[key2].keys).each do |key3|
items = index.fetch(key1, nil)&.fetch(key2, nil)&.fetch(key3, nil)
items.each(&list.method(:[])).each(&block) unless block.nil?
result.concat(items || EMPTY_ARRAY)
end
end
end
end
end
Triggering Updates
Broadcast updates to active subscriptions:
Basic Trigger
class Message < ApplicationRecord
after_create_commit :broadcast_creation
private
def broadcast_creation
field = GraphQL::AppSchema[:subscription][:message_added]
field.trigger(scope: channel)
end
end
Scoped Updates
Use scopes to target specific subscriptions:
class Message < ApplicationRecord
belongs_to :channel
after_create_commit :broadcast_creation
private
def broadcast_creation
field = GraphQL::AppSchema[:subscription][:message_added]
# Only notify subscriptions for this channel
field.trigger(scope: channel)
end
end
Hash-Based Scope Optimization
Avoid database queries when triggering (from docs/guides/subscriptions/memory-store.md:27-36):
field = GraphQL::AppSchema[:subscription][:user]
# These are equivalent:
field.trigger(scope: { User => 1 })
field.trigger(scope: User.find(1))
field.trigger(scope: User.find(1).hash)
# They all use the same approach towards #hash
User.hash ^ 1.hash
The memory store uses Ruby’s #hash method to minimize memory footprint. This is fully compatible with ActiveRecord and other Rails objects.
Custom Providers
Create custom providers by inheriting from Rails::GraphQL::Subscription::Provider::Base (lib/rails/graphql/subscription/provider/base.rb:18-68):
class Base
class_attribute :abstract, instance_accessor: false, default: false
delegate :fetch, :search, :find_each, to: :store
def initialize(**options)
@store = options.fetch(:store, Store::Memory.new)
@logger = options.fetch(:logger, GraphQL.logger)
@mutex = Mutex.new
validate!
end
# Before even generating the item, check if the operation can be subscribed
def accepts?(operation)
raise NotImplementedError, +"#{self.class.name} does not implement accepts?"
end
# Add one or more subscriptions to the provider
def add(*subscriptions)
raise NotImplementedError, +"#{self.class.name} does not implement add"
end
# Remove one subscription from the provider
async_exec def remove(item)
raise NotImplementedError, +"#{self.class.name} does not implement remove"
end
# Update one single subscription
async_exec def update(item, data = nil, **xargs)
raise NotImplementedError, +"#{self.class.name} does not implement update"
end
end
Example: HTTP Webhook Provider
lib/graphql/subscription/provider/webhook.rb
module GraphQL
module Subscription
module Provider
class Webhook < Rails::GraphQL::Subscription::Provider::Base
self.abstract = false
attr_reader :http_client
def initialize(**options)
@http_client = options.fetch(:http_client, HTTP)
super
end
def accepts?(operation)
operation.request.context[:webhook_url].present?
end
def add(*subscriptions)
subscriptions.each do |item|
log(:added, item)
store.add(item)
end
end
def async_remove(item)
return if (item = store.fetch(item)).nil?
store.remove(item)
log(:removed, item)
end
def async_update(item, data = nil, **xargs)
return if (item = store.fetch(item)).nil?
log(:updated, item) do
data = execute(item, **xargs) if data.nil?
store.update!(item)
unless unsubscribing?(data)
webhook_url = item.context[:webhook_url]
http_client.post(webhook_url, json: { result: data })
else
async_remove(item)
end
end
end
end
end
end
end
Register the custom provider:
app/graphql/app_schema.rb
module GraphQL
class AppSchema < Rails::GraphQL::Schema
use_subscription_provider :webhook,
store: Rails::GraphQL::Subscription::Store::Memory.new,
http_client: HTTP.timeout(5)
end
end
Custom Stores
Implement custom storage backends by inheriting from Rails::GraphQL::Subscription::Store::Base:
lib/graphql/subscription/store/redis.rb
module GraphQL
module Subscription
module Store
class Redis < Rails::GraphQL::Subscription::Store::Base
self.abstract = false
attr_reader :redis
def initialize(redis_client = ::Redis.new)
@redis = redis_client
end
def all
redis.smembers('subscriptions:sids')
end
def add(subscription)
sid = subscription.sid
# Store subscription data
redis.hset("subscription:#{sid}", subscription.to_h)
redis.sadd('subscriptions:sids', sid)
# Index for searching
field_key = "index:field:#{subscription.field.hash}"
redis.sadd(field_key, sid)
sid
end
def fetch(*sids)
return nil if sids.empty?
items = sids.map do |sid|
data = redis.hgetall("subscription:#{sid}")
data.empty? ? nil : deserialize_subscription(data)
end
items.one? ? items.first : items
end
def remove(item)
sid = instance?(item) ? item.sid : item
# Remove from index
data = redis.hgetall("subscription:#{sid}")
if data.present?
field_key = "index:field:#{data['field_hash']}"
redis.srem(field_key, sid)
end
# Remove subscription data
redis.del("subscription:#{sid}")
redis.srem('subscriptions:sids', sid)
end
def has?(item)
sid = instance?(item) ? item.sid : item
redis.sismember('subscriptions:sids', sid)
end
def search(**options, &block)
# Implement search logic based on indexed fields
# Return matching SIDs
end
end
end
end
end
Async Execution
Providers use async execution for thread safety (lib/rails/graphql/subscription/provider/base.rb:38-58):
def async_exec(*method_names)
method_names.each do |method_name|
async_method_name = :"async_#{method_name}"
class_eval do
alias_method async_method_name, method_name
define_method(method_name) do |*args, **xargs|
if @mutex.owned?
send(async_method_name, *args, **xargs)
else
async_exec(async_method_name, *args, **xargs)
end
end
end
end
end
The Action Cable provider overrides async execution to use pub/sub (lib/rails/graphql/subscription/provider/action_cable.rb:88-92):
def async_exec(method_name, *args, **xargs)
payload = [method_name, args, store.serialize(**xargs)]
with_pubsub { @pubsub.broadcast(INTERNAL_CHANNEL, Marshal.dump(payload)) }
nil
end
Monitoring
Subscription events emit ActiveSupport notifications (lib/rails/graphql/subscription/provider/base.rb:154-158):
def log(event, item = nil, &block)
data = { item: item, type: event, provider: self }
ActiveSupport::Notifications.instrument('subscription.graphql', **data, &block)
end
Subscribe to events:
config/initializers/graphql_monitoring.rb
ActiveSupport::Notifications.subscribe('subscription.graphql') do |name, start, finish, id, payload|
duration = finish - start
Rails.logger.info({
event: 'subscription',
type: payload[:type],
sid: payload[:item]&.sid,
duration: duration
}.to_json)
end
Testing
Test subscriptions in your specs:
spec/graphql/subscriptions/message_added_spec.rb
require 'rails_helper'
RSpec.describe 'MessageAdded subscription' do
let(:channel) { create(:channel) }
let(:subscription_query) do
<<~GQL
subscription MessageAdded($channelId: ID!) {
messageAdded(channelId: $channelId) {
id
body
}
}
GQL
end
it 'triggers when message is created' do
# Subscribe
result = GraphQL::AppSchema.execute(
subscription_query,
variables: { channelId: channel.id },
context: { channel: mock_channel }
)
expect(result.dig('data', 'messageAdded')).to be_nil
# Trigger update
message = create(:message, channel: channel)
# Verify broadcast was called
expect(ActionCable.server).to have_received(:broadcast)
.with(matching(/rails-graphql:/), hash_including('result'))
end
end
The memory store works well for single-instance deployments but requires all updates to be handled by the same Rails instance that created the subscription.
For multi-instance deployments:
- Use Redis for Action Cable - Ensures pub/sub works across instances
- Implement a distributed store - Share subscription data across instances
- Consider webhook providers - For external clients
Reference
Provider Methods
| Method | Description |
|---|
accepts?(operation) | Check if provider can handle operation |
add(*subscriptions) | Register new subscriptions |
remove(item) | Unsubscribe and cleanup |
update(item, data) | Broadcast update to subscription |
search(**options) | Find subscriptions matching criteria |
Store Methods
| Method | Description |
|---|
all | Return all subscription IDs |
add(subscription) | Store a new subscription |
fetch(*sids) | Retrieve subscriptions by ID |
remove(item) | Delete subscription |
search(**options) | Find subscriptions |
has?(item) | Check if subscription exists |
For complete implementation details, see: