OmniView leverages Oracle Advanced Queuing (AQ) for reliable, persistent message delivery from PL/SQL processes to the monitoring client. This page explains the queue architecture and how messages flow through the system.
Oracle Advanced Queuing Basics
Oracle AQ is an enterprise messaging system built into the Oracle database that provides:
Reliable delivery : Messages persist in database tables
Transactional semantics : Enqueue/dequeue operations are ACID-compliant
Multi-subscriber support : Multiple consumers can receive the same message
Blocking operations : Consumers can wait for messages without polling
Unlike external message brokers (Kafka, RabbitMQ), Oracle AQ keeps messages in database tables, eliminating the need for separate infrastructure.
OmniView Queue Structure
The system uses three core Oracle objects:
const (
QueueName = "OMNI_TRACER_QUEUE" // The queue itself
QueueTableName = "AQ$OMNI_TRACER_QUEUE" // Underlying storage table
QueuePayloadType = "OMNI_TRACER_PAYLOAD_TYPE" // Message structure type
)
Queue Payload Structure
Messages conform to a structured Oracle object type with JSON payload:
CREATE TYPE OMNI_TRACER_PAYLOAD_TYPE AS OBJECT (
JSON_DATA CLOB
);
After dequeue, messages are deserialized into:
type QueueMessage struct {
MessageID string `json:"MESSAGE_ID"`
ProcessName string `json:"PROCESS_NAME"`
LogLevel string `json:"LOG_LEVEL"`
Payload string `json:"PAYLOAD"`
Timestamp string `json:"TIMESTAMP"`
}
Enqueue: Sending Messages to the Queue
From PL/SQL Application Code
Applications use the OMNI_TRACER_API package to send trace messages:
-- Example: Log an informational message from PL/SQL
BEGIN
OMNI_TRACER_API . Log_Info (
p_process_name => 'MY_BATCH_JOB' ,
p_payload => 'Processing 1000 records'
);
END ;
The API package internally enqueues a message to OMNI_TRACER_QUEUE with:
Auto-generated message ID
Current timestamp
Specified log level (INFO, DEBUG, ERROR, etc.)
JSON-formatted payload
Multi-Subscriber Broadcasting
When a message is enqueued, Oracle AQ automatically:
Stores the message in AQ$OMNI_TRACER_QUEUE table
Creates separate queue entries for each registered subscriber
Marks messages as READY for consumption
Each subscriber gets an independent copy. If Subscriber A dequeues a message, Subscriber B still has their own copy to consume.
Dequeue: Receiving Messages
Blocking Consumer Pattern
OmniView uses a blocking consumer loop that waits for messages without busy polling:
func ( ts * TracerService ) blockingConsumerLoop ( ctx context . Context , subscriber * domain . Subscriber ) {
const errorDelay = 5 * time . Second
for {
// Check for cancellation
select {
case <- ctx . Done ():
fmt . Println ( "Event Listener stopping for subscriber:" , subscriber . Name )
return
default :
// Continue to blocking wait
}
// Blocking wait — Oracle holds this call until messages arrive or wait time expires
err := ts . processBatch ( subscriber )
if err != nil {
log . Printf ( "failed to dequeue messages for subscriber %s : %v " , subscriber . Name , err )
select {
case <- time . After ( errorDelay ):
continue
case <- ctx . Done ():
return
}
}
}
}
Key characteristics:
The dequeue operation blocks until messages arrive or the wait timeout expires
No CPU-intensive polling loop
Immediate message delivery when available
Configurable wait time per subscriber
Bulk Dequeue Process
Messages are retrieved in batches for efficiency:
func ( ts * TracerService ) processBatch ( subscriber * domain . Subscriber ) error {
// Lock to prevent concurrent dequeues
ts . processMu . Lock ()
defer ts . processMu . Unlock ()
messages , msgIDs , count , err := ts . db . BulkDequeueTracerMessages ( * subscriber )
if err != nil {
return err
}
if count == 0 {
return nil // No messages available
}
for i := 0 ; i < count ; i ++ {
var msg domain . QueueMessage
if err := json . Unmarshal ([] byte ( messages [ i ]), & msg ); err != nil {
log . Printf ( "failed to unmarshal message ID %s : %v " , msgIDs [ i ], err )
continue
}
ts . handleTracerMessage ( msg )
}
return nil
}
Low-Level Dequeue Implementation (C/CGO)
For performance, bulk dequeue uses CGO to call ODPI-C directly:
int DequeueManyAndExtract (
dpiConn * conn ,
dpiContext * context ,
const char * subscriberName ,
uint32_t batchSize ,
int32_t waitTime , // -1 = wait forever, 0 = no wait, >0 = seconds
TraceMessage ** outMessages ,
TraceId ** outIds ,
uint32_t * actualCount
) {
// 1. Execute PL/SQL procedure with wait time
const char * sql = "BEGIN OMNI_TRACER_API.Dequeue_Array_Events(:1, :2, :3, :4, :5, :6); END;" ;
// 2. Bind parameters:
// :1 = subscriber name
// :2 = batch size
// :3 = wait time (seconds)
// :4 = output payload collection
// :5 = output raw ID collection
// :6 = output count
// 3. Extract LOB content from each message
// 4. Return array of messages and IDs
}
Wait Time Behavior
messages , msgIDs , count , err := oa . BulkDequeueTracerMessages ( domain . Subscriber {
Name : "SUB_ABC123" ,
BatchSize : 1000 , // Max messages per call
WaitTime : 5 , // Block for 5 seconds if queue empty
})
WaitTime Behavior -1Wait indefinitely until messages arrive 0Return immediately (non-blocking check) > 0Block for N seconds, then return empty if no messages
OmniView uses WaitTime: 5 by default, balancing responsiveness with resource efficiency.
Message Flow Diagram
┌─────────────────────────────────────────────────────────────────┐
│ Oracle Database │
│ │
│ ┌──────────────────┐ │
│ │ PL/SQL Process │ │
│ │ (Application) │ │
│ └────────┬─────────┘ │
│ │ │
│ │ OMNI_TRACER_API.Log_Info() │
│ ▼ │
│ ┌───────────────────────────────────────┐ │
│ │ OMNI_TRACER_QUEUE │ │
│ │ (Oracle Advanced Queue) │ │
│ └───────────┬───────────────────────────┘ │
│ │ │
│ │ Broadcast to all subscribers │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ AQ$OMNI_TRACER_QUEUE (Queue Table) │ │
│ ├──────────────────┬──────────────────┬───────────────────┤ │
│ │ SUB_A1B2... │ SUB_C3D4... │ SUB_E5F6... │ │
│ │ READY: 15 msgs │ READY: 15 msgs │ READY: 15 msgs │ │
│ └──────────┬───────┴──────────┬───────┴──────────┬────────┘ │
│ │ │ │ │
└─────────────┼──────────────────┼──────────────────┼────────────┘
│ │ │
│ Blocking │ Blocking │ Blocking
│ Dequeue │ Dequeue │ Dequeue
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ OmniView │ │ OmniView │ │ OmniView │
│ Instance 1 │ │ Instance 2 │ │ Instance 3 │
└───────────────┘ └───────────────┘ └───────────────┘
Error Handling
Timeout Handling
When no messages arrive within the wait period:
if (errorInfo.code == 25228 ) { // ORA-25228: Timeout or end of fetch
* outCount = 0 ;
result = 0 ; // Success with zero messages
goto cleanup;
}
The Go code treats this as a normal empty result, not an error.
Message Deserialization Failures
for i := 0 ; i < count ; i ++ {
var msg domain . QueueMessage
if err := json . Unmarshal ([] byte ( messages [ i ]), & msg ); err != nil {
log . Printf ( "failed to unmarshal message ID %s : %v " , msgIDs [ i ], err )
continue // Skip invalid message, process rest
}
ts . handleTracerMessage ( msg )
}
Invalid messages are logged but don’t stop batch processing.
Batch Size Tuning
subscribers/subscriber_service.go
subscriber := domain . Subscriber {
Name : subscriberName ,
BatchSize : 1000 , // Retrieve up to 1000 messages per call
WaitTime : 5 ,
}
Larger batch sizes reduce round-trips but increase memory usage.
Concurrency Control
type TracerService struct {
db ports . DatabaseRepository
bolt ports . ConfigRepository
processMu sync . Mutex // Prevents concurrent dequeue operations
}
The mutex ensures only one dequeue happens at a time per service instance.
How OMNI_TRACER_API Works
The PL/SQL package provides a high-level API:
-- Typical usage in application code
BEGIN
-- Info level
OMNI_TRACER_API . Log_Info ( 'ORDER_PROCESSOR' , 'Starting order batch' );
-- Debug level
OMNI_TRACER_API . Log_Debug ( 'PAYMENT_SERVICE' , 'Validating card: ' || v_card_number);
-- Error level
OMNI_TRACER_API . Log_Error ( 'INVENTORY_CHECK' , 'Out of stock: SKU=' || v_sku);
END ;
Each call:
Constructs a OMNI_TRACER_PAYLOAD_TYPE object
Populates JSON with message details
Enqueues to OMNI_TRACER_QUEUE
Returns immediately (non-blocking for caller)
Next Steps
Subscriber Model Learn how OmniView registers and manages subscribers
Architecture Understand the overall system design