Kora’s Pub/Sub system provides real-time message delivery using a sharded broker architecture. Channels are distributed across independent shards for parallel publishing, while pattern subscriptions are replicated for universal matching.
Overview
From kora-pubsub/src/lib.rs:1-15:
Sharded Pub/Sub broker for Kōra. Provides a multi-threaded Pub/Sub message broker with per-channel sharding. Channels are hashed to independent shards so that publishes to different channels proceed in parallel without contention. Pattern subscriptions (glob-style) are replicated across all shards, since any publish could potentially match.
Features
Sharded channels for parallel message delivery
Glob patterns (*, ?) for flexible subscriptions
Push-mode delivery to connected clients
Dead subscriber cleanup for automatic resource management
Thread-safe broker with lock-free publish on most paths
Architecture
From kora-pubsub/src/broker.rs:59-90:
pub struct PubSubBroker {
shards : Vec < RwLock < ShardSubscriptions >>,
shard_mask : usize ,
hash_builder : ahash :: RandomState ,
}
fn shard_index ( & self , channel : & [ u8 ]) -> usize {
let hash = self . hash_builder . hash_one ( channel );
( hash as usize ) & self . shard_mask
}
Channel Hashing
Each channel is hashed to a specific shard using AHash. Multiple channels distribute across shards for parallelism.
Subscription Storage
Exact subscriptions live in the target shard. Pattern subscriptions are replicated to all shards since any publish could match.
Message Delivery
PUBLISH takes a read lock on the target shard and iterates subscribers. Dead connections are cleaned up lazily.
Commands
Subscribe to Channels
# Subscribe to exact channel
SUBSCRIBE news
# => 1: ["subscribe", "news", 1]
# Subscribe to multiple channels
SUBSCRIBE news alerts updates
# => 1: ["subscribe", "news", 1]
# => 2: ["subscribe", "alerts", 2]
# => 3: ["subscribe", "updates", 3]
Response format :
[ "subscribe" , "<channel>" , <total_subscriptions> ]
Subscribe to Patterns
# Subscribe to all channels matching pattern
PSUBSCRIBE news. *
# => ["psubscribe", "news.*", 1]
# Multiple patterns
PSUBSCRIBE user. * session. * cache. *
Pattern matching uses glob-style wildcards:
* matches zero or more characters
? matches exactly one character
Examples:
user.* matches user.1000, user.login, etc.
session.??? matches session.abc, session.123, but not session.a
Publish Messages
# Publish to a channel
PUBLISH news "Breaking: Kora v1.0 released!"
# => 42 (number of subscribers that received the message)
# Binary-safe payload
PUBLISH events "\x00\x01\x02\x03"
# => 5
Returns : Number of subscribers that received the message (exact + pattern matches).
Unsubscribe
# Unsubscribe from specific channels
UNSUBSCRIBE news alerts
# => 1: ["unsubscribe", "news", 1]
# => 2: ["unsubscribe", "alerts", 0]
# Unsubscribe from all channels
UNSUBSCRIBE
# => ["unsubscribe", null, 0]
# Unsubscribe from patterns
PUNSUBSCRIBE user. * session. *
# => 1: ["punsubscribe", "user.*", 1]
# => 2: ["punsubscribe", "session.*", 0]
Message Delivery
Exact Subscriptions
# Client 1
SUBSCRIBE notifications
# Client 2 publishes
PUBLISH notifications "New comment on your post"
# => 1
# Client 1 receives:
# ["message", "notifications", "New comment on your post"]
Message format (exact):
[ "message" , "<channel>" , "<payload>" ]
Pattern Subscriptions
# Client 1
PSUBSCRIBE user. *
# Client 2 publishes
PUBLISH user.1000 "Profile updated"
# => 1
# Client 1 receives:
# ["pmessage", "user.*", "user.1000", "Profile updated"]
Message format (pattern):
[ "pmessage" , "<pattern>" , "<channel>" , "<payload>" ]
Combined Subscriptions
# Client subscribes to both exact and pattern
SUBSCRIBE alerts
PSUBSCRIBE alerts. *
# Publisher sends to exact match
PUBLISH alerts "Server down!"
# => 1 (client receives once via exact subscription)
# Publisher sends to pattern match
PUBLISH alerts.critical "Database offline!"
# => 1 (client receives once via pattern subscription)
If a client has both an exact subscription and a matching pattern, they receive the message twice (once for each subscription type).
Use Cases
Real-Time Notifications Push updates to web/mobile clients. SUBSCRIBE notifications:user:1000
# Client receives instant updates
Chat Rooms Broadcast messages to room subscribers. SUBSCRIBE chat:room:general
PUBLISH chat:room:general "Hello everyone!"
Cache Invalidation Notify all app servers when cache keys change. PSUBSCRIBE cache:invalidate: *
PUBLISH cache:invalidate:user:1000 ""
Microservice Events Broadcast domain events across services. PSUBSCRIBE events:order. *
PUBLISH events:order.created "{...json...}"
Pattern Matching
From kora-pubsub/src/glob.rs, the pattern matcher supports:
Wildcards
Matches zero or more characters. Pattern: user. *
Matches:
- user.alice
- user.bob
- user.1000
- user.
Does not match:
- user
- useralice
Matches exactly one character. Pattern: log.?
Matches:
- log.1
- log.a
- log.!
Does not match:
- log.
- log.12
- log
Pattern Examples
# Match all user channels
PSUBSCRIBE user. *
# Matches: user.alice, user.1000, user.login
# Match specific format
PSUBSCRIBE session.????-????-????
# Matches: session.abcd-1234-xyz9
# Match nested namespaces
PSUBSCRIBE app: * :events
# Matches: app:user:events, app:order:events
# Combine wildcards
PSUBSCRIBE log. * . *
# Matches: log.info.app, log.error.db, log.warn.cache
Implementation Details
Subscriber Interface
From kora-pubsub/src/broker.rs:26-32:
pub trait MessageSink : Send + Sync {
/// Attempt to deliver msg to the subscriber.
/// Returns true if accepted, false if subscriber is dead.
fn send ( & self , msg : PubSubMessage ) -> bool ;
}
Clients implement MessageSink to receive messages. Dead subscribers (those returning false) are cleaned up on the next publish.
Dead Subscriber Cleanup
From kora-pubsub/src/broker.rs:150-180:
pub fn publish ( & self , channel : & [ u8 ], message : & [ u8 ]) -> usize {
let idx = self . shard_index ( channel );
let shard = self . shards[ idx ] . read ();
let mut live_count = 0 ;
// Exact subscribers
if let Some ( subs ) = shard . channels . get ( channel ) {
for sub in subs {
if sub . tx . send ( PubSubMessage :: Message { ... }) {
live_count += 1 ;
}
// Dead subscribers cleaned up lazily
}
}
// Pattern subscribers
for psub in & shard . patterns {
if glob_match ( & psub . pattern, channel ) {
if psub . tx . send ( PubSubMessage :: PMessage { ... }) {
live_count += 1 ;
}
}
}
live_count
}
Lazy cleanup avoids write locks on the publish path:
Publish takes only a read lock (parallel publishes to different shards)
Dead subscribers accumulate temporarily
Cleanup happens on next subscribe/unsubscribe (requires write lock anyway)
Trade-off: Small memory overhead for dead connections until next write operation.
Sharding Strategy
Exact subscriptions : Stored only in the channel’s target shard.
pub fn subscribe ( & self , channel : & [ u8 ], conn_id : u64 , tx : Arc < dyn MessageSink >) {
let idx = self . shard_index ( channel ); // Hash to shard
let mut shard = self . shards[ idx ] . write ();
shard . channels . entry ( channel . to_vec ()) . or_default () . push ( Subscriber { ... });
}
Pattern subscriptions : Replicated to all shards.
pub fn psubscribe ( & self , pattern : & [ u8 ], conn_id : u64 , tx : Arc < dyn MessageSink >) {
for shard_lock in & self . shards {
let mut shard = shard_lock . write ();
shard . patterns . push ( PatternSubscriber { ... });
}
}
Pattern subscriptions have O(P) overhead on every publish, where P = number of pattern subscriptions. Use sparingly on high-volume channels.
Prefer exact subscriptions
Exact subscriptions are O(1) lookup, patterns are O(P) scan: # Fast: hash table lookup
SUBSCRIBE notifications:user:1000
# Slower: linear scan of all patterns
PSUBSCRIBE notifications: *
More specific patterns reduce false matches: # Good: narrow scope
PSUBSCRIBE user.1000. *
# Bad: matches everything
PSUBSCRIBE *
Batch publishes when possible
Use pipelining to reduce round-trips: (
echo "PUBLISH chan1 msg1"
echo "PUBLISH chan2 msg2"
echo "PUBLISH chan3 msg3"
) | redis-cli --pipe
Track active subscriptions to detect leaks: count = pubsub.publish(channel, message)
if count == 0 :
log.warn( f "No subscribers for { channel } " )
Pub/Sub vs CDC
Feature Pub/Sub CDC Delivery Push Pull Ordering Not guaranteed Monotonic sequence History No (live only) Yes (ring buffer) Durability No Configurable Use case Real-time notifications Change streaming Filtering Pattern matching Key patterns
Use Pub/Sub for ephemeral, real-time messaging (chat, notifications).
Use CDC for reliable change streaming (replication, audit logs).
Next Steps
Change Data Capture Durable change streaming with cursors
Client Libraries Connect with Redis clients
Benchmarks See Pub/Sub performance metrics
API Reference Complete SUBSCRIBE/PUBLISH reference