Skip to main content
OmniView leverages Oracle Advanced Queuing (AQ) for reliable, persistent message delivery from PL/SQL processes to the monitoring client. This page explains the queue architecture and how messages flow through the system.

Oracle Advanced Queuing Basics

Oracle AQ is an enterprise messaging system built into the Oracle database that provides:
  • Reliable delivery: Messages persist in database tables
  • Transactional semantics: Enqueue/dequeue operations are ACID-compliant
  • Multi-subscriber support: Multiple consumers can receive the same message
  • Blocking operations: Consumers can wait for messages without polling
Unlike external message brokers (Kafka, RabbitMQ), Oracle AQ keeps messages in database tables, eliminating the need for separate infrastructure.

OmniView Queue Structure

The system uses three core Oracle objects:
domain/queue.go
const (
    QueueName        = "OMNI_TRACER_QUEUE"           // The queue itself
    QueueTableName   = "AQ$OMNI_TRACER_QUEUE"        // Underlying storage table
    QueuePayloadType = "OMNI_TRACER_PAYLOAD_TYPE"    // Message structure type
)

Queue Payload Structure

Messages conform to a structured Oracle object type with JSON payload:
CREATE TYPE OMNI_TRACER_PAYLOAD_TYPE AS OBJECT (
    JSON_DATA CLOB
);
After dequeue, messages are deserialized into:
domain/queue.go
type QueueMessage struct {
    MessageID   string `json:"MESSAGE_ID"`
    ProcessName string `json:"PROCESS_NAME"`
    LogLevel    string `json:"LOG_LEVEL"`
    Payload     string `json:"PAYLOAD"`
    Timestamp   string `json:"TIMESTAMP"`
}

Enqueue: Sending Messages to the Queue

From PL/SQL Application Code

Applications use the OMNI_TRACER_API package to send trace messages:
-- Example: Log an informational message from PL/SQL
BEGIN
    OMNI_TRACER_API.Log_Info(
        p_process_name => 'MY_BATCH_JOB',
        p_payload      => 'Processing 1000 records'
    );
END;
The API package internally enqueues a message to OMNI_TRACER_QUEUE with:
  • Auto-generated message ID
  • Current timestamp
  • Specified log level (INFO, DEBUG, ERROR, etc.)
  • JSON-formatted payload

Multi-Subscriber Broadcasting

When a message is enqueued, Oracle AQ automatically:
  1. Stores the message in AQ$OMNI_TRACER_QUEUE table
  2. Creates separate queue entries for each registered subscriber
  3. Marks messages as READY for consumption
Each subscriber gets an independent copy. If Subscriber A dequeues a message, Subscriber B still has their own copy to consume.

Dequeue: Receiving Messages

Blocking Consumer Pattern

OmniView uses a blocking consumer loop that waits for messages without busy polling:
tracer/tracer_service.go
func (ts *TracerService) blockingConsumerLoop(ctx context.Context, subscriber *domain.Subscriber) {
    const errorDelay = 5 * time.Second
    for {
        // Check for cancellation
        select {
        case <-ctx.Done():
            fmt.Println("Event Listener stopping for subscriber:", subscriber.Name)
            return
        default:
            // Continue to blocking wait
        }

        // Blocking wait — Oracle holds this call until messages arrive or wait time expires
        err := ts.processBatch(subscriber)
        if err != nil {
            log.Printf("failed to dequeue messages for subscriber %s: %v", subscriber.Name, err)
            select {
            case <-time.After(errorDelay):
                continue
            case <-ctx.Done():
                return
            }
        }
    }
}
Key characteristics:
  • The dequeue operation blocks until messages arrive or the wait timeout expires
  • No CPU-intensive polling loop
  • Immediate message delivery when available
  • Configurable wait time per subscriber

Bulk Dequeue Process

Messages are retrieved in batches for efficiency:
tracer/tracer_service.go
func (ts *TracerService) processBatch(subscriber *domain.Subscriber) error {
    // Lock to prevent concurrent dequeues
    ts.processMu.Lock()
    defer ts.processMu.Unlock()

    messages, msgIDs, count, err := ts.db.BulkDequeueTracerMessages(*subscriber)
    if err != nil {
        return err
    }

    if count == 0 {
        return nil // No messages available
    }

    for i := 0; i < count; i++ {
        var msg domain.QueueMessage
        if err := json.Unmarshal([]byte(messages[i]), &msg); err != nil {
            log.Printf("failed to unmarshal message ID %s: %v", msgIDs[i], err)
            continue
        }
        ts.handleTracerMessage(msg)
    }
    return nil
}

Low-Level Dequeue Implementation (C/CGO)

For performance, bulk dequeue uses CGO to call ODPI-C directly:
dequeue_ops.c
int DequeueManyAndExtract(
    dpiConn* conn, 
    dpiContext* context, 
    const char* subscriberName, 
    uint32_t batchSize, 
    int32_t waitTime,      // -1 = wait forever, 0 = no wait, >0 = seconds
    TraceMessage** outMessages, 
    TraceId** outIds, 
    uint32_t* actualCount
) {
    // 1. Execute PL/SQL procedure with wait time
    const char* sql = "BEGIN OMNI_TRACER_API.Dequeue_Array_Events(:1, :2, :3, :4, :5, :6); END;";
    
    // 2. Bind parameters:
    //    :1 = subscriber name
    //    :2 = batch size
    //    :3 = wait time (seconds)
    //    :4 = output payload collection
    //    :5 = output raw ID collection
    //    :6 = output count
    
    // 3. Extract LOB content from each message
    // 4. Return array of messages and IDs
}

Wait Time Behavior

oracle/queue.go
messages, msgIDs, count, err := oa.BulkDequeueTracerMessages(domain.Subscriber{
    Name:      "SUB_ABC123",
    BatchSize: 1000,          // Max messages per call
    WaitTime:  5,             // Block for 5 seconds if queue empty
})
WaitTimeBehavior
-1Wait indefinitely until messages arrive
0Return immediately (non-blocking check)
> 0Block for N seconds, then return empty if no messages
OmniView uses WaitTime: 5 by default, balancing responsiveness with resource efficiency.

Message Flow Diagram

┌─────────────────────────────────────────────────────────────────┐
│                      Oracle Database                             │
│                                                                  │
│  ┌──────────────────┐                                           │
│  │  PL/SQL Process  │                                           │
│  │  (Application)   │                                           │
│  └────────┬─────────┘                                           │
│           │                                                      │
│           │ OMNI_TRACER_API.Log_Info()                          │
│           ▼                                                      │
│  ┌───────────────────────────────────────┐                     │
│  │      OMNI_TRACER_QUEUE                │                     │
│  │  (Oracle Advanced Queue)               │                     │
│  └───────────┬───────────────────────────┘                     │
│              │                                                   │
│              │ Broadcast to all subscribers                     │
│              ▼                                                   │
│  ┌─────────────────────────────────────────────────────────┐  │
│  │     AQ$OMNI_TRACER_QUEUE (Queue Table)                   │  │
│  ├──────────────────┬──────────────────┬───────────────────┤  │
│  │  SUB_A1B2...     │  SUB_C3D4...     │  SUB_E5F6...      │  │
│  │  READY: 15 msgs  │  READY: 15 msgs  │  READY: 15 msgs   │  │
│  └──────────┬───────┴──────────┬───────┴──────────┬────────┘  │
│             │                  │                  │            │
└─────────────┼──────────────────┼──────────────────┼────────────┘
              │                  │                  │
              │ Blocking         │ Blocking         │ Blocking
              │ Dequeue          │ Dequeue          │ Dequeue
              ▼                  ▼                  ▼
      ┌───────────────┐  ┌───────────────┐  ┌───────────────┐
      │  OmniView     │  │  OmniView     │  │  OmniView     │
      │  Instance 1   │  │  Instance 2   │  │  Instance 3   │
      └───────────────┘  └───────────────┘  └───────────────┘

Error Handling

Timeout Handling

When no messages arrive within the wait period:
dequeue_ops.c
if (errorInfo.code == 25228) {  // ORA-25228: Timeout or end of fetch
    *outCount = 0;
    result = 0;  // Success with zero messages
    goto cleanup;
}
The Go code treats this as a normal empty result, not an error.

Message Deserialization Failures

tracer/tracer_service.go
for i := 0; i < count; i++ {
    var msg domain.QueueMessage
    if err := json.Unmarshal([]byte(messages[i]), &msg); err != nil {
        log.Printf("failed to unmarshal message ID %s: %v", msgIDs[i], err)
        continue  // Skip invalid message, process rest
    }
    ts.handleTracerMessage(msg)
}
Invalid messages are logged but don’t stop batch processing.

Performance Considerations

Batch Size Tuning

subscribers/subscriber_service.go
subscriber := domain.Subscriber{
    Name:      subscriberName,
    BatchSize: 1000,  // Retrieve up to 1000 messages per call
    WaitTime:  5,
}
Larger batch sizes reduce round-trips but increase memory usage.

Concurrency Control

tracer/tracer_service.go
type TracerService struct {
    db        ports.DatabaseRepository
    bolt      ports.ConfigRepository
    processMu sync.Mutex  // Prevents concurrent dequeue operations
}
The mutex ensures only one dequeue happens at a time per service instance.

How OMNI_TRACER_API Works

The PL/SQL package provides a high-level API:
-- Typical usage in application code
BEGIN
    -- Info level
    OMNI_TRACER_API.Log_Info('ORDER_PROCESSOR', 'Starting order batch');
    
    -- Debug level
    OMNI_TRACER_API.Log_Debug('PAYMENT_SERVICE', 'Validating card: ' || v_card_number);
    
    -- Error level
    OMNI_TRACER_API.Log_Error('INVENTORY_CHECK', 'Out of stock: SKU=' || v_sku);
END;
Each call:
  1. Constructs a OMNI_TRACER_PAYLOAD_TYPE object
  2. Populates JSON with message details
  3. Enqueues to OMNI_TRACER_QUEUE
  4. Returns immediately (non-blocking for caller)

Next Steps

Subscriber Model

Learn how OmniView registers and manages subscribers

Architecture

Understand the overall system design

Build docs developers (and LLMs) love