Raspberry Pi Integration
This lesson brings together everything from the previous sections into a complete, production-ready serial communication system for Raspberry Pi. You’ll understand the full architecture and be able to integrate it with your vision processing.
Learning Objectives
By the end of this lesson, you will be able to:
Set up Raspberry Pi serial hardware and permissions
Implement a complete SerialCommunication class
Manage threaded reading for non-blocking operation
Integrate serial communication with vision processing
Debug connection issues on Raspberry Pi
Deploy reliable robot control systems
This lesson analyzes the complete implementation from course/comm_class/raspberry_comm/json_data.py
Hardware Setup
Raspberry Pi Serial Ports
Raspberry Pi has multiple serial interfaces:
GPIO UART (/dev/ttyAMA0 or /dev/serial0):
Pins 8 (TX) and 10 (RX) on GPIO header
Direct connection, no USB
Requires disabling serial console
USB Serial (/dev/ttyUSB0 or /dev/ttyACM0):
USB to serial adapter
Easier setup, no GPIO config needed
More flexible for prototyping
For development, USB serial adapters are recommended. For production, GPIO UART is more reliable with fewer cables.
Permissions Setup
By default, serial ports require root access. Add your user to the dialout group:
sudo usermod -a -G dialout $USER
# Log out and back in for changes to take effect
# Verify access
ls -l /dev/ttyUSB0
# Should show: crw-rw---- 1 root dialout ...
Testing Connection
Verify hardware before writing code:
# Install serial tools
sudo apt-get install minicom
# Open serial terminal
minicom -D /dev/ttyUSB0 -b 115200
# Type characters - should see them if loopback connected
# Ctrl-A X to exit
Complete SerialCommunication Class
Let’s build the full class step by step.
Class Structure
From json_data.py:10-20:
import json
import time
import serial
import logging as log
from threading import Thread, Event
log.basicConfig( level = log. INFO , format = " %(asctime)s - %(levelname)s - %(message)s " )
class SerialCommunication :
def __init__ ( self ):
# Message framing
self .message_end = b ' \n '
# Connection state
self .com = None
self .is_connected = False
# Threading
self ._read_thread = None
self ._stop_event = Event()
# Message buffer
self .buffer = bytearray ()
Components :
Message framing : Newline delimiter for complete messages
Connection state : Track whether serial port is open
Threading : Background thread for non-blocking reads
Buffer : Accumulate bytes until complete message
Use Event() for thread control instead of boolean flags. Events are thread-safe and provide wait/timeout functionality.
Connection Management
From json_data.py:21-39:
def connect ( self ) -> bool :
"""Establish serial connection and start read thread"""
if self .is_connected:
return True # Already connected
try :
# Open serial port
self .com = serial.Serial(
port = "COM7" , # Change to /dev/ttyUSB0 on Raspberry Pi
baudrate = 115200 ,
write_timeout = 10
)
self .is_connected = True
# Start background reading thread
self ._stop_event.clear()
self ._read_thread = Thread( target = self ._read_loop)
self ._read_thread.daemon = True # Exit when main exits
self ._read_thread.start()
log.info( "Serial connection established" )
return True
except Exception as e:
log.error( f 'Error connecting to serial port: { str (e) } ' )
return False
Key Features :
Idempotent : Safe to call multiple times (checks is_connected)
Port Configuration : Adjust port parameter for your platform
Write Timeout : Prevents indefinite blocking on write()
Thread Lifecycle : Create and start thread atomically
Daemon Thread : Won’t prevent program exit
Error Handling : Returns bool to indicate success
Platform-Specific Ports :
Windows: "COM7", "COM8", etc.
Linux USB: "/dev/ttyUSB0", "/dev/ttyACM0"
Raspberry Pi GPIO: "/dev/ttyAMA0" or "/dev/serial0"
Use environment variables or config files for production: import os
port = os.getenv( 'SERIAL_PORT' , '/dev/ttyUSB0' )
Sending Messages
From json_data.py:41-48:
def writing_data ( self , message_type : str , data : dict ):
"""Send JSON message over serial"""
# Structure message
message = {
'type' : message_type,
'data' : data,
}
# Encode: dict → JSON string → bytes, append delimiter
encoded_message = json.dumps(message).encode() + self .message_end
log.info( f 'send message: { encoded_message } ' )
# Transmit
self .com.write(encoded_message)
Process :
Create structured dict
Serialize to JSON string
Encode to bytes
Append newline delimiter
Write to serial port
Usage Example :
serial_comm = SerialCommunication()
serial_comm.connect()
# Send detection result
serial_comm.writing_data( 'detection' , {
'class' : 'apple' ,
'confidence' : 0.92 ,
'center_x' : 320 ,
'center_y' : 240
})
Receiving Messages (Threaded)
From json_data.py:50-68:
def _read_loop ( self ):
"""Thread loop for reading messages from VEX"""
while not self ._stop_event.is_set():
if self .com and self .com.in_waiting: # Bytes available?
try :
char = self .com.read() # Read one byte
if char == self .message_end: # Complete message!
# Decode buffer
message = self .buffer.decode()
self .buffer = bytearray () # Reset
try :
# Parse JSON
data = json.loads(message)
self ._process_message(data)
except json.JSONDecodeError:
log.error( f 'error message decode: { message } ' )
else :
# Accumulate byte
self .buffer.extend(char)
except Exception as e:
log.error( f 'error read serial port: { e } ' )
time.sleep( 0.01 ) # Small delay to avoid busy-waiting
Threading Pattern :
Why Threading?
Non-Blocking : Main thread can process vision while waiting for messages
Responsive : Immediate message handling when data arrives
Clean Separation : Communication logic isolated from application logic
time.sleep(0.01) gives CPU time to other threads and prevents busy-waiting (100% CPU usage). 10ms is fast enough for robot control.
Message Processing
From json_data.py:70-81:
def _process_message ( self , message : dict ):
"""Process received message from VEX"""
try :
msg_type = message.get( 'type' , '' ).lower()
data = message.get( 'data' , {})
if msg_type == 'test_service' :
state = data.get( 'state' )
log.info( f ' { msg_type } status: \n state: { state } ' )
# Add handlers for other message types
elif msg_type == 'status' :
# Handle robot status updates
pass
elif msg_type == 'movement_complete' :
# Handle movement completion
pass
except Exception as e:
log.error( f 'error process message: { e } ' )
Extending Message Handlers :
def _process_message ( self , message : dict ):
try :
msg_type = message.get( 'type' , '' ).lower()
data = message.get( 'data' , {})
# Route to specific handlers
handlers = {
'test_service' : self ._handle_test_service,
'status' : self ._handle_status,
'movement_complete' : self ._handle_movement_complete,
'error' : self ._handle_error,
}
handler = handlers.get(msg_type)
if handler:
handler(data)
else :
log.warning( f 'Unknown message type: { msg_type } ' )
except Exception as e:
log.error( f 'Error processing message: { e } ' )
def _handle_test_service ( self , data : dict ):
state = data.get( 'state' )
log.info( f 'Test service status: { state } ' )
def _handle_status ( self , data : dict ):
state = data.get( 'state' )
position = data.get( 'position' )
log.info( f 'Robot state: { state } , position: { position } ' )
def _handle_movement_complete ( self , data : dict ):
success = data.get( 'success' )
if success:
log.info( 'Movement completed successfully' )
else :
log.error( 'Movement failed' )
Use a handler dictionary pattern for cleaner code as you add more message types. This scales better than long if/elif chains.
Graceful Shutdown
From json_data.py:83-91:
def close ( self ):
"""Close serial connection and stop threads"""
# Signal thread to stop
self ._stop_event.set()
# Wait for thread to finish (with timeout)
if self ._read_thread:
self ._read_thread.join( timeout = 1.0 )
# Close serial port
if self .com and self .com.is_open:
self .com.close()
self .is_connected = False
log.info( 'Serial connection closed' )
Shutdown Sequence :
Set stop event (thread checks this in loop)
Wait for thread to exit (join with timeout)
Close serial port
Update connection state
Usage with Context Manager :
class SerialCommunication :
def __enter__ ( self ):
self .connect()
return self
def __exit__ ( self , exc_type , exc_val , exc_tb ):
self .close()
# Usage
with SerialCommunication() as serial_comm:
serial_comm.writing_data( 'test' , { 'data' : 'value' })
# Automatically closed on exit
Integration with Vision Processing
Architecture
import cv2
from serial_communication import SerialCommunication
from vision_model import DetectionModel
class RobotVisionSystem :
def __init__ ( self ):
self .serial = SerialCommunication()
self .detector = DetectionModel()
self .camera = cv2.VideoCapture( 0 )
def start ( self ):
# Connect to robot
if not self .serial.connect():
print ( "Failed to connect to robot" )
return
print ( "Vision system started" )
try :
while True :
# Capture frame
ret, frame = self .camera.read()
if not ret:
break
# Run detection
results, class_names = self .detector.inference(frame)
# Process results
for result in results:
boxes = result.boxes
if boxes.shape[ 0 ] == 0 :
continue
confidence = float (boxes.conf[ 0 ])
if confidence > 0.5 :
class_id = int (boxes.cls[ 0 ])
class_name = class_names[class_id]
box = boxes.xyxy[ 0 ].cpu().numpy()
center_x = int ((box[ 0 ] + box[ 2 ]) / 2 )
center_y = int ((box[ 1 ] + box[ 3 ]) / 2 )
# Send to robot
self .serial.writing_data( 'detection' , {
'class' : class_name,
'confidence' : confidence,
'center_x' : center_x,
'center_y' : center_y
})
# Display (optional)
cv2.imshow( 'Vision' , frame)
if cv2.waitKey( 1 ) & 0x FF == ord ( 'q' ):
break
finally :
self .cleanup()
def cleanup ( self ):
self .camera.release()
cv2.destroyAllWindows()
self .serial.close()
if __name__ == "__main__" :
system = RobotVisionSystem()
system.start()
Key Integration Points :
Vision runs in main thread (captures and processes frames)
Serial reading runs in background thread (handles robot responses)
Detection results sent to robot via writing_data()
Both systems cleaned up gracefully on exit
Complete Usage Example
From json_data.py:94-113:
if __name__ == "__main__" :
serial_manager = SerialCommunication()
try :
# Establish connection
if serial_manager.connect():
log.info( "Connected to VEX Brain" )
# Send test message
message_type = 'test_service'
data = { 'state' : 'successfully' }
serial_manager.writing_data(message_type, data)
# Keep running to receive responses
while True :
time.sleep( 1 )
except KeyboardInterrupt :
log.info( "End program" )
finally :
serial_manager.close()
Flow :
Create SerialCommunication instance
Connect (opens port, starts thread)
Send messages as needed
Main loop keeps program alive
Ctrl+C triggers cleanup
finally ensures port closes
Debugging and Troubleshooting
Connection Issues
Problem : Serial exception: Could not open port
# Check available ports
import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
for port in ports:
print ( f ' { port.device } : { port.description } ' )
# Output:
# /dev/ttyUSB0: USB Serial
# /dev/ttyAMA0: Raspberry Pi UART
Problem : Permission denied
sudo usermod -a -G dialout $USER
# Logout and login
groups # Verify dialout is listed
Problem : Port in use
# Find process using port
sudo lsof /dev/ttyUSB0
# Kill process
sudo kill < PI D >
Communication Issues
Problem : No data received
# Add debug output
def _read_loop ( self ):
while not self ._stop_event.is_set():
waiting = self .com.in_waiting
if waiting > 0 :
log.info( f ' { waiting } bytes available' ) # Debug
# ... rest of code
Problem : Garbled data
Check baud rate matches on both sides (115200)
Verify TX/RX not swapped
Check ground connection
Problem : Partial messages
# Log buffer contents
if char == self .message_end:
log.info( f 'Complete message: { self .buffer } ' ) # See what arrived
message = self .buffer.decode()
import time
class SerialCommunication :
def __init__ ( self ):
# ... existing init ...
self .message_count = 0
self .start_time = time.time()
def _process_message ( self , message : dict ):
self .message_count += 1
# Log throughput every 100 messages
if self .message_count % 100 == 0 :
elapsed = time.time() - self .start_time
rate = self .message_count / elapsed
log.info( f 'Message rate: { rate :.1f} msg/sec' )
# ... process message ...
Expect 50-200 messages/second depending on message size and baud rate. If rate is lower, check for bottlenecks in _process_message().
Best Practices
1. Configuration Management
import os
from dataclasses import dataclass
@dataclass
class SerialConfig :
port: str = os.getenv( 'SERIAL_PORT' , '/dev/ttyUSB0' )
baudrate: int = 115200
timeout: float = 1.0
write_timeout: float = 10.0
class SerialCommunication :
def __init__ ( self , config : SerialConfig = None ):
self .config = config or SerialConfig()
# Use self.config.port, etc.
2. Logging Levels
# Development: verbose
log.basicConfig( level = log. DEBUG )
# Production: errors only
log.basicConfig( level = log. ERROR )
# Use appropriate levels
log.debug( f 'Raw bytes: { char } ' ) # Detailed debugging
log.info( f 'Connected to { port } ' ) # Important events
log.warning( f 'Retrying connection' ) # Potential issues
log.error( f 'Failed to parse: { msg } ' ) # Errors
3. Reconnection Logic
def connect_with_retry ( self , max_attempts = 3 , delay = 2.0 ):
for attempt in range (max_attempts):
if self .connect():
return True
log.warning( f 'Connection attempt { attempt + 1 } / { max_attempts } failed' )
time.sleep(delay)
return False
4. Message Rate Limiting
class SerialCommunication :
def __init__ ( self ):
# ... existing init ...
self .last_send_time = 0
self .min_send_interval = 0.05 # 50ms = 20 Hz max
def writing_data ( self , message_type : str , data : dict ):
# Rate limit
now = time.time()
elapsed = now - self .last_send_time
if elapsed < self .min_send_interval:
time.sleep( self .min_send_interval - elapsed)
# Send message
# ...
self .last_send_time = time.time()
Practice Exercise
Build a Complete Echo Test System
Requirements :
Raspberry Pi sends 100 numbered messages
VEX Brain echoes each message back
Raspberry Pi verifies responses
Calculate statistics:
Total time
Messages per second
Average round-trip latency
Success rate
Starter Code :
class EchoTest :
def __init__ ( self ):
self .serial = SerialCommunication()
self .pending = {} # id -> send_time
self .latencies = []
def run_test ( self , num_messages = 100 ):
self .serial.connect()
for i in range (num_messages):
send_time = time.time()
self .pending[i] = send_time
self .serial.writing_data( 'ping' , {
'id' : i,
'timestamp' : send_time
})
time.sleep( 0.1 ) # 10 Hz
# Wait for responses
time.sleep( 2.0 )
# Calculate stats
print ( f 'Sent: { num_messages } ' )
print ( f 'Received: { len ( self .latencies) } ' )
if self .latencies:
print ( f 'Avg latency: { sum ( self .latencies) / len ( self .latencies) * 1000 :.1f} ms' )
def _handle_pong ( self , data : dict ):
msg_id = data[ 'id' ]
if msg_id in self .pending:
send_time = self .pending.pop(msg_id)
latency = time.time() - send_time
self .latencies.append(latency)
Success Criteria :
100% message delivery
Average latency < 100ms
No crashes or hangs
Summary
You’ve learned:
✓ Raspberry Pi serial port configuration and permissions
✓ Complete SerialCommunication class implementation
✓ Threaded reading for non-blocking operation
✓ Integration with computer vision processing
✓ Debugging techniques for connection and communication issues
✓ Best practices for production systems
Next Steps
With robust communication in place, you’re ready to add intelligence! The next module covers computer vision: training YOLO models to detect objects.
Model Training Train custom YOLO models for object detection
Communication Overview Review serial protocol fundamentals
Complete Implementation : course/comm_class/raspberry_comm/json_data.py (117 lines)This production-ready code handles:
Connection management
Threaded reading
JSON message encoding/decoding
Error handling
Graceful shutdown