Overview
Transmitters are responsible for delivering messages to their final destination. The current implementation supports HTTP webhooks with configurable headers, TLS settings, and retry handling.
Transmitter Interface
All transmitters implement this simple interface:
type Transmitter interface {
Tx(io.Reader, transmitter.TransmitAttributes) error
}
Parameters:
io.Reader: The message body
transmitter.TransmitAttributes: Key-value metadata (converted to headers)
Return:
error: Standard error for non-retryable failures, or *TransmitRetryableError for retryable failures
Webhook Transmitter
Configuration
From transmitter/webhook/transmitter.go:38-44:
type TransmitterConfig struct {
Endpoint string // Target webhook URL
TLSInsecureSkipVerify bool // Skip TLS verification (dev only)
DefaultContentType string // Fallback Content-Type
RequestTimeout time.Duration // HTTP client timeout
}
HTTP Client Setup
The transmitter creates a configured HTTP client with TLS settings:
func NewTransmitter(c *TransmitterConfig) *Transmitter {
var idleConnTimeout time.Duration
if c.RequestTimeout > 0 {
idleConnTimeout = c.RequestTimeout + (30 * time.Second)
}
return &Transmitter{
endpoint: c.Endpoint,
client: &http.Client{
Timeout: c.RequestTimeout,
Transport: &http.Transport{
IdleConnTimeout: idleConnTimeout,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: c.TLSInsecureSkipVerify,
MinVersion: tls.VersionTLS13,
},
},
},
defaultContentType: c.DefaultContentType,
}
}
Security: TLSInsecureSkipVerify should only be used in development. Production deployments should always verify TLS certificates.
TLS Configuration
Carrier enforces strong TLS defaults:
TLSClientConfig: &tls.Config{
InsecureSkipVerify: c.TLSInsecureSkipVerify,
MinVersion: tls.VersionTLS13, // Requires TLS 1.3+
}
Features:
- Minimum TLS 1.3 for all connections
- Configurable certificate verification
- Connection reuse via
IdleConnTimeout
HTTP POST Implementation
Request Creation
From transmitter/webhook/transmitter.go:76-95:
func (t *Transmitter) newRequest(message io.Reader, attributes transmitter.TransmitAttributes) (*http.Request, error) {
req, err := http.NewRequest(http.MethodPost, t.endpoint, message)
if err != nil {
return req, err
}
for k, v := range attributes {
if k == HeaderContentType {
// send Content-Type header unmodified
req.Header.Add(k, v)
} else {
req.Header.Add(fmt.Sprintf("%s%s", HeaderPrefix, k), v)
}
}
if req.Header.Get(HeaderContentType) == "" && t.defaultContentType != "" {
// add the default Content-Type header
req.Header.Add(HeaderContentType, t.defaultContentType)
}
return req, err
}
Request properties:
- Method:
POST
- Body: Raw message from queue
- Headers: Generated from transmit attributes
Header Handling
Transmit attributes are converted to HTTP headers with the X-Carrier- prefix:
const HeaderPrefix = "X-Carrier-"
Example transformation:
TransmitAttributes{
"Receive-Count": "1",
"First-Receive-Time": "1620000000000",
}
Becomes:
X-Carrier-Receive-Count: 1
X-Carrier-First-Receive-Time: 1620000000000
Content-Type Handling
Content-Type receives special treatment:
if k == HeaderContentType {
// send Content-Type header unmodified
req.Header.Add(k, v)
} else {
req.Header.Add(fmt.Sprintf("%s%s", HeaderPrefix, k), v)
}
Fallback behavior:
if req.Header.Get(HeaderContentType) == "" && t.defaultContentType != "" {
req.Header.Add(HeaderContentType, t.defaultContentType)
}
If the SQS message has a Body.ContentType attribute, it’s used. Otherwise, the configured default (typically application/json) is applied.
Transmission Logic
From transmitter/webhook/transmitter.go:100-133:
func (t *Transmitter) Tx(message io.Reader, attributes transmitter.TransmitAttributes) error {
req, err := t.newRequest(message, attributes)
if err != nil {
return fmt.Errorf("%w: failed to create request: %w", transmitter.ErrTransmitFailed, err)
}
res, err := t.client.Do(req)
if res != nil && res.Body != nil {
defer res.Body.Close()
}
if err != nil {
return fmt.Errorf("%w: failed to send request: %w", transmitter.ErrTransmitFailed, err)
}
switch res.StatusCode {
case http.StatusOK:
return nil
case http.StatusTooManyRequests:
// Handle 429 with Retry-After
retryAfter := res.Header.Get(HeaderRetryAfter)
if retryAfter != "" {
seconds, err := strconv.Atoi(retryAfter)
if err != nil {
return fmt.Errorf("%w: %w: %w", transmitter.ErrTransmitFailed, ErrStatusCode429, err)
}
return transmitter.NewTransmitRetryableError(ErrStatusCode429, time.Duration(seconds*int(time.Second)))
}
return fmt.Errorf("%w: %w: %w", transmitter.ErrTransmitFailed, ErrStatusCode429, ErrNoRetryAfterHeader)
default:
return fmt.Errorf("%w: %w: %d", transmitter.ErrTransmitFailed, ErrNon200StatusCode, res.StatusCode)
}
}
Status Code Handling
| Status Code | Behavior |
|---|
200 OK | Success - message deleted from queue |
429 Too Many Requests | Retryable - visibility timeout updated based on Retry-After header |
| Other non-200 | Non-retryable - message stays in queue, error logged |
Error Handling
Retryable Errors
From transmitter/error.go:14-20:
type TransmitRetryableError struct {
Err error
RetryAfter time.Duration
}
Usage:
return transmitter.NewTransmitRetryableError(
ErrStatusCode429,
time.Duration(seconds*int(time.Second))
)
When a retryable error is returned, the receiver updates the SQS message visibility timeout:
VisibilityTimeout: int32(err.RetryAfter.Seconds())
Non-Retryable Errors
Non-retryable errors wrap transmitter.ErrTransmitFailed:
return fmt.Errorf("%w: %w: %d",
transmitter.ErrTransmitFailed,
ErrNon200StatusCode,
res.StatusCode
)
Messages with non-retryable errors remain in the queue until they:
- Are successfully transmitted on a retry
- Reach the maximum receive count (then moved to dead-letter queue)
- Expire based on message retention settings
TransmitAttributes Type
From transmitter/attribute.go:3-7:
type TransmitAttributes map[string]string
A simple string map that carries metadata from the receiver to the transmitter. The webhook transmitter converts these to HTTP headers.
Configuration Reference
| Environment Variable | Description | Default |
|---|
WEBHOOK_ENDPOINT | Target webhook URL | http://localhost:9000 |
WEBHOOK_DEFAULT_CONTENT_TYPE | Default Content-Type header | application/json |
WEBHOOK_REQUEST_TIMEOUT | HTTP request timeout | 60s |
WEBHOOK_TLS_INSECURE_SKIP_VERIFY | Skip TLS verification | false |
WEBHOOK_HEALTH_CHECK_ENDPOINT | Health check path | (none) |
WEBHOOK_HEALTH_CHECK_INTERVAL | Health check frequency | 60s |
WEBHOOK_HEALTH_CHECK_TIMEOUT | Health check timeout | 10s |
WEBHOOK_OFFLINE_THRESHOLD_COUNT | Failed checks before offline | 5 |
Example Webhook Request
Given an SQS message with:
- Body:
{"event": "user.created"}
- Attribute:
Body.ContentType = application/json
- Receive count:
1
Carrier will send:
POST /webhook HTTP/1.1
Host: example.com
Content-Type: application/json
X-Carrier-Receive-Count: 1
X-Carrier-First-Receive-Time: 1620000000000
{"event": "user.created"}
Extending Transmitters
To create a new transmitter type (e.g., gRPC, SNS):
- Create a new package (e.g.,
transmitter/grpc)
- Implement the interface:
func (t *GRPCTransmitter) Tx(message io.Reader, attributes transmitter.TransmitAttributes) error {
// Read message body
body, err := io.ReadAll(message)
// Convert attributes to gRPC metadata
md := metadata.New(attributes)
// Send via gRPC
return t.client.Send(ctx, body, md)
}
- Update main.go to use the new transmitter
The Transmitter interface is intentionally simple to make it easy to add new destination types.