Overview
A Queue is a First-In-First-Out (FIFO) data structure where elements are added at the rear and removed from the front. The UCX DSA package provides three queue implementations:
Queue : Fixed-capacity static queue
DynamicQueue : Automatically resizing queue
CircularQueue : Wrap-around circular buffer queue
Queue (Static)
A static queue with a fixed capacity. Uses circular indexing internally for efficient space utilization.
Creating a Queue
Empty Queue
With Initial Contents
Custom Capacity
From List
from dsa.queue import Queue
# Create queue with default capacity of 10
queue = Queue()
print (queue.capacity()) # 10
print ( len (queue)) # 0
Key Methods
enqueue(element)
Add an element to the rear of the queue.
queue = Queue( capacity = 5 )
queue.enqueue( 10 )
queue.enqueue( 20 )
queue.enqueue( 30 )
print (queue) # [10, 20, 30] Count: 3 Capacity: 5
# Raises Exception if capacity is exceeded
try :
for i in range ( 10 ):
queue.enqueue(i)
except Exception as e:
print (e) # Capacity Reached
dequeue()
Remove and return the element from the front of the queue.
queue = Queue.from_list([ 10 , 20 , 30 ])
front = queue.dequeue()
print (front) # 10
print (queue) # [20, 30] Count: 2 Capacity: 10
# Dequeue remaining elements
print (queue.dequeue()) # 20
print (queue.dequeue()) # 30
# Raises Exception if queue is empty
try :
queue.dequeue()
except Exception as e:
print (e) # Empty Queue
peek()
Return the front element without removing it.
queue = Queue.from_list([ 10 , 20 , 30 ])
print (queue.peek()) # 10
print ( len (queue)) # 3 (element still in queue)
# Raises Exception if queue is empty
empty_queue = Queue()
try :
empty_queue.peek()
except Exception as e:
print (e) # Empty Queue
Utility Methods
queue = Queue.from_list([ 1 , 2 , 3 , 4 , 5 ])
# Check if empty
print (queue.is_empty()) # False
# Get number of elements
print ( len (queue)) # 5
# Get capacity
print (queue.capacity()) # 10
# Convert to Python list (in order)
print (queue.to_ordered_list()) # [1, 2, 3, 4, 5]
# View raw internal array
print (queue.raw_view()) # Shows internal representation
Circular Indexing
The Queue uses circular indexing internally for efficient space reuse:
queue = Queue( capacity = 5 )
# Fill queue
for i in range ( 5 ):
queue.enqueue(i)
# Dequeue some elements
queue.dequeue() # 0
queue.dequeue() # 1
# Enqueue new elements (reuses freed space)
queue.enqueue( 5 )
queue.enqueue( 6 )
print (queue.to_ordered_list()) # [2, 3, 4, 5, 6]
DynamicQueue
A dynamic queue that automatically grows when full. Note that shrinking is not implemented.
Creating a DynamicQueue
from dsa.queue import DynamicQueue
# Create with default capacity
queue = DynamicQueue()
# Create with initial contents
queue = DynamicQueue( contents = [ 1 , 2 , 3 ])
# Create from list
queue = DynamicQueue.from_list([ 10 , 20 , 30 ])
Automatic Growth
The DynamicQueue doubles in capacity when full:
queue = DynamicQueue( capacity = 2 )
print ( f "Initial capacity: { queue.capacity() } " ) # 2
# Add elements - capacity doubles when full
queue.enqueue( 1 )
queue.enqueue( 2 )
print ( f "After 2 elements: { queue.capacity() } " ) # 2
queue.enqueue( 3 ) # Triggers growth
print ( f "After 3 elements: { queue.capacity() } " ) # 4
queue.enqueue( 4 )
queue.enqueue( 5 ) # Triggers growth again
print ( f "After 5 elements: { queue.capacity() } " ) # 8
Growth Strategy : Capacity doubles when front + count >= capacity. Elements are reorganized during growth to maintain queue order.
Key Methods
DynamicQueue supports the same methods as Queue:
queue = DynamicQueue()
# Enqueue without capacity concerns
for i in range ( 100 ):
queue.enqueue(i)
print ( len (queue)) # 100
# Dequeue elements
while not queue.is_empty():
value = queue.dequeue()
# Peek at front
queue.enqueue( 42 )
print (queue.peek()) # 42
Time Complexity :
enqueue(): O(1) amortized
dequeue(): O(1)
peek(): O(1)
CircularQueue
A circular queue that wraps around when capacity is reached, overwriting the oldest elements. Built on top of CircularArray.
Creating a CircularQueue
from dsa.queue import CircularQueue
# Create with default capacity
queue = CircularQueue( capacity = 5 )
# Create with initial contents
queue = CircularQueue( contents = [ 1 , 2 , 3 ], capacity = 5 )
Circular Behavior
When the queue is full, new elements overwrite the oldest ones:
queue = CircularQueue( capacity = 3 )
# Add elements
queue.enqueue( 1 )
queue.enqueue( 2 )
queue.enqueue( 3 )
print (queue.to_list()) # [1, 2, 3]
# Adding more wraps around and overwrites
queue.enqueue( 4 ) # Overwrites 1
print (queue.to_list()) # [2, 3, 4]
queue.enqueue( 5 ) # Overwrites 2
print (queue.to_list()) # [3, 4, 5]
CircularQueue is perfect for fixed-size buffers like:
Event logs with size limits
Recent request tracking
Streaming data windows
Ring buffers for producer-consumer
Key Methods
enqueue(element)
Enqueue element, wrapping around if necessary.
queue = CircularQueue( capacity = 3 )
queue.enqueue( 1 )
queue.enqueue( 2 )
queue.enqueue( 3 )
queue.enqueue( 4 ) # Wraps around, overwrites 1
print (queue.to_list()) # [2, 3, 4]
dequeue()
Dequeue element from front.
queue = CircularQueue.from_list([ 1 , 2 , 3 ])
front = queue.dequeue()
print (front) # 1
print (queue.to_list()) # [2, 3]
# Raises Exception if empty
try :
empty_queue = CircularQueue()
empty_queue.dequeue()
except Exception as e:
print (e) # Empty Queue
peek()
View front element without removing.
queue = CircularQueue.from_list([ 10 , 20 , 30 ])
print (queue.peek()) # 10
print ( len (queue)) # 3
Inherited Methods
CircularQueue inherits additional methods from CircularArray:
queue = CircularQueue( capacity = 5 )
queue.enqueue( 1 )
queue.enqueue( 2 )
queue.enqueue( 3 )
# Index access (from CircularArray)
print (queue[ 0 ]) # 1
print (queue[ 1 ]) # 2
# Set by index
queue[ 1 ] = 99
print (queue.to_list()) # [1, 99, 3]
# Capacity management
print (queue.capacity()) # 5
print (queue.is_empty()) # False
Use Cases
Task Scheduling
Breadth-First Search
Request Buffer
Print Queue
Process tasks in order of arrival: from dsa.queue import DynamicQueue
class TaskScheduler :
def __init__ ( self ):
self .tasks = DynamicQueue()
def add_task ( self , task ):
self .tasks.enqueue(task)
print ( f "Added task: { task } " )
def process_next ( self ):
if not self .tasks.is_empty():
task = self .tasks.dequeue()
print ( f "Processing: { task } " )
return task
return None
def pending_count ( self ):
return len ( self .tasks)
scheduler = TaskScheduler()
scheduler.add_task( "Email report" )
scheduler.add_task( "Update database" )
scheduler.add_task( "Send notifications" )
while scheduler.pending_count() > 0 :
scheduler.process_next()
Implement BFS using a queue: from dsa.queue import Queue
def bfs ( graph , start ):
"""Breadth-first search traversal."""
visited = set ()
queue = Queue()
queue.enqueue(start)
visited.add(start)
result = []
while not queue.is_empty():
node = queue.dequeue()
result.append(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
visited.add(neighbor)
queue.enqueue(neighbor)
return result
graph = {
'A' : [ 'B' , 'C' ],
'B' : [ 'D' , 'E' ],
'C' : [ 'F' ],
'D' : [],
'E' : [ 'F' ],
'F' : []
}
print (bfs(graph, 'A' )) # ['A', 'B', 'C', 'D', 'E', 'F']
Buffer incoming requests with size limit: from dsa.queue import CircularQueue
class RequestBuffer :
def __init__ ( self , max_size = 100 ):
self .buffer = CircularQueue( capacity = max_size)
def add_request ( self , request ):
# Oldest request automatically dropped if full
self .buffer.enqueue(request)
def get_recent_requests ( self ):
return self .buffer.to_list()
def count ( self ):
return len ( self .buffer)
buffer = RequestBuffer( max_size = 3 )
buffer.add_request( "GET /api/users" )
buffer.add_request( "POST /api/login" )
buffer.add_request( "GET /api/products" )
buffer.add_request( "DELETE /api/users/1" ) # Drops oldest
print (buffer.get_recent_requests())
# ['POST /api/login', 'GET /api/products', 'DELETE /api/users/1']
Simulate a printer queue: from dsa.queue import Queue
class PrinterQueue :
def __init__ ( self ):
self .jobs = Queue( capacity = 10 )
def add_job ( self , document ):
if len ( self .jobs) < self .jobs.capacity():
self .jobs.enqueue(document)
print ( f "Job added: { document } " )
return True
print ( "Queue full!" )
return False
def print_next ( self ):
if not self .jobs.is_empty():
doc = self .jobs.dequeue()
print ( f "Printing: { doc } " )
return doc
print ( "No jobs in queue" )
return None
def peek_next ( self ):
if not self .jobs.is_empty():
return self .jobs.peek()
return None
printer = PrinterQueue()
printer.add_job( "document1.pdf" )
printer.add_job( "report.docx" )
printer.add_job( "slides.pptx" )
print ( f "Next up: { printer.peek_next() } " )
printer.print_next()
printer.print_next()
Comparison
Feature Queue DynamicQueue CircularQueue Capacity Fixed Dynamic (grows) Fixed Overflow Exception Auto-grows Overwrites oldest Memory Predictable Variable Predictable Use Case Known max size Unknown size Fixed buffer Shrinking No No No Time Complexity O(1) O(1) amortized O(1)
When to use Queue : Use when you know the maximum size and want predictable memory usage.When to use DynamicQueue : Use when size varies significantly and you need automatic growth.When to use CircularQueue : Use for fixed-size buffers where oldest data can be discarded.
Equality Comparison
All queue types support equality comparison based on contents:
from dsa.queue import Queue, DynamicQueue, CircularQueue
q1 = Queue.from_list([ 1 , 2 , 3 ])
q2 = DynamicQueue.from_list([ 1 , 2 , 3 ])
q3 = CircularQueue( contents = [ 1 , 2 , 3 ])
print (q1 == q2) # True (same contents)
print (q1 == q3) # True (same contents)
Best Practices
Check Before Dequeue Always check if the queue is empty before dequeuing: if not queue.is_empty():
value = queue.dequeue()
Use Peek for Inspection Use peek() when you need to inspect without removing: if not queue.is_empty():
next_item = queue.peek()
Choose Right Type
Fixed size known: Queue
Size varies: DynamicQueue
Fixed buffer: CircularQueue
Handle Exceptions Wrap enqueue/dequeue in try-except: try :
queue.enqueue(value)
except Exception :
# Handle capacity error