Envelopes & Stamps
Keep on Learning!
If you liked what you've learned so far, dive in! Subscribe to get access to this tutorial plus video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeWe just got a request from Ponka herself... and when it comes to this site, Ponka is the boss. She thinks that, when a user uploads a photo, her image is actually being added a little bit too quickly. She wants it to take longer: she wants it to feel like she's doing some really epic work behind the scenes to get into your photo.
I know, it's kind of a silly example - Ponka is so weird when you talk to her before her shrimp breakfast and morning nap . But... it is an interesting challenge: could we somehow not only say: "handle this later"... but also "wait at least 5 seconds before handling it?".
Envelope: A Great Place to put a Message
Yep! And it touches on some super cool parts of the system called stamps and envelopes. First, open up ImagePostController and go up to where we create the AddPonkaToImage object. AddPonkaToImage is called the "message" - we know that. What we don't know is that, when you pass your message to the bus, internally, it gets wrapped inside something called an Envelope.
Now, this isn't an especially important detail except that, if you have an Envelope, you can attach extra config to it via stamps. So yes, you literally put a message in an envelope and then attach stamps. Is this your favorite component or what?
Anyways, those stamps can carry all sorts of info. For example, if you're using RabbitMQ, you can configure a few things about how the message is delivered, like something called a "routing key". Or, you can configure a delay.
Put the Message into the Envelope, then add Stamps
Check this out: say $envelope = new Envelope() and pass it our $message. Then, pass this an optional second argument: an array of stamps.
| // ... lines 1 - 15 | |
| use Symfony\Component\Messenger\Envelope; | |
| // ... lines 17 - 23 | |
| class ImagePostController extends AbstractController | |
| { | |
| // ... lines 26 - 40 | |
| public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus) | |
| { | |
| // ... lines 43 - 63 | |
| $envelope = new Envelope($message, [ | |
| // ... line 65 | |
| ]); | |
| // ... lines 67 - 69 | |
| } | |
| // ... lines 71 - 98 | |
| } |
Include just one: new DelayStamp(5000). This indicates to the transport... which is kind of like the mail carrier... that you'd like this message to be delayed 5 seconds before it's delivered. Finally, pass the $envelope - not the message - into $messageBus->dispatch().
| // ... lines 1 - 17 | |
| use Symfony\Component\Messenger\Stamp\DelayStamp; | |
| // ... lines 19 - 23 | |
| class ImagePostController extends AbstractController | |
| { | |
| // ... lines 26 - 40 | |
| public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus) | |
| { | |
| // ... lines 43 - 63 | |
| $envelope = new Envelope($message, [ | |
| new DelayStamp(5000) | |
| ]); | |
| $messageBus->dispatch($envelope); | |
| // ... lines 68 - 69 | |
| } | |
| // ... lines 71 - 98 | |
| } |
Yep, the dispatch() method accepts raw message objects or Envelope objects. If you pass a raw message, it wraps it in an Envelope. If you do pass an Envelope, it uses it! The end result is the same as before... except that we're now applying a DelayStamp.
Let's try it! This time we don't need to restart our worker because we haven't changed any code it will use: we only changed code that controls how the message will be delivered. But... if you're ever not sure - just restart it.
I will clear the console so we can watch what happens. Then... let's upload three photos and... one, two, three, four there it is! It delayed 5 seconds and then started processing each like normal. There's not a 5 second delay between handling each message: it just makes sure that each message is handled no sooner than 5 seconds after sending it.
Tip
Support for delays in Redis WAS added in Symfony 4.4.
Side note: In Symfony 4.3, the Redis transport doesn't support delays - but it may be added in the future.
What other Stamps are There?
Anyways, you may not use stamps a ton, but you will need them from time-to-time. You'll probably Google "How do I configure validation groups in Messenger" and learn which stamp controls this. Don't worry, I'll talk about validation later - it's not something that's happening right now.
One other cool thing is that, internally, Messenger itself uses stamps to track and help deliver messages correctly. Check this out: wrap $messageBus->dispatch() in a dump() call.
| // ... lines 1 - 23 | |
| class ImagePostController extends AbstractController | |
| { | |
| // ... lines 26 - 40 | |
| public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus) | |
| { | |
| // ... lines 43 - 66 | |
| dump($messageBus->dispatch($envelope)); | |
| // ... lines 68 - 69 | |
| } | |
| // ... lines 71 - 98 | |
| } |
Let's go over and upload one new image. Then, on the web debug toolbar, find the AJAX request that just finished - it'll be the bottom one - click to open its profiler and then click "Debug" on the left. There it is! The dispatch() method returns an Envelope... which holds the message of course... and now has four stamps! It has the DelayStamp like we expected, but also a BusNameStamp, which records the name of the bus that it was sent to. This is cool: we only have one bus now, but you're allowed to have multiple, and we'll talk about why you might do that later. The BusNameStamp helps the worker command know which bus to send the Envelope to after it's read from the transport.
That SentStamp is basically a marker that says "this message was sent to a transport instead of being handled immediately" and this TransportMessageIdStamp, literally contains the id of the new row in the messenger_messages table... in case that's useful.
You don't really need to care about any of this - but watching what stamps are being added to your Envelope may help you debug an issue or do some more advanced stuff. In fact, some of these will come in handy soon when we talk about middleware.
For now, remove the dump() and then, so I don't drive myself crazy with how slow this is, change the DelayStamp to 500 milliseconds. Shh, don't tell Ponka. After this change... yep! The message is handled almost immediately.
| // ... lines 1 - 23 | |
| class ImagePostController extends AbstractController | |
| { | |
| // ... lines 26 - 40 | |
| public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus) | |
| { | |
| // ... lines 43 - 63 | |
| $envelope = new Envelope($message, [ | |
| new DelayStamp(500) | |
| ]); | |
| $messageBus->dispatch($envelope); | |
| // ... lines 68 - 69 | |
| } | |
| // ... lines 71 - 98 | |
| } |
Next, let's talk about retries and what happens when things go wrong! No joke: this stuff is super cool.
10 Comments
Hi SymfonyCasts.
First i want to thank you for all your work, i have respect for all the energy you put into these videos.
I've learned about messenger a week ago. I think it's great.
Right now I want to start using messenger with an Api Platform application (
https://api-platform.com/docs/core/messenger/).I do this together with doctrine.
What i want to know is this:
When a client calls an url on my api that generates lots of documents and this generating is handled with messenger, is there a way to let the client know that the documents are ready?
Right now i have a listener that listens for the messenger event 'onMessageHandled'. I was thinking i could do something there. Am i on the right track?
And what is best practise, put the documents at a certain location?
Thank you very much in advance and keep up the good work !!!
With kind regards,
Annemieke
Hey @Annemieke-B ,
First of all, thanks for the kind words about our courses!
Yeah, I think you're on the right track. Listening for the
onMessageHandledevent should be a solid approach for notifying the client when document generation is complete.When docs are handled - you can update the status of it in the DB so that the system knows docs were processed. There're a few techniques that should help you to notify your clients:
You can store documents in a secure location, such as a cloud storage bucket or a file server, and provide the client with a URL to access or download the documents. If it's a temporary files - you can run some CRON job that will clean up old docs to save some hdd space. Also, to avoid files leaking you can sign download URLs, IIRC AWS cloud bucket allow this for example. Then even if the user will reshare that URL with other users - it will be invalidated shortly.
I hope this helps and good luck with it!
Cheers!
Thank you very much for the quick response @Victor !
I've looked at both ways, the polling and the websockets. Both have some disadvantages.
The reason the api client wants to receive these documents is so they can create an email including these documents and send this to a 'customer'.
I have suggested to project leader to create this mail ourselves. I think this will work must faster and safer.
Thanks for your help. And please let me know what you think of my solution if you have the time.
With kind regards,
Annemieke
Hey Annemieke,
Indeed, both ways will have pros and cons, so it's mostly about choosing the best option for your specific needs.
Actually, your idea sounds like a good 3rd option. I don't know your specific project of course, but it can be automated this way - it sounds like a double-win.
Cheers!
Hi Ryan, is it possible to delay 5 seconds between handling each message?
Hey @Core!
Hmm, I'm not sure! You can add a
DelayStamp, but I don't think that's what you're referring to. Why do you want to delay exactly? Anyways, one way to do this would be to add a listener to theWorkerMessageHandledEventevent and, inside, literallysleep(5). That might sound a bit crazy, but that's actually what the worker (i.e.messenger:consume) already does between messages: it does ausleep(1000000)(equivalent tosleep(1)) after not finding any new messages before checking again - https://github.com/symfony/symfony/blob/69f46f231b16be1bce1e2ecfa7f9a1fb192cda22/src/Symfony/Component/Messenger/Worker.php#L134Cheers!
Is it possible to set custom "queue name" with parameter on
$messageBus->dispatch($message);
For example, "competition_41".
I need progress bar 0-100% for each "competitions messages".
Hey Dmitriy!
I don't think this is easily possible. The problem is that... queue systems aren't meant to be used this way. For AMQP, for example, you're supposed to send to an "exchange"... which then delivers to one or more queues based on routing rules. It seems like it should be possible, but you're not "supposed to" think about "sending a message directly to a queue".
If you're using AMQP, this is still possible - but you'd need to set some things up manually. It would look something like:
A) Create the 100 queues that you need in AMQP.
B) Set up some custom routing in AMQP so that different routing keys (e.g. competition_41) send to different queues (e.g. competition_41).
C) In Messenger, attach a custom AmqpStamp to the message with whatever routing key you want.
If you're using the Doctrine transport, I don't think making the queue dynamic is at all possible. I don't fully understand what you're trying to accomplish, but I may try to do it a different way.
Cheers!
Hello,
As a side note in your text, you wrote that redis transport delay is not supported in sf4.3, but may be added in future --> I think it's ok now since 4.4 ? https://github.com/symfony/... ? If so, we can update the text :) cheers !
Hey Julien R.!
Excellent idea - we'll add a note!
Cheers!
"Houston: no signs of life"
Start the conversation!