Skip to main content
Kora’s Pub/Sub system provides real-time message delivery using a sharded broker architecture. Channels are distributed across independent shards for parallel publishing, while pattern subscriptions are replicated for universal matching.

Overview

From kora-pubsub/src/lib.rs:1-15:
Sharded Pub/Sub broker for Kōra. Provides a multi-threaded Pub/Sub message broker with per-channel sharding. Channels are hashed to independent shards so that publishes to different channels proceed in parallel without contention. Pattern subscriptions (glob-style) are replicated across all shards, since any publish could potentially match.

Features

  • Sharded channels for parallel message delivery
  • Glob patterns (*, ?) for flexible subscriptions
  • Push-mode delivery to connected clients
  • Dead subscriber cleanup for automatic resource management
  • Thread-safe broker with lock-free publish on most paths

Architecture

From kora-pubsub/src/broker.rs:59-90:
pub struct PubSubBroker {
    shards: Vec<RwLock<ShardSubscriptions>>,
    shard_mask: usize,
    hash_builder: ahash::RandomState,
}

fn shard_index(&self, channel: &[u8]) -> usize {
    let hash = self.hash_builder.hash_one(channel);
    (hash as usize) & self.shard_mask
}
1

Channel Hashing

Each channel is hashed to a specific shard using AHash. Multiple channels distribute across shards for parallelism.
2

Subscription Storage

Exact subscriptions live in the target shard. Pattern subscriptions are replicated to all shards since any publish could match.
3

Message Delivery

PUBLISH takes a read lock on the target shard and iterates subscribers. Dead connections are cleaned up lazily.

Commands

Subscribe to Channels

# Subscribe to exact channel
SUBSCRIBE news
# => 1: ["subscribe", "news", 1]

# Subscribe to multiple channels
SUBSCRIBE news alerts updates
# => 1: ["subscribe", "news", 1]
# => 2: ["subscribe", "alerts", 2]
# => 3: ["subscribe", "updates", 3]
Response format:
["subscribe", "<channel>", <total_subscriptions>]

Subscribe to Patterns

# Subscribe to all channels matching pattern
PSUBSCRIBE news.*
# => ["psubscribe", "news.*", 1]

# Multiple patterns
PSUBSCRIBE user.* session.* cache.*
Pattern matching uses glob-style wildcards:
  • * matches zero or more characters
  • ? matches exactly one character
Examples:
  • user.* matches user.1000, user.login, etc.
  • session.??? matches session.abc, session.123, but not session.a

Publish Messages

# Publish to a channel
PUBLISH news "Breaking: Kora v1.0 released!"
# => 42 (number of subscribers that received the message)

# Binary-safe payload
PUBLISH events "\x00\x01\x02\x03"
# => 5
Returns: Number of subscribers that received the message (exact + pattern matches).

Unsubscribe

# Unsubscribe from specific channels
UNSUBSCRIBE news alerts
# => 1: ["unsubscribe", "news", 1]
# => 2: ["unsubscribe", "alerts", 0]

# Unsubscribe from all channels
UNSUBSCRIBE
# => ["unsubscribe", null, 0]

# Unsubscribe from patterns
PUNSUBSCRIBE user.* session.*
# => 1: ["punsubscribe", "user.*", 1]
# => 2: ["punsubscribe", "session.*", 0]

Message Delivery

Exact Subscriptions

# Client 1
SUBSCRIBE notifications

# Client 2 publishes
PUBLISH notifications "New comment on your post"
# => 1

# Client 1 receives:
# ["message", "notifications", "New comment on your post"]
Message format (exact):
["message", "<channel>", "<payload>"]

Pattern Subscriptions

# Client 1
PSUBSCRIBE user.*

# Client 2 publishes
PUBLISH user.1000 "Profile updated"
# => 1

# Client 1 receives:
# ["pmessage", "user.*", "user.1000", "Profile updated"]
Message format (pattern):
["pmessage", "<pattern>", "<channel>", "<payload>"]

Combined Subscriptions

# Client subscribes to both exact and pattern
SUBSCRIBE alerts
PSUBSCRIBE alerts.*

# Publisher sends to exact match
PUBLISH alerts "Server down!"
# => 1 (client receives once via exact subscription)

# Publisher sends to pattern match
PUBLISH alerts.critical "Database offline!"
# => 1 (client receives once via pattern subscription)
If a client has both an exact subscription and a matching pattern, they receive the message twice (once for each subscription type).

Use Cases

Real-Time Notifications

Push updates to web/mobile clients.
SUBSCRIBE notifications:user:1000
# Client receives instant updates

Chat Rooms

Broadcast messages to room subscribers.
SUBSCRIBE chat:room:general
PUBLISH chat:room:general "Hello everyone!"

Cache Invalidation

Notify all app servers when cache keys change.
PSUBSCRIBE cache:invalidate:*
PUBLISH cache:invalidate:user:1000 ""

Microservice Events

Broadcast domain events across services.
PSUBSCRIBE events:order.*
PUBLISH events:order.created "{...json...}"

Pattern Matching

From kora-pubsub/src/glob.rs, the pattern matcher supports:

Wildcards

Matches zero or more characters.
Pattern: user.*
Matches:
  - user.alice
  - user.bob
  - user.1000
  - user.
Does not match:
  - user
  - useralice

Pattern Examples

# Match all user channels
PSUBSCRIBE user.*
# Matches: user.alice, user.1000, user.login

# Match specific format
PSUBSCRIBE session.????-????-????
# Matches: session.abcd-1234-xyz9

# Match nested namespaces
PSUBSCRIBE app:*:events
# Matches: app:user:events, app:order:events

# Combine wildcards
PSUBSCRIBE log.*.*
# Matches: log.info.app, log.error.db, log.warn.cache

Implementation Details

Subscriber Interface

From kora-pubsub/src/broker.rs:26-32:
pub trait MessageSink: Send + Sync {
    /// Attempt to deliver msg to the subscriber.
    /// Returns true if accepted, false if subscriber is dead.
    fn send(&self, msg: PubSubMessage) -> bool;
}
Clients implement MessageSink to receive messages. Dead subscribers (those returning false) are cleaned up on the next publish.

Dead Subscriber Cleanup

From kora-pubsub/src/broker.rs:150-180:
pub fn publish(&self, channel: &[u8], message: &[u8]) -> usize {
    let idx = self.shard_index(channel);
    let shard = self.shards[idx].read();
    
    let mut live_count = 0;
    
    // Exact subscribers
    if let Some(subs) = shard.channels.get(channel) {
        for sub in subs {
            if sub.tx.send(PubSubMessage::Message { ... }) {
                live_count += 1;
            }
            // Dead subscribers cleaned up lazily
        }
    }
    
    // Pattern subscribers
    for psub in &shard.patterns {
        if glob_match(&psub.pattern, channel) {
            if psub.tx.send(PubSubMessage::PMessage { ... }) {
                live_count += 1;
            }
        }
    }
    
    live_count
}
Lazy cleanup avoids write locks on the publish path:
  • Publish takes only a read lock (parallel publishes to different shards)
  • Dead subscribers accumulate temporarily
  • Cleanup happens on next subscribe/unsubscribe (requires write lock anyway)
Trade-off: Small memory overhead for dead connections until next write operation.

Sharding Strategy

Exact subscriptions: Stored only in the channel’s target shard.
pub fn subscribe(&self, channel: &[u8], conn_id: u64, tx: Arc<dyn MessageSink>) {
    let idx = self.shard_index(channel);  // Hash to shard
    let mut shard = self.shards[idx].write();
    shard.channels.entry(channel.to_vec()).or_default().push(Subscriber { ... });
}
Pattern subscriptions: Replicated to all shards.
pub fn psubscribe(&self, pattern: &[u8], conn_id: u64, tx: Arc<dyn MessageSink>) {
    for shard_lock in &self.shards {
        let mut shard = shard_lock.write();
        shard.patterns.push(PatternSubscriber { ... });
    }
}
Pattern subscriptions have O(P) overhead on every publish, where P = number of pattern subscriptions. Use sparingly on high-volume channels.

Performance Tips

Exact subscriptions are O(1) lookup, patterns are O(P) scan:
# Fast: hash table lookup
SUBSCRIBE notifications:user:1000

# Slower: linear scan of all patterns
PSUBSCRIBE notifications:*
More specific patterns reduce false matches:
# Good: narrow scope
PSUBSCRIBE user.1000.*

# Bad: matches everything
PSUBSCRIBE *
Use pipelining to reduce round-trips:
(
  echo "PUBLISH chan1 msg1"
  echo "PUBLISH chan2 msg2"
  echo "PUBLISH chan3 msg3"
) | redis-cli --pipe
Track active subscriptions to detect leaks:
count = pubsub.publish(channel, message)
if count == 0:
    log.warn(f"No subscribers for {channel}")

Pub/Sub vs CDC

FeaturePub/SubCDC
DeliveryPushPull
OrderingNot guaranteedMonotonic sequence
HistoryNo (live only)Yes (ring buffer)
DurabilityNoConfigurable
Use caseReal-time notificationsChange streaming
FilteringPattern matchingKey patterns
Use Pub/Sub for ephemeral, real-time messaging (chat, notifications).
Use CDC for reliable change streaming (replication, audit logs).

Next Steps

Change Data Capture

Durable change streaming with cursors

Client Libraries

Connect with Redis clients

Benchmarks

See Pub/Sub performance metrics

API Reference

Complete SUBSCRIBE/PUBLISH reference

Build docs developers (and LLMs) love