Skip to main content

Overview

The CommunicationManager class handles all serial communication between the Raspberry Pi and the VEX brain using a JSON-based message protocol over UART.
Messages are sent and received as JSON objects, making the protocol extensible and easy to debug.

Message Structure

All messages follow this JSON structure:
{
  "type": "message_type",
  "data": {
    // Message-specific payload
  }
}
Messages are terminated with a newline character (\n) for frame delimitation.

Setup

1

Initialize the Communication Manager

from arm_system.communication.serial_manager import CommunicationManager

# Basic initialization
comm = CommunicationManager(
    port='/dev/ttyACM1',
    baudrate=115200,
    camera_index=0
)
Parameters:
  • port: Serial port device path (default: /dev/ttyACM1)
  • baudrate: Communication speed (default: 115200)
  • camera_index: Camera device index (default: 0)
2

Establish Connection

if comm.connect():
    print("Connected successfully!")
else:
    print("Connection failed")
The connect() method:
  1. Opens the serial port with timeout settings
  2. Starts a background thread for reading messages
  3. Returns True on success, False on failure

Sending Messages

Basic Send

Use the send_message() method to send commands:
# Send a message
success = comm.send_message('check_service', {})

if success:
    print("Message sent successfully")

Message Types and Examples

# Verify robot system status
comm.send_message('check_service', {})

# Expected response:
# {
#   "type": "check_service",
#   "data": {
#     "state": "ready"
#   }
# }

Receiving Messages

Message Reading Thread

The communication manager automatically reads incoming messages in a background thread:
# From serial_manager.py:112-130
def _read_loop(self):
    """Thread loop for read messages from VEX"""
    while not self._stop_event.is_set():
        if self.serial_port and self.serial_port.in_waiting:
            try:
                char = self.serial_port.read()
                if char == self.message_end:  # '\n'
                    message = self.buffer.decode()
                    self.buffer = bytearray()
                    try:
                        data = json.loads(message)
                        self._process_message(data)
                    except json.JSONDecodeError:
                        log.error(f'error message decode: {message}')
                else:
                    self.buffer.extend(char)
            except Exception as e:
                log.error(f'error read serial port: {e}')
        time.sleep(0.01)

Callback Registration

1

Define Your Callback Function

def my_scan_callback(data: dict):
    """Handle scan data"""
    angle = data.get('angle', 0)
    distance = data.get('distance', 0)
    obj_class = data.get('class', 'unknown')
    
    print(f"Object detected at {angle}° and {distance}mm: {obj_class}")
2

Register the Callback

# Register callback for scan service messages
comm.register_callback('scan_service', my_scan_callback)
The callback will be invoked automatically when messages of type scan_service are received.
3

Unregister When Done

# Remove callback by setting to None
comm.register_callback('scan_service', None)

Real-World Callback Example

From the Robot class implementation:
class Robot:
    def __init__(self):
        self.serial_manager = CommunicationManager(port='/dev/ttyACM1', baudrate=115200)
        
        # Register callback for scan results
        self.scan_results = []
        self.serial_manager.register_callback('scan_service', self._scan_callback)
    
    def _scan_callback(self, data):
        """Process incoming scan data"""
        if data.get('class'):
            self._update_object_registry(data)
    
    def _update_object_registry(self, data: dict):
        """Add detected object to registry"""
        self.scan_results.append({
            'position': {
                'angle': data.get('angle', 0),
                'distance': data.get('distance', 0)
            },
            'detection':{
                'class': data.get('class', 'default'),
                'confidence': data.get('confidence', 0.0),
                'image': data.get('image_path', '')
            },
            'placement_zone': self._get_placement_zones(data.get('class', 'default'))
        })

Synchronous Waiting

Wait for Movement Confirmation

# Send movement command
comm.send_message('pick_service', {
    'joint': 'base',
    'angle': 90,
    'speed': 30
})

# Wait for confirmation (up to 30 seconds)
if comm.wait_for_confirmation('base', timeout=30.0):
    print("Movement completed successfully")
else:
    print("Movement failed or timed out")
Implementation details:
# From serial_manager.py:210-224
def wait_for_confirmation(self, joint: str, timeout: float = 30.0) -> bool:
    """Wait for movement confirmed"""
    start_time = time.time()
    self.movement_event.clear()
    
    while (time.time() - start_time) < timeout:
        if self.movement_status.get(joint, {}).get('state') == 'completed':
            return True
        if self.movement_status.get(joint, {}).get('state') == 'error':
            log.error(f'error in movement')
            return False
        self.movement_event.wait(0.1)
    
    log.warning(f"timeout, waiting movement of: {joint}")
    return False

Wait for Scan Completion

# Start scan
comm.send_message('scan_service', {'speed': 20})

# Wait for scan to complete (up to 60 seconds)
if comm.scan_complete_event.wait(timeout=60):
    comm.scan_complete_event.clear()
    print("Scan completed!")
else:
    print("Scan timeout")

Wait for Angle Response

# Request current angles
comm.send_message('get_angles', {})

# Wait for response
if comm.wait_for_angles_response(timeout=5.0):
    angles = comm.current_angles
    print(f"Base: {angles['base']}°")
    print(f"Arm: {angles['arm']}°")
    print(f"Gripper: {angles['gripper']}°")
else:
    print("Timeout waiting for angles")

Advanced Usage

Custom Message Processing

The _process_message() method handles all incoming messages:
# From serial_manager.py:132-172
def _process_message(self, message: dict):
    """Process message from VEX"""
    try:
        msg_type = message.get('type', '').lower()
        data = message.get('data', {})

        if msg_type == 'check_service':
            state = data.get('state')
            log.info(f'{msg_type} status:\nstate: {state}')

        elif msg_type == 'safety_service':
            self.safety_status = message.get('data', {})
            state = data.get('state')
            time_taken = data.get('time')
            log.info(f"{msg_type} status: \nstate: {state}, \ntime: {time_taken}s")

        elif msg_type == 'scan_service':
            state = data.get('state')
            if state == 'detected':
                # Trigger async object detection
                Thread(target=self._handle_object_detection, args=(data,)).start()
                
            elif state == 'complete':
                self.scan_complete_event.set()
        
        elif msg_type in ('pick_service', 'place_service'):
            joint = data.get('joint')
            self.movement_status[joint] = data
            self.movement_event.set()
            
        elif msg_type == 'current_angles':
            self.current_angles = message.get('data', {})
            self.angles_event.set()
            
    except Exception as e:
        log.error(f'error process message: {e}')

Object Detection Integration

The communication manager automatically triggers object detection when objects are detected during scanning:
# From serial_manager.py:174-202
def _handle_object_detection(self, data: dict):
    """Object detect in real time"""
    try:
        # 1. Capture image from camera
        img_path = self.camera.capture_image()
        if not img_path:
            log.error("camera could not be captured")
            return
        
        # 2. Run YOLO detection
        image, yolo_result = self.object_detect_model.read_image_path(
            img_path, 
            draw_results=True, 
            save_drawn_img=True
        )
        if yolo_result is None:
            log.info("no detections.")
            return
        
        # 3. Update data with detection results
        data.update({
            'class': yolo_result['class'],
            'confidence': yolo_result['confidence'],
            'timestamp': time.time(),
            'image_path': img_path
        })
        
        # 4. Notify via callback
        if self.callbacks.get('scan_service'):
            self.callbacks['scan_service'](data)
            
    except Exception as e:
        log.error(f"error in object detection: {str(e)}")

Complete Example

Here’s a complete example demonstrating sending messages and handling responses:
example.py
import time
from arm_system.communication.serial_manager import CommunicationManager

def scan_callback(data: dict):
    """Handle scan results"""
    if data.get('class'):
        print(f"Detected: {data['class']} at {data['angle']}° with confidence {data['confidence']:.2f}")

# Initialize
comm = CommunicationManager(port='/dev/ttyACM1', baudrate=115200)

if not comm.connect():
    print("Failed to connect")
    exit(1)

print("Connected!")

# Check system
comm.send_message('check_service', {})
time.sleep(1)

# Register callback
comm.register_callback('scan_service', scan_callback)

# Start scan
print("Starting scan...")
comm.send_message('scan_service', {'speed': 20})

# Wait for completion
if comm.scan_complete_event.wait(timeout=60):
    comm.scan_complete_event.clear()
    print("Scan complete!")
else:
    print("Scan timeout")

# Cleanup
comm.register_callback('scan_service', None)
comm.close()

Troubleshooting

Error: error message decode: <malformed_json>Solutions:
  • Check VEX brain firmware is sending valid JSON
  • Verify message termination with \n
  • Ensure baudrate matches on both sides (115200)
  • Check for buffer overflow issues
Issue: Callback function never gets calledSolutions:
  • Verify callback is registered before sending message
  • Check message type matches exactly (case-sensitive)
  • Ensure read thread is running (check is_connected)
  • Add logging to _process_message() to debug
Issue: wait_for_confirmation() always times outSolutions:
  • Verify VEX brain is responding
  • Check that response message type matches
  • Increase timeout value
  • Check movement_status dictionary is being updated
  • Test with simpler commands first (check_service)

Next Steps

Vision & Inference

Learn how object detection integrates with serial communication

Pick and Place

Build complete motion sequences with serial commands

Communication API

Full API reference for CommunicationManager

Message Protocol

Understand the complete message protocol

Build docs developers (and LLMs) love