Skip to main content

Overview

The Communication Module (communication/serial_manager.py) provides the interface between the Python control system and the VEX Brain hardware. It manages serial communication, coordinates perception subsystems, and handles asynchronous message processing.

CommunicationManager Class

Initialization (serial_manager.py:16-50)

class CommunicationManager:
    def __init__(self, port: str='/dev/ttyACM1', 
                 baudrate: int = 115200, 
                 camera_index: int = 0):
        """
        :param port: serial port
        :param baudrate: baudrate
        :param camera_index: camera index
        """
        self.port = port
        self.baudrate = baudrate
        self.message_end = b'\n'
        
        self.serial_port: Optional[serial.Serial] = None
        self.is_connected = False
        
        # Threads and events
        self._read_thread = None
        self._stop_event = Event()
        self.scan_complete_event = Event()
        self.movement_event = Event()
        self.angles_event = Event()
        
        # Buffer for incoming data
        self.buffer = bytearray()
        
        # Callback registry
        self.callbacks = {}
        
        # State tracking
        self.movement_status: Dict[str, Dict[str, Any]] = {}
        self.current_angles: Dict[str, float] = {}
        self.safety_status: Dict[str, Any] = {}
        self.scan_data = None
        
        # Perception integration
        self.camera = CameraManager(camera_index=camera_index)
        self.object_detect_model = ImageProcessor(
            confidence_threshold=0.45
        )

Serial Communication

Connection Management

Establishing Connection (serial_manager.py:52-74)

def connect(self) -> bool:
    """Serial connection"""
    if self.is_connected:
        return True
    
    try:
        self.serial_port = serial.Serial(
            port=self.port, 
            baudrate=self.baudrate,
            timeout=10,
            write_timeout=10
        )
        self.is_connected = True
        
        # Start read loop in daemon thread
        self._stop_event.clear()
        self._read_thread = Thread(
            target=self._read_loop, 
            daemon=True
        )
        self._read_thread.start()
        return True
        
    except Exception as e:
        log.error(f'Error connecting to serial port: {str(e)}')
        return False
The connection automatically starts a daemon thread to continuously monitor incoming messages.

Closing Connection (serial_manager.py:76-85)

def close(self):
    """Close serial connection"""
    self._stop_event.set()
    if self._read_thread:
        self._read_thread.join(timeout=1.0)
        
    if self.serial_port and self.serial_port.is_open:
        self.serial_port.close()
        self.is_connected = False
        log.info('Serial connection closed')

Message Protocol

JSON Message Format

All messages use a standardized JSON structure:
{
  "type": "scan_service",
  "data": {
    "speed": 20
  }
}
Messages are terminated with a newline character (\n).

Sending Messages (serial_manager.py:87-107)

def send_message(self, message_type: str, data: dict) -> bool:
    """
    Send JSON message to robot
    :param message_type: 'check_service', 'safety_service', etc.
    :param data: message data
    """
    if not self.is_connected or not self.serial_port:
        log.error("Error: serial port not initialized")
        return False
    
    try:
        message = {
            'type': message_type,
            'data': data,
        }
        encoded_message = json.dumps(message).encode() + self.message_end
        self.serial_port.write(encoded_message)
        return True
    except Exception as e:
        print(f"Error sending message: {e}")
        return False

Message Types

TypeDirectionPurpose
check_serviceBidirectionalSystem health check
safety_serviceBidirectionalEmergency stop and home position
scan_serviceBidirectionalEnvironment scanning with detection
pick_serviceTo VEX → From VEXPick object sequence
place_serviceTo VEX → From VEXPlace object sequence
current_anglesFrom VEXJoint angle telemetry

Threading Architecture

Read Loop Thread (serial_manager.py:112-130)

def _read_loop(self):
    """Thread loop for reading messages from VEX"""
    while not self._stop_event.is_set():
        if self.serial_port and self.serial_port.in_waiting:
            try:
                char = self.serial_port.read()
                if char == self.message_end:
                    message = self.buffer.decode()
                    self.buffer = bytearray()
                    try:
                        data = json.loads(message)
                        self._process_message(data)
                    except json.JSONDecodeError:
                        log.error(f'Error decoding message: {message}')
                else:
                    self.buffer.extend(char)
            except Exception as e:
                log.error(f'Error reading serial port: {e}')
        time.sleep(0.01)
The read loop uses a character-by-character approach to handle incomplete messages and ensure proper message boundary detection.

Message Processing Flow

Message Processing (serial_manager.py:132-172)

def _process_message(self, message: dict):
    """Process message from VEX"""
    try:
        msg_type = message.get('type', '').lower()
        data = message.get('data', {})

        if msg_type == 'check_service':
            state = data.get('state')
            log.info(f'{msg_type} status:\nstate: {state}')

        elif msg_type == 'safety_service':
            self.safety_status = message.get('data', {})
            state = data.get('state')
            time_taken = data.get('time')
            log.info(f"{msg_type} status: \nstate: {state}, \ntime: {time_taken}s")
            if state == 'error':
                log.error(f"Safety Service Error: {data.get('error_msg', 'Unknown error')}")

        elif msg_type == 'scan_service':
            state = data.get('state')
            if state == 'detected':
                # Run detection in separate thread
                Thread(
                    target=self._handle_object_detection, 
                    args=(data,)
                ).start()
                log.info(f"Scan Data - Object Detected: "
                        f"Angle:    {data['angle']}° "
                        f"Distance: {data['distance']}mm")

            elif state == 'complete':
                self.scan_complete_event.set()
                log.info("Scan completed!")
        
        elif msg_type in ('pick_service', 'place_service'):
            joint = data.get('joint')
            self.movement_status[joint] = data
            self.movement_event.set()
            
        elif msg_type == 'current_angles':
            self.current_angles = message.get('data', {})
            self.angles_event.set()
            
    except Exception as e:
        log.error(f'Error processing message: {e}')

Callback System

Registration (serial_manager.py:109-110)

def register_callback(self, message_type: str, callback: callable):
    self.callbacks[message_type] = callback

Usage Example (from main.py:14)

class Robot:
    def __init__(self):
        self.serial_manager = CommunicationManager(
            port='/dev/ttyACM1', 
            baudrate=115200
        )
        
        # Register callback for scan results
        self.scan_results = []
        self.serial_manager.register_callback(
            'scan_service', 
            self._scan_callback
        )
Callbacks enable loose coupling between the communication layer and application logic.

Object Detection Integration

Detection Handler (serial_manager.py:174-202)

def _handle_object_detection(self, data: dict):
    """Object detection in real time"""
    try:
        # 1. Capture image
        img_path = self.camera.capture_image()
        if not img_path:
            log.error("Camera could not capture image")
            return
        
        # 2. YOLO detection
        image, yolo_result = self.object_detect_model.read_image_path(
            img_path, 
            draw_results=True, 
            save_drawn_img=True
        )
        if yolo_result is None:
            log.info("No detections.")
            return
        
        # 3. Update data
        data.update({
            'class': yolo_result['class'],
            'confidence': yolo_result['confidence'],
            'timestamp': time.time(),
            'image_path': img_path
        })
        
        # 4. Notify the central system
        if self.callbacks.get('scan_service'):
            self.callbacks['scan_service'](data)
            
    except Exception as e:
        log.error(f"Error in object detection: {str(e)}")

Detection Pipeline

Synchronization Primitives

Event-Based Waiting

The module uses threading events for synchronization:

Movement Confirmation (serial_manager.py:210-224)

def wait_for_confirmation(self, joint: str, timeout: float = 30.0) -> bool:
    """Wait for movement confirmed"""
    start_time = time.time()
    self.movement_event.clear()
    
    while (time.time() - start_time) < timeout:
        if self.movement_status.get(joint, {}).get('state') == 'completed':
            return True
        if self.movement_status.get(joint, {}).get('state') == 'error':
            log.error(f'Error in movement')
            return False
        self.movement_event.wait(0.1)
    
    log.warning(f"Timeout waiting for movement of: {joint}")
    return False

Angle Response (serial_manager.py:226-235)

def wait_for_angles_response(self, timeout: float = 5.0) -> bool:
    self.angles_event.clear()
    start_time = time.time()
    
    while (time.time() - start_time) < timeout:
        if self.current_angles:
            return True
        self.angles_event.wait(0.1)
        
    return False
All wait operations include timeout handling to prevent infinite blocking if the VEX Brain becomes unresponsive.

State Tracking

Movement Status

# Per-joint movement tracking
self.movement_status: Dict[str, Dict[str, Any]] = {
    'base': {'state': 'completed'},
    'arm': {'state': 'pending'},
    'gripper': {'state': 'error'}
}

Safety Status

self.safety_status: Dict[str, Any] = {
    'state': 'completed',
    'time': 2.5
}

Current Angles

self.current_angles: Dict[str, float] = {
    'base': 90.0,
    'shoulder': 45.0,
    'elbow': 30.0
}

Best Practices

Thread Safety

The module uses events and atomic operations to ensure thread-safe communication between the read loop, detection threads, and the main application.

Error Handling

try:
    # Attempt operation
    self.serial_port.write(encoded_message)
    return True
except Exception as e:
    log.error(f"Error: {e}")
    return False

Resource Cleanup

def __del__(self):
    """Ensure cleanup on deletion"""
    self.close()

Usage Example

Complete example from main.py:
class Robot:
    def __init__(self):
        # Initialize communication
        self.serial_manager = CommunicationManager(
            port='/dev/ttyACM1', 
            baudrate=115200
        )
        
    def handle_scan_command(self):
        # Register callback
        self.serial_manager.register_callback(
            'scan_service', 
            self._scan_callback
        )
        
        # Send scan command
        self.serial_manager.send_message(
            'scan_service', 
            {'speed': 20}
        )
        
        # Wait for completion
        if self.serial_manager.scan_complete_event.wait(timeout=60):
            self.serial_manager.scan_complete_event.clear()
            self.process_scan_results()

Next Steps

Perception

Learn how images are captured and processed

Control

Understand movement execution and sequencing

Build docs developers (and LLMs) love