Skip to main content

Overview

Receivers are responsible for reading messages from message queues and distributing them to workers for transmission. The current implementation supports Amazon SQS.

SQS Receiver

Long Polling Strategy

The SQS receiver uses long polling to efficiently receive messages without overwhelming the SQS API:
res, err := p.client.ReceiveMessage(p.ctx, &sqs.ReceiveMessageInput{
    QueueUrl:            &p.queueURL,
    MaxNumberOfMessages: p.batchSize,
    VisibilityTimeout:   p.visibilityTimeout,
    WaitTimeSeconds:     20,  // Long polling wait time
    AttributeNames: []types.QueueAttributeName{
        types.QueueAttributeName(SQSAttributeApproxomiteReceiveCount),
        types.QueueAttributeName(SQSAttributeApproxomiteFirstReceiveTimestamp),
    },
    MessageAttributeNames: []string{
        SQSMessageAttributeBodyContentType,
    },
})
Benefits of long polling:
  • Reduces API calls and costs
  • Minimizes empty receives
  • 20-second wait provides near-real-time message delivery

Batch Processing

Messages are fetched and processed in batches for efficiency:
type ReceiverConfig struct {
    LogHandler        slog.Handler
    SQSClient         MessageReadWriter
    SQSQueueName      string
    VisibilityTimeout int
    BatchSize         int          // Number of messages per batch
    MaxWorkers        int          // Concurrent workers
    Transmitter       Transmitter
    Ctx               context.Context
}
Configuration:
  • SQS_BATCH_SIZE: Number of messages to fetch per API call (1-10)
  • SQS_RECEIVER_WORKERS: Number of concurrent workers per receiver

Message Attributes Handling

The receiver extracts SQS message attributes and converts them to transmit attributes:

SQS Attributes

From receiver/sqs/sqs.go:18-30:
const (
    SQSAttributeApproxomiteReceiveCount          = "ApproximateReceiveCount"
    SQSAttributeApproxomiteFirstReceiveTimestamp = "ApproximateFirstReceiveTimestamp"
    SQSMessageAttributeBodyContentType           = "Body.ContentType"
    TransmitAttributeReceiveCount                = "Receive-Count"
    TransmitAttributeFirstReceiveTime            = "First-Receive-Time"
    TransmitAttributeContentType                 = "Content-Type"
)

Attribute Conversion

From receiver/sqs/sqs.go:135-155:
func (h *handler) generateAttributes(m *message) transmitter.TransmitAttributes {
    attributes := make(transmitter.TransmitAttributes)
    for k, v := range m.Attributes {
        switch k {
        case SQSAttributeApproxomiteReceiveCount:
            attributes[TransmitAttributeReceiveCount] = v
        case SQSAttributeApproxomiteFirstReceiveTimestamp:
            attributes[TransmitAttributeFirstReceiveTime] = v
        }
    }
    for k, v := range m.MessageAttributes {
        switch k {
        case SQSMessageAttributeBodyContentType:
            if v.StringValue != nil {
                attributes[TransmitAttributeContentType] = *v.StringValue
            }
        }
    }
    return attributes
}
These attributes are passed to the transmitter as HTTP headers (with the X-Carrier- prefix).

Interfaces

MessageReader Interface

Defines the read operations for SQS:
type MessageReader interface {
    GetQueueUrl(context.Context, *sqs.GetQueueUrlInput, ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
    ReceiveMessage(context.Context, *sqs.ReceiveMessageInput, ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
}

MessageWriter Interface

Defines the write operations for SQS:
type MessageWriter interface {
    DeleteMessageBatch(context.Context, *sqs.DeleteMessageBatchInput, ...func(*sqs.Options)) (*sqs.DeleteMessageBatchOutput, error)
    ChangeMessageVisibilityBatch(context.Context, *sqs.ChangeMessageVisibilityBatchInput, ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityBatchOutput, error)
}

MessageReadWriter Interface

Combines both interfaces:
type MessageReadWriter interface {
    MessageReader
    MessageWriter
}
Testing: These interfaces allow for easy mocking of the SQS client in unit tests.

Worker Pool Architecture

Each receiver maintains its own pool of workers:
workers := c.BatchSize
if c.MaxWorkers != 0 {
    workers = c.MaxWorkers
}
messages := make(chan *message, c.BatchSize)
results := make(chan *transmitResult, c.BatchSize)
ctx, cancel := context.WithCancel(context.Background())
p := pool.NewPool(&pool.PoolConfig{
    Size:       workers,
    BufferSize: workers,
    Ctx:        ctx,
})
for range workers {
    h := newHandler(&handlerConfig{
        Transmitter: c.Transmitter,
        Ctx:         ctx,
        Work:        messages,
        Results:     results,
    })
    p.Run(h.handleMessages)
}

Message Processing Flow

  1. Receive batch from SQS
  2. Distribute messages to worker channel
  3. Workers transmit messages concurrently
  4. Collect results from all workers
  5. Batch delete successful messages
  6. Batch update visibility for retryable errors

Result Handling

Success Path

From receiver/sqs/sqs.go:263-268:
if r.err == nil {
    deleteEntries = append(deleteEntries, types.DeleteMessageBatchRequestEntry{
        Id:            r.MessageID,
        ReceiptHandle: r.ReceiptHandle,
    })
}

Retryable Error Path

From receiver/sqs/sqs.go:252-260:
if r.err != nil {
    var err *transmitter.TransmitRetryableError
    if errors.As(r.err, &err) {
        // update visibility timeout on retryable errors
        retryEntries = append(retryEntries, types.ChangeMessageVisibilityBatchRequestEntry{
            Id:                r.MessageID,
            ReceiptHandle:     r.ReceiptHandle,
            VisibilityTimeout: int32(err.RetryAfter.Seconds()),
        })
    }
}

Non-Retryable Error Path

else {
    p.log.Error("failed to transmit message", "error", r.err)
}
Non-retryable errors are logged but the message remains in the queue. It will eventually be reprocessed or moved to a dead-letter queue based on SQS configuration.

Batch Operations

Batch Delete

From receiver/sqs/sqs.go:270-280:
if len(deleteEntries) > 0 {
    _, err := p.client.DeleteMessageBatch(p.ctx, &sqs.DeleteMessageBatchInput{
        QueueUrl: &p.queueURL,
        Entries:  deleteEntries,
    })
    if err != nil {
        p.log.Error("failed to delete messages", "error", err)
    } else {
        p.log.Debug("deleted messages", "count", len(deleteEntries))
    }
}

Batch Visibility Update

From receiver/sqs/sqs.go:282-293:
if len(retryEntries) > 0 {
    _, err := p.client.ChangeMessageVisibilityBatch(p.ctx, &sqs.ChangeMessageVisibilityBatchInput{
        QueueUrl: &p.queueURL,
        Entries:  retryEntries,
    })
    if err != nil {
        p.log.Error("failed to update message visibility", "error", err)
    } else {
        p.log.Debug("updated message visibility", "count", len(retryEntries))
    }
}
If batch operations fail, messages may be reprocessed. Ensure your webhook endpoints are idempotent.

Event Loop

The main receive loop runs until shutdown:
func (p *Receiver) Rx() {
    p.log.Info("starting event loop", "batch_size", p.batchSize, "max_workers", p.maxWorkers)
    for {
        select {
        case <-p.ctx.Done():
            p.log.Info("stopping event loop")
            p.cancel()
            p.pool.Stop(true)
            return
        default:
            // poll for new messages on the queue
            res, err := p.client.ReceiveMessage(/* ... */)
            if err != nil {
                if !errors.Is(err, context.Canceled) {
                    p.log.Error("failed to receive messages", "error", err)
                }
                continue
            }
            p.log.Debug("received messages", "count", len(res.Messages))
            p.processMessages(res)
        }
    }
}

Configuration Reference

Environment VariableDescriptionDefault
SQS_ENDPOINTSQS service endpointRequired
SQS_QUEUE_NAMEName of the SQS queueRequired
SQS_RECEIVERSNumber of concurrent receivers1
SQS_RECEIVER_WORKERSWorkers per receiver1
SQS_BATCH_SIZEMessages per batch1
For high throughput, increase SQS_RECEIVERS and SQS_RECEIVER_WORKERS. Monitor your webhook endpoint’s capacity to handle concurrent requests.

Build docs developers (and LLMs) love