Skip to main content

Overview

This guide shows how to integrate the TrashClassificator into your own applications. Learn to use custom video sources, process results programmatically, save detection data, and create advanced integration patterns.

Basic Integration Pattern

Minimal Integration

The simplest way to integrate the classifier:
basic_integration.py
import cv2
from trash_classificator.processor import TrashClassificator

# Initialize classifier
classifier = TrashClassificator()

# Process a single image
image = cv2.imread('trash_image.jpg')
annotated_image, results = classifier.frame_processing(image)

# Access results
print(f"Detected {results['detected_objects']} objects")
for classification in results['classifications']:
    print(f"Class: {classification['class']}, Confidence: {classification['confidence']}")

# Save annotated image
cv2.imwrite('output.jpg', annotated_image)

Custom Video Sources

Multiple Camera Sources

Process multiple camera feeds simultaneously:
multi_camera.py
import cv2
import threading
from trash_classificator.processor import TrashClassificator

class MultiCameraSystem:
    def __init__(self, camera_ids):
        self.cameras = [cv2.VideoCapture(cam_id) for cam_id in camera_ids]
        self.classifiers = [TrashClassificator() for _ in camera_ids]
        self.results = {}
        
    def process_camera(self, camera_index):
        """Process single camera in separate thread"""
        cap = self.cameras[camera_index]
        classifier = self.classifiers[camera_index]
        
        while cap.isOpened():
            success, frame = cap.read()
            if not success:
                break
                
            image, results = classifier.frame_processing(frame)
            self.results[f'camera_{camera_index}'] = results
            
            cv2.imshow(f'Camera {camera_index}', image)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    
    def run(self):
        """Start processing all cameras"""
        threads = []
        for i in range(len(self.cameras)):
            thread = threading.Thread(target=self.process_camera, args=(i,))
            thread.start()
            threads.append(thread)
        
        for thread in threads:
            thread.join()
        
        # Cleanup
        for cap in self.cameras:
            cap.release()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    # Process cameras 0 and 1
    system = MultiCameraSystem([0, 1])
    system.run()

IP Camera / RTSP Streams

ip_camera.py
import cv2
from trash_classificator.processor import TrashClassificator

class IPCameraProcessor:
    def __init__(self, rtsp_url, username=None, password=None):
        # Construct authenticated URL if credentials provided
        if username and password:
            # Format: rtsp://username:password@ip:port/path
            url_parts = rtsp_url.split('//')
            rtsp_url = f"{url_parts[0]}//{username}:{password}@{url_parts[1]}"
        
        self.cap = cv2.VideoCapture(rtsp_url)
        self.classifier = TrashClassificator()
        
        # Set buffer size to reduce latency
        self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
    
    def run(self):
        while self.cap.isOpened():
            success, frame = self.cap.read()
            if not success:
                print("Reconnecting...")
                self.cap.release()
                self.cap = cv2.VideoCapture(rtsp_url)
                continue
            
            image, results = self.classifier.frame_processing(frame)
            cv2.imshow('IP Camera', image)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
        self.cap.release()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    processor = IPCameraProcessor(
        rtsp_url='rtsp://192.168.1.100:554/stream1',
        username='admin',
        password='password'
    )
    processor.run()

Image Directory Batch Processing

batch_processing.py
import cv2
import os
from pathlib import Path
from trash_classificator.processor import TrashClassificator

class BatchImageProcessor:
    def __init__(self, input_dir, output_dir):
        self.input_dir = Path(input_dir)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.classifier = TrashClassificator()
    
    def process_directory(self):
        """Process all images in directory"""
        image_extensions = ['.jpg', '.jpeg', '.png', '.bmp']
        image_files = []
        
        for ext in image_extensions:
            image_files.extend(self.input_dir.glob(f'*{ext}'))
        
        results_summary = []
        
        for img_path in image_files:
            print(f"Processing {img_path.name}...")
            
            # Read and process image
            image = cv2.imread(str(img_path))
            annotated, results = self.classifier.frame_processing(image)
            
            # Save annotated image
            output_path = self.output_dir / f"annotated_{img_path.name}"
            cv2.imwrite(str(output_path), annotated)
            
            # Store results
            results_summary.append({
                'filename': img_path.name,
                'results': results
            })
        
        return results_summary

if __name__ == "__main__":
    processor = BatchImageProcessor(
        input_dir='./images/trash',
        output_dir='./images/output'
    )
    
    summary = processor.process_directory()
    
    # Print summary
    total_objects = sum(r['results']['detected_objects'] for r in summary)
    print(f"\nProcessed {len(summary)} images")
    print(f"Total objects detected: {total_objects}")

Processing Results Programmatically

Extracting Detection Data

result_extraction.py
import cv2
from trash_classificator.processor import TrashClassificator

class ResultExtractor:
    def __init__(self):
        self.classifier = TrashClassificator()
    
    def extract_detections(self, image):
        """Extract structured detection data"""
        annotated, results = self.classifier.frame_processing(image)
        
        detections = []
        for classification in results.get('classifications', []):
            detection = {
                'class': classification['class'],
                'confidence': classification['confidence'],
                'bbox': classification.get('bbox', []),  # [x1, y1, x2, y2]
                'center': self._calculate_center(classification.get('bbox', [])),
                'area': self._calculate_area(classification.get('bbox', []))
            }
            detections.append(detection)
        
        return detections
    
    def _calculate_center(self, bbox):
        """Calculate bounding box center"""
        if len(bbox) == 4:
            x1, y1, x2, y2 = bbox
            return ((x1 + x2) // 2, (y1 + y2) // 2)
        return None
    
    def _calculate_area(self, bbox):
        """Calculate bounding box area"""
        if len(bbox) == 4:
            x1, y1, x2, y2 = bbox
            return (x2 - x1) * (y2 - y1)
        return 0

if __name__ == "__main__":
    extractor = ResultExtractor()
    image = cv2.imread('trash.jpg')
    
    detections = extractor.extract_detections(image)
    
    for det in detections:
        print(f"Found {det['class']} at {det['center']} "
              f"with {det['confidence']:.2%} confidence")

Real-Time Analytics

analytics.py
import cv2
import time
from collections import defaultdict, deque
from trash_classificator.processor import TrashClassificator

class TrashAnalytics:
    def __init__(self, video_source, window_size=30):
        self.cap = cv2.VideoCapture(video_source)
        self.classifier = TrashClassificator()
        
        # Analytics data
        self.class_counts = defaultdict(int)
        self.detection_history = deque(maxlen=window_size)
        self.start_time = time.time()
        self.frame_count = 0
    
    def update_analytics(self, results):
        """Update analytics with new results"""
        self.frame_count += 1
        self.detection_history.append(results['detected_objects'])
        
        for classification in results.get('classifications', []):
            self.class_counts[classification['class']] += 1
    
    def get_statistics(self):
        """Get current statistics"""
        elapsed_time = time.time() - self.start_time
        fps = self.frame_count / elapsed_time if elapsed_time > 0 else 0
        avg_detections = sum(self.detection_history) / len(self.detection_history) if self.detection_history else 0
        
        return {
            'fps': fps,
            'total_frames': self.frame_count,
            'avg_detections': avg_detections,
            'class_distribution': dict(self.class_counts),
            'most_common_class': max(self.class_counts, key=self.class_counts.get) if self.class_counts else None
        }
    
    def run(self):
        while self.cap.isOpened():
            success, frame = self.cap.read()
            if not success:
                break
            
            image, results = self.classifier.frame_processing(frame)
            self.update_analytics(results)
            
            # Display statistics on frame
            stats = self.get_statistics()
            cv2.putText(image, f"FPS: {stats['fps']:.1f}", (10, 30),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            cv2.putText(image, f"Avg Detections: {stats['avg_detections']:.1f}", (10, 70),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            
            cv2.imshow('Analytics', image)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        
        # Print final statistics
        final_stats = self.get_statistics()
        print("\n=== Final Statistics ===")
        print(f"Total Frames: {final_stats['total_frames']}")
        print(f"Average FPS: {final_stats['fps']:.2f}")
        print(f"Average Detections: {final_stats['avg_detections']:.2f}")
        print(f"\nClass Distribution:")
        for cls, count in final_stats['class_distribution'].items():
            print(f"  {cls}: {count}")
        
        self.cap.release()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    analytics = TrashAnalytics(0)
    analytics.run()

Saving Detection Results

JSON Export

json_export.py
import cv2
import json
import datetime
from pathlib import Path
from trash_classificator.processor import TrashClassificator

class ResultExporter:
    def __init__(self, output_dir='results'):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.classifier = TrashClassificator()
    
    def process_and_export(self, image_path):
        """Process image and export results to JSON"""
        image = cv2.imread(str(image_path))
        annotated, results = self.classifier.frame_processing(image)
        
        # Prepare export data
        export_data = {
            'timestamp': datetime.datetime.now().isoformat(),
            'image_path': str(image_path),
            'image_size': {
                'width': image.shape[1],
                'height': image.shape[0]
            },
            'detections': results
        }
        
        # Save JSON
        json_path = self.output_dir / f"{Path(image_path).stem}_results.json"
        with open(json_path, 'w') as f:
            json.dump(export_data, f, indent=2)
        
        # Save annotated image
        img_path = self.output_dir / f"{Path(image_path).stem}_annotated.jpg"
        cv2.imwrite(str(img_path), annotated)
        
        return json_path, img_path

if __name__ == "__main__":
    exporter = ResultExporter()
    json_path, img_path = exporter.process_and_export('trash.jpg')
    print(f"Results saved to {json_path}")
    print(f"Annotated image saved to {img_path}")

CSV Logging

csv_logging.py
import cv2
import csv
import datetime
from pathlib import Path
from trash_classificator.processor import TrashClassificator

class CSVLogger:
    def __init__(self, csv_path='detections.csv'):
        self.csv_path = Path(csv_path)
        self.classifier = TrashClassificator()
        
        # Create CSV with headers if it doesn't exist
        if not self.csv_path.exists():
            with open(self.csv_path, 'w', newline='') as f:
                writer = csv.writer(f)
                writer.writerow(['timestamp', 'class', 'confidence', 'x1', 'y1', 'x2', 'y2', 'source'])
    
    def log_detections(self, image, source='unknown'):
        """Log detections to CSV file"""
        annotated, results = self.classifier.frame_processing(image)
        
        with open(self.csv_path, 'a', newline='') as f:
            writer = csv.writer(f)
            
            timestamp = datetime.datetime.now().isoformat()
            for classification in results.get('classifications', []):
                bbox = classification.get('bbox', [0, 0, 0, 0])
                row = [
                    timestamp,
                    classification['class'],
                    classification['confidence'],
                    *bbox,
                    source
                ]
                writer.writerow(row)
        
        return annotated, results

if __name__ == "__main__":
    logger = CSVLogger()
    cap = cv2.VideoCapture(0)
    
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break
        
        annotated, results = logger.log_detections(frame, source='webcam')
        cv2.imshow('Logging', annotated)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

Advanced Integration Patterns

Combined System: Video + Serial + Analytics

advanced_integration.py
import cv2
import json
import time
from collections import defaultdict
from trash_classificator.processor import TrashClassificator
from serial_com import CommunicationManager

class AdvancedTrashSystem:
    def __init__(self, video_source, serial_port=None):
        # Video processing
        self.cap = cv2.VideoCapture(video_source)
        self.cap.set(3, 1280)
        self.cap.set(4, 720)
        self.classifier = TrashClassificator()
        
        # Serial communication (optional)
        self.serial = None
        if serial_port:
            self.serial = CommunicationManager(port=serial_port)
            self.serial.connect()
        
        # Analytics
        self.stats = defaultdict(int)
        self.last_detection_time = {}
        self.detection_cooldown = 2.0  # seconds
    
    def should_report_detection(self, class_name):
        """Avoid reporting duplicate detections too quickly"""
        current_time = time.time()
        last_time = self.last_detection_time.get(class_name, 0)
        
        if current_time - last_time > self.detection_cooldown:
            self.last_detection_time[class_name] = current_time
            return True
        return False
    
    def process_frame(self, frame):
        """Process single frame"""
        annotated, results = self.classifier.frame_processing(frame)
        
        # Update statistics
        for classification in results.get('classifications', []):
            class_name = classification['class']
            self.stats[class_name] += 1
            
            # Send via serial if available and not duplicate
            if self.serial and self.should_report_detection(class_name):
                self.serial.send_message(
                    message_type='classification',
                    data=classification
                )
        
        return annotated, results
    
    def draw_stats(self, image):
        """Draw statistics overlay on image"""
        y_offset = 30
        for class_name, count in self.stats.items():
            text = f"{class_name}: {count}"
            cv2.putText(image, text, (10, y_offset),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            y_offset += 30
        return image
    
    def run(self):
        """Main processing loop"""
        try:
            while self.cap.isOpened():
                success, frame = self.cap.read()
                if not success:
                    break
                
                # Process frame
                annotated, results = self.process_frame(frame)
                
                # Draw statistics
                annotated = self.draw_stats(annotated)
                
                # Display
                cv2.imshow('Advanced Trash System', annotated)
                
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        
        finally:
            # Cleanup
            self.cap.release()
            cv2.destroyAllWindows()
            if self.serial:
                self.serial.close()
            
            # Print final statistics
            print("\n=== Session Statistics ===")
            for class_name, count in self.stats.items():
                print(f"{class_name}: {count}")

if __name__ == "__main__":
    system = AdvancedTrashSystem(
        video_source=0,
        serial_port='/dev/ttyUSB0'  # Optional
    )
    system.run()

Best Practices

Resource Management: Always release video captures and close connections in a finally block or context manager to prevent resource leaks.
Performance Optimization: For real-time applications, consider:
  • Processing every Nth frame instead of all frames
  • Reducing input resolution
  • Using GPU acceleration if available
  • Running detection in a separate thread
Error Handling: Implement robust error handling for production systems:
try:
    image, results = classifier.frame_processing(frame)
except Exception as e:
    log.error(f"Processing failed: {e}")
    continue  # Skip this frame

Next Steps

Video Stream

Basic video stream processing example

Serial Communication

Send results to external devices

API Reference

Detailed API documentation

Configuration

Configure classifier settings

Build docs developers (and LLMs) love