Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

AMQP Priority Exchange

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

The idea behind our async and async_priority_high transports was that we can send some messages to async_priority_high and others to async, with the goal that those messages would end up in different "buckets"... or, more technically, in different "queues". Then we can instruct our worker to first read all messages from whatever queue async_priority_high is bound to before reading messages from whatever queue the async transport is bound to.

The queue_name Option in Doctrine

This did work before with Doctrine, thanks to this queue_name: high option. The default value for this option is... default. As a reminder, I'll quickly log into my database:

mysql -u root messenger_tutorial

And see what that table looked like:

DESCRIBE messenger_messages;

Yep, the queue_name column was the key to making this work. Messages that were sent to async_priority_high had a queue_name set to high, and those sent to the async transport had a value of default. So even though we only had one database table, it functioned like two queues: when we consumed the async_priority_high transport, it queried for all messages WHERE queue_name="high".

The problem is that this queue_name option is specific to the doctrine transport, and it has absolutely no effect when using AMQP.

Routing Messages to... a Queue?

But... on a high-level... our goal is the same: we need two queues. We need the async_priority_high transport to send messages to one queue and the async transport to send messages to a different queue.

But with AMQP... you don't send a message directly to a queue... you send it to an exchange... and then it's the exchange's responsibility to look at its internal rules and figure out which queue, or queues, that message should actually go to.

This means that to get a message to a queue, we need to tweak things on the exchange level. And there are two different ways to do this. First, we could continue to have a single exchange and then add some internal rules - called bindings - to teach the exchange that some messages should go to one queue and other messages should go to another queue. I'm going to show you how to do this a bit later.

The second option isn't quite as cool, but it's a bit simpler. By default, when Messenger creates an exchange, it creates it as a fanout type. That means that when a message is sent to this exchange, it's routed to every queue that's bound to it. So if we added a second binding to a second queue - maybe messages_high_priority - then every message that's sent to this exchange would be routed to both queues. It would be duplicated! That's... not what we want.

Instead, we're going to create two fanout exchanges, and each exchange will route all of its messages to a separate queue. We'll have two exchanges and two queues.

Configuring a Second Exchange

Let's configure this inside of messenger.yaml. Under options add exchange then name set to, how about, messages_high_priority. Below this, add queues with just one key below: messages_high set to null.

... lines 3 - 19
... lines 21 - 26
... line 28
name: messages_high_priority
messages_high: ~
... lines 34 - 42

This config has three effects. First, because we have the auto_setup feature enabled, the first time we talk to RabbitMQ, Messenger will create the messages_high_priority exchange, the messages_high queue and bind them together. The second effect is that when we send messages to this transport they will be sent to the messages_high_priority exchange. The third and final effect is that when we consume from this transport, Messenger will read messages from the messages_high queue.

If that still doesn't make complete sense... don't worry: let's see this in action. First, make sure that your worker is not running: our's is stopped. Now let's go over and delete three photos - one, two, three - and upload four photos.

Cool! Let's see what happened in RabbitMQ! Inside the manager, click "Exchanges". Nice! We do have a new messages_high_priority exchange! The original messages exchange still sends all of its messages to a messages queue... but the new exchange sends all of its messages to a queue called messages_high. That's thanks to our queues config.

And... what's inside each queue? Go check it out! It's exactly what we want: the 3 deleted messages are waiting in the messages queue and the 4 newly-uploaded photos are in messages_high. Each transport is successfully getting their messages into a separate queue! And that means that we can consume them independently.

At the command line, we would normally tell Messenger to consume from async_priority_high and then async to get our prioritized delivery. But to clearly show what's happening, let's consume them independently for now. Start by consuming messages from the async transport:

php bin/console messenger:consume -vv async

It starts processing the ImagePostDeletedEvent objects... and stops after those three. It's done! That queue is empty. The command did not read the messages from messages_high. To do that, consume the async_priority_high transport:

php bin/console messenger:consume -vv async_priority_high

There we go! The simplest... but not fanciest... way to have prioritized transports with AMQP is to send each transport to a different exchange and configure it to route to a different queue. Later... we'll see the fancier way.

Before we get there, remember when I had you comment-out the DelayStamp before we started using RabbitMQ? Next, I'll show you why: we'll re-add that DelayStamp and see the crazy way that messages are "delayed" in RabbitMQ.

Leave a comment!

Login or Register to join the conversation
Mehul-J Avatar
Mehul-J Avatar Mehul-J | posted 1 year ago

Hi, I am using first time RabbitMq . In rabbitmq inside query the ready is always 0 . Even after messenger is consumed . And the state is in running. Why is this not changing???? Can anyone help

Mehul-J Avatar

I got it. before worker is consumed i get ready. after worker consumed it will be zero


Hey Nagashree,

Glad you were figured it out yourself!


Dariia M. Avatar
Dariia M. Avatar Dariia M. | posted 3 years ago | edited

Hello! Can you provide the origin source of messenger configuration for amqp queues, please? It's the beginning of "Configuring a Second Exchange" paragraph in this lesson, changing the configuration of messenger.yaml file.
I didn't find this in official documentation neither in results on php bin/console config:dump framework messenger command.


Hey Dariia M.

Yes it's not fully described in official doncumentation, not in config:dump. I think it is so because it's DSN based configuration but you can find it in source code <a href="https://github.com/symfony/symfony/blob/efbe75291869ab944c62b48de9a0c2257ccace8f/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php#L80&quot;&gt; here </a>

Hope it will help you!


Dariia M. Avatar
Dariia M. Avatar Dariia M. | sadikoff | posted 3 years ago | edited

Hello sadikoff
Thank you for response, now I know where to find it :)

Cat in space

"Houston: no signs of life"
Start the conversation!

This tutorial is built with Symfony 4.3, but will work well on Symfony 4.4 or 5.

What PHP libraries does this tutorial use?

// composer.json
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2