Overview
The CommunicationManager class handles bidirectional serial communication between the Raspberry Pi host and the VEX IQ2 controller using a JSON-based message protocol over UART.
Raspberry Pi Implementation : examples/serial_com.py:11
VEX Controller Implementation : arm_controller/src/main.py:31
CommunicationManager Class
Raspberry Pi Implementation
class CommunicationManager :
def __init__ ( self , port : str = 'COM7' , baudrate : int = 115200 ):
self .port = port
self .baudrate = baudrate
self .message_end = b ' \n '
self .serial_port = None
self .is_connected = False
self ._read_thread = None
self .buffer = bytearray ()
self ._stop_event = Event()
VEX Controller Implementation
class CommunicationManager :
def __init__ ( self ):
self .serial_port = None
self .buffer = bytearray ()
self .message_end = b ' \n '
The VEX implementation uses a simplified synchronous design, while the Raspberry Pi uses threading for non-blocking communication.
Connection Setup
Raspberry Pi Connection
def connect ( self ) -> bool :
if self .is_connected:
return True
try :
self .serial_port = serial.Serial(
port = self .port,
baudrate = self .baudrate,
timeout = 10 ,
write_timeout = 10
)
self .is_connected = True
# Start read loop in background thread
self ._stop_event.clear()
self ._read_thread = Thread( target = self ._read_loop)
self ._read_thread.daemon = True
self ._read_thread.start()
return True
except Exception as e:
log.error( f 'Error connecting to serial port: { str (e) } ' )
return False
Connection Parameters :
Port : COM7 (Windows) or /dev/ttyUSB0 (Linux)
Baudrate : 115200
Timeout : 10 seconds
Write Timeout : 10 seconds
Initialize Serial Port
Open serial connection with specified parameters
Start Read Thread
Launch background thread for continuous message reception
Verify Connection
Return connection status for error handling
VEX Controller Connection
def initialize ( self ):
try :
self .serial_port = open ( '/dev/serial1' , 'rb+' )
except :
raise Exception ( 'serial port not available' )
VEX controller uses fixed serial port /dev/serial1. Ensure this port is enabled in VEX brain settings.
JSON Message Protocol
All messages follow this standard JSON structure:
{
"type" : "message_type" ,
"data" : {
// Message-specific data
}
}
Message Terminator : Newline character (\n)
Message Types
Check Service
Request (Raspberry Pi → VEX):
{
"type" : "check_service" ,
"data" : {}
}
Response (VEX → Raspberry Pi):
// Success
{
"type" : "check_service" ,
"data" : {
"state" : "approved"
}
}
// Failure
{
"type" : "check_service" ,
"data" : {
"error" : "Sensors or motors not installed"
}
}
Safety Service
Request (Raspberry Pi → VEX):
{
"type" : "safety_service" ,
"data" : {}
}
Response (VEX → Raspberry Pi):
{
"type" : "safety_service" ,
"data" : {
"state" : "approved"
}
}
Scan Service
Request (Raspberry Pi → VEX):
{
"type" : "scan_service" ,
"data" : {}
}
Response (VEX → Raspberry Pi):
{
"type" : "scan_service" ,
"data" : {
"state" : "complete" ,
"objects" : [
{
"center_angle" : 45.5 ,
"width" : 12.3 ,
"distance" : 150 ,
"max_size" : 85
},
{
"center_angle" : 180.0 ,
"width" : 8.7 ,
"distance" : 200 ,
"max_size" : 65
}
]
}
}
Object Data Fields :
center_angle: Angular position (0-360°)
width: Angular width of object (degrees)
distance: Distance from sensor (millimeters)
max_size: Maximum detected size value (raw sensor units)
Send Operations
Raspberry Pi Send
def send_message ( self , message_type : str , data : dict ) -> bool :
if not self .is_connected or not self .serial_port:
log.error( "Error: Puerto serial no inicializado" )
return False
try :
message = {
'type' : message_type,
'data' : data,
}
encoded_message = json.dumps(message).encode() + self .message_end
log.info( f 'send message: { encoded_message } ' )
self .serial_port.write(encoded_message)
return True
except Exception as e:
print ( f "Error enviando mensaje: { e } " )
return False
Usage Example :
serial_manager = CommunicationManager( port = '/dev/ttyUSB0' )
if serial_manager.connect():
# Send check service request
serial_manager.send_message( 'check_service' , {})
# Send scan service request
serial_manager.send_message( 'scan_service' , {})
VEX Send
def send_message ( self , message_type : str , data : dict ) -> bool :
if not self .serial_port:
return False
message = {
'type' : message_type,
'data' : data,
}
encoded_message = json.dumps(message).encode() + self .message_end
self .serial_port.write(encoded_message)
return True
Usage in RoboticServices :
# Send check service response
data = { 'state' : 'approved' }
self .communication.send_message( 'check_service' , data)
# Send scan results
data = { 'state' : 'complete' , 'objects' : objects_map}
self .communication.send_message( 'scan_service' , data)
Receive Operations
Raspberry Pi Receive (Threaded)
def _read_loop ( self ):
"""Thread loop for read messages from VEX"""
while not self ._stop_event.is_set():
if self .serial_port and self .serial_port.in_waiting:
try :
char = self .serial_port.read()
if char == self .message_end:
message = self .buffer.decode()
self .buffer = bytearray ()
try :
data = json.loads(message)
self ._process_message(data)
except json.JSONDecodeError:
log.error( f 'error message decode: { message } ' )
else :
self .buffer.extend(char)
except Exception as e:
log.error( f 'error read serial port: { e } ' )
time.sleep( 0.01 )
The read loop runs in a daemon thread with 10ms polling interval to balance responsiveness and CPU usage.
VEX Receive (Synchronous)
def read_message ( self ):
char = self .serial_port.read( 1 )
if char == self .message_end:
message = self .buffer.decode()
self .buffer = bytearray ()
try :
return json.loads(message)
except json.JSONDecodeError:
return None
else :
self .buffer.extend(char)
return None
Usage in RoboticServices Main Loop :
while True :
message = self .communication.read_message()
if message:
self .sensor_module.print_screen( "rx: {} " .format(message), 1 , 15 )
self .process_raspberry_message(message)
Message Processing
Raspberry Pi Message Processing
def _process_message ( self , message : dict ):
try :
msg_type = message.get( 'type' , '' ).lower()
data = message.get( 'data' , {})
if msg_type == 'check_service' :
state = data.get( 'state' )
log.info( f ' { msg_type } status: \n state: { state } ' )
elif msg_type == 'safety_service' :
state = data.get( 'state' )
time_taken = data.get( 'time' )
log.info( f " { msg_type } status: \n state: { state } , \n time: { time_taken } s" )
if state == 'error' :
log.error( f "Safety Service Error: { data.get( 'error_msg' , 'Unknown error' ) } " )
elif msg_type == 'scan_service' :
state = data.get( 'state' )
if state == 'complete' :
objects = data.get( 'objects' , [])
log.info( f "Scan Complete - Objects detected: { len (objects) } " )
for obj in objects:
log.info( f "Object at { obj[ 'center_angle' ] } °, "
f "width: { obj[ 'width' ] } mm, "
f "distance: { obj[ 'distance' ] } mm, "
f "size: { obj[ 'max_size' ] } mm" )
except Exception as e:
log.error( f 'error process message: { e } ' )
VEX Message Processing
def process_raspberry_message ( self , message : dict ):
if not message or 'type' not in message:
return
msg_type = message[ 'type' ].lower()
msg_data = message.get( 'data' , {})
if msg_type == 'check_service' :
self .check_mode_active = True
elif msg_type == 'safety_service' :
self .reset_safety_sequence()
self .safety_mode_active = True
elif msg_type == 'scan_service' :
self .reset_scan_sequence()
self .scan_mode_active = True
Connection Teardown
Raspberry Pi Close
def close ( self ):
self ._stop_event.set()
if self ._read_thread:
self ._read_thread.join( timeout = 1.0 )
if self .serial_port and self .serial_port.is_open:
self .serial_port.close()
self .is_connected = False
log.info( 'Serial connection closed' )
Signal Thread Stop
Set stop event to terminate read loop
Join Thread
Wait up to 1 second for thread to finish
Close Port
Close serial connection and update status
VEX Close
def close ( self ):
if self .serial_port:
self .serial_port.close()
Complete Usage Example
Raspberry Pi Client
import time
import logging as log
from serial_com import CommunicationManager
log.basicConfig( level = log. INFO , format = " %(asctime)s - %(levelname)s - %(message)s " )
serial_manager = CommunicationManager( port = '/dev/ttyUSB0' , baudrate = 115200 )
try :
if serial_manager.connect():
log.info( "Connected to VEX Brain" )
# Send check service request
serial_manager.send_message( 'check_service' , {})
time.sleep( 2 )
# Send safety service request
serial_manager.send_message( 'safety_service' , {})
time.sleep( 5 )
# Send scan service request
serial_manager.send_message( 'scan_service' , {})
# Keep program running to receive messages
while True :
time.sleep( 1 )
except KeyboardInterrupt :
log.info( "Program terminated by user" )
finally :
serial_manager.close()
VEX Controller Main Loop
if __name__ == "__main__" :
arm_process = RoboticServices()
arm_process.run() # Initializes communication and starts main loop
Error Handling
Raspberry Pi
VEX Controller
try :
serial_manager.send_message( 'check_service' , {})
except Exception as e:
log.error( f 'Error sending message: { e } ' )
Best Practices
Connection Validation Always check connection status before sending messages
Message Validation Verify ‘type’ field exists before processing messages
JSON Error Handling Catch JSONDecodeError for malformed messages
Graceful Shutdown Always call close() to properly terminate threads and connections
Troubleshooting
Issue Cause Solution Connection refused Incorrect port Verify port with ls /dev/tty* on Linux Timeout errors Wrong baudrate Ensure both sides use 115200 Malformed messages Buffer corruption Check message terminator is \n No messages received Thread not started Verify _read_thread.start() was called
Ensure serial port has correct permissions: sudo chmod 666 /dev/ttyUSB0 on Linux
Arm Controller Service message handling in RoboticServices
Safety System Safety service message flow
Overview System architecture