The system processes video streams in real-time, capturing frames from cameras or video files and running inference on each frame.
Video Stream Setup
The TrashSystem class in examples/video_stream.py:12 demonstrates complete video processing:
import cv2
from trash_classificator.processor import TrashClassificator
class TrashSystem :
def __init__ ( self , video_source ):
self .cap = cv2.VideoCapture(video_source)
self .cap.set( 3 , 1280 ) # Width
self .cap.set( 4 , 720 ) # Height
self .trash_classificator_system = TrashClassificator()
Video Sources
Webcam
Video File
IP Camera
Use camera index (typically 0 for default webcam): # Default webcam
system = TrashSystem( 0 )
# Secondary camera
system = TrashSystem( 1 )
Provide file path to process recorded video: system = TrashSystem( 'trash_video.mp4' )
system = TrashSystem( '/path/to/video.avi' )
Use RTSP or HTTP stream URL: # RTSP stream
system = TrashSystem( 'rtsp://192.168.1.100:554/stream' )
# HTTP stream
system = TrashSystem( 'http://camera.local/video' )
Frame Capture and Processing
The main processing loop in examples/video_stream.py:19 handles frame capture:
Open Video Stream
Initialize the video capture and check if it opened successfully: def run ( self ):
while self .cap.isOpened():
success, frame = self .cap.read()
if not success:
break
Process Frame
Pass each frame through the classification pipeline: image, process_log = self .trash_classificator_system.frame_processing(frame)
log.info( f "process information: { process_log } " )
Returns annotated image and status message.
Display Output
Show the processed frame in a window: cv2.imshow( "Frame" , image)
if cv2.waitKey( 1 ) & 0x FF == ord ( 'q' ):
break
Press ‘q’ to exit the stream.
Cleanup Resources
Release the camera and close windows: self .cap.release()
cv2.destroyAllWindows()
Camera Configuration
The system configures camera resolution using OpenCV properties:
self .cap.set( 3 , 1280 ) # CAP_PROP_FRAME_WIDTH
self .cap.set( 4 , 720 ) # CAP_PROP_FRAME_HEIGHT
Common Properties
Property Code Description Example Width 3 Frame width in pixels cap.set(3, 1920)Height 4 Frame height in pixels cap.set(4, 1080)FPS 5 Frames per second cap.set(5, 30)Brightness 10 Camera brightness cap.set(10, 128)Contrast 11 Camera contrast cap.set(11, 32)Exposure 15 Exposure value cap.set(15, -5)
Resolution Support Not all cameras support all resolutions. If the requested resolution is not available, OpenCV will use the closest supported resolution.
Real-time Inference Optimization
Device Selection
The system automatically selects the optimal compute device in segmentation/device_manager.py:6:
class DeviceManager :
@ staticmethod
def get_device () -> torch.device:
if torch.backends.mps.is_available():
device = torch.device( "mps" ) # Apple Silicon
elif torch.cuda.is_available():
device = torch.device( "cuda" ) # NVIDIA GPU
else :
device = torch.device( "cpu" ) # CPU fallback
return device
Check Device
Device Logging
import torch
from trash_classificator.segmentation.device_manager import DeviceManager
device = DeviceManager.get_device()
print ( f "Using device: { device } " )
# Device logging is automatic
# Output: "Model is using device: NVIDIA GeForce RTX 3080"
Model Loading
The model loads once during initialization in segmentation/model_loader.py:6:
class ModelLoader :
def __init__ ( self , device : torch.device):
self .model = YOLO(trash_model_path).to(device)
def get_model ( self ) -> YOLO :
return self .model
Model Persistence The model is loaded only once when TrashClassificator initializes. This avoids repeated loading overhead during video processing.
Inference Parameters
Optimized parameters for real-time processing:
results = model.track(
image,
conf = 0.55 , # Balanced confidence threshold
verbose = False , # Disable logging overhead
persist = True , # Track objects across frames
imgsz = 640 , # Optimized input size
stream = True # Generator mode for efficiency
)
Display and Output Options
Window Display
The default implementation shows a live window:
cv2.imshow( "Frame" , image)
if cv2.waitKey( 1 ) & 0x FF == ord ( 'q' ):
break
Full Screen
Resizable Window
Multiple Windows
Display in full screen mode: cv2.namedWindow( "Frame" , cv2. WND_PROP_FULLSCREEN )
cv2.setWindowProperty( "Frame" , cv2. WND_PROP_FULLSCREEN , cv2. WINDOW_FULLSCREEN )
cv2.imshow( "Frame" , image)
Allow window resizing: cv2.namedWindow( "Frame" , cv2. WINDOW_NORMAL )
cv2.resizeWindow( "Frame" , 800 , 600 )
cv2.imshow( "Frame" , image)
Show original and processed frames: cv2.imshow( "Original" , frame)
cv2.imshow( "Processed" , image)
Video Recording
Save processed video to file:
class TrashSystem :
def __init__ ( self , video_source , output_path = None ):
self .cap = cv2.VideoCapture(video_source)
self .cap.set( 3 , 1280 )
self .cap.set( 4 , 720 )
self .trash_classificator_system = TrashClassificator()
# Optional video writer
if output_path:
fourcc = cv2.VideoWriter_fourcc( * 'mp4v' )
fps = self .cap.get(cv2. CAP_PROP_FPS ) or 30.0
self .writer = cv2.VideoWriter(output_path, fourcc, fps, ( 1280 , 720 ))
else :
self .writer = None
def run ( self ):
while self .cap.isOpened():
success, frame = self .cap.read()
if not success:
break
image, process_log = self .trash_classificator_system.frame_processing(frame)
log.info( f "process information: { process_log } " )
# Write to output file
if self .writer:
self .writer.write(image)
cv2.imshow( "Frame" , image)
if cv2.waitKey( 1 ) & 0x FF == ord ( 'q' ):
break
self .cap.release()
if self .writer:
self .writer.release()
cv2.destroyAllWindows()
Headless Processing
Run without GUI display for server deployments:
def run_headless ( self , output_path ):
fourcc = cv2.VideoWriter_fourcc( * 'mp4v' )
writer = cv2.VideoWriter(output_path, fourcc, 30.0 , ( 1280 , 720 ))
while self .cap.isOpened():
success, frame = self .cap.read()
if not success:
break
image, status = self .trash_classificator_system.frame_processing(frame)
writer.write(image)
# Log progress without display
log.info( f "Frame processed: { status } " )
self .cap.release()
writer.release()
Logging Configuration
The example configures Python logging in examples/video_stream.py:4:
import logging as log
log.basicConfig( level = log. INFO , format = " %(asctime)s - %(levelname)s - %(message)s " )
Output format:
2026-03-07 14:32:15,123 - INFO - Model is using device: NVIDIA GeForce RTX 3080
2026-03-07 14:32:16,234 - INFO - process information: Trash detected
2026-03-07 14:32:16,345 - INFO - process information: Trash detected
Measuring FPS
Add frame rate measurement:
import time
def run_with_fps ( self ):
fps_start_time = time.time()
fps_counter = 0
while self .cap.isOpened():
success, frame = self .cap.read()
if not success:
break
image, process_log = self .trash_classificator_system.frame_processing(frame)
# Calculate FPS
fps_counter += 1
elapsed = time.time() - fps_start_time
if elapsed > 1.0 :
fps = fps_counter / elapsed
log.info( f "FPS: { fps :.2f} " )
fps_counter = 0
fps_start_time = time.time()
# Display with FPS overlay
cv2.putText(image, f "FPS: { fps :.2f} " , ( 10 , 30 ),
cv2. FONT_HERSHEY_SIMPLEX , 1 , ( 0 , 255 , 0 ), 2 )
cv2.imshow( "Frame" , image)
if cv2.waitKey( 1 ) & 0x FF == ord ( 'q' ):
break
self .cap.release()
cv2.destroyAllWindows()
Complete Example
Basic Usage
Video File Processing
Custom Configuration
from examples.video_stream import TrashSystem
if __name__ == "__main__" :
# Use default webcam
system = TrashSystem( 0 )
system.run()
Next Steps
Classification Pipeline Understand the frame processing pipeline
Drawing Detections Customize visualization of detected objects