Skip to main content

Overview

The CommunicationManager class handles all serial communication between the Raspberry Pi and VEX brain, including message passing, callback registration, and real-time object detection coordination. Source: arm_system/communication/serial_manager.py:16

Class Definition

CommunicationManager

class CommunicationManager:
    def __init__(self, port: str='/dev/ttyACM1', baudrate: int = 115200, camera_index: int = 0)
Initializes the communication manager with serial port configuration and camera setup.
port
str
default:"/dev/ttyACM1"
Serial port device path for VEX brain connection
baudrate
int
default:"115200"
Serial communication baud rate
camera_index
int
default:"0"
Camera device index for image capture

Attributes

serial_port
serial.Serial
PySerial connection object
is_connected
bool
Connection status flag
scan_complete_event
threading.Event
Event signaled when scanning is complete
movement_event
threading.Event
Event signaled when movement commands are received
angles_event
threading.Event
Event signaled when angle data is received
callbacks
dict
Dictionary mapping message types to callback functions
movement_status
dict
Dictionary tracking status of each joint movement
current_angles
dict
Dictionary storing current joint angles
camera
CameraManager
Camera manager instance for image capture
object_detect_model
ImageProcessor
Image processor with YOLO detection model (confidence threshold: 0.45)

Methods

connect

def connect(self) -> bool
Establishes serial connection to VEX brain and starts background read thread.
success
bool
True if connection successful, False otherwise
Configuration:
  • Timeout: 10 seconds
  • Write timeout: 10 seconds
  • Background thread: Daemon mode
Source: arm_system/communication/serial_manager.py:52

close

def close(self)
Closes serial connection and stops background read thread. Process:
  1. Sets stop event
  2. Joins read thread (1.0s timeout)
  3. Closes serial port
  4. Updates connection status
Source: arm_system/communication/serial_manager.py:76

send_message

def send_message(self, message_type: str, data: dict) -> bool
Sends JSON-encoded message to VEX brain over serial connection.
message_type
str
required
Message type identifier:
  • check_service - Verify sensors and motors
  • safety_service - Execute safety protocols
  • scan_service - Start environment scan
  • pick_service - Execute pick movement
  • place_service - Execute place movement
  • get_angles - Request current joint angles
data
dict
required
Message payload data
success
bool
True if message sent successfully, False otherwise
Message Format:
{
  "type": "message_type",
  "data": {}
}
Source: arm_system/communication/serial_manager.py:87

Example

comm = CommunicationManager()
comm.connect()
comm.send_message('scan_service', {'speed': 20})

register_callback

def register_callback(self, message_type: str, callback: callable)
Registers a callback function for specific message types.
message_type
str
required
Message type to listen for (e.g., ‘scan_service’)
callback
callable
required
Function to call when message is received. Set to None to unregister.
Callback Signature:
def callback(data: dict):
    # data contains message payload
    pass
Source: arm_system/communication/serial_manager.py:109

Example

def scan_callback(data):
    print(f"Object detected at {data['angle']}°")

comm.register_callback('scan_service', scan_callback)

wait_for_confirmation

def wait_for_confirmation(self, joint: str, timeout: float = 30.0) -> bool
Waits for movement confirmation from VEX brain.
joint
str
required
Joint name to wait for: ‘base’, ‘arm’, or ‘gripper’
timeout
float
default:"30.0"
Maximum wait time in seconds
success
bool
True if movement completed successfully, False if error or timeout
Source: arm_system/communication/serial_manager.py:210

wait_for_angles_response

def wait_for_angles_response(self, timeout: float = 5.0) -> bool
Waits for angle data response from VEX brain.
timeout
float
default:"5.0"
Maximum wait time in seconds
success
bool
True if angles received, False if timeout
Source: arm_system/communication/serial_manager.py:226

Threading & Events

The CommunicationManager uses threading for non-blocking communication:

Background Read Thread

Continuously reads from serial port and processes incoming messages. Source: arm_system/communication/serial_manager.py:112

Event System

  • scan_complete_event: Set when scan finishes
  • movement_event: Set when movement status updates
  • angles_event: Set when angle data received

Message Processing

Incoming messages are automatically processed based on type:

Supported Message Types

TypeDescriptionEvents Triggered
check_serviceSystem check statusNone
safety_serviceSafety protocol statusUpdates safety_status
scan_serviceScan progress/detectionscan_complete_event, launches detection thread
pick_servicePick movement statusmovement_event
place_servicePlace movement statusmovement_event
current_anglesJoint angle dataangles_event
Source: arm_system/communication/serial_manager.py:132

Object Detection Integration

When a scan detection occurs, the CommunicationManager:
  1. Captures image via CameraManager
  2. Runs YOLO inference via ImageProcessor
  3. Updates detection data with class, confidence, timestamp, and image path
  4. Invokes registered callback with complete data
Source: arm_system/communication/serial_manager.py:174

Example Usage

from arm_system.communication.serial_manager import CommunicationManager

# Initialize
comm = CommunicationManager(port='/dev/ttyACM1', baudrate=115200)

# Connect
if comm.connect():
    # Register callback
    def on_detection(data):
        print(f"Detected {data['class']} at {data['angle']}°")
    
    comm.register_callback('scan_service', on_detection)
    
    # Send scan command
    comm.send_message('scan_service', {'speed': 20})
    
    # Wait for completion
    if comm.scan_complete_event.wait(timeout=60):
        print("Scan complete!")
    
    # Cleanup
    comm.close()

Build docs developers (and LLMs) love