Skip to main content

Overview

OmniView uses Oracle Advanced Queuing (AQ) subscriber mechanisms to receive trace messages from the database. Each OmniView instance registers as a unique subscriber to ensure reliable message delivery.

Register_Subscriber

Registers a new subscriber to the OMNI_TRACER_QUEUE, allowing it to receive trace messages.

Procedure Signature

Omni_Tracer.sql:177-200
PROCEDURE Register_Subscriber(subscriber_name_ IN VARCHAR2) 
IS
    PRAGMA AUTONOMOUS_TRANSACTION;
    sub_ SYS.AQ$_AGENT;
BEGIN
    IF subscriber_name_ IS NULL THEN
        RAISE_APPLICATION_ERROR(-20001, 'Subscriber name cannot be NULL or empty');
    END IF;

    sub_ := SYS.AQ$_AGENT(subscriber_name_, NULL, NULL);
    DBMS_AQADM.ADD_SUBSCRIBER (
        queue_name      => TRACER_QUEUE_NAME,
        subscriber      => sub_
    );
    COMMIT;
EXCEPTION
WHEN OTHERS THEN
    IF SQLCODE = -24034 THEN -- Subscriber already exists
        COMMIT; -- Must commit autonomous transaction even on expected exception
    ELSE
        ROLLBACK;
        RAISE;
    END IF;
END Register_Subscriber;

Parameters

subscriber_name_
VARCHAR2
required
Unique name for the subscriber. OmniView generates names in the format SUB_<UUID> (e.g., SUB_A3F2E8C1_9D4B_4F7A_B2E6_1C8D5A9F3E7B).

Usage from PL/SQL

BEGIN
    OMNI_TRACER_API.Register_Subscriber('MY_SUBSCRIBER_NAME');
END;
/

Error Handling

Raised when subscriber_name_ is NULL or empty.
Omni_Tracer.sql:182-184
IF subscriber_name_ IS NULL THEN
    RAISE_APPLICATION_ERROR(-20001, 'Subscriber name cannot be NULL or empty');
END IF;
Silently succeeds if the subscriber is already registered (idempotent operation).
Omni_Tracer.sql:194-195
IF SQLCODE = -24034 THEN -- Subscriber already exists
    COMMIT;
Rolls back the autonomous transaction and re-raises the exception.
The procedure uses PRAGMA AUTONOMOUS_TRANSACTION to ensure subscriber registration is committed independently of the calling transaction.

Dequeue_Array_Events

Bulk dequeues messages from the queue for a specific subscriber. This is the primary method OmniView uses to retrieve trace messages.

Procedure Signature

Omni_Tracer.sql:273-320
PROCEDURE Dequeue_Array_Events(
    subscriber_name_ IN  VARCHAR2,
    batch_size_      IN  INTEGER,
    wait_time_       IN  NUMBER DEFAULT DBMS_AQ.NO_WAIT,
    messages_        OUT OMNI_TRACER_PAYLOAD_ARRAY,
    message_ids_     OUT OMNI_TRACER_RAW_ARRAY,
    msg_count_       OUT INTEGER)
IS
    dequeue_options_     DBMS_AQ.DEQUEUE_OPTIONS_T;
    message_props_array_ DBMS_AQ.MESSAGE_PROPERTIES_ARRAY_T;
    payload_array_       OMNI_TRACER_PAYLOAD_ARRAY;
    msg_id_array_        DBMS_AQ.MSGID_ARRAY_T;
BEGIN
    messages_ := OMNI_TRACER_PAYLOAD_ARRAY();
    message_ids_ := OMNI_TRACER_RAW_ARRAY();
    msg_count_ := 0;

    dequeue_options_.consumer_name := subscriber_name_;
    dequeue_options_.wait          := wait_time_;
    dequeue_options_.navigation    := DBMS_AQ.FIRST_MESSAGE;
    dequeue_options_.visibility    := DBMS_AQ.IMMEDIATE;

    msg_count_ := DBMS_AQ.DEQUEUE_ARRAY(
        queue_name                => TRACER_QUEUE_NAME,
        dequeue_options           => dequeue_options_,
        array_size                => batch_size_,
        message_properties_array  => message_props_array_,
        payload_array             => payload_array_,
        msgid_array               => msg_id_array_
    );
    
    messages_ := payload_array_;

    FOR i_ IN 1 .. msg_count_ LOOP
        message_ids_.EXTEND;
        message_ids_(i_) := msg_id_array_(i_);
    END LOOP;
EXCEPTION
    WHEN OTHERS THEN
        IF SQLCODE = -25228 THEN
            -- No messages available
            messages_ := OMNI_TRACER_PAYLOAD_ARRAY();
            message_ids_ := OMNI_TRACER_RAW_ARRAY();
            msg_count_ := 0;
        ELSE
            RAISE;
        END IF;
END Dequeue_Array_Events;

Parameters

Input Parameters

subscriber_name_
VARCHAR2
required
Name of the subscriber to dequeue messages for. Must match a previously registered subscriber.
batch_size_
INTEGER
required
Maximum number of messages to dequeue in a single call. OmniView typically uses 1000.
wait_time_
NUMBER
default:"DBMS_AQ.NO_WAIT"
Time in seconds to wait for messages. Use:
  • DBMS_AQ.NO_WAIT (0): Return immediately if no messages
  • DBMS_AQ.FOREVER (-1): Wait indefinitely
  • Positive number: Wait specified seconds

Output Parameters

messages_
OMNI_TRACER_PAYLOAD_ARRAY
Array of message payload objects. Each contains a BLOB with JSON data.
message_ids_
OMNI_TRACER_RAW_ARRAY
Array of RAW(16) message IDs corresponding to the dequeued messages.
msg_count_
INTEGER
Actual number of messages dequeued (may be less than batch_size_).

Dequeue Options

The procedure configures the following dequeue options:
Omni_Tracer.sql:290-293
dequeue_options_.consumer_name := subscriber_name_;
dequeue_options_.wait          := wait_time_;
dequeue_options_.navigation    := DBMS_AQ.FIRST_MESSAGE;
dequeue_options_.visibility    := DBMS_AQ.IMMEDIATE;
consumer_name
VARCHAR2
Specifies which subscriber’s messages to dequeue
wait
NUMBER
Wait time for messages (configurable via parameter)
navigation
NUMBER
DBMS_AQ.FIRST_MESSAGE - Start from the first available message
visibility
NUMBER
DBMS_AQ.IMMEDIATE - Remove messages from queue immediately

Error Handling

Returns empty arrays with count 0 when no messages are available.
Omni_Tracer.sql:311-316
IF SQLCODE = -25228 THEN
    -- No messages available
    messages_ := OMNI_TRACER_PAYLOAD_ARRAY();
    message_ids_ := OMNI_TRACER_RAW_ARRAY();
    msg_count_ := 0;
Re-raises any other exceptions for the caller to handle.

Subscriber Registration Flow

OmniView follows this process to register and manage subscribers:
1

Generate subscriber name

OmniView generates a unique subscriber name using UUID v4.
subscriber_service.go:76-84
func generateSubscriberName() string {
    // UUID V4 Generation
    uuidWithHyphen := uuid.New()
    // Format the UUID as a named subscriber identifier
    // Replace - with _ to comply with Oracle naming conventions
    // Add a prefix for clarity : SUB_
    subscriberName := "SUB_" + strings.ToUpper(strings.ReplaceAll(uuidWithHyphen.String(), "-", "_"))
    return subscriberName
}
Example: SUB_A3F2E8C1_9D4B_4F7A_B2E6_1C8D5A9F3E7B
2

Store subscriber locally

The subscriber information is stored in OmniView’s BoltDB configuration.
subscriber_service.go:38-50
func (ss *SubscriberService) NewSubscriber() (domain.Subscriber, error) {
    subscriberName := generateSubscriberName()
    subscriber := domain.Subscriber{
        Name:      subscriberName,
        BatchSize: 1000,
        WaitTime:  5,
    }
    if err := ss.SetSubscriber(subscriber); err != nil {
        return domain.Subscriber{}, err
    }
    return subscriber, nil
}
3

Register in Oracle

OmniView calls the Register_Subscriber procedure via PL/SQL.
subscriptions.go:18-20
err := oa.ExecuteWithParams("BEGIN OMNI_TRACER_API.Register_Subscriber(:subscriberName); END;", map[string]interface{}{
    "subscriberName": subscriber.Name,
})
4

Verify registration

OmniView checks if the subscriber exists in ALL_QUEUE_SUBSCRIBERS.
subscriptions.go:31-35
query := `SELECT COUNT(1)
          FROM ALL_QUEUE_SUBSCRIBERS
          WHERE QUEUE_NAME = :queueName
          AND CONSUMER_NAME = :subscriberName
          AND OWNER = :queueOwner`

Go Integration

OmniView’s Go code interacts with the subscriber procedures through the Oracle adapter.

Subscriber Domain Object

subscriber.go:6-10
type Subscriber struct {
    Name      string
    BatchSize int
    WaitTime  int
}
Name
string
Unique subscriber name (e.g., SUB_A3F2E8C1_9D4B_4F7A_B2E6_1C8D5A9F3E7B)
BatchSize
int
Number of messages to dequeue per batch (default: 1000)
WaitTime
int
Seconds to wait for messages (default: 5)

Bulk Dequeue Implementation

OmniView uses C bindings to efficiently dequeue and process messages:
queue.go:45-96
func (oa *OracleAdapter) BulkDequeueTracerMessages(subscriber domain.Subscriber) ([]string, [][]byte, int, error) {
    if oa.Connection == nil {
        return nil, nil, 0, fmt.Errorf("database connection is not established")
    }

    if subscriber.BatchSize <= 0 {
        return nil, nil, 0, fmt.Errorf("batch size must be > 0")
    }

    var cMessages *C.TraceMessage
    var cIds *C.TraceId
    var cCount C.uint32_t

    cSubscriberName := C.CString(subscriber.Name)
    defer C.free(unsafe.Pointer(cSubscriberName))

    if C.DequeueManyAndExtract(oa.Connection, oa.Context, cSubscriberName, C.uint32_t(subscriber.BatchSize), C.int32_t(subscriber.WaitTime), &cMessages, &cIds, &cCount) != C.DPI_SUCCESS {
        var errInfo C.dpiErrorInfo
        C.dpiContext_getError(oa.Context, &errInfo)

        if errInfo.code == 25228 { // DPI-25228: No messages available
            return []string{}, [][]byte{}, 0, nil
        }

        return nil, nil, 0, fmt.Errorf("failed to dequeue messages: %s (code: %d)", C.GoString(errInfo.message), errInfo.code)
    }
    count := int(cCount)

    if count == 0 {
        return []string{}, [][]byte{}, 0, nil
    }

    defer C.FreeDequeueResults(cMessages, cIds, cCount)

    messages := make([]string, count)
    msgIds := make([][]byte, count)

    for i := 0; i < count; i++ {
        msg := (*C.TraceMessage)(unsafe.Pointer(uintptr(unsafe.Pointer(cMessages)) + uintptr(i)*unsafe.Sizeof(*cMessages)))
        id := (*C.TraceId)(unsafe.Pointer(uintptr(unsafe.Pointer(cIds)) + uintptr(i)*unsafe.Sizeof(*cIds)))

        if msg.data != nil && msg.length > 0 {
            messages[i] = C.GoStringN(msg.data, C.int(msg.length))
        }

        if id.data != nil && id.length > 0 {
            msgIds[i] = C.GoBytes(unsafe.Pointer(id.data), C.int(id.length))
        }
    }

    return messages, msgIds, count, nil
}
OmniView uses custom C bindings (DequeueManyAndExtract) for performance optimization when dequeuing large batches of messages.

Event Listener Loop

OmniView runs a blocking consumer loop for each subscriber:
tracer_service.go:46-70
func (ts *TracerService) blockingConsumerLoop(ctx context.Context, subscriber *domain.Subscriber) {
    const errorDelay = 5 * time.Second
    for {
        // Check if context is cancelled before blocking
        select {
        case <-ctx.Done():
            fmt.Println("Event Listener stopping for subscriber:", subscriber.Name)
            return
        default:
            // Continue to blocking wait
        }

        // Blocking wait — Oracle holds this call until messages arrive or wait time expires
        err := ts.processBatch(subscriber)
        if err != nil {
            log.Printf("failed to dequeue messages for subscriber %s: %v", subscriber.Name, err)
            select {
            case <-time.After(errorDelay):
                continue
            case <-ctx.Done():
                return
            }
        }
    }
}
The loop uses the subscriber’s configured WaitTime (default 5 seconds) to block until messages are available or the timeout expires.

Queue Depth Monitoring

OmniView can check the number of pending messages for a subscriber:
queue.go:18-43
func (oa *OracleAdapter) CheckQueueDepth(subscriberID string, queueTableName string) (int, error) {
    query := fmt.Sprintf(`SELECT COUNT(*) 
              FROM %s
              WHERE QUEUE = :queueName
              AND CONSUMER_NAME = :subscriberID
              AND MSG_STATE = 'READY'`, queueTableName)

    results, err := oa.FetchWithParams(query, map[string]interface{}{
        "queueName":    domain.QueueName,
        "subscriberID": subscriberID,
    })
    if err != nil {
        return 0, err
    }
    if len(results) == 0 {
        return 0, fmt.Errorf("no results returned from queue depth query")
    }

    count, err := parseCountResult(results)
    if err != nil {
        return 0, fmt.Errorf("failed to parse count result: %v", err)
    }

    return count, nil
}
subscriberID
string
Subscriber name to check queue depth for
queueTableName
string
Name of the queue table (typically generated by Oracle AQ)
Returns the count of messages in READY state for the specified subscriber.

Performance Considerations

Batch Size

The default batch size of 1000 messages balances memory usage and throughput:
  • Smaller batches (100-500): Lower memory usage, more database round trips
  • Larger batches (1000-5000): Better throughput, higher memory usage
Adjust BatchSize in the Subscriber configuration based on your message volume and available memory.

Wait Time

The default wait time of 5 seconds prevents excessive polling:
  • Shorter wait (1-2s): More responsive, higher CPU usage
  • Longer wait (10-30s): Lower CPU usage, delayed message processing
Using DBMS_AQ.FOREVER (-1) is not recommended as it can make the application unresponsive to shutdown signals.

Sharded Queue Benefits

The sharded queue architecture enables:
  • Multiple subscribers can dequeue in parallel without contention
  • Messages are distributed across 4 shards for load balancing
  • High-throughput scenarios benefit from parallel processing

Troubleshooting

Possible causes:
  • Subscriber not registered: Check ALL_QUEUE_SUBSCRIBERS
  • Queue not started: Verify queue status in USER_QUEUES
  • Wrong subscriber name: Ensure exact match including case
Solution:
-- Check if subscriber exists
SELECT * FROM ALL_QUEUE_SUBSCRIBERS 
WHERE QUEUE_NAME = 'OMNI_TRACER_QUEUE';

-- Check queue status
SELECT name, enqueue_enabled, dequeue_enabled 
FROM USER_QUEUES 
WHERE name = 'OMNI_TRACER_QUEUE';
This is normal behavior when no messages are available within the wait time.Solution: The Go code handles this gracefully and returns empty results.
Large batch sizes can consume significant memory, especially with large message payloads.Solution: Reduce the BatchSize in the subscriber configuration.
Subscriber may not be processing messages fast enough.Solution:
  • Check CheckQueueDepth() to monitor queue depth
  • Increase subscriber BatchSize
  • Add more subscriber instances (different subscriber names)

OMNI_TRACER_API Package

Overview of the complete package

Trace_Message

Learn how to send trace messages

Build docs developers (and LLMs) love