Skip to main content

Overview

The VEX CommunicationManager class handles serial communication on the VEX brain side, receiving commands from the Raspberry Pi and sending status updates. Source: arm_system/vex_brain/src/main.py:26

Class Definition

CommunicationManager

class CommunicationManager:
    def __init__(self)
Initializes the VEX communication manager with buffer and message delimiter.

Attributes

serial_port
file
Serial port file handle (/dev/serial1)
buffer
bytearray
Byte buffer for incoming message assembly
message_end
bytes
Message delimiter (newline character: b’\n’)

Methods

initialize

def initialize(self)
Opens the serial port for communication. Port: /dev/serial1 (read/write binary mode) Raises:
  • Exception - If serial port cannot be opened
Source: arm_system/vex_brain/src/main.py:32

Example

comm = CommunicationManager()
try:
    comm.initialize()
    print("Serial port opened successfully")
except Exception as e:
    print(f"Failed to open serial port: {e}")

read_message

def read_message(self) -> dict
Reads and parses incoming JSON messages from serial port. Process:
  1. Reads one byte at a time
  2. Accumulates bytes in buffer until newline delimiter
  3. Decodes buffer as UTF-8 string
  4. Parses JSON message
  5. Returns parsed dictionary
message
dict
Parsed JSON message as dictionary, or None if:
  • No complete message available
  • JSON parsing fails
Message Format:
{
  "type": "message_type",
  "data": {}
}
Source: arm_system/vex_brain/src/main.py:38

Example

while True:
    msg = comm.read_message()
    if msg:
        print(f"Received: {msg['type']}")
        print(f"Data: {msg['data']}")

send_message

def send_message(self, msg_type: str, data: dict) -> bool
Sends JSON-encoded message over serial connection.
msg_type
str
required
Message type identifier:
  • check_service - System check response
  • safety_service - Safety protocol status
  • scan_service - Scan progress/completion
  • pick_service - Pick movement status
  • place_service - Place movement status
  • current_angles - Joint angle data
  • error - Error notification
data
dict
required
Message payload data
success
bool
Always returns True after sending
Message Encoding:
  1. Creates dictionary with ‘type’ and ‘data’ keys
  2. Serializes to JSON
  3. Encodes as UTF-8 bytes
  4. Appends newline delimiter
  5. Writes to serial port
Source: arm_system/vex_brain/src/main.py:51

Example

# Send scan detection
comm.send_message('scan_service', {
    'state': 'detected',
    'angle': 90.0,
    'distance': 150,
    'size': 1000
})

# Send movement completion
comm.send_message('pick_service', {
    'joint': 'base',
    'state': 'completed',
    'target_angle': 90,
    'actual_angle': 89.5,
    'accuracy': 0.5
})

# Send error
comm.send_message('error', {
    'msg': 'Sensor not installed'
})

Message Types

Incoming Messages (Raspberry Pi → VEX)

TypeData FieldsDescription
check_service{}Request system check
safety_service{}Request safety protocol
scan_servicespeed: intStart environment scan
pick_servicejoint, angle, distance, action, speedExecute pick movement
place_servicejoint, angle, distance, action, speedExecute place movement

Outgoing Messages (VEX → Raspberry Pi)

TypeData FieldsDescription
check_servicestate: strSystem check result
safety_servicestate: strSafety protocol status
scan_servicestate, angle, distance, size, objectsScan progress/completion
pick_servicejoint, state, target_angle, actual_anglePick movement status
place_servicejoint, statePlace movement status
current_angles{joint: angle, ...}Current joint angles
errormsg: strError message

Integration with RoboticServices

class RoboticServices:
    def __init__(self):
        self.comms = CommunicationManager()
        # ... other components
    
    def run(self):
        self.comms.initialize()
        
        while True:
            msg = self.comms.read_message()
            if msg:
                self.process_message(msg)
                # Execute services based on message

Complete Example

from vex import *
import json

class CommunicationManager:
    def __init__(self):
        self.serial_port = None
        self.buffer = bytearray()
        self.message_end = b'\n'
    
    def initialize(self):
        self.serial_port = open('/dev/serial1', 'rb+')
    
    def read_message(self):
        char = self.serial_port.read(1)
        if char == self.message_end:
            message = self.buffer.decode()
            self.buffer = bytearray()
            try:
                return json.loads(message)
            except json.JSONDecodeError:
                return None
        else:
            self.buffer.extend(char)
        return None
    
    def send_message(self, msg_type: str, data: dict):
        message = {
            'type': msg_type,
            'data': data,
        }
        encoded_message = json.dumps(message).encode() + self.message_end
        self.serial_port.write(encoded_message)
        return True

# Usage
comm = CommunicationManager()
comm.initialize()

# Main loop
while True:
    msg = comm.read_message()
    if msg:
        msg_type = msg['type']
        data = msg['data']
        
        if msg_type == 'scan_service':
            # Execute scan
            comm.send_message('scan_service', {
                'state': 'complete',
                'objects': []
            })

Error Handling

  • Initialization Failure: Raises exception if serial port cannot be opened
  • JSON Decode Error: Returns None on malformed JSON
  • Buffer Management: Automatically resets buffer after message parsing

Build docs developers (and LLMs) love