Skip to main content

Overview

The CommunicationManager class provides bidirectional serial communication between the Raspberry Pi controller and the VEX Brain. It implements a JSON-based messaging protocol with threaded message reading and automatic message parsing. Source: ~/workspace/source/examples/serial_com.py:11
This is the Raspberry Pi implementation. The VEX Brain uses a simplified version without threading.

Constructor

__init__(port: str = 'COM7', baudrate: int = 115200)

Initializes the communication manager with serial port configuration.
comm = CommunicationManager(port='/dev/ttyUSB0', baudrate=115200)
port
str
default:"COM7"
Serial port identifier. Common values:
  • Windows: 'COM3', 'COM7', etc.
  • Linux: '/dev/ttyUSB0', '/dev/ttyACM0'
  • macOS: '/dev/cu.usbserial-*'
baudrate
int
default:"115200"
Communication speed in bits per second. Must match VEX Brain configuration. Standard values: 9600, 19200, 38400, 57600, 115200
Internal Attributes:
  • message_end: Line terminator (b'\n')
  • serial_port: PySerial Serial object
  • is_connected: Connection state flag
  • _read_thread: Background thread for receiving messages
  • buffer: Byte buffer for incomplete messages
  • _stop_event: Threading event for graceful shutdown
Source: serial_com.py:12

Connection Methods

connect()

Establishes serial connection and starts background message reading thread.
if comm.connect():
    print("Connected to VEX Brain")
else:
    print("Connection failed")
return
bool
Returns True if connection succeeds, False on error
Process:
  1. Checks if already connected (idempotent)
  2. Opens serial port with PySerial
  3. Configures timeouts (10 seconds for read/write)
  4. Starts daemon thread for continuous message reading
  5. Sets is_connected flag
Thread Configuration:
  • Daemon thread (exits when main program exits)
  • Runs _read_loop() method continuously
  • Automatically processes incoming messages
Error Handling:
  • Logs connection errors
  • Returns False without raising exceptions
  • Connection state remains False
Source: serial_com.py:24

Messaging Methods

send_message(message_type: str, data: dict)

Sends a JSON-formatted message to the VEX Brain.
success = comm.send_message('scan_service', {'data': 'None'})
message_type
str
required
Message type identifier. Valid types:
  • 'check_service': Hardware verification request
  • 'safety_service': Safety sequence initiation
  • 'scan_service': Environmental scan request
data
dict
required
Message payload dictionary. Can be empty {} or contain command-specific data.
return
bool
Returns True if message sent successfully, False on error
Message Format:
{
  "type": "scan_service",
  "data": {}
}
The message is encoded as JSON, converted to bytes, and terminated with newline (\n). Example Messages:
comm.send_message('check_service', {})
# Sends: {"type": "check_service", "data": {}}\n
Source: serial_com.py:49

Thread Management

_read_loop() (Internal)

Background thread method that continuously reads and processes incoming serial data.
# Called automatically by connect()
# Do not call directly
Process:
  1. Checks for available data in serial buffer
  2. Reads one byte at a time
  3. Accumulates bytes in buffer until newline
  4. Decodes complete message as JSON
  5. Calls _process_message() for handling
  6. Sleeps 10ms between iterations
Error Handling:
  • Catches JSON decode errors
  • Logs malformed messages
  • Continues operation on errors
  • Stops when _stop_event is set
Source: serial_com.py:72

_process_message(message: dict) (Internal)

Parses and handles incoming messages from VEX Brain.
# Called automatically by _read_loop()
# Do not call directly
Supported Response Types: Source: serial_com.py:92

Connection Cleanup

close()

Gracefully closes the serial connection and stops background threads.
comm.close()
Process:
  1. Sets stop event to terminate read thread
  2. Waits up to 1 second for thread to finish
  3. Closes serial port
  4. Updates connection state
  5. Logs closure confirmation
Usage:
  • Call in finally block for guaranteed cleanup
  • Safe to call multiple times
  • Prevents resource leaks
Source: serial_com.py:127

Message Protocol Specification

Message Structure

All messages follow a consistent JSON structure:
{
  "type": "<message_type>",
  "data": {<payload>}
}
Encoding:
  • JSON format (UTF-8)
  • Newline terminated (\n)
  • No additional framing or checksums
Size Limits:
  • Practical limit: ~1KB per message
  • VEX Brain buffer constraints
  • Large object arrays may be split

Command Messages (Raspberry Pi → VEX)

check_service
object
Request hardware verification
{"type": "check_service", "data": {}}
safety_service
object
Initiate safety check sequence
{"type": "safety_service", "data": {}}
scan_service
object
Start environmental scan
{"type": "scan_service", "data": {}}

Response Messages (VEX → Raspberry Pi)

All responses include state in the data field:
  • "approved": Operation completed successfully
  • "complete": Multi-step operation finished
  • "in_progress": Operation ongoing
  • "error": Operation failed

Complete Usage Example

import time
import logging as log
from serial_com import CommunicationManager

log.basicConfig(level=log.INFO)

# Initialize communication manager
comm = CommunicationManager(port='/dev/ttyUSB0', baudrate=115200)

try:
    # Establish connection
    if comm.connect():
        log.info("Connected to VEX Brain")
        
        # Check hardware
        comm.send_message('check_service', {})
        time.sleep(1)
        
        # Run safety checks
        comm.send_message('safety_service', {})
        time.sleep(5)
        
        # Perform scan
        comm.send_message('scan_service', {})
        
        # Keep program running to receive responses
        while True:
            time.sleep(1)
            
    else:
        log.error("Failed to connect")
        
except KeyboardInterrupt:
    log.info("Program terminated by user")
    
finally:
    # Always clean up
    comm.close()

VEX Brain Implementation

The VEX Brain uses a simplified synchronous version:
# VEX Brain implementation (from main.py)
class CommunicationManager:
    def __init__(self):
        self.serial_port = None
        self.buffer = bytearray()
        self.message_end = b'\n'
        
    def initialize(self):
        self.serial_port = open('/dev/serial1', 'rb+')
        
    def read_message(self):
        char = self.serial_port.read(1)
        if char == self.message_end:
            message = self.buffer.decode()
            self.buffer = bytearray()
            return json.loads(message)
        else:
            self.buffer.extend(char)
        return None
        
    def send_message(self, message_type: str, data: dict):
        message = {'type': message_type, 'data': data}
        encoded = json.dumps(message).encode() + self.message_end
        self.serial_port.write(encoded)
Key Differences:
  • No threading (VEX OS limitations)
  • Synchronous read_message() called in main loop
  • Opens /dev/serial1 directly
  • No PySerial dependency

Build docs developers (and LLMs) love