Overview
udp_protocol.py defines binary packet formats for low-latency communication between the training system and CL1 neural interface. It uses struct-packed binary data for efficient serialization.
Key Features:
- Fixed-size binary packets for predictable performance
- Little-endian encoding for consistency
- Microsecond timestamps for latency measurement
- Numpy integration for ML pipeline compatibility
Location: source/udp_protocol.py
Constants
Packet Sizes
Number of neural channel groups used in the systemCorresponds to: encoding, move_forward, move_backward, move_left, move_right, turn_left, turn_right, attack
Size of stimulation command packets in bytesStructure: 8 bytes timestamp + 32 bytes frequencies + 32 bytes amplitudes
Size of spike data packets in bytesStructure: 8 bytes timestamp + 32 bytes spike counts
MAX_CHANNELS_PER_FEEDBACK
Maximum number of channels that can be specified in a feedback command
Feedback Types
Feedback type code for interrupt commands (stop ongoing stimulation)
Feedback type code for event-based feedback (kills, damage, pickups)
Feedback type code for reward-based feedback (positive/negative reinforcement)
Stimulation Command Packet
Direction: Training System → CL1 Device
Size: 72 bytes
Purpose: Send neural stimulation parameters
┌─────────────────────────────────────────────────────────────────┐
│ Timestamp (8 bytes, uint64) │
├─────────────────────────────────────────────────────────────────┤
│ Frequencies (32 bytes, 8 × float32) │
│ - freq[0]: encoding channels (Hz) │
│ - freq[1]: move_forward channels (Hz) │
│ - freq[2]: move_backward channels (Hz) │
│ - freq[3]: move_left channels (Hz) │
│ - freq[4]: move_right channels (Hz) │
│ - freq[5]: turn_left channels (Hz) │
│ - freq[6]: turn_right channels (Hz) │
│ - freq[7]: attack channels (Hz) │
├─────────────────────────────────────────────────────────────────┤
│ Amplitudes (32 bytes, 8 × float32) │
│ - amp[0]: encoding channels (μA) │
│ - amp[1]: move_forward channels (μA) │
│ - amp[2]: move_backward channels (μA) │
│ - amp[3]: move_left channels (μA) │
│ - amp[4]: move_right channels (μA) │
│ - amp[5]: turn_left channels (μA) │
│ - amp[6]: turn_right channels (μA) │
│ - amp[7]: attack channels (μA) │
└─────────────────────────────────────────────────────────────────┘
Struct Format: <Q + ffffffff + ffffffff (little-endian)
Spike Data Packet
Direction: CL1 Device → Training System
Size: 40 bytes
Purpose: Send spike counts from neural hardware
┌─────────────────────────────────────────────────────────────────┐
│ Timestamp (8 bytes, uint64) │
├─────────────────────────────────────────────────────────────────┤
│ Spike Counts (32 bytes, 8 × float32) │
│ - spikes[0]: encoding channels spike count │
│ - spikes[1]: move_forward channels spike count │
│ - spikes[2]: move_backward channels spike count │
│ - spikes[3]: move_left channels spike count │
│ - spikes[4]: move_right channels spike count │
│ - spikes[5]: turn_left channels spike count │
│ - spikes[6]: turn_right channels spike count │
│ - spikes[7]: attack channels spike count │
└─────────────────────────────────────────────────────────────────┘
Struct Format: <Q + ffffffff (little-endian)
Direction: Training System → CL1 Device
Size: 120 bytes
Purpose: Send reward/event-based stimulation commands
┌─────────────────────────────────────────────────────────────────┐
│ Timestamp (8 bytes, uint64) │
├─────────────────────────────────────────────────────────────────┤
│ Type (1 byte, uint8) │
│ 0 = interrupt, 1 = event, 2 = reward │
├─────────────────────────────────────────────────────────────────┤
│ Num Channels (1 byte, uint8) │
├─────────────────────────────────────────────────────────────────┤
│ Channels (64 bytes, 64 × uint8) │
│ Channel numbers (0-63), padded with 0xFF for unused slots │
├─────────────────────────────────────────────────────────────────┤
│ Frequency (4 bytes, uint32) - Hz │
├─────────────────────────────────────────────────────────────────┤
│ Amplitude (4 bytes, float32) - μA │
├─────────────────────────────────────────────────────────────────┤
│ Pulses (4 bytes, uint32) - Number of pulses/bursts │
├─────────────────────────────────────────────────────────────────┤
│ Unpredictable (1 byte, uint8) - Boolean flag │
├─────────────────────────────────────────────────────────────────┤
│ Event Name (32 bytes, string) - Null-padded ASCII │
├─────────────────────────────────────────────────────────────────┤
│ Padding (1 byte) │
└─────────────────────────────────────────────────────────────────┘
Struct Format: <QBB64BIfIB32sx (little-endian)
Direction: Training System → CL1 Device
Size: Variable (JSON)
Purpose: Send training events for logging
┌─────────────────────────────────────────────────────────────────┐
│ Timestamp (8 bytes, uint64) │
├─────────────────────────────────────────────────────────────────┤
│ JSON Length (4 bytes, uint32) │
├─────────────────────────────────────────────────────────────────┤
│ JSON Payload (variable length, UTF-8) │
│ { │
│ "timestamp": <uint64>, │
│ "event_type": "episode_end" | "training_complete", │
│ "data": { ... } │
│ } │
└─────────────────────────────────────────────────────────────────┘
Functions
Stimulation Command Functions
pack_stimulation_command(frequencies, amplitudes)
Pack stimulation parameters into a binary UDP packet.
Parameters:
frequencies (np.ndarray): Shape (8,), frequency values in Hz
amplitudes (np.ndarray): Shape (8,), amplitude values in μA
Returns:
bytes: 72-byte binary packet ready to send via UDP
Raises:
ValueError: If arrays have incorrect shape
Example:
import numpy as np
import udp_protocol
frequencies = np.array([10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 12.0], dtype=np.float32)
amplitudes = np.array([1.5, 1.6, 1.7, 1.8, 1.9, 2.0, 2.1, 2.2], dtype=np.float32)
packet = udp_protocol.pack_stimulation_command(frequencies, amplitudes)
stim_socket.sendto(packet, (cl1_host, cl1_stim_port))
unpack_stimulation_command(packet)
Unpack a stimulation command packet.
Parameters:
packet (bytes): 72-byte binary packet from UDP
Returns:
tuple: (timestamp, frequencies, amplitudes)
timestamp (int): Microseconds since epoch
frequencies (np.ndarray): Shape (8,), Hz values
amplitudes (np.ndarray): Shape (8,), μA values
Raises:
ValueError: If packet has incorrect size
Example:
packet, addr = stim_socket.recvfrom(udp_protocol.STIM_PACKET_SIZE)
timestamp, frequencies, amplitudes = udp_protocol.unpack_stimulation_command(packet)
print(f"Received at: {timestamp}")
print(f"Frequencies: {frequencies}")
print(f"Amplitudes: {amplitudes}")
Spike Data Functions
pack_spike_data(spike_counts)
Pack spike count data into a binary UDP packet.
Parameters:
spike_counts (np.ndarray): Shape (8,), spike counts per channel group
Returns:
bytes: 40-byte binary packet ready to send via UDP
Raises:
ValueError: If array has incorrect shape
Example:
spike_counts = np.array([0, 2, 5, 1, 3, 0, 4, 2], dtype=np.float32)
packet = udp_protocol.pack_spike_data(spike_counts)
spike_socket.sendto(packet, (training_host, spike_port))
unpack_spike_data(packet)
Unpack a spike data packet.
Parameters:
packet (bytes): 40-byte binary packet from UDP
Returns:
tuple: (timestamp, spike_counts)
timestamp (int): Microseconds since epoch
spike_counts (np.ndarray): Shape (8,), spike counts
Raises:
ValueError: If packet has incorrect size
Example:
packet, addr = spike_socket.recvfrom(udp_protocol.SPIKE_PACKET_SIZE)
timestamp, spike_counts = udp_protocol.unpack_spike_data(packet)
print(f"Spikes: {spike_counts}")
latency = udp_protocol.get_latency_ms(timestamp)
print(f"Latency: {latency:.2f} ms")
Feedback Command Functions
pack_feedback_command(feedback_type, channels, frequency, amplitude, pulses, unpredictable=False, event_name="")
Pack feedback stimulation command into a binary UDP packet.
Parameters:
feedback_type (str): Type of feedback - "interrupt", "event", or "reward"
channels (List[int]): List of channel numbers to stimulate (0-63)
frequency (int): Stimulation frequency in Hz
amplitude (float): Stimulation amplitude in μA
pulses (int): Number of pulses/bursts
unpredictable (bool): Whether this is unpredictable stimulation (default: False)
event_name (str): Name of the event for logging (default: "")
Returns:
bytes: 120-byte binary packet ready to send via UDP
Raises:
ValueError: If parameters are invalid (too many channels, invalid type, etc.)
Example:
# Send event-based feedback for enemy kill
packet = udp_protocol.pack_feedback_command(
feedback_type="event",
channels=[35, 36, 38],
frequency=20,
amplitude=2.5,
pulses=40,
unpredictable=False,
event_name="enemy_kill"
)
feedback_socket.sendto(packet, (cl1_host, cl1_feedback_port))
# Send interrupt command
packet = udp_protocol.pack_feedback_command(
feedback_type="interrupt",
channels=[19, 20, 22, 23, 24, 26],
frequency=0,
amplitude=0.0,
pulses=0
)
feedback_socket.sendto(packet, (cl1_host, cl1_feedback_port))
unpack_feedback_command(packet)
Unpack a feedback command packet.
Parameters:
packet (bytes): 120-byte binary packet from UDP
Returns:
tuple: (timestamp, feedback_type, channels, frequency, amplitude, pulses, unpredictable, event_name)
timestamp (int): Microseconds since epoch
feedback_type (str): "interrupt", "event", or "reward"
channels (List[int]): Channel numbers (0xFF padding removed)
frequency (int): Hz
amplitude (float): μA
pulses (int): Number of pulses
unpredictable (bool): Unpredictable flag
event_name (str): Event name (null padding removed)
Raises:
ValueError: If packet has incorrect size
Example:
packet, addr = feedback_socket.recvfrom(udp_protocol.FEEDBACK_PACKET_SIZE)
timestamp, feedback_type, channels, freq, amp, pulses, unpred, name = \
udp_protocol.unpack_feedback_command(packet)
if feedback_type == "interrupt":
neurons.interrupt(cl.ChannelSet(*channels))
elif feedback_type == "event":
print(f"Event feedback: {name} on channels {channels}")
Pack event metadata into a UDP packet.
Parameters:
event_type (str): Type of event - "episode_end", "checkpoint", "training_complete", etc.
data (dict): Dictionary of event data
Returns:
bytes: Variable-length binary packet with JSON payload
Example:
packet = udp_protocol.pack_event_metadata(
event_type="episode_end",
data={
"episode": 1234,
"total_reward": 450.5,
"episode_length": 512,
"kills": 3
}
)
event_socket.sendto(packet, (cl1_host, cl1_event_port))
Unpack event metadata packet.
Parameters:
packet (bytes): Binary packet from UDP
Returns:
tuple: (timestamp, event_type, data)
timestamp (int): Microseconds since epoch
event_type (str): Event type
data (dict): Event data
Raises:
ValueError: If packet is too small or JSON is invalid
Example:
packet, addr = event_socket.recvfrom(4096)
timestamp, event_type, data = udp_protocol.unpack_event_metadata(packet)
if event_type == "episode_end":
print(f"Episode {data['episode']} completed: reward={data['total_reward']}")
elif event_type == "training_complete":
print(f"Training finished: {data['total_episodes']} episodes")
recording.stop()
Utility Functions
get_latency_ms(packet_timestamp)
Calculate network latency from packet timestamp to now.
Parameters:
packet_timestamp (int): Timestamp from packet (microseconds since epoch)
Returns:
float: Latency in milliseconds
Example:
timestamp, frequencies, amplitudes = udp_protocol.unpack_stimulation_command(packet)
latency = udp_protocol.get_latency_ms(timestamp)
if latency > 10.0:
print(f"WARNING: High latency detected: {latency:.2f} ms")
Usage Examples
Training System - Send Stimulation
import socket
import numpy as np
import udp_protocol
# Create UDP socket
stim_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Prepare stimulation parameters
frequencies = np.array([10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 12.0], dtype=np.float32)
amplitudes = np.array([1.5, 1.6, 1.7, 1.8, 1.9, 2.0, 2.1, 2.2], dtype=np.float32)
# Pack and send
packet = udp_protocol.pack_stimulation_command(frequencies, amplitudes)
stim_socket.sendto(packet, ('192.168.1.100', 12345))
CL1 Device - Receive Stimulation
import socket
import udp_protocol
import cl
# Create and bind socket
stim_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stim_socket.bind(('0.0.0.0', 12345))
stim_socket.setblocking(False)
with cl.open() as neurons:
for tick in neurons.loop(ticks_per_second=10):
try:
packet, addr = stim_socket.recvfrom(udp_protocol.STIM_PACKET_SIZE)
timestamp, frequencies, amplitudes = udp_protocol.unpack_stimulation_command(packet)
# Apply stimulation to hardware
apply_stimulation(neurons, frequencies, amplitudes)
except BlockingIOError:
pass # No packet available
CL1 Device - Send Spikes
import socket
import numpy as np
import udp_protocol
# Create UDP socket
spike_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Collect spikes
spike_counts = collect_spikes(tick) # np.ndarray of shape (8,)
# Pack and send
packet = udp_protocol.pack_spike_data(spike_counts)
spike_socket.sendto(packet, ('192.168.1.50', 12346))
Training System - Receive Spikes
import socket
import udp_protocol
# Create and bind socket
spike_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
spike_socket.bind(('0.0.0.0', 12346))
spike_socket.settimeout(0.1)
try:
packet, addr = spike_socket.recvfrom(udp_protocol.SPIKE_PACKET_SIZE)
timestamp, spike_counts = udp_protocol.unpack_spike_data(packet)
# Convert to PyTorch tensor for model
spike_tensor = torch.from_numpy(spike_counts).float()
# Measure latency
latency = udp_protocol.get_latency_ms(timestamp)
print(f"Received spikes with {latency:.2f} ms latency")
except socket.timeout:
print("No spike data received")
spike_counts = np.zeros(8, dtype=np.float32)
Send Event-Based Feedback
import socket
import udp_protocol
feedback_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Enemy kill feedback
packet = udp_protocol.pack_feedback_command(
feedback_type="event",
channels=[35, 36, 38],
frequency=20,
amplitude=2.5,
pulses=40,
event_name="enemy_kill"
)
feedback_socket.sendto(packet, ('192.168.1.100', 12348))
# Took damage feedback
packet = udp_protocol.pack_feedback_command(
feedback_type="event",
channels=[44, 47, 48],
frequency=90,
amplitude=2.2,
pulses=50,
unpredictable=True,
event_name="took_damage"
)
feedback_socket.sendto(packet, ('192.168.1.100', 12348))
Packet Size vs Latency
| Packet Type | Size | Latency Impact |
|---|
| Stimulation | 72 bytes | ~0.01 ms @ 1Gbps |
| Spike Data | 40 bytes | ~0.005 ms @ 1Gbps |
| Feedback | 120 bytes | ~0.015 ms @ 1Gbps |
| Event (avg) | ~200 bytes | ~0.025 ms @ 1Gbps |
Binary vs JSON Trade-offs
Binary (Stimulation/Spike/Feedback):
- ✅ Fixed size (predictable)
- ✅ Fast serialization (~1-5 μs)
- ✅ Low network overhead
- ❌ Less human-readable
- ❌ Harder to debug
JSON (Event Metadata):
- ✅ Human-readable
- ✅ Flexible schema
- ✅ Easy debugging
- ❌ Variable size
- ❌ Slower serialization (~50-100 μs)
- ❌ Higher network overhead
Optimization Tips
- Reuse numpy arrays instead of creating new ones every packet
- Cache struct.pack format strings (already done in module)
- Use non-blocking sockets to prevent loop stalling
- Batch event metadata when possible (send multiple episodes in one packet)
- Monitor latency with
get_latency_ms() to detect network issues
Testing
The module includes a test suite when run directly:
Output:
Testing UDP protocol...
Stimulation packet size: 72 bytes
Unpacked timestamp: 1234567890123456
Frequencies match: True
Amplitudes match: True
Spike packet size: 40 bytes
Unpacked timestamp: 1234567890123457
Spike counts match: True
Packet latency: 0.003 ms
All protocol tests passed!
See Also