Overview
The Control Module orchestrates robotic arm movements through the Robot class in main.py. It manages scanning, object selection, and executing complex pick-and-place sequences with built-in safety mechanisms.
Robot Class
Initialization (main.py:8-22)
class Robot :
def __init__ ( self ):
self .serial_manager = CommunicationManager(
port = '/dev/ttyACM1' ,
baudrate = 115200
)
# Register scan data callback
self .scan_results = []
self .serial_manager.register_callback(
'scan_service' ,
self ._scan_callback
)
# Define placement zones
self .placement_zones = {
'apple' : { 'angle' : 90 , 'distance' : 200 },
'orange' : { 'angle' : 180 , 'distance' : 200 },
'bottle' : { 'angle' : 45 , 'distance' : 200 },
'default' : { 'angle' : 270 , 'distance' : 200 },
}
Placement Zone Configuration
Each object class has a designated placement zone:
Object Angle Distance (mm) Apple 90° 200 Orange 180° 200 Bottle 45° 200 Default 270° 200
Angles are measured from the base reference position (0°), and distances are in millimeters from the robot base.
Scanning Operations
Scan Command Handler (main.py:58-74)
def handle_scan_command ( self ):
"""Scan command"""
self .scan_results = []
self .serial_manager.register_callback(
'scan_service' ,
self ._scan_callback
)
self .serial_manager.send_message(
'scan_service' ,
{ 'speed' : 20 }
)
log.info( "Scanning in progress..." )
if self .serial_manager.scan_complete_event.wait( timeout = 60 ):
self .serial_manager.scan_complete_event.clear()
self .process_scan_results()
else :
log.error( "Scanning timeout" )
self .serial_manager.register_callback( 'scan_service' , None )
Scan Callback (main.py:76-78)
def _scan_callback ( self , data ):
if data.get( 'class' ):
self ._update_object_registry(data)
Object Registry Update (main.py:80-96)
def _update_object_registry ( self , data : dict ):
"""Update object registry"""
try :
self .scan_results.append({
'position' : {
'angle' : data.get( 'angle' , 0 ),
'distance' : data.get( 'distance' , 0 )
},
'detection' : {
'class' : data.get( 'class' , 'default' ),
'confidence' : data.get( 'confidence' , 0.0 ),
'image' : data.get( 'image_path' , '' )
},
'placement_zone' : self ._get_placement_zones(
data.get( 'class' , 'default' )
)
})
except Exception as e:
log.error( f "Error updating registry: { str (e) } " )
Scan Results Processing (main.py:102-129)
def process_scan_results ( self ):
"""Process scan data"""
if not self .scan_results:
log.warning( "Scanning completed without object detection" )
return
log.info( f " \n === Objects scanned: ( { len ( self .scan_results) } ) ===" )
processed_list = []
for i, obj in enumerate ( self .scan_results, start = 1 ):
angle = obj[ 'position' ][ 'angle' ]
distance = obj[ 'position' ][ 'distance' ]
obj_class = obj[ 'detection' ][ 'class' ]
confidence = obj[ 'detection' ][ 'confidence' ]
zone = obj[ 'placement_zone' ]
item = {
'index' : i,
'center_angle' : angle,
'distance' : distance,
'class' : obj_class,
'confidence' : confidence,
'placement_zone' : zone
}
processed_list.append(item)
log.info(
f "Obj { i } -> angle: { angle } °, "
f "distance: { distance } mm, "
f "class: { obj_class } , "
f "conf: { confidence :.2f} "
)
self .scan_results = processed_list
Pick and Place Operations
Complete Sequence Flow
Pick and Place Handler (main.py:132-149)
def handle_pick_place_command ( self ):
"""Pick & place command"""
if not self .scan_results:
log.warning( "1. First scan the environment (option 'n')" )
return
selected_object = self .select_object_interactively()
if not selected_object:
return
log.info( f " \n Init pick & place to object: { selected_object[ 'index' ] } :" )
log.info( f "Angle: { selected_object[ 'center_angle' ] } °" )
log.info( f "Distance: { selected_object[ 'distance' ] } mm" )
if self .execute_pick_sequence(selected_object):
log.info( f "Pick completed!" )
if self .execute_place_sequence(selected_object):
log.info( f "Pick and place completed!" )
Object Selection (main.py:151-169)
def select_object_interactively ( self ):
"""Interface for object selection"""
print ( " \n === OBJECTS DETECTED LIST ===" )
for o in self .scan_results:
i = o[ 'index' ]
print (
f "[ { i } ] angle= { o[ 'center_angle' ] } ° "
f "dist= { o[ 'distance' ] } mm "
f "class= { o[ 'class' ] } "
f "conf= { o[ 'confidence' ] :.2f} "
)
print ( "[0] Cancel" )
try :
selection = int ( input ( " \n Select the object to pick: " ))
if selection == 0 :
print ( "Operation canceled" )
return {}
return next (
(x for x in self .scan_results if x[ 'index' ] == selection),
{}
)
except ValueError :
print ( "Invalid input" )
return {}
Movement Sequences
Pick Sequence (main.py:171-183)
def execute_pick_sequence ( self , target_object : dict ) -> bool :
try :
plan = [
{
'joint' : 'base' ,
'angle' : target_object[ 'center_angle' ],
'speed' : 30
},
{
'joint' : 'arm' ,
'distance' : target_object[ 'distance' ],
'action' : 'pick'
},
{ 'joint' : 'gripper' , 'action' : 'close' },
{
'joint' : 'arm' ,
'distance' : target_object[ 'distance' ],
'action' : 'up'
},
]
self .execute_movement( 'pick_service' , plan)
return True
except Exception as e:
log.error( f "Error in pick sequence: { e } " )
return False
Pick Sequence Steps:
Base rotation : Align with object angle
Arm extension : Reach object distance
Gripper close : Grasp object
Arm retraction : Lift object up
Place Sequence (main.py:185-200)
def execute_place_sequence ( self , target_object : dict ):
"""Execute object placement"""
try :
zone_params = target_object[ 'placement_zone' ]
movement_plan = [
{
'joint' : 'base' ,
'angle' : zone_params[ 'angle' ],
'speed' : 30
},
{
'joint' : 'arm' ,
'distance' : zone_params[ 'distance' ],
'action' : 'place'
},
{ 'joint' : 'gripper' , 'action' : 'open' },
{
'joint' : 'arm' ,
'distance' : target_object[ 'distance' ],
'action' : 'up'
},
{ 'joint' : 'base' , 'angle' : 0 , 'speed' : 60 }, # Return to base
]
self .execute_movement( 'place_service' , movement_plan)
return True
except Exception as e:
log.error( f "Error in place sequence: { e } " )
return False
Place Sequence Steps:
Base rotation : Turn to placement zone angle
Arm extension : Reach placement distance
Gripper open : Release object
Arm retraction : Lift up
Base return : Return to 0° home position
Movement Visualizations
Movement Execution
Execute Movement (main.py:217-242)
def execute_movement ( self , message_type : str , movement_sequence : list ):
"""Send commands to arm"""
log.info( " \n Execution movements:" )
for move in movement_sequence:
try :
joint = move[ 'joint' ]
log.info( f "Movement: { joint } " )
# Reset state
self .serial_manager.movement_status[joint] = {
'state' : 'pending'
}
# Send command
self .serial_manager.send_message(message_type, move)
# Wait for confirmation
if not self .serial_manager.wait_for_confirmation(joint):
raise Exception ( f "Movement failed in: { joint } " )
# Log success
log.info( f "-> Movement { joint } completed!" )
except Exception as e:
log.error( f 'Error in movement: { str (e) } ' )
self .handle_movement_failure()
raise
Each movement waits for confirmation from the VEX Brain before proceeding to the next step, ensuring sequential execution.
Safety Protocols
Movement Failure Handler (main.py:244-258)
def handle_movement_failure ( self ):
"""Handles faults in the motion sequence"""
log.error( 'Executing security protocol' )
self .serial_manager.send_message( 'safety_service' , {})
start_time = time.time()
while (time.time() - start_time) < 15 :
if self .serial_manager.safety_status.get( 'state' ) == 'completed' :
log.info( "System safety" )
return
time.sleep( 0.5 )
log.error( "Error in safety service" )
self .serial_manager.close()
exit ( 1 )
Safety Flow
If the safety protocol times out after 15 seconds, the system performs an emergency shutdown and exits to prevent unsafe operation.
Angle Retrieval
Get Current Angles (main.py:203-215)
def get_current_angles ( self ) -> dict :
"""Get current angles"""
try :
self .serial_manager.send_message( 'get_angles' , {})
if self .serial_manager.wait_for_angles_response( timeout = 5 ):
return self .serial_manager.current_angles
else :
raise Exception ( 'Timeout waiting for angles' )
except Exception as e:
log.error( f "Error getting angles: { e } " )
return None
Main Execution Loop
Main Menu (main.py:25-55)
def main_menu_loop ( self ):
running = True
while running:
print ( " \n === Main menu ===" )
print ( " [c] check service" )
print ( " [s] safety service" )
print ( " [n] scan service" )
print ( " [p] pick & place service" )
print ( " [q] exit" )
user_input = input ( "Input command: " ).strip().lower()
if user_input == 'c' :
self .serial_manager.send_message( 'check_service' , {})
elif user_input == 's' :
self .serial_manager.send_message( 'safety_service' , {})
elif user_input == 'n' :
self .handle_scan_command()
elif user_input == 'p' :
self .handle_pick_place_command()
elif user_input == 'q' :
running = False
else :
print ( "Command unrecognized" )
time.sleep( 0.5 )
Application Entry Point (main.py:261-276)
def run ( self ):
try :
if self .serial_manager.connect():
log.info( "Connected to VEX brain" )
self .main_menu_loop()
except KeyboardInterrupt :
log.info( "Program interrupted by user." )
finally :
log.info( "Closing serial connection." )
self .serial_manager.close()
if __name__ == '__main__' :
robot = Robot()
robot.run()
Data Structures
Object Registry Entry
{
'index' : 1 ,
'center_angle' : 45 ,
'distance' : 150 ,
'class' : 'apple' ,
'confidence' : 0.87 ,
'placement_zone' : {
'angle' : 90 ,
'distance' : 200
}
}
Movement Command
{
'joint' : 'base' , # or 'arm', 'gripper'
'angle' : 90 , # for rotation joints
'distance' : 200 , # for linear joints
'action' : 'pick' , # or 'place', 'up', 'close', 'open'
'speed' : 30 # movement speed
}
Error Handling Strategies
Timeout Protection
if self .serial_manager.scan_complete_event.wait( timeout = 60 ):
# Process results
else :
log.error( "Scanning timeout" )
Exception Recovery
try :
self .execute_movement( 'pick_service' , plan)
return True
except Exception as e:
log.error( f "Error in pick sequence: { e } " )
return False
State Validation
if not self .scan_results:
log.warning( "1. First scan the environment (option 'n')" )
return
Best Practices
Sequential Movement Execution
Movements are executed sequentially with confirmation at each step. This ensures the robot completes one action before starting the next.
State Reset
self .serial_manager.movement_status[joint] = { 'state' : 'pending' }
Always reset state before sending a new command.
Resource Cleanup
finally :
log.info( "Closing serial connection." )
self .serial_manager.close()
Ensure proper cleanup even if errors occur.
Complete Example
Full pick-and-place workflow:
# 1. Initialize robot
robot = Robot()
# 2. Connect to VEX
robot.serial_manager.connect()
# 3. Scan environment
robot.handle_scan_command()
# -> Robot rotates and detects objects
# -> Camera captures images
# -> YOLO identifies objects
# -> Registry updated
# 4. Select object
selected = robot.select_object_interactively()
# User selects object #1
# 5. Execute pick sequence
robot.execute_pick_sequence(selected)
# -> Rotate base to object
# -> Extend arm
# -> Close gripper
# -> Lift up
# 6. Execute place sequence
robot.execute_place_sequence(selected)
# -> Rotate to placement zone
# -> Extend arm
# -> Open gripper
# -> Retract
# -> Return to home
# 7. Cleanup
robot.serial_manager.close()
Next Steps
Communication Understand how commands are sent to the VEX Brain
Perception Learn how objects are detected during scanning