Skip to main content

Raspberry Pi Integration

This lesson brings together everything from the previous sections into a complete, production-ready serial communication system for Raspberry Pi. You’ll understand the full architecture and be able to integrate it with your vision processing.

Learning Objectives

By the end of this lesson, you will be able to:
  • Set up Raspberry Pi serial hardware and permissions
  • Implement a complete SerialCommunication class
  • Manage threaded reading for non-blocking operation
  • Integrate serial communication with vision processing
  • Debug connection issues on Raspberry Pi
  • Deploy reliable robot control systems
This lesson analyzes the complete implementation from course/comm_class/raspberry_comm/json_data.py

Hardware Setup

Raspberry Pi Serial Ports

Raspberry Pi has multiple serial interfaces: GPIO UART (/dev/ttyAMA0 or /dev/serial0):
  • Pins 8 (TX) and 10 (RX) on GPIO header
  • Direct connection, no USB
  • Requires disabling serial console
USB Serial (/dev/ttyUSB0 or /dev/ttyACM0):
  • USB to serial adapter
  • Easier setup, no GPIO config needed
  • More flexible for prototyping
For development, USB serial adapters are recommended. For production, GPIO UART is more reliable with fewer cables.

Permissions Setup

By default, serial ports require root access. Add your user to the dialout group:
sudo usermod -a -G dialout $USER
# Log out and back in for changes to take effect

# Verify access
ls -l /dev/ttyUSB0
# Should show: crw-rw---- 1 root dialout ...

Testing Connection

Verify hardware before writing code:
# Install serial tools
sudo apt-get install minicom

# Open serial terminal
minicom -D /dev/ttyUSB0 -b 115200

# Type characters - should see them if loopback connected
# Ctrl-A X to exit

Complete SerialCommunication Class

Let’s build the full class step by step.

Class Structure

From json_data.py:10-20:
import json
import time
import serial
import logging as log
from threading import Thread, Event

log.basicConfig(level=log.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

class SerialCommunication:
    def __init__(self):
        # Message framing
        self.message_end = b'\n'
        
        # Connection state
        self.com = None
        self.is_connected = False
        
        # Threading
        self._read_thread = None
        self._stop_event = Event()
        
        # Message buffer
        self.buffer = bytearray()
Components:
  • Message framing: Newline delimiter for complete messages
  • Connection state: Track whether serial port is open
  • Threading: Background thread for non-blocking reads
  • Buffer: Accumulate bytes until complete message
Use Event() for thread control instead of boolean flags. Events are thread-safe and provide wait/timeout functionality.

Connection Management

From json_data.py:21-39:
def connect(self) -> bool:
    """Establish serial connection and start read thread"""
    if self.is_connected:
        return True  # Already connected

    try:
        # Open serial port
        self.com = serial.Serial(
            port="COM7",      # Change to /dev/ttyUSB0 on Raspberry Pi
            baudrate=115200,
            write_timeout=10
        )
        self.is_connected = True

        # Start background reading thread
        self._stop_event.clear()
        self._read_thread = Thread(target=self._read_loop)
        self._read_thread.daemon = True  # Exit when main exits
        self._read_thread.start()
        
        log.info("Serial connection established")
        return True

    except Exception as e:
        log.error(f'Error connecting to serial port: {str(e)}')
        return False
Key Features:
  1. Idempotent: Safe to call multiple times (checks is_connected)
  2. Port Configuration: Adjust port parameter for your platform
  3. Write Timeout: Prevents indefinite blocking on write()
  4. Thread Lifecycle: Create and start thread atomically
  5. Daemon Thread: Won’t prevent program exit
  6. Error Handling: Returns bool to indicate success
Platform-Specific Ports:
  • Windows: "COM7", "COM8", etc.
  • Linux USB: "/dev/ttyUSB0", "/dev/ttyACM0"
  • Raspberry Pi GPIO: "/dev/ttyAMA0" or "/dev/serial0"
Use environment variables or config files for production:
import os
port = os.getenv('SERIAL_PORT', '/dev/ttyUSB0')

Sending Messages

From json_data.py:41-48:
def writing_data(self, message_type: str, data: dict):
    """Send JSON message over serial"""
    # Structure message
    message = {
        'type': message_type,
        'data': data,
    }
    
    # Encode: dict → JSON string → bytes, append delimiter
    encoded_message = json.dumps(message).encode() + self.message_end
    
    log.info(f'send message: {encoded_message}')
    
    # Transmit
    self.com.write(encoded_message)
Process:
  1. Create structured dict
  2. Serialize to JSON string
  3. Encode to bytes
  4. Append newline delimiter
  5. Write to serial port
Usage Example:
serial_comm = SerialCommunication()
serial_comm.connect()

# Send detection result
serial_comm.writing_data('detection', {
    'class': 'apple',
    'confidence': 0.92,
    'center_x': 320,
    'center_y': 240
})

Receiving Messages (Threaded)

From json_data.py:50-68:
def _read_loop(self):
    """Thread loop for reading messages from VEX"""
    while not self._stop_event.is_set():
        if self.com and self.com.in_waiting:  # Bytes available?
            try:
                char = self.com.read()  # Read one byte
                
                if char == self.message_end:  # Complete message!
                    # Decode buffer
                    message = self.buffer.decode()
                    self.buffer = bytearray()  # Reset
                    
                    try:
                        # Parse JSON
                        data = json.loads(message)
                        self._process_message(data)
                    except json.JSONDecodeError:
                        log.error(f'error message decode: {message}')
                else:
                    # Accumulate byte
                    self.buffer.extend(char)
                    
            except Exception as e:
                log.error(f'error read serial port: {e}')
        
        time.sleep(0.01)  # Small delay to avoid busy-waiting
Threading Pattern: Why Threading?
  • Non-Blocking: Main thread can process vision while waiting for messages
  • Responsive: Immediate message handling when data arrives
  • Clean Separation: Communication logic isolated from application logic
time.sleep(0.01) gives CPU time to other threads and prevents busy-waiting (100% CPU usage). 10ms is fast enough for robot control.

Message Processing

From json_data.py:70-81:
def _process_message(self, message: dict):
    """Process received message from VEX"""
    try:
        msg_type = message.get('type', '').lower()
        data = message.get('data', {})

        if msg_type == 'test_service':
            state = data.get('state')
            log.info(f'{msg_type} status:\nstate: {state}')
        
        # Add handlers for other message types
        elif msg_type == 'status':
            # Handle robot status updates
            pass
        
        elif msg_type == 'movement_complete':
            # Handle movement completion
            pass

    except Exception as e:
        log.error(f'error process message: {e}')
Extending Message Handlers:
def _process_message(self, message: dict):
    try:
        msg_type = message.get('type', '').lower()
        data = message.get('data', {})
        
        # Route to specific handlers
        handlers = {
            'test_service': self._handle_test_service,
            'status': self._handle_status,
            'movement_complete': self._handle_movement_complete,
            'error': self._handle_error,
        }
        
        handler = handlers.get(msg_type)
        if handler:
            handler(data)
        else:
            log.warning(f'Unknown message type: {msg_type}')
    
    except Exception as e:
        log.error(f'Error processing message: {e}')

def _handle_test_service(self, data: dict):
    state = data.get('state')
    log.info(f'Test service status: {state}')

def _handle_status(self, data: dict):
    state = data.get('state')
    position = data.get('position')
    log.info(f'Robot state: {state}, position: {position}')

def _handle_movement_complete(self, data: dict):
    success = data.get('success')
    if success:
        log.info('Movement completed successfully')
    else:
        log.error('Movement failed')
Use a handler dictionary pattern for cleaner code as you add more message types. This scales better than long if/elif chains.

Graceful Shutdown

From json_data.py:83-91:
def close(self):
    """Close serial connection and stop threads"""
    # Signal thread to stop
    self._stop_event.set()
    
    # Wait for thread to finish (with timeout)
    if self._read_thread:
        self._read_thread.join(timeout=1.0)
    
    # Close serial port
    if self.com and self.com.is_open:
        self.com.close()
        self.is_connected = False
        log.info('Serial connection closed')
Shutdown Sequence:
  1. Set stop event (thread checks this in loop)
  2. Wait for thread to exit (join with timeout)
  3. Close serial port
  4. Update connection state
Usage with Context Manager:
class SerialCommunication:
    def __enter__(self):
        self.connect()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

# Usage
with SerialCommunication() as serial_comm:
    serial_comm.writing_data('test', {'data': 'value'})
    # Automatically closed on exit

Integration with Vision Processing

Architecture

import cv2
from serial_communication import SerialCommunication
from vision_model import DetectionModel

class RobotVisionSystem:
    def __init__(self):
        self.serial = SerialCommunication()
        self.detector = DetectionModel()
        self.camera = cv2.VideoCapture(0)
    
    def start(self):
        # Connect to robot
        if not self.serial.connect():
            print("Failed to connect to robot")
            return
        
        print("Vision system started")
        
        try:
            while True:
                # Capture frame
                ret, frame = self.camera.read()
                if not ret:
                    break
                
                # Run detection
                results, class_names = self.detector.inference(frame)
                
                # Process results
                for result in results:
                    boxes = result.boxes
                    if boxes.shape[0] == 0:
                        continue
                    
                    confidence = float(boxes.conf[0])
                    if confidence > 0.5:
                        class_id = int(boxes.cls[0])
                        class_name = class_names[class_id]
                        box = boxes.xyxy[0].cpu().numpy()
                        
                        center_x = int((box[0] + box[2]) / 2)
                        center_y = int((box[1] + box[3]) / 2)
                        
                        # Send to robot
                        self.serial.writing_data('detection', {
                            'class': class_name,
                            'confidence': confidence,
                            'center_x': center_x,
                            'center_y': center_y
                        })
                
                # Display (optional)
                cv2.imshow('Vision', frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        
        finally:
            self.cleanup()
    
    def cleanup(self):
        self.camera.release()
        cv2.destroyAllWindows()
        self.serial.close()

if __name__ == "__main__":
    system = RobotVisionSystem()
    system.start()
Key Integration Points:
  1. Vision runs in main thread (captures and processes frames)
  2. Serial reading runs in background thread (handles robot responses)
  3. Detection results sent to robot via writing_data()
  4. Both systems cleaned up gracefully on exit

Complete Usage Example

From json_data.py:94-113:
if __name__ == "__main__":
    serial_manager = SerialCommunication()
    
    try:
        # Establish connection
        if serial_manager.connect():
            log.info("Connected to VEX Brain")

            # Send test message
            message_type = 'test_service'
            data = {'state': 'successfully'}
            serial_manager.writing_data(message_type, data)

            # Keep running to receive responses
            while True:
                time.sleep(1)

    except KeyboardInterrupt:
        log.info("End program")
    finally:
        serial_manager.close()
Flow:
  1. Create SerialCommunication instance
  2. Connect (opens port, starts thread)
  3. Send messages as needed
  4. Main loop keeps program alive
  5. Ctrl+C triggers cleanup
  6. finally ensures port closes

Debugging and Troubleshooting

Connection Issues

Problem: Serial exception: Could not open port
# Check available ports
import serial.tools.list_ports

ports = serial.tools.list_ports.comports()
for port in ports:
    print(f'{port.device}: {port.description}')

# Output:
# /dev/ttyUSB0: USB Serial
# /dev/ttyAMA0: Raspberry Pi UART
Problem: Permission denied
sudo usermod -a -G dialout $USER
# Logout and login
groups  # Verify dialout is listed
Problem: Port in use
# Find process using port
sudo lsof /dev/ttyUSB0

# Kill process
sudo kill <PID>

Communication Issues

Problem: No data received
# Add debug output
def _read_loop(self):
    while not self._stop_event.is_set():
        waiting = self.com.in_waiting
        if waiting > 0:
            log.info(f'{waiting} bytes available')  # Debug
            # ... rest of code
Problem: Garbled data
  • Check baud rate matches on both sides (115200)
  • Verify TX/RX not swapped
  • Check ground connection
Problem: Partial messages
# Log buffer contents
if char == self.message_end:
    log.info(f'Complete message: {self.buffer}')  # See what arrived
    message = self.buffer.decode()

Performance Monitoring

import time

class SerialCommunication:
    def __init__(self):
        # ... existing init ...
        self.message_count = 0
        self.start_time = time.time()
    
    def _process_message(self, message: dict):
        self.message_count += 1
        
        # Log throughput every 100 messages
        if self.message_count % 100 == 0:
            elapsed = time.time() - self.start_time
            rate = self.message_count / elapsed
            log.info(f'Message rate: {rate:.1f} msg/sec')
        
        # ... process message ...
Expect 50-200 messages/second depending on message size and baud rate. If rate is lower, check for bottlenecks in _process_message().

Best Practices

1. Configuration Management

import os
from dataclasses import dataclass

@dataclass
class SerialConfig:
    port: str = os.getenv('SERIAL_PORT', '/dev/ttyUSB0')
    baudrate: int = 115200
    timeout: float = 1.0
    write_timeout: float = 10.0

class SerialCommunication:
    def __init__(self, config: SerialConfig = None):
        self.config = config or SerialConfig()
        # Use self.config.port, etc.

2. Logging Levels

# Development: verbose
log.basicConfig(level=log.DEBUG)

# Production: errors only
log.basicConfig(level=log.ERROR)

# Use appropriate levels
log.debug(f'Raw bytes: {char}')  # Detailed debugging
log.info(f'Connected to {port}')  # Important events
log.warning(f'Retrying connection')  # Potential issues
log.error(f'Failed to parse: {msg}')  # Errors

3. Reconnection Logic

def connect_with_retry(self, max_attempts=3, delay=2.0):
    for attempt in range(max_attempts):
        if self.connect():
            return True
        log.warning(f'Connection attempt {attempt+1}/{max_attempts} failed')
        time.sleep(delay)
    return False

4. Message Rate Limiting

class SerialCommunication:
    def __init__(self):
        # ... existing init ...
        self.last_send_time = 0
        self.min_send_interval = 0.05  # 50ms = 20 Hz max
    
    def writing_data(self, message_type: str, data: dict):
        # Rate limit
        now = time.time()
        elapsed = now - self.last_send_time
        if elapsed < self.min_send_interval:
            time.sleep(self.min_send_interval - elapsed)
        
        # Send message
        # ...
        self.last_send_time = time.time()

Practice Exercise

Build a Complete Echo Test System

Requirements:
  1. Raspberry Pi sends 100 numbered messages
  2. VEX Brain echoes each message back
  3. Raspberry Pi verifies responses
  4. Calculate statistics:
    • Total time
    • Messages per second
    • Average round-trip latency
    • Success rate
Starter Code:
class EchoTest:
    def __init__(self):
        self.serial = SerialCommunication()
        self.pending = {}  # id -> send_time
        self.latencies = []
    
    def run_test(self, num_messages=100):
        self.serial.connect()
        
        for i in range(num_messages):
            send_time = time.time()
            self.pending[i] = send_time
            
            self.serial.writing_data('ping', {
                'id': i,
                'timestamp': send_time
            })
            
            time.sleep(0.1)  # 10 Hz
        
        # Wait for responses
        time.sleep(2.0)
        
        # Calculate stats
        print(f'Sent: {num_messages}')
        print(f'Received: {len(self.latencies)}')
        if self.latencies:
            print(f'Avg latency: {sum(self.latencies)/len(self.latencies)*1000:.1f}ms')
    
    def _handle_pong(self, data: dict):
        msg_id = data['id']
        if msg_id in self.pending:
            send_time = self.pending.pop(msg_id)
            latency = time.time() - send_time
            self.latencies.append(latency)
Success Criteria:
  • 100% message delivery
  • Average latency < 100ms
  • No crashes or hangs

Summary

You’ve learned:
  • ✓ Raspberry Pi serial port configuration and permissions
  • ✓ Complete SerialCommunication class implementation
  • ✓ Threaded reading for non-blocking operation
  • ✓ Integration with computer vision processing
  • ✓ Debugging techniques for connection and communication issues
  • ✓ Best practices for production systems

Next Steps

With robust communication in place, you’re ready to add intelligence! The next module covers computer vision: training YOLO models to detect objects.

Model Training

Train custom YOLO models for object detection

Communication Overview

Review serial protocol fundamentals
Complete Implementation: course/comm_class/raspberry_comm/json_data.py (117 lines)This production-ready code handles:
  • Connection management
  • Threaded reading
  • JSON message encoding/decoding
  • Error handling
  • Graceful shutdown

Build docs developers (and LLMs) love