If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeThe two messages that we we're sending to the async
transport are AddPonkaToImage
and DeletePhotoFile
, which handles deleting the physical file from the filesystem. And... that second one isn't something the user actually notices or cares about - it's just housekeeping. If it happened 5 minutes from now or 10 days from now, the user wouldn't care.
This creates an interesting situation. Our worker handles things in a first-in-first-out basis: if we send 5 messages to the transport, the worker will handle them in the order in which they were received. This means that if a bunch of images are deleted and then someone uploads a new photo... the worker will process all of those delete messages before finally adding Ponka to the photo. And that... isn't ideal.
The truth is that AddPonkaToImage
messages should have a higher priority in our system than DeletePhotoFile
: we always want AddPonkaToImage
to be handled before any DeletePhotoFile
messages... even if they were added first.
So... can we set a priority on messages? Not exactly. It turns out that in the queueing world, this is solved by creating multiple queues and giving each of those a priority. In Symfony Messenger, that translates to multiple transports.
Below the async
transport, create a new transport called, how about, async_priority_high
. Let's use the same DSN as before, which in our case is using doctrine
. Below, add options
, then queue_name
set to high
. The name high
isn't important - we could use anything. The queue_name
option is specific to the Doctrine transport and ultimately controls the value of a column in the table, which operates like a category and allows us to have multiple "queues" of messages inside the same table. And also, for any transport, you can configure these options as query parameters on the DSN or under this options
key.
framework: | |
messenger: | |
... lines 3 - 10 | |
transports: | |
... lines 12 - 17 | |
async_priority_high: | |
dsn: '%env(MESSENGER_TRANSPORT_DSN)%' | |
options: | |
queue_name: high | |
... lines 22 - 30 |
At this point we have three queues - which are all stored in the same table in the database, but with different queue_name
values. And now that we have this new transport, we can route AddPonkaToImage
to async_priority_high
.
framework: | |
messenger: | |
... lines 3 - 25 | |
routing: | |
... line 27 | |
'App\Message\AddPonkaToImage': async_priority_high | |
... lines 29 - 30 |
If we stopped now... all we've really done is make it possible to send these two different message classes to two different queues. But there's nothing special about async_priority_high
. Sure, I put the word "high" in its name, but it's no different than async
.
The real magic comes from the worker. Find your terminal where the worker is running and hit Control+C to stop it. If you just run messenger:consume
without any arguments and you have more than one transport, it asks you which transport you want to consume:
php bin/console messenger:consume
Meaning, which transport do you want to receive messages from. But actually, you can read messages from multiple transports at once and tell the worker which should be read first. Check this out: I'll say async_priority_high, async
.
This tells the worker: first ask async_priority_high
if it has any messages. If it doesn't, then go check the async
transport.
We should be able to see this in action. I'll refresh the page, delete a bunch of images here as fast as I can and then upload a couple of photos. Check the terminal output:
It's handles DeletePhotoFile
then... AddPonkaToImage
, another AddPonkaToImage
, another AddPonkaToImage
and... yea! It goes back to handling the lower-priority DeletePhotoFile
.
So, in the beginning - before we uploaded - it did consume a few DeletePhotoFile
messages. But as soon as it saw a message on that async_priority_high
transport, it consumed all of those until it was empty. When it was, it then returned to consuming messages from async
.
Basically, each time the worker looks for the next message, it checks the highest priority transport first and only asks the next transport - or transports - if it's empty.
And... that's it! Create a new transport for however many different priority "levels" you need, then tell the worker command which order to process them. Oh, and instead of using this interactive way of doing things, you can run:
php bin/console messenger:consume async_priority_high async
Perfect. Next, let's talk about one option we can use to make it easier to develop while using queues... because always needing to remember to run the worker command while coding can be a pain.
// composer.json
{
"require": {
"php": "^7.1.3",
"ext-ctype": "*",
"ext-iconv": "*",
"composer/package-versions-deprecated": "^1.11", // 1.11.99
"intervention/image": "^2.4", // 2.4.2
"league/flysystem-bundle": "^1.0", // 1.1.0
"sensio/framework-extra-bundle": "^5.3", // v5.3.1
"symfony/console": "4.3.*", // v4.3.2
"symfony/dotenv": "4.3.*", // v4.3.2
"symfony/flex": "^1.9", // v1.9.10
"symfony/framework-bundle": "4.3.*", // v4.3.2
"symfony/messenger": "4.3.*", // v4.3.4
"symfony/orm-pack": "^1.0", // v1.0.6
"symfony/serializer-pack": "^1.0", // v1.0.2
"symfony/validator": "4.3.*", // v4.3.2
"symfony/webpack-encore-bundle": "^1.5", // v1.6.2
"symfony/yaml": "4.3.*" // v4.3.2
},
"require-dev": {
"symfony/debug-pack": "^1.0", // v1.0.7
"symfony/maker-bundle": "^1.0", // v1.12.0
"symfony/test-pack": "^1.0" // v1.0.6
}
}