Skip to main content

JSON Message Format

Now that you understand basic serial communication, it’s time to add structure. Raw byte streams are hard to work with — JSON provides a human-readable, extensible format for robot commands and status messages.

Learning Objectives

By the end of this lesson, you will be able to:
  • Design JSON message schemas for robot control
  • Encode and decode messages reliably
  • Implement type-safe message handling
  • Create bidirectional command-response patterns
  • Handle message validation and errors
This lesson uses the complete implementation from course/comm_class/raspberry_comm/json_data.py

Why JSON?

Advantages

Human-Readable: Easy to debug and log
{"type": "move_arm", "data": {"x": 100, "y": 50}}
vs. binary:
\x01\x64\x00\x32
Flexible: Add new fields without breaking existing code
{"type": "detect", "data": {"class": "apple", "confidence": 0.95, "new_field": 42}}
Language-Agnostic: Python and C++ both have JSON libraries Structured: Type checking and validation

Disadvantages

  • Larger message size (more bytes than binary)
  • Slower parsing (negligible for robot control)
  • Requires proper escaping of special characters
For robot control at 10-100 Hz update rates, JSON overhead is negligible. The benefits far outweigh the costs.

Message Schema Design

Basic Structure

Every message follows this pattern:
{
  "type": "<message_type>",
  "data": {
    // Type-specific fields
  }
}
Fields:
  • type: String identifying the message purpose
  • data: Dictionary containing message payload

Example Message Types

Vision Detection:
{
  "type": "detection",
  "data": {
    "class": "apple",
    "confidence": 0.87,
    "bbox": [120, 80, 200, 160],
    "timestamp": 1678901234.56
  }
}
Robot Status:
{
  "type": "status",
  "data": {
    "state": "idle",
    "position": [45, 90, 120],
    "battery": 87
  }
}
Command:
{
  "type": "move_to",
  "data": {
    "x": 100,
    "y": 150,
    "z": 50,
    "speed": 50
  }
}
Define your message types early in development. Document each type’s data fields as part of your API contract between Raspberry Pi and VEX Brain.

Implementation: Sending Messages

Encoding and Transmission

From json_data.py:41-48:
def writing_data(self, message_type: str, data: dict):
    # 1. Structure the message
    message = {
        'type': message_type,
        'data': data,
    }
    
    # 2. Encode to JSON string, then bytes
    encoded_message = json.dumps(message).encode() + self.message_end
    
    # 3. Log for debugging
    log.info(f'send message: {encoded_message}')
    
    # 4. Transmit over serial
    self.com.write(encoded_message)
Step-by-Step:
  1. Structure: Create Python dict with type and data
  2. Serialize: json.dumps() converts dict → string
  3. Encode: .encode() converts string → bytes
  4. Terminate: Append newline delimiter (\n)
  5. Send: Write to serial port

Usage Example

From json_data.py:100-103:
if serial_manager.connect():
    log.info("Connected to VEX Brain")

    # Send test message
    message_type = 'test_service'
    data = {'state': 'successfully'}
    serial_manager.writing_data(message_type, data)
Transmitted Bytes:
{"type": "test_service", "data": {"state": "successfully"}}\n
Why append newline?The receiving side accumulates bytes until it sees \n, then knows the message is complete. This is the framing protocol from the previous lesson.

Implementation: Receiving Messages

Decoding and Processing

Recall from the serial protocol lesson that we accumulate bytes in a buffer. Once we have a complete message (delimiter found), we decode it: From json_data.py:50-68:
def _read_loop(self):
    """Thread loop for read messages from VEX"""
    while not self._stop_event.is_set():
        if self.com and self.com.in_waiting:
            try:
                char = self.com.read()
                
                # Check for complete message
                if char == self.message_end:
                    message = self.buffer.decode()  # bytes → string
                    self.buffer = bytearray()  # Reset
                    
                    try:
                        data = json.loads(message)  # string → dict
                        self._process_message(data)  # Handle it
                    except json.JSONDecodeError:
                        log.error(f'error message decode: {message}')
                else:
                    self.buffer.extend(char)
                    
            except Exception as e:
                log.error(f'error read serial port: {e}')
        time.sleep(0.01)
Decoding Steps:
  1. Receive bytes: Accumulate until \n
  2. Decode: .decode() converts bytes → string
  3. Parse: json.loads() converts string → dict
  4. Validate: Check for decoding errors
  5. Process: Route to appropriate handler

Message Processing

From json_data.py:70-81:
def _process_message(self, message: dict):
    """process message from VEX"""
    try:
        msg_type = message.get('type', '').lower()
        data = message.get('data', {})

        if msg_type == 'test_service':
            state = data.get('state')
            log.info(f'{msg_type} status:\nstate: {state}')
        
        # Add more message type handlers here
        # elif msg_type == 'status':
        #     ...

    except Exception as e:
        log.error(f'error process message: {e}')
Pattern:
  • Extract type field (case-insensitive)
  • Extract data payload
  • Route based on type using if/elif
  • Use .get() with defaults for safe field access
Safe Field Accessmessage.get('type', '') returns empty string if ‘type’ is missing, preventing KeyError crashes. Always use .get() with defaults when parsing untrusted input.

VEX Brain Implementation

Receiving and Responding

From vex_brain_comm/src/main.py:39-74:
import json
from vex import *

brain = Brain()

def json_data():
    buffer = bytearray()
    try:
        s = open('/dev/serial1', 'rb+')  # Read/write
    except:
        raise Exception('serial port error')
    
    while True:
        char = s.read(1)
        
        if char == b'\n':  # Complete message
            message = buffer.decode()
            buffer = bytearray()
            
            try:
                # Parse JSON
                msg = json.loads(message)
                msg_type = msg['type'].lower()
                data = msg.get('data', {})
            
                # Handle specific message type
                if msg_type == 'test_service' and data.get('state') == 'successfully':
                    # Prepare response
                    write_data = {'state': 'successfully'}
                    response = {
                        'type': msg_type,
                        'data': write_data,
                    }
                    
                    # Send response
                    encoded_message = json.dumps(response).encode() + b'\n'
                    s.write(encoded_message)
                    
                    # Display on screen
                    brain.screen.print_at("serial com:", x=1, y=15)
                    brain.screen.print_at("¡successfully!", x=1, y=35)
                    
            except:
                raise Exception('json error')
        else:
            buffer.extend(char)
Key Points:
  1. Same Pattern: Buffer → delimiter → decode → parse → process
  2. Bidirectional: VEX can send responses back
  3. Type Matching: Check msg_type to route correctly
  4. User Feedback: Display status on VEX brain screen
VEX Brain’s Python environment has json module built-in, making JSON parsing straightforward even on embedded hardware.

Message Type Design Patterns

1. Command Pattern

Raspberry Pi → VEX Brain
# Raspberry Pi sends command
serial_manager.writing_data('move_arm', {
    'target_x': 120,
    'target_y': 80,
    'speed': 50
})
# VEX Brain receives and executes
if msg_type == 'move_arm':
    x = data['target_x']
    y = data['target_y']
    speed = data['speed']
    robot_arm.move_to(x, y, speed)

2. Status Query Pattern

Raspberry Pi → VEX Brain
serial_manager.writing_data('get_status', {})
VEX Brain → Raspberry Pi
if msg_type == 'get_status':
    response_data = {
        'state': 'idle',
        'position': get_current_position(),
        'battery': brain.battery.percent()
    }
    send_message('status', response_data)

3. Event Notification Pattern

VEX Brain → Raspberry Pi (unsolicited)
# When arm reaches target
send_message('movement_complete', {
    'success': True,
    'final_position': [45, 90, 120]
})

4. Detection Result Pattern

Raspberry Pi → VEX Brain
# After vision inference
if best_detection['confidence'] > 0.5:
    serial_manager.writing_data('detection', {
        'class': best_detection['class'],
        'confidence': float(best_detection['confidence']),
        'bbox': best_detection['box'].tolist(),
        'center_x': int((box[0] + box[2]) / 2),
        'center_y': int((box[1] + box[3]) / 2)
    })
Type ConversionBefore JSON encoding, convert NumPy types:
  • np.float32float()
  • np.ndarray.tolist()
  • np.int64int()
Otherwise, json.dumps() will raise a TypeError.

Message Validation

Schema Validation

Define expected fields for each message type:
MESSAGE_SCHEMAS = {
    'move_arm': ['target_x', 'target_y', 'speed'],
    'detection': ['class', 'confidence', 'bbox'],
    'status': ['state', 'position'],
}

def validate_message(msg_type: str, data: dict) -> bool:
    if msg_type not in MESSAGE_SCHEMAS:
        log.error(f'Unknown message type: {msg_type}')
        return False
    
    required_fields = MESSAGE_SCHEMAS[msg_type]
    for field in required_fields:
        if field not in data:
            log.error(f'Missing field {field} in {msg_type}')
            return False
    
    return True
Usage:
def _process_message(self, message: dict):
    msg_type = message.get('type', '')
    data = message.get('data', {})
    
    if not validate_message(msg_type, data):
        return  # Skip invalid messages
    
    # Process valid message
    if msg_type == 'move_arm':
        # ...

Type Checking

Validate data types:
def validate_detection_message(data: dict) -> bool:
    # Check required fields exist
    if not all(k in data for k in ['class', 'confidence', 'bbox']):
        return False
    
    # Check types
    if not isinstance(data['class'], str):
        return False
    if not isinstance(data['confidence'], (int, float)):
        return False
    if not isinstance(data['bbox'], list) or len(data['bbox']) != 4:
        return False
    
    # Check ranges
    if not 0.0 <= data['confidence'] <= 1.0:
        return False
    
    return True
Fail GracefullyInvalid messages should be logged and skipped, not crash the system. Robot operation must continue even if one message is malformed.

Error Handling

JSON Parsing Errors

try:
    data = json.loads(message)
except json.JSONDecodeError as e:
    log.error(f'JSON parse error at position {e.pos}: {e.msg}')
    log.error(f'Invalid message: {message}')
    return  # Continue with next message

Missing Fields

msg_type = message.get('type')
if not msg_type:
    log.error(f'Message missing type field: {message}')
    return

data = message.get('data')
if data is None:
    log.error(f'Message missing data field: {message}')
    return

Encoding Errors

try:
    encoded = json.dumps(message).encode()
except TypeError as e:
    log.error(f'Cannot JSON encode message: {e}')
    log.error(f'Message content: {message}')
    return

Full Communication Flow

Let’s trace a complete round-trip:

Step 1: Raspberry Pi Detects Object

# Vision processing
best_detection = {
    'class': 'apple',
    'confidence': 0.89,
    'box': np.array([100, 80, 180, 160])
}

# Send to VEX
serial_manager.writing_data('detection', {
    'class': best_detection['class'],
    'confidence': float(best_detection['confidence']),
    'center_x': 140,
    'center_y': 120
})

Step 2: Transmitted Bytes

{"type": "detection", "data": {"class": "apple", "confidence": 0.89, "center_x": 140, "center_y": 120}}\n

Step 3: VEX Receives and Processes

# Buffer accumulates until \n
message = buffer.decode()  # String from bytes
msg = json.loads(message)  # Dict from string

if msg['type'] == 'detection':
    obj_class = msg['data']['class']
    center_x = msg['data']['center_x']
    center_y = msg['data']['center_y']
    
    # Move arm to object
    robot_arm.move_to(center_x, center_y)

Step 4: VEX Sends Confirmation

# After movement complete
response = {
    'type': 'movement_complete',
    'data': {'success': True, 'object': obj_class}
}
encoded = json.dumps(response).encode() + b'\n'
s.write(encoded)

Step 5: Raspberry Pi Receives Confirmation

# In _process_message
if msg_type == 'movement_complete':
    success = data.get('success')
    obj = data.get('object')
    log.info(f'Arm successfully picked up {obj}')
Add timestamps to messages for latency measurement:
import time

message = {
    'type': 'detection',
    'timestamp': time.time(),
    'data': {...}
}
Receiving side calculates: latency = time.time() - message['timestamp']

Practice Exercise

Build a Ping-Pong System

Requirements:
  1. Raspberry Pi sends:
    {"type": "ping", "data": {"id": 1, "timestamp": 1234.56}}
    
  2. VEX Brain responds:
    {"type": "pong", "data": {"id": 1, "timestamp": 1234.56}}
    
  3. Raspberry Pi measures round-trip time
  4. Send 100 pings, calculate:
    • Average latency
    • Min/max latency
    • Packet loss rate
Success Criteria:
  • Average latency < 50ms
  • Zero packet loss
  • Messages with matching IDs

Extension: Command Queue

Implement a command queue on VEX Brain:
  • Receive multiple movement commands
  • Queue them
  • Execute sequentially
  • Send status after each completion

Summary

You’ve learned:
  • ✓ JSON message schema design (type + data structure)
  • ✓ Encoding Python dicts to JSON strings to bytes
  • ✓ Decoding bytes to strings to Python dicts
  • ✓ Type-based message routing and handling
  • ✓ Message validation and error handling
  • ✓ Bidirectional command-response patterns
  • ✓ Complete round-trip communication flow

Next Steps

You now have structured communication working! The next lesson brings it all together with a complete Raspberry Pi implementation including connection management and real-world usage.

Raspberry Pi Integration

Complete implementation guide for Raspberry Pi serial communication
Course Repository: Full code in course/comm_class/
  • raspberry_comm/json_data.py: Complete Raspberry Pi implementation (117 lines)
  • vex_brain_comm/src/main.py: VEX Brain handlers (74 lines)

Build docs developers (and LLMs) love