Now that you understand basic serial communication, it’s time to add structure. Raw byte streams are hard to work with — JSON provides a human-readable, extensible format for robot commands and status messages.
if serial_manager.connect(): log.info("Connected to VEX Brain") # Send test message message_type = 'test_service' data = {'state': 'successfully'} serial_manager.writing_data(message_type, data)
Why append newline?The receiving side accumulates bytes until it sees \n, then knows the message is complete. This is the framing protocol from the previous lesson.
Recall from the serial protocol lesson that we accumulate bytes in a buffer. Once we have a complete message (delimiter found), we decode it:Fromjson_data.py:50-68:
def _read_loop(self): """Thread loop for read messages from VEX""" while not self._stop_event.is_set(): if self.com and self.com.in_waiting: try: char = self.com.read() # Check for complete message if char == self.message_end: message = self.buffer.decode() # bytes → string self.buffer = bytearray() # Reset try: data = json.loads(message) # string → dict self._process_message(data) # Handle it except json.JSONDecodeError: log.error(f'error message decode: {message}') else: self.buffer.extend(char) except Exception as e: log.error(f'error read serial port: {e}') time.sleep(0.01)
def _process_message(self, message: dict): """process message from VEX""" try: msg_type = message.get('type', '').lower() data = message.get('data', {}) if msg_type == 'test_service': state = data.get('state') log.info(f'{msg_type} status:\nstate: {state}') # Add more message type handlers here # elif msg_type == 'status': # ... except Exception as e: log.error(f'error process message: {e}')
Pattern:
Extract type field (case-insensitive)
Extract data payload
Route based on type using if/elif
Use .get() with defaults for safe field access
Safe Field Accessmessage.get('type', '') returns empty string if ‘type’ is missing, preventing KeyError crashes. Always use .get() with defaults when parsing untrusted input.
MESSAGE_SCHEMAS = { 'move_arm': ['target_x', 'target_y', 'speed'], 'detection': ['class', 'confidence', 'bbox'], 'status': ['state', 'position'],}def validate_message(msg_type: str, data: dict) -> bool: if msg_type not in MESSAGE_SCHEMAS: log.error(f'Unknown message type: {msg_type}') return False required_fields = MESSAGE_SCHEMAS[msg_type] for field in required_fields: if field not in data: log.error(f'Missing field {field} in {msg_type}') return False return True
Usage:
def _process_message(self, message: dict): msg_type = message.get('type', '') data = message.get('data', {}) if not validate_message(msg_type, data): return # Skip invalid messages # Process valid message if msg_type == 'move_arm': # ...
def validate_detection_message(data: dict) -> bool: # Check required fields exist if not all(k in data for k in ['class', 'confidence', 'bbox']): return False # Check types if not isinstance(data['class'], str): return False if not isinstance(data['confidence'], (int, float)): return False if not isinstance(data['bbox'], list) or len(data['bbox']) != 4: return False # Check ranges if not 0.0 <= data['confidence'] <= 1.0: return False return True
Fail GracefullyInvalid messages should be logged and skipped, not crash the system. Robot operation must continue even if one message is malformed.
try: data = json.loads(message)except json.JSONDecodeError as e: log.error(f'JSON parse error at position {e.pos}: {e.msg}') log.error(f'Invalid message: {message}') return # Continue with next message
msg_type = message.get('type')if not msg_type: log.error(f'Message missing type field: {message}') returndata = message.get('data')if data is None: log.error(f'Message missing data field: {message}') return
You now have structured communication working! The next lesson brings it all together with a complete Raspberry Pi implementation including connection management and real-world usage.
Raspberry Pi Integration
Complete implementation guide for Raspberry Pi serial communication
Course Repository: Full code in course/comm_class/
raspberry_comm/json_data.py: Complete Raspberry Pi implementation (117 lines)