Getting started with Magento 2 message queue

Magento 2 message queue is a great feature introduced in new version of the platform.
The goal of this article is to show practical example on how to publish (send) message to the queue and consume (receive and process) it.

Magento supports MySql and RabbitMQ based message queues. In the example, described in this article, we are going to use Magento 2 RabbitMQ.
Let’s start with installing RabbitMQ server. On systems similar to Ubuntu, it can be accomplished with a single command:

sudo apt install -y rabbitmq-server

Additionally, we enable rabbitmq_management Magento 2 plugin to get WEB interface for monitoring queues:

rabbitmq-plugins enable rabbitmq_management

Navigate to http://127.0.0.1:15672/ and enter default login/pass: guest/guest. If you use a remote server or docker container you can create a ssh tunnel to access this page from your local machine:

ssh -L 15672:localhost:15672 user@remote.host

We have completed the server setup. Let’s begin the coding part. Create a module. In this sample, we use Atwix_Queue namespace. Our goal will be to send a message to the queue on product delete and then listen to this queue, receive (consume) message and log it to the file.

First, we define the exchange, topic, queue, publisher, and consumer. As you may guess such configuration should be done in XML files (taken from official Magento message queues guide):

  • communication.xml – Defines aspects of the message queue system that all communication types have in common.
  • queue_consumer.xml – Defines the relationship between an existing queue and its consumer.
  • queue_topology.xml – Defines the message routing rules and declares queues and exchanges.
  • queue_publisher.xml – Defines the exchange where a topic is published.

Let’s start with communication.xml and define topic atwix.product.delete:

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="atwix.product.delete" request="Magento\Catalog\Api\Data\ProductInterface"/>
</config>

Also, we specify data type of the topic in request attribute:Magento\Catalog\Api\Data\ProductInterface.

Then in queue_topology.xml define atwix.product exchange and bind AtwixProductDelete to route atwix.product.delete topic to atwix_product_delete queue:

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="atwix.product" type="topic" connection="amqp">
        <binding id="AtwixProductDelete" topic="atwix.product.delete" destinationType="queue" destination="atwix_product_delete"/>
    </exchange>
</config>

Note connection type amqp in the file above.

queue_publisher.xml is simple. No publisher classes are specified here:

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="atwix.product.delete">
        <connection name="amqp" exchange="atwix.product" />
    </publisher>
</config>

In queue_consumer.xml we define consumer and specify class and method that processes message i.e. handler:

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="AtwixProductDelete" queue="atwix_product_delete" connection="amqp" handler="Atwix\Queue\Model\Product\DeleteConsumer::processMessage"/>
</config>

We are done with configuration. If everything is defined right after running bin/magento setup:upgrade we should have exchange and queue created on RabbitMQ server:
RabbitMQ screenshot
Let’s continue with creating publisher class. It is \Atwix\Queue\Model\Product\DeletePublisher. Here we need to specify topic name as a constant and declare method that will publish a message to the queue.

class DeletePublisher
{
    const TOPIC_NAME = 'atwix.product.delete';

    /**
     * @var \Magento\Framework\MessageQueue\PublisherInterface
     */
    private $publisher;

    /**
     * @param \Magento\Framework\MessageQueue\PublisherInterface $publisher
     */
    public function __construct(\Magento\Framework\MessageQueue\PublisherInterface $publisher)
    {
        $this->publisher = $publisher;
    }

    /**
     * {@inheritdoc}
     */
    public function execute(\Magento\Catalog\Api\Data\ProductInterface $product)
    {
        $this->publisher->publish(self::TOPIC_NAME, $product);
    }
}

Product object will be automatically encoded to JSON.
Next create a plugin where we track product delete event  and call execute method of publisher:

class ProductDeletePlugin
{
    /**
     * @var \Magento\Quote\Model\Product\QuoteItemsCleanerInterface
     */
    private $productDeletePublisher;

    /**
     * @param \Magento\Quote\Model\Product\QuoteItemsCleanerInterface $quoteItemsCleaner
     */
    public function __construct(DeletePublisher $productDeletePublisher)
    {
        $this->productDeletePublisher = $productDeletePublisher;
    }

    /**
     * @param ProductResource $subject
     * @param ProductResource $result
     * @param \Magento\Catalog\Api\Data\ProductInterface $product
     * @return ProductResource
     * @SuppressWarnings(PHPMD.UnusedFormalParameter)
     */
    public function afterDelete(
        ProductResource $subject,
        ProductResource $result,
        \Magento\Catalog\Api\Data\ProductInterface $product
    ) {
        $this->productDeletePublisher->execute($product);
        return $result;
    }
}

We should be good with publisher part. Delete a product and check if message has been sent to the queue:
RabbitMQ atwix_product_delete screenshot
On the screenshot above you can see that there is one message in the queue. Let’s try consume it.
Create handler that we specified in queue_consumer.xml:

class DeleteConsumer
{
    /**
     * @var \Zend\Log\Logger
     */
    private $logger;

    /**
     * @var string
     */
    private $logFileName = 'product-delete-consumer.log';

    /**
     * @var \Magento\Framework\App\Filesystem\DirectoryList
     */
    private $directoryList;

    /**
     * DeleteConsumer constructor.
     * @param \Magento\Framework\App\Filesystem\DirectoryList $directoryList
     * @throws \Magento\Framework\Exception\FileSystemException
     */
    public function __construct(
        \Magento\Framework\App\Filesystem\DirectoryList $directoryList
    ) {
        $this->directoryList = $directoryList;
        $logDir = $directoryList->getPath('log');
        $writer = new \Zend\Log\Writer\Stream($logDir . DIRECTORY_SEPARATOR . $this->logFileName);
        $logger = new \Zend\Log\Logger();
        $logger->addWriter($writer);
        $this->logger = $logger;
    }

    /**
     * @param \Magento\Catalog\Api\Data\ProductInterface $product
     * @throws \Magento\Framework\Exception\LocalizedException
     * @return void
     */
    public function processMessage(\Magento\Catalog\Api\Data\ProductInterface $product)
    {
        $this->logger->info($product->getId() . ' ' . $product->getSku());
    }
}

From the code above you can see that processMessage method expects instance of \Magento\Catalog\Api\Data\ProductInterface as parameter.
The last step is to start our consumer with command:

bin/magento queue:consumers:start AtwixProductDelete

As a result in RabbitMQ management web interface you should see that message has been consumed and as proof of that product-delete-consumer.log should contain a record with product id and sku.
You can find the complete module code on Github.