Overview
The CommunicationManager class handles all serial communication between the Raspberry Pi and the VEX brain using a JSON-based message protocol over UART.
Messages are sent and received as JSON objects, making the protocol extensible and easy to debug.
Message Structure
All messages follow this JSON structure:
{
"type" : "message_type" ,
"data" : {
// Message-specific payload
}
}
Messages are terminated with a newline character (\n) for frame delimitation.
Setup
Initialize the Communication Manager
from arm_system.communication.serial_manager import CommunicationManager
# Basic initialization
comm = CommunicationManager(
port = '/dev/ttyACM1' ,
baudrate = 115200 ,
camera_index = 0
)
Parameters:
port: Serial port device path (default: /dev/ttyACM1)
baudrate: Communication speed (default: 115200)
camera_index: Camera device index (default: 0)
Establish Connection
if comm.connect():
print ( "Connected successfully!" )
else :
print ( "Connection failed" )
The connect() method:
Opens the serial port with timeout settings
Starts a background thread for reading messages
Returns True on success, False on failure
Sending Messages
Basic Send
Use the send_message() method to send commands:
# Send a message
success = comm.send_message( 'check_service' , {})
if success:
print ( "Message sent successfully" )
Message Types and Examples
Check Service
Safety Service
Scan Service
Pick Service
Place Service
Get Angles
# Verify robot system status
comm.send_message( 'check_service' , {})
# Expected response:
# {
# "type": "check_service",
# "data": {
# "state": "ready"
# }
# }
Receiving Messages
Message Reading Thread
The communication manager automatically reads incoming messages in a background thread:
# From serial_manager.py:112-130
def _read_loop ( self ):
"""Thread loop for read messages from VEX"""
while not self ._stop_event.is_set():
if self .serial_port and self .serial_port.in_waiting:
try :
char = self .serial_port.read()
if char == self .message_end: # '\n'
message = self .buffer.decode()
self .buffer = bytearray ()
try :
data = json.loads(message)
self ._process_message(data)
except json.JSONDecodeError:
log.error( f 'error message decode: { message } ' )
else :
self .buffer.extend(char)
except Exception as e:
log.error( f 'error read serial port: { e } ' )
time.sleep( 0.01 )
Callback Registration
Define Your Callback Function
def my_scan_callback ( data : dict ):
"""Handle scan data"""
angle = data.get( 'angle' , 0 )
distance = data.get( 'distance' , 0 )
obj_class = data.get( 'class' , 'unknown' )
print ( f "Object detected at { angle } ° and { distance } mm: { obj_class } " )
Register the Callback
# Register callback for scan service messages
comm.register_callback( 'scan_service' , my_scan_callback)
The callback will be invoked automatically when messages of type scan_service are received.
Unregister When Done
# Remove callback by setting to None
comm.register_callback( 'scan_service' , None )
Real-World Callback Example
From the Robot class implementation:
class Robot :
def __init__ ( self ):
self .serial_manager = CommunicationManager( port = '/dev/ttyACM1' , baudrate = 115200 )
# Register callback for scan results
self .scan_results = []
self .serial_manager.register_callback( 'scan_service' , self ._scan_callback)
def _scan_callback ( self , data ):
"""Process incoming scan data"""
if data.get( 'class' ):
self ._update_object_registry(data)
def _update_object_registry ( self , data : dict ):
"""Add detected object to registry"""
self .scan_results.append({
'position' : {
'angle' : data.get( 'angle' , 0 ),
'distance' : data.get( 'distance' , 0 )
},
'detection' :{
'class' : data.get( 'class' , 'default' ),
'confidence' : data.get( 'confidence' , 0.0 ),
'image' : data.get( 'image_path' , '' )
},
'placement_zone' : self ._get_placement_zones(data.get( 'class' , 'default' ))
})
Synchronous Waiting
Wait for Movement Confirmation
# Send movement command
comm.send_message( 'pick_service' , {
'joint' : 'base' ,
'angle' : 90 ,
'speed' : 30
})
# Wait for confirmation (up to 30 seconds)
if comm.wait_for_confirmation( 'base' , timeout = 30.0 ):
print ( "Movement completed successfully" )
else :
print ( "Movement failed or timed out" )
Implementation details:
# From serial_manager.py:210-224
def wait_for_confirmation ( self , joint : str , timeout : float = 30.0 ) -> bool :
"""Wait for movement confirmed"""
start_time = time.time()
self .movement_event.clear()
while (time.time() - start_time) < timeout:
if self .movement_status.get(joint, {}).get( 'state' ) == 'completed' :
return True
if self .movement_status.get(joint, {}).get( 'state' ) == 'error' :
log.error( f 'error in movement' )
return False
self .movement_event.wait( 0.1 )
log.warning( f "timeout, waiting movement of: { joint } " )
return False
Wait for Scan Completion
# Start scan
comm.send_message( 'scan_service' , { 'speed' : 20 })
# Wait for scan to complete (up to 60 seconds)
if comm.scan_complete_event.wait( timeout = 60 ):
comm.scan_complete_event.clear()
print ( "Scan completed!" )
else :
print ( "Scan timeout" )
Wait for Angle Response
# Request current angles
comm.send_message( 'get_angles' , {})
# Wait for response
if comm.wait_for_angles_response( timeout = 5.0 ):
angles = comm.current_angles
print ( f "Base: { angles[ 'base' ] } °" )
print ( f "Arm: { angles[ 'arm' ] } °" )
print ( f "Gripper: { angles[ 'gripper' ] } °" )
else :
print ( "Timeout waiting for angles" )
Advanced Usage
Custom Message Processing
The _process_message() method handles all incoming messages:
# From serial_manager.py:132-172
def _process_message ( self , message : dict ):
"""Process message from VEX"""
try :
msg_type = message.get( 'type' , '' ).lower()
data = message.get( 'data' , {})
if msg_type == 'check_service' :
state = data.get( 'state' )
log.info( f ' { msg_type } status: \n state: { state } ' )
elif msg_type == 'safety_service' :
self .safety_status = message.get( 'data' , {})
state = data.get( 'state' )
time_taken = data.get( 'time' )
log.info( f " { msg_type } status: \n state: { state } , \n time: { time_taken } s" )
elif msg_type == 'scan_service' :
state = data.get( 'state' )
if state == 'detected' :
# Trigger async object detection
Thread( target = self ._handle_object_detection, args = (data,)).start()
elif state == 'complete' :
self .scan_complete_event.set()
elif msg_type in ( 'pick_service' , 'place_service' ):
joint = data.get( 'joint' )
self .movement_status[joint] = data
self .movement_event.set()
elif msg_type == 'current_angles' :
self .current_angles = message.get( 'data' , {})
self .angles_event.set()
except Exception as e:
log.error( f 'error process message: { e } ' )
Object Detection Integration
The communication manager automatically triggers object detection when objects are detected during scanning:
# From serial_manager.py:174-202
def _handle_object_detection ( self , data : dict ):
"""Object detect in real time"""
try :
# 1. Capture image from camera
img_path = self .camera.capture_image()
if not img_path:
log.error( "camera could not be captured" )
return
# 2. Run YOLO detection
image, yolo_result = self .object_detect_model.read_image_path(
img_path,
draw_results = True ,
save_drawn_img = True
)
if yolo_result is None :
log.info( "no detections." )
return
# 3. Update data with detection results
data.update({
'class' : yolo_result[ 'class' ],
'confidence' : yolo_result[ 'confidence' ],
'timestamp' : time.time(),
'image_path' : img_path
})
# 4. Notify via callback
if self .callbacks.get( 'scan_service' ):
self .callbacks[ 'scan_service' ](data)
except Exception as e:
log.error( f "error in object detection: { str (e) } " )
Complete Example
Here’s a complete example demonstrating sending messages and handling responses:
import time
from arm_system.communication.serial_manager import CommunicationManager
def scan_callback ( data : dict ):
"""Handle scan results"""
if data.get( 'class' ):
print ( f "Detected: { data[ 'class' ] } at { data[ 'angle' ] } ° with confidence { data[ 'confidence' ] :.2f} " )
# Initialize
comm = CommunicationManager( port = '/dev/ttyACM1' , baudrate = 115200 )
if not comm.connect():
print ( "Failed to connect" )
exit ( 1 )
print ( "Connected!" )
# Check system
comm.send_message( 'check_service' , {})
time.sleep( 1 )
# Register callback
comm.register_callback( 'scan_service' , scan_callback)
# Start scan
print ( "Starting scan..." )
comm.send_message( 'scan_service' , { 'speed' : 20 })
# Wait for completion
if comm.scan_complete_event.wait( timeout = 60 ):
comm.scan_complete_event.clear()
print ( "Scan complete!" )
else :
print ( "Scan timeout" )
# Cleanup
comm.register_callback( 'scan_service' , None )
comm.close()
Troubleshooting
Error: error message decode: <malformed_json>Solutions:
Check VEX brain firmware is sending valid JSON
Verify message termination with \n
Ensure baudrate matches on both sides (115200)
Check for buffer overflow issues
Issue: Callback function never gets calledSolutions:
Verify callback is registered before sending message
Check message type matches exactly (case-sensitive)
Ensure read thread is running (check is_connected)
Add logging to _process_message() to debug
Timeout Waiting for Response
Issue: wait_for_confirmation() always times outSolutions:
Verify VEX brain is responding
Check that response message type matches
Increase timeout value
Check movement_status dictionary is being updated
Test with simpler commands first (check_service)
Next Steps
Vision & Inference Learn how object detection integrates with serial communication
Pick and Place Build complete motion sequences with serial commands
Communication API Full API reference for CommunicationManager
Message Protocol Understand the complete message protocol