Skip to main content

Overview

The TracerService is responsible for managing the OMNI_TRACER package deployment and event listening system in OmniView. It handles:
  • Deploying and initializing the Oracle OMNI_TRACER package
  • Starting event listeners for subscribers
  • Processing batches of tracer messages from Oracle Advanced Queuing (AQ)
  • Handling tracer events in a concurrent, non-blocking manner
The service follows the hexagonal architecture pattern and integrates with the Oracle database adapter to receive real-time tracer events.

Service Structure

type TracerService struct {
    db        ports.DatabaseRepository
    bolt      ports.ConfigRepository
    processMu sync.Mutex
}
db
ports.DatabaseRepository
Database repository for Oracle operations including package deployment and message dequeuing
bolt
ports.ConfigRepository
Configuration repository for BoltDB operations and persistent storage
processMu
sync.Mutex
Mutex to prevent concurrent batch processing and ensure thread-safe dequeue operations

Constructor

NewTracerService

Creates a new instance of TracerService with injected dependencies.
func NewTracerService(
    db ports.DatabaseRepository,
    bolt ports.ConfigRepository,
) *TracerService
db
ports.DatabaseRepository
required
Database repository interface for Oracle database operations
bolt
ports.ConfigRepository
required
Configuration repository interface for BoltDB operations
*TracerService
*TracerService
Returns a pointer to the newly created TracerService instance
Example Usage:
// From cmd/omniview/main.go:61
tracerService := tracer.NewTracerService(dbAdapter, boltAdapter)

Methods

StartEventListener

Starts the event listener for a specific subscriber. This method launches two concurrent goroutines:
  1. Initial processing of any existing messages in the queue
  2. Blocking consumer loop that continuously listens for new messages
func (ts *TracerService) StartEventListener(
    ctx context.Context,
    subscriber *domain.Subscriber,
    schema string,
) error
ctx
context.Context
required
Context for managing the lifecycle of the event listener. Cancelling this context will stop the listener.
subscriber
*domain.Subscriber
required
Subscriber configuration including name, batch size, and wait time:
  • Name: Unique subscriber identifier (e.g., “SUB_ABC123”)
  • BatchSize: Number of messages to dequeue per batch (default: 1000)
  • WaitTime: Wait time in seconds for dequeue operations (default: 5)
schema
string
required
Oracle schema name where the tracer queue is located
error
error
Returns nil on successful start. The actual listener runs asynchronously in goroutines.
Example Usage:
// From cmd/omniview/main.go:93-95
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := tracerService.StartEventListener(ctx, &subscriber, appConfig.Username); err != nil {
    log.Fatalf("failed to start tracer event listener: %v", err)
}

DeployAndCheck

Ensures the OMNI_TRACER package is deployed to the Oracle database and properly initialized. This method:
  1. Checks if the package already exists
  2. Deploys the Omni_Tracer.sql package if not present
  3. Initializes the package using Omni_Initialize.ins if newly deployed
func (ts *TracerService) DeployAndCheck() error
error
error
Returns error if package deployment or initialization fails, nil on success
Example Usage:
// From cmd/omniview/main.go:72-74
if err := tracerService.DeployAndCheck(); err != nil {
    log.Fatalf("failed to deploy tracer package: %v", err)
}

blockingConsumerLoop

Internal method that runs a continuous loop listening for tracer messages. This method:
  • Blocks while waiting for messages from Oracle AQ
  • Processes batches as they arrive
  • Handles context cancellation for graceful shutdown
  • Implements error recovery with exponential backoff
func (ts *TracerService) blockingConsumerLoop(
    ctx context.Context,
    subscriber *domain.Subscriber,
)
ctx
context.Context
required
Context for cancellation signals
subscriber
*domain.Subscriber
required
Subscriber configuration for dequeue operations
This is an internal method called by StartEventListener. It runs indefinitely until the context is cancelled. On errors, it waits 5 seconds before retrying.

processBatch

Processes a batch of tracer messages from the Oracle queue. This method:
  • Acquires a lock to prevent concurrent processing
  • Dequeues messages in bulk from Oracle AQ
  • Unmarshals JSON message payloads
  • Passes each message to the message handler
func (ts *TracerService) processBatch(subscriber *domain.Subscriber) error
subscriber
*domain.Subscriber
required
Subscriber configuration including batch size for dequeue operations
error
error
Returns error if dequeue or processing fails, nil on success (including when no messages are available)
Message Format: Each message is unmarshaled into a domain.QueueMessage structure:
type QueueMessage struct {
    MessageID   string `json:"MESSAGE_ID"`
    ProcessName string `json:"PROCESS_NAME"`
    LogLevel    string `json:"LOG_LEVEL"`
    Payload     string `json:"PAYLOAD"`
    Timestamp   string `json:"TIMESTAMP"`
}

Integration with Oracle Adapter

The TracerService integrates with the Oracle adapter through the ports.DatabaseRepository interface, using these key methods:
  • PackageExists: Check if OMNI_TRACER_API package is deployed
  • DeployFile: Deploy SQL package files to the database
  • ExecuteStatement: Execute initialization scripts
  • BulkDequeueTracerMessages: Dequeue batches of messages from the Oracle AQ queue

Oracle AQ Configuration

The service works with these Oracle objects:
const (
    QueueName        = "OMNI_TRACER_QUEUE"
    QueueTableName   = "AQ$OMNI_TRACER_QUEUE"
    QueuePayloadType = "OMNI_TRACER_PAYLOAD_TYPE"
)

Architecture Pattern

TracerService follows the hexagonal architecture (ports and adapters) pattern:
  1. Core Service (internal/service/tracer): Contains business logic
  2. Ports (internal/core/ports): Define repository interfaces
  3. Adapters (internal/adapter/storage/oracle): Implement the ports for Oracle database
This design allows for:
  • Dependency inversion (service depends on interfaces, not concrete implementations)
  • Testability (can mock repository interfaces)
  • Flexibility (can swap Oracle for other databases)

Usage Flow

Error Handling

  • Package Deployment Errors: Returned immediately to caller, causing application startup failure
  • Dequeue Errors: Logged and retried after 5-second delay
  • Unmarshal Errors: Logged but processing continues with next message
  • Context Cancellation: Gracefully exits the consumer loop

Thread Safety

The processMu mutex ensures that only one batch processing operation occurs at a time, preventing:
  • Race conditions in dequeue operations
  • Duplicate message processing
  • Concurrent access to shared resources

Build docs developers (and LLMs) love