Skip to main content

System Architecture

The robotic arm system follows a modular, event-driven architecture with clear separation of concerns. The system is built using Python and communicates with the VEX robotic arm hardware through serial communication.

Architecture Diagram

Core Modules

Robot Controller (main.py:8-276)

The central control system that orchestrates all operations:
class Robot:
    def __init__(self):
        self.serial_manager = CommunicationManager(
            port='/dev/ttyACM1', 
            baudrate=115200
        )
        
        # Register scan data callback
        self.scan_results = []
        self.serial_manager.register_callback(
            'scan_service', 
            self._scan_callback
        )
        
        # Define placement zones for objects
        self.placement_zones = {
            'apple': {'angle': 90, 'distance': 200},
            'orange': {'angle': 180, 'distance': 200},
            'bottle': {'angle': 45, 'distance': 200},
            'default': {'angle': 270, 'distance': 200},
        }

Communication Module

CommunicationManager (communication/serial_manager.py:16-236) handles:
  • Serial communication with VEX Brain
  • Camera integration
  • Object detection coordination
  • Event-driven message processing

Perception Module

Composed of:
  • CameraManager (perception/vision/camera/main.py:5-29) - Image capture
  • ImageProcessor (perception/vision/image_processing.py:8-95) - Detection pipeline
  • DetectionModel (perception/vision/detection/main.py:15-22) - YOLO11 inference

Mapping Module

OccupancyGrid (mapping/occupancy_grid.py:5-111) provides:
  • Probabilistic occupancy mapping
  • Bayesian sensor fusion
  • World-to-grid coordinate transformations

Data Flow

Message Protocol

All communication uses JSON messages over serial:
message = {
    'type': 'scan_service',  # Message type
    'data': {'speed': 20}     # Payload
}

Threading Model

The system uses multiple threads for concurrent operations:

Communication Thread (serial_manager.py:68-69)

self._stop_event.clear()
self._read_thread = Thread(target=self._read_loop, daemon=True)
self._read_thread.start()
The _read_loop continuously monitors the serial port for incoming messages from the VEX Brain.

Detection Thread (serial_manager.py:153)

Thread(target=self._handle_object_detection, args=(data,)).start()
Object detection runs in separate threads to avoid blocking serial communication.

Thread Synchronization

Events coordinate between threads:
class CommunicationManager:
    def __init__(self, ...):
        # Thread synchronization events
        self._stop_event = Event()
        self.scan_complete_event = Event()
        self.movement_event = Event()
        self.angles_event = Event()
The daemon thread ensures clean shutdown when the main program exits.

State Management

The system maintains multiple state dictionaries:
# Movement tracking per joint
self.movement_status: Dict[str, Dict[str, Any]] = {}

# Current joint angles
self.current_angles: Dict[str, float] = {}

# Safety system state
self.safety_status: Dict[str, Any] = {}

# Scan results registry
self.scan_data = None

Module Dependencies

Design Patterns

Observer Pattern (Callbacks)

The callback system allows decoupled event handling:
# Register callback
self.serial_manager.register_callback('scan_service', self._scan_callback)

# Trigger callback when message received
if self.callbacks.get('scan_service'):
    self.callbacks['scan_service'](data)

Event-Driven Architecture

Events signal completion of asynchronous operations:
# Wait for scan completion
if self.serial_manager.scan_complete_event.wait(timeout=60):
    self.serial_manager.scan_complete_event.clear()
    self.process_scan_results()

State Machine Pattern

Movement execution follows a state machine:
self.movement_status[joint] = {'state': 'pending'}
# ... send command ...
if self.movement_status.get(joint, {}).get('state') == 'completed':
    return True
The architecture prioritizes safety with timeout handling and error recovery mechanisms throughout the system.

Error Handling

Each module implements robust error handling:
def handle_movement_failure(self):
    """Handles faults in the motion sequence"""
    log.error('executing security protocol')
    self.serial_manager.send_message('safety_service', {})
    
    start_time = time.time()
    while (time.time() - start_time) < 15:
        if self.serial_manager.safety_status.get('state') == 'completed':
            log.info("system safety")
            return
        time.sleep(0.5)
        
    log.error("error in safety service")
    self.serial_manager.close()
    exit(1)
If safety protocols fail, the system will automatically close the serial connection and exit to prevent unsafe operations.

Next Steps

Communication

Learn about serial protocol and message handling

Perception

Explore vision and object detection systems

Control

Understand movement execution and sequencing

Mapping

Dive into occupancy grid mapping

Build docs developers (and LLMs) love