Skip to main content

Overview

The Control Module orchestrates robotic arm movements through the Robot class in main.py. It manages scanning, object selection, and executing complex pick-and-place sequences with built-in safety mechanisms.

Robot Class

Initialization (main.py:8-22)

class Robot:
    def __init__(self):
        self.serial_manager = CommunicationManager(
            port='/dev/ttyACM1', 
            baudrate=115200
        )
        
        # Register scan data callback
        self.scan_results = []
        self.serial_manager.register_callback(
            'scan_service', 
            self._scan_callback
        )
        
        # Define placement zones
        self.placement_zones = {
            'apple': {'angle': 90, 'distance': 200},
            'orange': {'angle': 180, 'distance': 200},
            'bottle': {'angle': 45, 'distance': 200},
            'default': {'angle': 270, 'distance': 200},
        }

Placement Zone Configuration

Each object class has a designated placement zone:
ObjectAngleDistance (mm)
Apple90°200
Orange180°200
Bottle45°200
Default270°200
Angles are measured from the base reference position (0°), and distances are in millimeters from the robot base.

Scanning Operations

Scan Command Handler (main.py:58-74)

def handle_scan_command(self):
    """Scan command"""
    self.scan_results = []
    self.serial_manager.register_callback(
        'scan_service', 
        self._scan_callback
    )
    
    self.serial_manager.send_message(
        'scan_service', 
        {'speed': 20}
    )
    
    log.info("Scanning in progress...")
    
    if self.serial_manager.scan_complete_event.wait(timeout=60):
        self.serial_manager.scan_complete_event.clear()
        self.process_scan_results()
    else:
        log.error("Scanning timeout")
    
    self.serial_manager.register_callback('scan_service', None)

Scan Callback (main.py:76-78)

def _scan_callback(self, data):
    if data.get('class'):
        self._update_object_registry(data)

Object Registry Update (main.py:80-96)

def _update_object_registry(self, data: dict):
    """Update object registry"""
    try:
        self.scan_results.append({
            'position': {
                'angle': data.get('angle', 0),
                'distance': data.get('distance', 0)
            },
            'detection': {
                'class': data.get('class', 'default'),
                'confidence': data.get('confidence', 0.0),
                'image': data.get('image_path', '')
            },
            'placement_zone': self._get_placement_zones(
                data.get('class', 'default')
            )
        })
    except Exception as e:
        log.error(f"Error updating registry: {str(e)}")

Scan Results Processing (main.py:102-129)

def process_scan_results(self):
    """Process scan data"""
    if not self.scan_results:
        log.warning("Scanning completed without object detection")
        return
        
    log.info(f"\n=== Objects scanned: ({len(self.scan_results)}) ===")
    processed_list = []
    for i, obj in enumerate(self.scan_results, start=1):
        angle = obj['position']['angle']
        distance = obj['position']['distance']
        obj_class = obj['detection']['class']
        confidence = obj['detection']['confidence']
        zone = obj['placement_zone']

        item = {
            'index': i,
            'center_angle': angle,
            'distance': distance,
            'class': obj_class,
            'confidence': confidence,
            'placement_zone': zone
        }
        processed_list.append(item)

        log.info(
            f"Obj {i} -> angle: {angle}°, "
            f"distance: {distance}mm, "
            f"class: {obj_class}, "
            f"conf: {confidence:.2f}"
        )

    self.scan_results = processed_list

Pick and Place Operations

Complete Sequence Flow

Pick and Place Handler (main.py:132-149)

def handle_pick_place_command(self):
    """Pick & place command"""
    if not self.scan_results:
        log.warning("1. First scan the environment (option 'n')")
        return

    selected_object = self.select_object_interactively()
    if not selected_object:
        return

    log.info(f"\nInit pick & place to object: {selected_object['index']}:")
    log.info(f"Angle: {selected_object['center_angle']}°")
    log.info(f"Distance: {selected_object['distance']} mm")
    
    if self.execute_pick_sequence(selected_object):
        log.info(f"Pick completed!")
        if self.execute_place_sequence(selected_object):
            log.info(f"Pick and place completed!")

Object Selection (main.py:151-169)

def select_object_interactively(self):
    """Interface for object selection"""
    print("\n=== OBJECTS DETECTED LIST ===")
    for o in self.scan_results:
        i = o['index']
        print(
            f"[{i}] angle={o['center_angle']}° "
            f"dist={o['distance']}mm "
            f"class={o['class']} "
            f"conf={o['confidence']:.2f}"
        )
    print("[0] Cancel")
    
    try:
        selection = int(input("\nSelect the object to pick: "))
        if selection == 0:
            print("Operation canceled")
            return {}
        
        return next(
            (x for x in self.scan_results if x['index'] == selection), 
            {}
        )
    except ValueError:
        print("Invalid input")
        return {}

Movement Sequences

Pick Sequence (main.py:171-183)

def execute_pick_sequence(self, target_object: dict) -> bool:
    try:
        plan = [
            {
                'joint': 'base', 
                'angle': target_object['center_angle'], 
                'speed': 30
            },
            {
                'joint': 'arm', 
                'distance': target_object['distance'], 
                'action': 'pick'
            },
            {'joint': 'gripper', 'action': 'close'},
            {
                'joint': 'arm', 
                'distance': target_object['distance'], 
                'action': 'up'
            },
        ]
        self.execute_movement('pick_service', plan)
        return True
    except Exception as e:
        log.error(f"Error in pick sequence: {e}")
        return False

Pick Sequence Steps:

  1. Base rotation: Align with object angle
  2. Arm extension: Reach object distance
  3. Gripper close: Grasp object
  4. Arm retraction: Lift object up

Place Sequence (main.py:185-200)

def execute_place_sequence(self, target_object: dict):
    """Execute object placement"""
    try:
        zone_params = target_object['placement_zone']
        movement_plan = [
            {
                'joint': 'base', 
                'angle': zone_params['angle'], 
                'speed': 30
            },
            {
                'joint': 'arm', 
                'distance': zone_params['distance'], 
                'action': 'place'
            },
            {'joint': 'gripper', 'action': 'open'},
            {
                'joint': 'arm', 
                'distance': target_object['distance'], 
                'action': 'up'
            },
            {'joint': 'base', 'angle': 0, 'speed': 60},  # Return to base
        ]
        self.execute_movement('place_service', movement_plan)
        return True
    except Exception as e:
        log.error(f"Error in place sequence: {e}")
        return False

Place Sequence Steps:

  1. Base rotation: Turn to placement zone angle
  2. Arm extension: Reach placement distance
  3. Gripper open: Release object
  4. Arm retraction: Lift up
  5. Base return: Return to 0° home position

Movement Visualizations

Movement Execution

Execute Movement (main.py:217-242)

def execute_movement(self, message_type: str, movement_sequence: list):
    """Send commands to arm"""
    log.info("\nExecution movements:")
    
    for move in movement_sequence:
        try:
            joint = move['joint']
            log.info(f"Movement: {joint}")
            
            # Reset state
            self.serial_manager.movement_status[joint] = {
                'state': 'pending'
            }
            
            # Send command
            self.serial_manager.send_message(message_type, move)
            
            # Wait for confirmation
            if not self.serial_manager.wait_for_confirmation(joint):
                raise Exception(f"Movement failed in: {joint}")
            
            # Log success
            log.info(f"-> Movement {joint} completed!")
                
        except Exception as e:
            log.error(f'Error in movement: {str(e)}')
            self.handle_movement_failure()
            raise
Each movement waits for confirmation from the VEX Brain before proceeding to the next step, ensuring sequential execution.

Safety Protocols

Movement Failure Handler (main.py:244-258)

def handle_movement_failure(self):
    """Handles faults in the motion sequence"""
    log.error('Executing security protocol')
    self.serial_manager.send_message('safety_service', {})
    
    start_time = time.time()
    while (time.time() - start_time) < 15:
        if self.serial_manager.safety_status.get('state') == 'completed':
            log.info("System safety")
            return
        time.sleep(0.5)
        
    log.error("Error in safety service")
    self.serial_manager.close()
    exit(1)

Safety Flow

If the safety protocol times out after 15 seconds, the system performs an emergency shutdown and exits to prevent unsafe operation.

Angle Retrieval

Get Current Angles (main.py:203-215)

def get_current_angles(self) -> dict:
    """Get current angles"""
    try:
        self.serial_manager.send_message('get_angles', {})
        
        if self.serial_manager.wait_for_angles_response(timeout=5):
            return self.serial_manager.current_angles
        else:
            raise Exception('Timeout waiting for angles')
        
    except Exception as e:
        log.error(f"Error getting angles: {e}")
        return None

Main Execution Loop

def main_menu_loop(self):
    running = True
    while running:
        print("\n=== Main menu ===")
        print(" [c] check service")
        print(" [s] safety service")
        print(" [n] scan service")
        print(" [p] pick & place service")
        print(" [q] exit")
        
        user_input = input("Input command: ").strip().lower()

        if user_input == 'c':
            self.serial_manager.send_message('check_service', {})
            
        elif user_input == 's':
            self.serial_manager.send_message('safety_service', {})
            
        elif user_input == 'n':
            self.handle_scan_command()
            
        elif user_input == 'p':
            self.handle_pick_place_command()
            
        elif user_input == 'q':
            running = False
            
        else:
            print("Command unrecognized")
        
        time.sleep(0.5)

Application Entry Point (main.py:261-276)

def run(self):
    try:
        if self.serial_manager.connect():
            log.info("Connected to VEX brain")
            self.main_menu_loop()
            
    except KeyboardInterrupt:
        log.info("Program interrupted by user.")
    finally:
        log.info("Closing serial connection.")
        self.serial_manager.close()


if __name__ == '__main__':
    robot = Robot()
    robot.run()

Data Structures

Object Registry Entry

{
    'index': 1,
    'center_angle': 45,
    'distance': 150,
    'class': 'apple',
    'confidence': 0.87,
    'placement_zone': {
        'angle': 90,
        'distance': 200
    }
}

Movement Command

{
    'joint': 'base',      # or 'arm', 'gripper'
    'angle': 90,          # for rotation joints
    'distance': 200,      # for linear joints
    'action': 'pick',     # or 'place', 'up', 'close', 'open'
    'speed': 30           # movement speed
}

Error Handling Strategies

Timeout Protection

if self.serial_manager.scan_complete_event.wait(timeout=60):
    # Process results
else:
    log.error("Scanning timeout")

Exception Recovery

try:
    self.execute_movement('pick_service', plan)
    return True
except Exception as e:
    log.error(f"Error in pick sequence: {e}")
    return False

State Validation

if not self.scan_results:
    log.warning("1. First scan the environment (option 'n')")
    return

Best Practices

Sequential Movement Execution

Movements are executed sequentially with confirmation at each step. This ensures the robot completes one action before starting the next.

State Reset

self.serial_manager.movement_status[joint] = {'state': 'pending'}
Always reset state before sending a new command.

Resource Cleanup

finally:
    log.info("Closing serial connection.")
    self.serial_manager.close()
Ensure proper cleanup even if errors occur.

Complete Example

Full pick-and-place workflow:
# 1. Initialize robot
robot = Robot()

# 2. Connect to VEX
robot.serial_manager.connect()

# 3. Scan environment
robot.handle_scan_command()
# -> Robot rotates and detects objects
# -> Camera captures images
# -> YOLO identifies objects
# -> Registry updated

# 4. Select object
selected = robot.select_object_interactively()
# User selects object #1

# 5. Execute pick sequence
robot.execute_pick_sequence(selected)
# -> Rotate base to object
# -> Extend arm
# -> Close gripper
# -> Lift up

# 6. Execute place sequence
robot.execute_place_sequence(selected)
# -> Rotate to placement zone
# -> Extend arm
# -> Open gripper
# -> Retract
# -> Return to home

# 7. Cleanup
robot.serial_manager.close()

Next Steps

Communication

Understand how commands are sent to the VEX Brain

Perception

Learn how objects are detected during scanning

Build docs developers (and LLMs) love