Skip to main content
Message queues provide an asynchronous communications protocol — the sender and the receiver of the message do not need to interact with the queue at the same time. Messages placed onto the queue are stored until the recipient retrieves them.

When to use message queues

Use a message queue when:
  • A process can be executed asynchronously (does not block the user)
  • A process does not affect user experience in real time
  • Processes need to be executed in parallel for faster performance
  • You need a guarantee of processing
  • You need scalability

Transport options

The DBAL broker is implemented by OroMessageQueueBundle and is available in all Oro applications out-of-the-box.Messages are stored in application database tables — easy to set up, no additional infrastructure required.Configuration:
oro_message_queue:
  transport:
    default: 'dbal'
    dbal:
      connection: default                  # Doctrine DBAL connection name
      table: oro_message_queue             # Table where messages are stored
      pid_file_dir: /tmp/oro-message-queue # PID files for RedeliverOrphanMessagesExtension
      consumer_process_pattern: ':consume' # Used to detect live consumers
      polling_interval: 1000               # Consumer polling interval (milliseconds)
Limitations:
  • Uses a polling model (queries DB once per second by default). Low polling_interval values may cause DB load.
  • Fatal errors can leave messages locked in the DB. RedeliverOrphanMessagesExtension periodically redelivers such orphan messages.

Message topics

Every message sent to the queue must declare a topic — a message type identifier. Topics define the message body schema, default priority, and description.

Declaring a topic

Create a class implementing Oro\Component\MessageQueue\Topic\TopicInterface (or extending AbstractTopic) and register it as a service with the oro_message_queue.topic tag:
// Oro/Bundle/EmailBundle/Async/Topic/SendAutoResponseTopic.php

namespace Oro\Bundle\EmailBundle\Async\Topic;

use Oro\Component\MessageQueue\Topic\AbstractTopic;
use Symfony\Component\OptionsResolver\OptionsResolver;

class SendAutoResponseTopic extends AbstractTopic
{
    public static function getName(): string
    {
        return 'oro.email.send_auto_response';
    }

    public static function getDescription(): string
    {
        return 'Send auto response for single email';
    }

    public function configureMessageBody(OptionsResolver $resolver): void
    {
        $resolver
            ->setRequired(['jobId', 'id'])
            ->addAllowedTypes('jobId', 'int')
            ->addAllowedTypes('id', 'int');
    }
}
Register it in Resources/config/mq_topics.yml:
services:
    _defaults:
        tags:
            - { name: oro_message_queue.topic }

    Oro\Bundle\EmailBundle\Async\Topic\SendAutoResponsesTopic: ~
If autoconfigure is enabled in the service container, you do not need to explicitly tag the service.

Message body validation

Before a message is pushed to the queue, its body is validated against the schema defined in configureMessageBody(). Invalid messages are not queued and an error is logged instead. During consumption, the MessageBodyResolverExtension validates the body again before it reaches the processor. Messages with invalid bodies are rejected.

Topics in this section

Message Queue Topics

Declare, register, and validate message topics.

Message Queue Jobs

Create uniquely identified jobs to prevent duplicate processing.

Buffering Messages

Buffer messages in memory and flush them at the end of a request.

Delayed Messages

Schedule messages to be processed after a configurable delay.

Filtering Messages

Skip or requeue messages based on configurable filtering logic.

Consumer Configuration

Configure consumer processes, concurrency, and heartbeat settings.

Supervisord

Manage consumer processes with Supervisord in production.

Security Context

Propagate user and organization security context into message processors.

Testing

Write unit and functional tests for message producers and consumers.

RabbitMQ

Configure and administer RabbitMQ for Enterprise Edition installations.

Build docs developers (and LLMs) love