Skip to main content

Overview

The CommunicationManager class handles bidirectional serial communication between the Raspberry Pi host and the VEX IQ2 controller using a JSON-based message protocol over UART. Raspberry Pi Implementation: examples/serial_com.py:11
VEX Controller Implementation: arm_controller/src/main.py:31

CommunicationManager Class

Raspberry Pi Implementation

serial_com.py
class CommunicationManager:
    def __init__(self, port: str = 'COM7', baudrate: int = 115200):
        self.port = port
        self.baudrate = baudrate
        self.message_end = b'\n'
        
        self.serial_port = None
        self.is_connected = False
        self._read_thread = None
        
        self.buffer = bytearray()
        self._stop_event = Event()

VEX Controller Implementation

main.py
class CommunicationManager:
    def __init__(self):
        self.serial_port = None
        self.buffer = bytearray()
        self.message_end = b'\n'
The VEX implementation uses a simplified synchronous design, while the Raspberry Pi uses threading for non-blocking communication.

Connection Setup

Raspberry Pi Connection

serial_com.py
def connect(self) -> bool:
    if self.is_connected:
        return True
    
    try:
        self.serial_port = serial.Serial(
            port=self.port,
            baudrate=self.baudrate,
            timeout=10,
            write_timeout=10
        )
        self.is_connected = True
        
        # Start read loop in background thread
        self._stop_event.clear()
        self._read_thread = Thread(target=self._read_loop)
        self._read_thread.daemon = True
        self._read_thread.start()
        return True
        
    except Exception as e:
        log.error(f'Error connecting to serial port: {str(e)}')
        return False
Connection Parameters:
  • Port: COM7 (Windows) or /dev/ttyUSB0 (Linux)
  • Baudrate: 115200
  • Timeout: 10 seconds
  • Write Timeout: 10 seconds
1

Initialize Serial Port

Open serial connection with specified parameters
2

Start Read Thread

Launch background thread for continuous message reception
3

Verify Connection

Return connection status for error handling

VEX Controller Connection

main.py
def initialize(self):
    try:
        self.serial_port = open('/dev/serial1', 'rb+')
    except:
        raise Exception('serial port not available')
VEX controller uses fixed serial port /dev/serial1. Ensure this port is enabled in VEX brain settings.

JSON Message Protocol

Message Format

All messages follow this standard JSON structure:
{
    "type": "message_type",
    "data": {
        // Message-specific data
    }
}
Message Terminator: Newline character (\n)

Message Types

Check Service

Request (Raspberry Pi → VEX):
{
    "type": "check_service",
    "data": {}
}
Response (VEX → Raspberry Pi):
// Success
{
    "type": "check_service",
    "data": {
        "state": "approved"
    }
}

// Failure
{
    "type": "check_service",
    "data": {
        "error": "Sensors or motors not installed"
    }
}

Safety Service

Request (Raspberry Pi → VEX):
{
    "type": "safety_service",
    "data": {}
}
Response (VEX → Raspberry Pi):
{
    "type": "safety_service",
    "data": {
        "state": "approved"
    }
}

Scan Service

Request (Raspberry Pi → VEX):
{
    "type": "scan_service",
    "data": {}
}
Response (VEX → Raspberry Pi):
{
    "type": "scan_service",
    "data": {
        "state": "complete",
        "objects": [
            {
                "center_angle": 45.5,
                "width": 12.3,
                "distance": 150,
                "max_size": 85
            },
            {
                "center_angle": 180.0,
                "width": 8.7,
                "distance": 200,
                "max_size": 65
            }
        ]
    }
}
Object Data Fields:
  • center_angle: Angular position (0-360°)
  • width: Angular width of object (degrees)
  • distance: Distance from sensor (millimeters)
  • max_size: Maximum detected size value (raw sensor units)

Send Operations

Raspberry Pi Send

serial_com.py
def send_message(self, message_type: str, data: dict) -> bool:
    if not self.is_connected or not self.serial_port:
        log.error("Error: Puerto serial no inicializado")
        return False
    
    try:
        message = {
            'type': message_type,
            'data': data,
        }
        encoded_message = json.dumps(message).encode() + self.message_end
        log.info(f'send message: {encoded_message}')
        self.serial_port.write(encoded_message)
        return True
    except Exception as e:
        print(f"Error enviando mensaje: {e}")
        return False
Usage Example:
serial_manager = CommunicationManager(port='/dev/ttyUSB0')
if serial_manager.connect():
    # Send check service request
    serial_manager.send_message('check_service', {})
    
    # Send scan service request
    serial_manager.send_message('scan_service', {})

VEX Send

main.py
def send_message(self, message_type: str, data: dict) -> bool:
    if not self.serial_port:
        return False
    
    message = {
        'type': message_type,
        'data': data,
    }
    encoded_message = json.dumps(message).encode() + self.message_end
    self.serial_port.write(encoded_message)
    return True
Usage in RoboticServices:
main.py
# Send check service response
data = {'state': 'approved'}
self.communication.send_message('check_service', data)

# Send scan results
data = {'state': 'complete', 'objects': objects_map}
self.communication.send_message('scan_service', data)

Receive Operations

Raspberry Pi Receive (Threaded)

serial_com.py
def _read_loop(self):
    """Thread loop for read messages from VEX"""
    while not self._stop_event.is_set():
        if self.serial_port and self.serial_port.in_waiting:
            try:
                char = self.serial_port.read()
                if char == self.message_end:
                    message = self.buffer.decode()
                    self.buffer = bytearray()
                    try:
                        data = json.loads(message)
                        self._process_message(data)
                    except json.JSONDecodeError:
                        log.error(f'error message decode: {message}')
                else:
                    self.buffer.extend(char)
            except Exception as e:
                log.error(f'error read serial port: {e}')
        time.sleep(0.01)
The read loop runs in a daemon thread with 10ms polling interval to balance responsiveness and CPU usage.

VEX Receive (Synchronous)

main.py
def read_message(self):
    char = self.serial_port.read(1)
    if char == self.message_end:
        message = self.buffer.decode()
        self.buffer = bytearray()
        try:
            return json.loads(message)
        except json.JSONDecodeError:
            return None
    else:
        self.buffer.extend(char)
    return None
Usage in RoboticServices Main Loop:
main.py
while True:
    message = self.communication.read_message()
    if message:
        self.sensor_module.print_screen("rx: {}".format(message), 1, 15)
        self.process_raspberry_message(message)

Message Processing

Raspberry Pi Message Processing

serial_com.py
def _process_message(self, message: dict):
    try:
        msg_type = message.get('type', '').lower()
        data = message.get('data', {})
        
        if msg_type == 'check_service':
            state = data.get('state')
            log.info(f'{msg_type} status:\nstate: {state}')
        
        elif msg_type == 'safety_service':
            state = data.get('state')
            time_taken = data.get('time')
            log.info(f"{msg_type} status: \nstate: {state}, \ntime: {time_taken}s")
            if state == 'error':
                log.error(f"Safety Service Error: {data.get('error_msg', 'Unknown error')}")
        
        elif msg_type == 'scan_service':
            state = data.get('state')
            if state == 'complete':
                objects = data.get('objects', [])
                log.info(f"Scan Complete - Objects detected: {len(objects)}")
                for obj in objects:
                    log.info(f"Object at {obj['center_angle']}°, "
                            f"width: {obj['width']}mm, "
                            f"distance: {obj['distance']}mm, "
                            f"size: {obj['max_size']}mm")
    except Exception as e:
        log.error(f'error process message: {e}')

VEX Message Processing

main.py
def process_raspberry_message(self, message:dict):
    if not message or 'type' not in message:
        return
        
    msg_type = message['type'].lower()
    msg_data = message.get('data', {})
        
    if msg_type == 'check_service':
        self.check_mode_active = True
        
    elif msg_type == 'safety_service':
        self.reset_safety_sequence()
        self.safety_mode_active = True
        
    elif msg_type == 'scan_service':
        self.reset_scan_sequence()
        self.scan_mode_active = True

Connection Teardown

Raspberry Pi Close

serial_com.py
def close(self):
    self._stop_event.set()
    if self._read_thread:
        self._read_thread.join(timeout=1.0)
    if self.serial_port and self.serial_port.is_open:
        self.serial_port.close()
        self.is_connected = False
        log.info('Serial connection closed')
1

Signal Thread Stop

Set stop event to terminate read loop
2

Join Thread

Wait up to 1 second for thread to finish
3

Close Port

Close serial connection and update status

VEX Close

main.py
def close(self):
    if self.serial_port:
        self.serial_port.close()

Complete Usage Example

Raspberry Pi Client

import time
import logging as log
from serial_com import CommunicationManager

log.basicConfig(level=log.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

serial_manager = CommunicationManager(port='/dev/ttyUSB0', baudrate=115200)

try:
    if serial_manager.connect():
        log.info("Connected to VEX Brain")
        
        # Send check service request
        serial_manager.send_message('check_service', {})
        time.sleep(2)
        
        # Send safety service request
        serial_manager.send_message('safety_service', {})
        time.sleep(5)
        
        # Send scan service request
        serial_manager.send_message('scan_service', {})
        
        # Keep program running to receive messages
        while True:
            time.sleep(1)
            
except KeyboardInterrupt:
    log.info("Program terminated by user")
finally:
    serial_manager.close()

VEX Controller Main Loop

main.py
if __name__ == "__main__":
    arm_process = RoboticServices()
    arm_process.run()  # Initializes communication and starts main loop

Error Handling

try:
    serial_manager.send_message('check_service', {})
except Exception as e:
    log.error(f'Error sending message: {e}')

Best Practices

Connection Validation

Always check connection status before sending messages

Message Validation

Verify ‘type’ field exists before processing messages

JSON Error Handling

Catch JSONDecodeError for malformed messages

Graceful Shutdown

Always call close() to properly terminate threads and connections

Troubleshooting

IssueCauseSolution
Connection refusedIncorrect portVerify port with ls /dev/tty* on Linux
Timeout errorsWrong baudrateEnsure both sides use 115200
Malformed messagesBuffer corruptionCheck message terminator is \n
No messages receivedThread not startedVerify _read_thread.start() was called
Ensure serial port has correct permissions: sudo chmod 666 /dev/ttyUSB0 on Linux

Arm Controller

Service message handling in RoboticServices

Safety System

Safety service message flow

Overview

System architecture

Build docs developers (and LLMs) love