Overview
The Communication Module (communication/serial_manager.py) provides the interface between the Python control system and the VEX Brain hardware. It manages serial communication, coordinates perception subsystems, and handles asynchronous message processing.
CommunicationManager Class
Initialization (serial_manager.py:16-50)
class CommunicationManager :
def __init__ ( self , port : str = '/dev/ttyACM1' ,
baudrate : int = 115200 ,
camera_index : int = 0 ):
"""
:param port: serial port
:param baudrate: baudrate
:param camera_index: camera index
"""
self .port = port
self .baudrate = baudrate
self .message_end = b ' \n '
self .serial_port: Optional[serial.Serial] = None
self .is_connected = False
# Threads and events
self ._read_thread = None
self ._stop_event = Event()
self .scan_complete_event = Event()
self .movement_event = Event()
self .angles_event = Event()
# Buffer for incoming data
self .buffer = bytearray ()
# Callback registry
self .callbacks = {}
# State tracking
self .movement_status: Dict[ str , Dict[ str , Any]] = {}
self .current_angles: Dict[ str , float ] = {}
self .safety_status: Dict[ str , Any] = {}
self .scan_data = None
# Perception integration
self .camera = CameraManager( camera_index = camera_index)
self .object_detect_model = ImageProcessor(
confidence_threshold = 0.45
)
Serial Communication
Connection Management
Establishing Connection (serial_manager.py:52-74)
def connect ( self ) -> bool :
"""Serial connection"""
if self .is_connected:
return True
try :
self .serial_port = serial.Serial(
port = self .port,
baudrate = self .baudrate,
timeout = 10 ,
write_timeout = 10
)
self .is_connected = True
# Start read loop in daemon thread
self ._stop_event.clear()
self ._read_thread = Thread(
target = self ._read_loop,
daemon = True
)
self ._read_thread.start()
return True
except Exception as e:
log.error( f 'Error connecting to serial port: { str (e) } ' )
return False
The connection automatically starts a daemon thread to continuously monitor incoming messages.
Closing Connection (serial_manager.py:76-85)
def close ( self ):
"""Close serial connection"""
self ._stop_event.set()
if self ._read_thread:
self ._read_thread.join( timeout = 1.0 )
if self .serial_port and self .serial_port.is_open:
self .serial_port.close()
self .is_connected = False
log.info( 'Serial connection closed' )
Message Protocol
All messages use a standardized JSON structure:
{
"type" : "scan_service" ,
"data" : {
"speed" : 20
}
}
Messages are terminated with a newline character (\n).
Sending Messages (serial_manager.py:87-107)
def send_message ( self , message_type : str , data : dict ) -> bool :
"""
Send JSON message to robot
:param message_type: 'check_service', 'safety_service', etc.
:param data: message data
"""
if not self .is_connected or not self .serial_port:
log.error( "Error: serial port not initialized" )
return False
try :
message = {
'type' : message_type,
'data' : data,
}
encoded_message = json.dumps(message).encode() + self .message_end
self .serial_port.write(encoded_message)
return True
except Exception as e:
print ( f "Error sending message: { e } " )
return False
Message Types
Type Direction Purpose check_serviceBidirectional System health check safety_serviceBidirectional Emergency stop and home position scan_serviceBidirectional Environment scanning with detection pick_serviceTo VEX → From VEX Pick object sequence place_serviceTo VEX → From VEX Place object sequence current_anglesFrom VEX Joint angle telemetry
Threading Architecture
Read Loop Thread (serial_manager.py:112-130)
def _read_loop ( self ):
"""Thread loop for reading messages from VEX"""
while not self ._stop_event.is_set():
if self .serial_port and self .serial_port.in_waiting:
try :
char = self .serial_port.read()
if char == self .message_end:
message = self .buffer.decode()
self .buffer = bytearray ()
try :
data = json.loads(message)
self ._process_message(data)
except json.JSONDecodeError:
log.error( f 'Error decoding message: { message } ' )
else :
self .buffer.extend(char)
except Exception as e:
log.error( f 'Error reading serial port: { e } ' )
time.sleep( 0.01 )
The read loop uses a character-by-character approach to handle incomplete messages and ensure proper message boundary detection.
Message Processing Flow
Message Processing (serial_manager.py:132-172)
def _process_message ( self , message : dict ):
"""Process message from VEX"""
try :
msg_type = message.get( 'type' , '' ).lower()
data = message.get( 'data' , {})
if msg_type == 'check_service' :
state = data.get( 'state' )
log.info( f ' { msg_type } status: \n state: { state } ' )
elif msg_type == 'safety_service' :
self .safety_status = message.get( 'data' , {})
state = data.get( 'state' )
time_taken = data.get( 'time' )
log.info( f " { msg_type } status: \n state: { state } , \n time: { time_taken } s" )
if state == 'error' :
log.error( f "Safety Service Error: { data.get( 'error_msg' , 'Unknown error' ) } " )
elif msg_type == 'scan_service' :
state = data.get( 'state' )
if state == 'detected' :
# Run detection in separate thread
Thread(
target = self ._handle_object_detection,
args = (data,)
).start()
log.info( f "Scan Data - Object Detected: "
f "Angle: { data[ 'angle' ] } ° "
f "Distance: { data[ 'distance' ] } mm" )
elif state == 'complete' :
self .scan_complete_event.set()
log.info( "Scan completed!" )
elif msg_type in ( 'pick_service' , 'place_service' ):
joint = data.get( 'joint' )
self .movement_status[joint] = data
self .movement_event.set()
elif msg_type == 'current_angles' :
self .current_angles = message.get( 'data' , {})
self .angles_event.set()
except Exception as e:
log.error( f 'Error processing message: { e } ' )
Callback System
Registration (serial_manager.py:109-110)
def register_callback ( self , message_type : str , callback : callable ):
self .callbacks[message_type] = callback
Usage Example (from main.py:14)
class Robot :
def __init__ ( self ):
self .serial_manager = CommunicationManager(
port = '/dev/ttyACM1' ,
baudrate = 115200
)
# Register callback for scan results
self .scan_results = []
self .serial_manager.register_callback(
'scan_service' ,
self ._scan_callback
)
Callbacks enable loose coupling between the communication layer and application logic.
Object Detection Integration
Detection Handler (serial_manager.py:174-202)
def _handle_object_detection ( self , data : dict ):
"""Object detection in real time"""
try :
# 1. Capture image
img_path = self .camera.capture_image()
if not img_path:
log.error( "Camera could not capture image" )
return
# 2. YOLO detection
image, yolo_result = self .object_detect_model.read_image_path(
img_path,
draw_results = True ,
save_drawn_img = True
)
if yolo_result is None :
log.info( "No detections." )
return
# 3. Update data
data.update({
'class' : yolo_result[ 'class' ],
'confidence' : yolo_result[ 'confidence' ],
'timestamp' : time.time(),
'image_path' : img_path
})
# 4. Notify the central system
if self .callbacks.get( 'scan_service' ):
self .callbacks[ 'scan_service' ](data)
except Exception as e:
log.error( f "Error in object detection: { str (e) } " )
Detection Pipeline
Synchronization Primitives
Event-Based Waiting
The module uses threading events for synchronization:
Movement Confirmation (serial_manager.py:210-224)
def wait_for_confirmation ( self , joint : str , timeout : float = 30.0 ) -> bool :
"""Wait for movement confirmed"""
start_time = time.time()
self .movement_event.clear()
while (time.time() - start_time) < timeout:
if self .movement_status.get(joint, {}).get( 'state' ) == 'completed' :
return True
if self .movement_status.get(joint, {}).get( 'state' ) == 'error' :
log.error( f 'Error in movement' )
return False
self .movement_event.wait( 0.1 )
log.warning( f "Timeout waiting for movement of: { joint } " )
return False
Angle Response (serial_manager.py:226-235)
def wait_for_angles_response ( self , timeout : float = 5.0 ) -> bool :
self .angles_event.clear()
start_time = time.time()
while (time.time() - start_time) < timeout:
if self .current_angles:
return True
self .angles_event.wait( 0.1 )
return False
All wait operations include timeout handling to prevent infinite blocking if the VEX Brain becomes unresponsive.
State Tracking
Movement Status
# Per-joint movement tracking
self .movement_status: Dict[ str , Dict[ str , Any]] = {
'base' : { 'state' : 'completed' },
'arm' : { 'state' : 'pending' },
'gripper' : { 'state' : 'error' }
}
Safety Status
self .safety_status: Dict[ str , Any] = {
'state' : 'completed' ,
'time' : 2.5
}
Current Angles
self .current_angles: Dict[ str , float ] = {
'base' : 90.0 ,
'shoulder' : 45.0 ,
'elbow' : 30.0
}
Best Practices
Thread Safety
The module uses events and atomic operations to ensure thread-safe communication between the read loop, detection threads, and the main application.
Error Handling
try :
# Attempt operation
self .serial_port.write(encoded_message)
return True
except Exception as e:
log.error( f "Error: { e } " )
return False
Resource Cleanup
def __del__ ( self ):
"""Ensure cleanup on deletion"""
self .close()
Usage Example
Complete example from main.py:
class Robot :
def __init__ ( self ):
# Initialize communication
self .serial_manager = CommunicationManager(
port = '/dev/ttyACM1' ,
baudrate = 115200
)
def handle_scan_command ( self ):
# Register callback
self .serial_manager.register_callback(
'scan_service' ,
self ._scan_callback
)
# Send scan command
self .serial_manager.send_message(
'scan_service' ,
{ 'speed' : 20 }
)
# Wait for completion
if self .serial_manager.scan_complete_event.wait( timeout = 60 ):
self .serial_manager.scan_complete_event.clear()
self .process_scan_results()
Next Steps
Perception Learn how images are captured and processed
Control Understand movement execution and sequencing