Skip to main content
OmniView uses Oracle AQ subscribers to enable multiple independent consumers to receive trace messages. Each OmniView instance registers as a unique subscriber, allowing distributed monitoring without message loss.

What is a Subscriber?

In Oracle Advanced Queuing, a subscriber is a named consumer registered to receive messages from a queue. When a message is enqueued, Oracle automatically creates copies for each subscriber.

Key Properties

domain/subscriber.go
type Subscriber struct {
    Name      string  // Unique identifier (e.g., "SUB_A1B2C3D4_E5F6_...")
    BatchSize int     // Max messages to retrieve per dequeue
    WaitTime  int     // Seconds to block waiting for messages
}
  • Name: Unique identifier for this consumer
  • BatchSize: Performance tuning for bulk operations (default: 1000)
  • WaitTime: How long to block on empty queue (default: 5 seconds)
Each subscriber maintains independent message state. If one subscriber crashes, others continue receiving messages without interruption.

Subscriber Registration Flow

1. Check for Existing Subscriber

On startup, OmniView checks if a subscriber already exists in local storage:
subscribers/subscriber_service.go
func (ss *SubscriberService) RegisterSubscriber() (domain.Subscriber, error) {
    subscriber, err := ss.GetSubscriber()
    if err != nil {
        if !errors.Is(err, domain.ErrSubscriberNotFound) {
            return domain.Subscriber{}, err
        }
        // Not found - create new subscriber
        newSubscriber, err := ss.NewSubscriber()
        if err != nil {
            return domain.Subscriber{}, err
        }
        subscriber = &newSubscriber
    }
    // Register in Oracle
    if err := ss.db.RegisterNewSubscriber(*subscriber); err != nil {
        return domain.Subscriber{}, err
    }
    return *subscriber, nil
}
First Run: Generates a new subscriber and stores it in BoltDB
Subsequent Runs: Retrieves existing subscriber from BoltDB

2. Generate Unique Subscriber Name

New subscribers get UUID-based names:
subscribers/subscriber_service.go
func generateSubscriberName() string {
    // UUID V4 Generation
    uuidWithHyphen := uuid.New()
    // Format: SUB_A1B2C3D4_E5F6_7890_ABCD_EF1234567890
    subscriberName := "SUB_" + strings.ToUpper(strings.ReplaceAll(uuidWithHyphen.String(), "-", "_"))
    return subscriberName
}
Example output: SUB_F47AC10B_58CC_4372_A567_0E02B2C3D479
Hyphens are replaced with underscores to comply with Oracle identifier naming rules.

3. Register with Oracle AQ

The adapter calls the PL/SQL API to register the subscriber:
oracle/subscriptions.go
func (oa *OracleAdapter) RegisterNewSubscriber(subscriber domain.Subscriber) error {
    exists, err := subscriberExists(oa, subscriber)
    if err != nil {
        return fmt.Errorf("failed to check subscriber existence: %v", err)
    }
    if !exists {
        // Call PL/SQL procedure
        err := oa.ExecuteWithParams(
            "BEGIN OMNI_TRACER_API.Register_Subscriber(:subscriberName); END;",
            map[string]interface{}{"subscriberName": subscriber.Name},
        )
        if err != nil {
            return fmt.Errorf("failed to register subscriber: %v", err)
        }
    }
    return nil
}

4. Verify Subscriber Exists

Before registration, the system checks the ALL_QUEUE_SUBSCRIBERS view:
oracle/subscriptions.go
func subscriberExists(oa *OracleAdapter, subscriber domain.Subscriber) (bool, error) {
    query := `SELECT COUNT(1)
              FROM ALL_QUEUE_SUBSCRIBERS
              WHERE QUEUE_NAME = :queueName
              AND CONSUMER_NAME = :subscriberName
              AND OWNER = :queueOwner`
    results, err := oa.FetchWithParams(query, map[string]interface{}{
        "queueName":      domain.QueueName,
        "subscriberName": subscriber.Name,
        "queueOwner":     oa.config.Username,
    })
    // ... parse count result
    return count > 0, nil
}
This prevents duplicate registrations and handles restarts gracefully.

Subscriber Lifecycle

┌──────────────────────────────────────────────────────────────┐
│                    Application Startup                        │
└───────────────────────────┬──────────────────────────────────┘


                ┌───────────────────────┐
                │ Check BoltDB for      │
                │ existing subscriber   │
                └───────┬───────────────┘

           ┌────────────┴───────────┐
           │                        │
           ▼                        ▼
    ┌─────────────┐         ┌─────────────────┐
    │   Found     │         │   Not Found     │
    └──────┬──────┘         └────────┬────────┘
           │                         │
           │                         ▼
           │              ┌──────────────────────┐
           │              │ Generate UUID-based  │
           │              │ subscriber name      │
           │              └──────────┬───────────┘
           │                         │
           │                         ▼
           │              ┌──────────────────────┐
           │              │ Store in BoltDB      │
           │              └──────────┬───────────┘
           │                         │
           └─────────┬───────────────┘


          ┌─────────────────────────┐
          │ Check if subscriber     │
          │ exists in Oracle        │
          └──────────┬──────────────┘

        ┌────────────┴──────────────┐
        │                           │
        ▼                           ▼
 ┌────────────┐            ┌───────────────────┐
 │   Exists   │            │   Does Not Exist  │
 └─────┬──────┘            └─────────┬─────────┘
       │                             │
       │                             ▼
       │                  ┌────────────────────────────┐
       │                  │ Call OMNI_TRACER_API       │
       │                  │ .Register_Subscriber()     │
       │                  └──────────┬─────────────────┘
       │                             │
       └──────────┬──────────────────┘


    ┌──────────────────────────────┐
    │ Subscriber ready for         │
    │ message consumption          │
    └──────────────────────────────┘

Multi-Subscriber Support

Multiple OmniView instances can monitor the same database simultaneously:

Scenario: Three Monitoring Instances

Oracle Database (PROD_DB)
└── OMNI_TRACER_QUEUE
    ├── SUB_A1B2C3D4... (Instance 1 - Dev Team)
    ├── SUB_E5F6G7H8... (Instance 2 - Ops Team)
    └── SUB_I9J0K1L2... (Instance 3 - Support Team)
When PL/SQL enqueues a message:
  1. Oracle creates 3 independent copies in AQ$OMNI_TRACER_QUEUE
  2. Each subscriber sees the message as READY
  3. Each instance dequeues its own copy independently
  4. No coordination required between instances

Message Independence

// Instance 1 dequeues
messages1, _, count1, _ := db.BulkDequeueTracerMessages(subscriber1)
// Returns: 5 messages

// Instance 2 dequeues (same time period)
messages2, _, count2, _ := db.BulkDequeueTracerMessages(subscriber2)
// Returns: 5 messages (same content, different queue entries)
Each subscriber’s queue depth is independent. If Subscriber A has processed 100 messages but Subscriber B just started, Subscriber B will see all messages from the beginning (subject to retention policy).

Subscriber Configuration

Default Settings

subscribers/subscriber_service.go
func (ss *SubscriberService) NewSubscriber() (domain.Subscriber, error) {
    subscriberName := generateSubscriberName()
    subscriber := domain.Subscriber{
        Name:      subscriberName,
        BatchSize: 1000,  // Retrieve up to 1000 messages per call
        WaitTime:  5,     // Block for 5 seconds on empty queue
    }
    if err := ss.SetSubscriber(subscriber); err != nil {
        return domain.Subscriber{}, err
    }
    return subscriber, nil
}

Tuning Batch Size

BatchSizeUse Case
100Low-volume queues, minimal latency
1000Default - balanced performance
5000High-volume queues, bulk processing

Tuning Wait Time

WaitTimeBehavior
0Non-blocking: Return immediately if empty
5Default - good balance between CPU and responsiveness
60Low-priority monitoring, reduce Oracle load
-1Block indefinitely (not recommended)

Message Routing Logic

How Oracle determines which messages a subscriber receives:

1. Enqueue Time

-- PL/SQL application code
BEGIN
    OMNI_TRACER_API.Log_Info('ORDER_SVC', 'Processing order 12345');
END;
Oracle automatically:
  • Inserts message into AQ$OMNI_TRACER_QUEUE
  • Creates rows for all current subscribers
  • Sets MSG_STATE = 'READY' for each subscriber

2. Dequeue Operation

messages, ids, count, err := db.BulkDequeueTracerMessages(subscriber)
Oracle:
  • Finds READY messages for this specific subscriber
  • Returns up to BatchSize messages
  • Marks retrieved messages as PROCESSED
  • Removes processed messages after commit

3. Queue Depth Check

oracle/queue.go
func (oa *OracleAdapter) CheckQueueDepth(subscriberID string, queueTableName string) (int, error) {
    query := fmt.Sprintf(`SELECT COUNT(*) 
              FROM %s
              WHERE QUEUE = :queueName
              AND CONSUMER_NAME = :subscriberID
              AND MSG_STATE = 'READY'`, queueTableName)
    // Execute query and return count
}
Returns how many unprocessed messages are waiting for this subscriber.

Error Handling

Subscriber Already Exists

oracle/subscriptions.go
func (oa *OracleAdapter) RegisterNewSubscriber(subscriber domain.Subscriber) error {
    exists, err := subscriberExists(oa, subscriber)
    if exists {
        return nil  // Idempotent - not an error
    }
    // Register new subscriber
}
Reregistering an existing subscriber is a no-op.

Subscriber Not Found in BoltDB

subscribers/subscriber_service.go
subscriber, err := ss.GetSubscriber()
if errors.Is(err, domain.ErrSubscriberNotFound) {
    // Expected on first run - create new subscriber
    newSubscriber, _ := ss.NewSubscriber()
}
Handled gracefully by generating a new subscriber.

Storage Persistence

BoltDB Schema

omniview.bolt
└── subscribers/
    └── current
        ├── Name: "SUB_F47AC10B_58CC_4372_A567_0E02B2C3D479"
        ├── BatchSize: 1000
        └── WaitTime: 5
The subscriber is persisted locally so:
  • Restarts use the same subscriber (preserves queue position)
  • Multiple runs don’t create duplicate subscribers
  • Subscriber name remains consistent

Oracle Metadata

-- Query registered subscribers
SELECT QUEUE_NAME, CONSUMER_NAME, ADDRESS
FROM ALL_QUEUE_SUBSCRIBERS
WHERE QUEUE_NAME = 'OMNI_TRACER_QUEUE';

-- Result:
-- QUEUE_NAME          CONSUMER_NAME                             ADDRESS
-- OMNI_TRACER_QUEUE   SUB_F47AC10B_58CC_4372_A567_0E02B2C3D479  NULL
-- OMNI_TRACER_QUEUE   SUB_A1B2C3D4_E5F6_7890_ABCD_EF1234567890  NULL

Best Practices

Yes. OmniView automatically reuses the subscriber stored in BoltDB. This ensures:
  • You don’t miss messages sent while OmniView was stopped
  • Queue depth remains consistent
  • No orphaned subscribers accumulate
Create one subscriber per OmniView instance. Each instance automatically generates and persists its own subscriber on first run.Multiple subscribers are useful for:
  • Different teams monitoring the same database
  • Separate dev/test/prod environments
  • Redundant monitoring instances
OmniView will generate a new subscriber with a different name. This means:
  • Old subscriber remains in Oracle but becomes inactive
  • New subscriber starts fresh (may miss recent messages)
  • Old subscriber can be manually removed from Oracle if needed
Currently, subscriber names are auto-generated via UUID. Manual naming would require modifying generateSubscriberName() in subscriber_service.go.The UUID approach ensures global uniqueness and avoids naming conflicts.

Next Steps

Message Queuing

Deep dive into Oracle AQ and dequeue operations

Architecture

Understand how subscribers fit into the overall design

Build docs developers (and LLMs) love