Prime Time with Messenger: Queues, Workers & more Fun! (Ryan Weaver)

Video not working?

It looks like your browser may not support the H264 codec. If you're using Linux, try a different browser or try installing the gstreamer0.10-ffmpeg gstreamer0.10-plugins-good packages.

Thanks! This saves us from needing to use Flash or encode videos in multiple formats. And that let's us get back to making more videos :). But as always, please feel free to message us.

Tip

SymfonyCon 2019 Amsterdam presentation by Ryan Weaver.

Talk slides

In Symfony 4.4, Messenger will lose its experimental label and officially become stable! In this talk, we'll go through Messenger from the ground-up: learning about messages, message handlers, transports and how to consume messages asynchronously via worker commands. Everything you need to make your app faster by delaying work until later.

We'll also talk about the many new features that were added to Messenger since Symfony 4.3, like retries, the failure transport and support for using Doctrine and Redis as transports.

Let's get to work with Messenger!

Alright. Hello. Welcome. It's ah, I guess that was my subtle cue to begin so we can stay on time here. All right, so let's get into this. We have lots of things to talk about. So my name is Ryan. I work on SymfonyCasts. So I do a lot of tutorials on SymfonyCasts. Hopefully you've heard of us at this point. Um, and what else to know about me? Where's, where's Leanna? This is the fun part. She doesn't come to the conferences anymore. There she is. How many people have met Leanna? Let's get a fair picture of, yeah, round of applause. There we go!

So if you don't know Leanna, this is what she looks like. She enjoys conversation, high fives. Basically making friends with everyone. So afterwards, if you would like to meet Leanna, she would like to meet you. She's all the way up here in the second row and or you can tweet at her right now. You can become remote friends by tweeting at her. That would also be awesome. She is responsible amongst actually running the company for, hold on. Here we go for all the animations on SymfonyCasts. So if you enjoy some of the ridiculous things we do, like it's almost always her. And most importantly, I am the father of my son Beckett. I am a dad. So this means that I get to show pictures of my kid to people and since the room is very tight, there's not really a way to get away from it.

So this is my mandatory "there is my son" picture. Okay, so messenger, I think at this point we mostly understand what messenger is. It's a Facebook product for sending chats to your friends. I use it all the time. It's very nice. It's all so a component in Symfony for sending messages and if you haven't used it before, that still makes it sound like it's kind of just Facebook messenger. You're like, wait, Facebook messenger is also how I send messages. So what does that actually mean? So messenger is, and if you haven't used it before, some of these words won't really have much meaning to you. It's technically a message bus, which a message bus can also be used as a command bus or an event bus or query bus. I'm not going to get into the details of the differences between those. Um, it's of course also a school bus sometimes and maybe most importantly or maybe the reason that you're here is that it allows you to run code asynchronously or so you're told.

Stroopwafel Ordering System

So let's get started with messenger. We are going to do something very important. We are going to make our stroopwafel ordering system. So we're going to make an end points or I guess I already made an end point where I can make a post request to send my stroopwafel order. And here's my very complex looking. Here we go. Controller code. Just going to point my computer at myself a bit more. There we go. Okay, so I'm taking some data, I'm creating some variables and okay, this is the really important part. This is our proprietary secret stroopwafel baking machine recipe. Okay? Here's what we do. We um, we uh, we put some, some dough on the waffle iron. Okay. That takes some time. Right? That takes about 500 milliseconds. We're fast, but it still takes some time. And then we add some caramel and if you have a topping, we add the topping and then at the very end we give it to you.

Right. So that's our very important system here. Takes a little bit of time for us to do this. And then boom, we have stroopwafel. How many people have had stroopwafel since they've been here? Okay, good. Yeah, like half. If you're like, what is a stroopwafel just Google it. You should have them here, you can get them fresh here. I've never had a fresh one until a few days ago. I've had them just not fresh. Yeah, well they're warm and the caramel kind of drips out the side. Okay. So this was delicious, but can we organize our code? Of course we can. So maybe, maybe, maybe something that we decide to do is a stroopwafel is a topping, what size you want and who you want to deliver it to. That's like what you tell the person when you want your, your stroopwafel. So maybe we decide to put those three arguments, those three parameters into just kind of an object.

Organizing Our Code

Okay. So anytime now that we want to make a stroopwafel, we can just create a StroopwafelOrder. And that's kind of the thing that we'll deal with. And this doesn't change our, our code much right now inside of our code, we can take those three variables and put them into an object. Our stroopwafel machine is now using the order variable. It's using this object to call, get size, get toppings. Uh, this is a very, very small change. It's like, okay, maybe the code is a little bit cleaner. Um, okay, if you're looking at the controller now, so we have this nice object, but we still have all of our code in the middle. So what do we do as Symfony developers we say, Oh, you should refactor that logic into a service and you can have nice skinny controllers. Okay, so let's refactor our logic into a service.

So this is our StroopwafelOrderHandler. We need the logger. So I auto wire the logger into it. Cool. And then I decide to make a function called handleOrder that be called anything. Right? And I'm just, I'm making this up. This is my service to centralize my code and then I just move all the logic into that class. So now in our controller, it's ah, we create our StroopwafelOrder object, put this stuff in it, and then we call the handleOrder method on our new service and pass it the object. Cool. So we haven't talked about messenger at all yet. That was just a sort of um, service refactoring 101 just moving code around, putting it into service. Nothing special at all about that. But we're already very close to using messenger. So when we started, we were calling the, the Stroopwafel Handler directly.

We're calling that ah, or actually not when we started, but there's sort of multiple layers we can do here. So most commonly if you have a service that does some work, you normally just call a method on it directly. Like, I need to make a stroopwafel. So let me get the service that does that and I will call a method on it. And we can do that by passing the maybe three arguments to it, like our, our, our size and our toppings and who it's for or we can go a little bit fancier, right? And have our model object that has those properties inside of it. And we could pass that to the service. But in both cases, we're just passing ah we're calling the method directly on the service. So when we introduce messenger or a message bus, it's just a small difference here.

A message bus is just a middleman. So instead of you calling the method on the service directly, you take this, uh, the, your StroopwafelOrder object and you call dispatch on messenger. It's kind of this middle thing. And then it figures out which service to call and calls it for you. So really is like, what is a message bus? It's just a middle layer. Instead of you calling a service directly, you call this middle layer and then it figures out who to call. That's it. So if message bus idea is somewhat new to you, I hope you feel very underwhelmed at this moment. You're like, that's it. That's what a message bus is. Yes, that's what a message bus is. It's a very simple concept. So in diagram format, so let's say we have these kind of three messages on the left, these three things we want to do.

We want to bake a stroopwafel, brew coffee or clean up, clean up the bathroom. And then on the right here we have our kind of our people or our services that do that work. So instead of, you know, for example the, we're going to bake a stroopwafel that's done by the chef. So instead of telling the chef directly to do that, we're going to kind of pass that to the bus and the bus is going to say, Hm, this is work that's done by the chef. Okay? Same thing with brew coffee. That's the barrista and clean up the bathroom. That's maybe somebody else the custodian. Cool. Okay, so now we're ready to use messenger.

Configuring Messenger

So let's install it, composer require messenger. Of course we give you this nice big, blue message. It looks really good. We'll talk about all this stuff though. And the key thing with the message bus, there has to be a connection between the message and the handler.

How does message, how does the message bus know that when we give a StroopwafelOrder that it should call this service and this method on that service. Uh, and by the way I won't show it, but you can have one message. Uh, you can configure it, so one message has multiple handlers. So you can, you can say, here's my one message and maybe you need to have three different handlers call that. That's legal. I'll just do the kind of one to one relationship. So how does messenger know what to call when we pass it a StroopwafelOrder? And that could, when, when messenger was created, that could've been done in any way. It could have been all in Yaml the way that we came up with, which I really like is a two step process. And of course this is Symfony. So if you want to configure this in a harder way, you can definitely do that.

There's always a harder, more manual way of doing things. I'm going to show you the, uh, the most straight forward, the way that most people will do it, but you always have more control over this process. So the main way we do it is we say, okay, you take that service object and you now make it implement MessageHandler Interface. And all that does is that tells messenger to be aware that this is apparently a MessageHandler. It's not enough to tell it what messages it handles, but it kind of makes the system aware of it. This is actually a rare empty interface. There's no methods you implement. It's a purely a marker interface. So that the system is aware of it. Step two is, messenger needs to know what method to call on your handler. So even if it knows that this message, should go to this handler, it's like, okay, but what method would I call on it?

So the easiest way to solve that was to use PHPs __invoke. So again, you can configure this, but out of the box you're supposed to make your handlers have an __invoke method. That's what messenger expects there to be unless you tell it otherwise. And then the key part of it is, it's just really simple is once we have added the interface here, Symfony looks at this class, looks at the __invoke method and then looks at the one argument and reads it's type hint. It's like just the simplest thing ever. It says, apparently the StroopwafelOrderHandler should be called when a StroopwafelOrder is passed to the message bus because that's what this class is saying. So that's how the connections made between the two. And like we like to do in Symfony, we like to make that connection without having to go to another yaml file.

So everything's happening inside of the message class and the handler class. Okay. Then using, uh, using the message bus really couldn't be simpler and there's a new service MessageBusInterface, you auto wire that into your controller or into a service and then it really only has, um, Oh Hey, I have that backwards. It really only has one method on it, which I have backwards here with the red and green messageBus, arrow dispatch, there's no other method on it. You dispatch the message into it and then it takes care of the rest. So at this point that message is still synchronous though. This was just a, a really like a design pattern. We just used a message bus, uh, to call our handler for us. We added a middleman, but we didn't really make any significant changes to our application. So that's kind of a first half of messenger. The second half is, okay, now how do we make this asynchronous?

Um, so in general, like why would you have a message bus? Like why, why are we adding this middle layer? It was easier just for me to call my service directly. There was really no problem with that. By centralizing things through the message bus, um, you open up a lot of possibilities. In anytime in your code, if you, if you do something in 10 different places and you suddenly centralize that code, that gives you very powerful opportunities to do something every time you perform that action. So a simple example is you can now log a message every time or log something each time a message is dispatched. That's a silly example, but the point is you might be dispatching many messages to do lots of work from all around your application and you can now write some centralized code that is executed every single time a message goes through the message bus. So there's many more interesting things you can do, but you can run code at that time. Um, Oh yeah, here's a great example and we actually have a middleware that does this. You could, you could wrap each handler automatically in a transaction. That's not something that happens out of the box in messenger, but you can add that. And there's people like Tobias Nyholm uses that all the time. So in his handlers, he just kind of writes doctrine code and there's something else that wraps that in a transaction and then commits the transaction afterwards.

Um, and really the main thing, the most important thing for messenger is it means that you can actually have something in the center, intercept the message, not call the handler and then just store it somewhere. And if we've stored the message somewhere, then you can start to think like, hey, if we have this message sitting over here, I don't know, maybe we save it on the file system, that's not really that important, then we could maybe save it for later and then later read that message and then pass it to the handler. So that's kind of the key thing with the message bus. It gives you that hook for saving these messages somewhere else for later. So the way that you do that is called Transports. Which by chance is also a word that's used in the new mailer component. So the transport is where you want a message to be sent for storage elsewhere.

There are three supported out of the box. We have AMQP which is RabbitMQ. Uh, we have the doctrine transport which we are going to use cause it's probably the most familiar and easiest to people. Uh, which literally means that these messages are stored in a database table. And then we also have a Redis transports that you can put things in as well. And you can also do lots of other ones by using third party libraries. But in core we have those three things. So we'll make sure we have doctrine installed and maybe you already have it installed in your project. And then I'm going to uncomment the MESSENGER_TRANSPORT_DSN and use the doctrine transport. That doctrine://default, the default is referring to your default connection in doctrine. So if you have more than one connection, you would choose whichever connection you want.

So it's nice because there's no, um, you don't have to worry about configuring the database twice. You just say reuse my connection. Okay, so then inside of our configuration file, so config/packages/messenger.yaml we don't really have anything in here at this point. We're going to add something now, which is we're going to configure a transport called async. And that name is not important at all. So you can choose your favorite name for that. You're going to see how we reference that key in a second. We're just calling it async because it seems like a pretty good name. So we're saying this async transport should send to doctrine. Okay, so if we ran our code right now, it's still synchronous, meaning the request is still waiting for us to, you know, do the whole stroopwafel thing before the response is returned. So it hasn't changed that we've just added a transport.

Async Configuration

So the way that you actually say, I want this message to be asynchronous is you route it to a transport. So that's the last piece here. We have a routing key and we're going to say App\Message\StroopwafelOrder should go to async. The moment we do that, when it goes into the message bus, there's something that sends it to the doctrine transport and then just stops. So the handler is never called. And we get that response back very quickly. Okay. So where did what, what does it mean to be sent to the doctor and transport? Um, quite literally you have a table in your database. I mean for me only to be fully honest, only about six months ago, the whole kind of world of queuing was still new to me. So being able to use a doctrine transport to like learn and see things in a familiar atmosphere was very, very useful.

So that's where I'm using it here. You can literally, once you've sent that first message there, you have a new table in your database called messenger_messages and it looks like that craziness. The key thing here is you're actually seeing a PHP serialized object. So that message you put into there has just been serialized and stored in a database table. And there's a couple other properties like when was it created and things like that. But it's basically just sitting there in your database. Okay. Oh and as I mentioned, mentioned up here, the table is created automatically. So like you're like where did this table come from? It's just created automatically for you. You don't need an entity for it happens behind the scenes. Okay, so this is great. So now they're being stored in the database tables though. Next question is how do we actually say I want you read them and handle them.

This is the thing that's called a worker. Worker being a fancy word for a command line process that reads that message out of the database table and then sends it back through the message bus. But this time saying actually handle it. So in Symfony it's just a bin console command bin console messenger:consume -vv a, I do the -vv for very verbose. It just gives you like more, you can actually see what's happening if you do the -vv um, so we do that. We're going to get a couple of log entries. It's going to say I received the message, then it's going to say this message was handled by that class. Meaning it was handled successfully and Oh sorry. Those are actually split. So it says, first it says the message is being handled basically. And then at the end it says it was handled successfully. And, and acknowledging to transport it. That's a queuing word. Acknowledging to transport is a fancy way of saying I removed it from the database table, like we successfully handled it so it's no longer in the database table and now it's gone and it's handled. And then this, this process just, just hangs there and waits for more messages. Okay. And then we have stroopwafel so we still get a stroopwafel we just didn't have to wait for it. Yes. stroopwafel yeah. Yes.

So messenger, uh, done, uh, the end, I'm just kidding, don't, don't, yeah, don't clap or anything yet. Not actually the end because that's only been 20 minutes, but we could actually stop right there. We have a, a messenger tutorial on SymfonyCasts and I was like reviewing it for this talk. And I realized that in about the first five videos we talked about like all the main things, like you could, it's 45 chapters, 45 videos, the first five is enough to give you your 90% use case. Uh, and then you can dive way deeper after that. Um, but it really is just that simple. So the one thing you, even if you don't want to go too much deeper, you probably need to know a little bit more about that worker. Um, if you've never done this type of system before, and one of the obvious questions is how do I run this on production?

Running workers on Production

So as I mentioned, you run this command and it just sits there and waits and it finds another message. It processes it, waits, finds another message, processes it. Um, so do we just run this manually on production and hope that it runs forever and doesn't error out at any point? Definitely not. So actually you want your worker process to die every now and then. So, uh, you know, one of the questions with this stuff is like, hey, PHP isn't really meant to be a long running process. Aren't there memory leaks or this or that? Maybe, uh, the point is like, you should actually celebrate this. Uh, don't expect your, your worker to handle 10 million messages in a row without having any problems or any memory leaks or any errors or like that. Um, you want your worker to die and then when it dies you'll just restart it again.

But you want it to die on your terms but what you don't want is you don't want it to die because something in your code was not doing is hanging onto variables and your memory is getting bigger and then suddenly in the middle of making a stroopwafel you hit the memory limit. That's not a great failure cause you might be halfway done with the stroopwafel and it might be burning on the waffle iron when your worker dies. So you want it to die but you want it to die on your terms. So, what that means is there are options to the worker which basically say only handle 10 messages, then die. Just stop, just exit or run messages until you take up 128 megabytes of memory or run messages for a maximum of 10 minutes. And actually the more aggressive you are in this, like kind of the safer you are because it's the less likely that you're going to end up in the middle of handling a message.

And then something just, uh, something just dies. By the way, because I won't mention it later. If a message did get halfway through and then you just died, just like a really fatal error, that message will be re-handled. That's kind of a property of a queuing system is like the queuing system will redeliver the message later unless you've actually told it, I did finish this. You still don't want it to die halfway through because it still might mean you did half of the work. And so if you handle again later, you'll do half of the work again and then the rest of the work, um, but it will get redelivered. So that means that we need something on production to make sure that this process is always running. And when it exits, it always restarts. This is kind of the one infrastructure thing you need to worry about if you're on a kind of a really, uh, not really.

But if you're on a traditional server setup, this is most traditionally done with supervisor. We have documentation on that on Symfony. It's just a little configuration file. And you say, make sure that command never stops running, which really means when it dies, that supervisor will start it again. And you can also tell it to run three of these commands at once, cause that's totally legal. Three workers or 10 workers or 100 workers so that you're processing the messages faster. Um, or if you're on like a platform as a service, like SymfonyCloud, Platform.SH, Heroku, these are usually built in. There's usually a section that says like workers and you give it the command and then and then it runs it. So another issue with this is, think about it. If you, when you make a new deploy, you probably already have a worker process running and maybe it will run for five more minutes before it like hits some limit and then, and then exits and then restarts again.

So when you deploy your code, does the worker process see the new code? And it actually doesn't, cause if you think about it, when it was booted up, that was before your deploy. So it got all of those classes in memory. And then when you deploy changes to those classes, they're not there until that worker actually exits and restarts. So again, you actually want your workers to exit and restart in this. So one of the features new in 4.3 and I'll talk about lots of features that are new in 4.3 and 4.4 is very simply just a way to stop all your workers. So on deploy, this is a deploy step. You say messenger:stop-workers and they'll gracefully exit, they'll finish the message that they are currently working on and then they'll close themselves and then you know your Supervisord or whatever is going to then spin up a new one and it solves this problem. So this is like one of the little things that, it seems little but it makes messenger a lot more usable now than one year ago because this is something you had to kind of worry about handling yourself.

Running Locally

Oh also what about running locally? That's another thing. It's like, Hey, when you're running, when you're doing things locally and you have your worker in the background, if you are actively working on your handler, you need to remember to stop the worker and restart it or else it doesn't see the code. I do this all the time. So if you use the Symfony binary, which, which I highly recommend you, that has a symfony run -d command, which just runs things, uh, just executes things, the -d means run as a daemon and then you can tell it to run symfony console messenger:consume. And then my favorite thing, I just learned this, it has a brand new, I think this is maybe only uh, maybe a week old. It has a watch on it. So this basically says, uh, I want you to run that in the background and watch for changes on these directories and then restart the process if you see changes to those directories. So kind of takes care of restarting itself in the background. This also, um, if I remember correctly, will, uh, stops itself when you do your server stop. So if you're using the web server, a server stop is going to stop all the background processes.

Envelopes & Stamps

All right. Um, so I do have to talk a little bit about envelopes and stamps as a core concept. It's not a new concept if you've used messenger or saw Samuel's presentation last year. So I'll just talk a little bit about it. Um, you're sending messages into message bus. So we thought that was a really cool idea. So Nicolas Grekas thought it would be really cute if we, if we put the message, if we were able to put the messages into envelopes and then put stamps on them. I mean, it's just adorable. So we have this concept in, in a Symfony and messenger of messages inside envelopes with stamps on them. And probably most of the time you don't know or care that this is happening. Um, until you do. So, first of all, one of the things that you can do is instead of dispatching a message directly, you can create this Envelope object.

That's a, an object in the core of Symfony. It's not something I created. You create this Envelope object, you put the message inside of it and then you actually dispatch the envelope. This makes no difference. In fact, if you look inside the MessageBus class, if you pass it a Message object, something that's not an envelope, it creates an envelope for you inputs it inside. So our message was always in an envelope. We just didn't, we don't have to worry about ourselves if we don't care, but you are allowed to put it inside of an envelope. Why would you put it inside of an envelope? Because you want to add a stamp. So the message itself is only concerned about containing information that's needed by the handler. Like what is the topping, what is the size and who is this stroopwafel for. But sometimes there's other information that you kind of need to attach to the message.

Most commonly configuration for the transport. So this is a real stamp called the DelayStamp where you can actually tell the transport, I want to delay this message five seconds before it's processed. Another really common stamp if you're using RabbitMQ is there is an AMQP stamp which lets you control like more specific parameters that are special to RabbitMQ that are special to AMQP. So you can customize like, like you could do a um, if you use AMQP, you can attach a um, a routing key. That's how you're gonna attach a routing. Can you do a specific message is via stamp. There's not a ton of these stamps but they exist and that's how you kind of control various things. Also, this is fun for debugging how the system works. The message bus dispatch method returns an the envelope. It's an, it's kind of your same envelope but as it goes through the process, other people add stamps to it and you can dump this and it's just kind of interesting to see what it adds.

We can see our delay stamp on top. There's actually something called the BusNameStamp. It's an internal thing, but it actually records what bus you were sent to in case you have multiple buses in your app. This case it has a SentStamp. That's actually proof that your message was sent to a transport versus handled immediately. If it were handled, it would have a HandledStamp. So it really is like as this message gets delivered in a sense, like there's other parties like stamping it along the way, it went to this country, then I went to this country and then went here and kind of adding information and tracking things along the way. At the bottom there's a TransportMessageIdStamp, which is actually a stamp that contains the ID in the database that this got stored on since we're using the doctrine transport.

Priority Transports

All right, so another new feature in Symfony 4.3, was priority transports, which was a really important feature for me. So let's say we have a StroopwafelOrder and then we get a CoffeeOrder and then another StroopwafelOrder then um and then somebody tells us that we need to clean the bathroom, another CoffeeOrder, somebody tells us we need to wash the dishes. Um, these come in whatever order they come in, but it's not necessarily the order that we want to process them. Uh, probably if we go clean the bathroom while someone's waiting for their coffee order, it's not going to be a great user experience. Okay. So the way to handle this, um, it's not the only way to handle this, but the, typically the way this is handled in the queuing world and the way that's handled inside of messenger is not by adding a priority to messages.

This is something you can do. For example, in AMQP they have this concept of like prioritizing messages. But most of the time it's not about prioritizing the messages. It's about two different queues, two different transports, queues. Those are kind of synonyms to different queues and one of them is a higher priority than the other one. So in this case, we're going to create a second transport called async high priority and we are still gonna use the doctrine transport. Uh, but this time when you have a transport, you can pass various options to it. One of the options in the doctrine transport is a queue_name. And I'm just going to set this to high priority. What that literally means in the database is that doctrine has a column on that table called queue_name. If you don't set this, it's default. If you set it, it's set to whatever your value is here.

So we have one database table bore, sort of creating sort of two queues in that table. You know, the ability to select all from the default queue and then select all from the high priority queue. So, but they're all kind of still sitting in the same table. So we have a second, a um, transport now and now we're able to, uh, just send some messages to the high priority one and some messages to the normal one. Now at this point there is nothing different between those two transports. So if you're trying to figure out why one is more important than the other one, other than the name, there's nothing. There are two perfectly normal identical transports or queues. The important thing is when you handle these with your worker. So earlier when we ran messenger:consume, we just said messenger:consume. This time we're going to say messenger:consume and we're going to pass the names of our transports in the order that they should be prioritized.

So async_high_priority first and then async afterwards. So each time that the worker goes and looks for new messages, it first asks the first transport, the async_priority_high if there's any messages there, if there are, it handles them. If there aren't then it goes and asks the next transport if there are any there. And every time it handles one message it goes back to the beginning. So if it does handle one in the low priority one, as soon as it finishes, it doesn't keep working on low priority ones. It goes back to the beginning and says, okay now are there any high priority messages or messages that are in the high priority transport?

Failures & Retries

All right, so another thing that was new in 4.3 that was very, very important is like what happens on failure? Cause it's, it's, it's really important. Like, if something fails during a normal web request, uh, at least the user would see a 500 error and they would know that it didn't work and maybe they would refresh and maybe we'd get an error in our logs or something like that. And so it's even more important when you do something asynchronously that you, you know, that it actually works. You don't want somebody to, um, uh, maybe, um, uh, do a reset password and maybe you then send them the email asynchronously and it fails and they never get the email. It's like, that's kind of lame there, just like sitting there waiting for it. So what should happen when there's an error handling a message? So let's introduce our error.

So we, uh, we have a CoffeeOrderHandler. Um, we're not very good at keeping inventory of our coffee beans. So about four out of five times our handler throws an OutOf BeansException. Okay. So I know that's small. I'm going to describe what this is going through. So we, somebody orders coffee. We already have our worker that's like looking for messages. It's finds this message, it's really excited. It receives the message and then it says error while handling this message. And then it says sending for retry number one, um, 1000 millisecond delay. Cause that's a really important thing. One of the most common ways a handler can fail is because you're probably talking to a third party API or even your database. You're doing something over the network and obviously network connections can fail. And when they do, it's usually temporary. You might want to wait at least a little bit of time before you retry immediately. So this delays one second and then it sends it back. Well I should say it sends it back to the transport and says, wait one second before you give this back to me to handle. So to wait at least one second before we handle it again, then it consumes it again. It has another air, it sends it for retry number two. This time it delays it for two seconds, uh, after it tries to the third time it delays it for four seconds.

And then finally in this case, it actually got it the fourth time. Finally we had beans. It handled it successfully and everyone was happy. So, um, the obvious question is what if it keeps failing? Like does this just get tried forever? Um, so this is all dictated by what's called a retry_strategy. And what you see here are the default settings. Um, this is code I stole from you. I think, um, the settings here, the max_retries are three. Um, the delay is one, 1000 milliseconds, one second, and there's a multiplier of two. And what that means is you get an exponential delay on there. So the first delay is one second delay is a times two, so two seconds and then two, and then third one is two times two, four, and then four and eight. If you'd kept doing retries, it would be 16 seconds, 32 seconds.

So you can kind of give it this nice curve of being, like, if we're still having problems, maybe we need to wait a little bit longer. And by the way, it's a one-second by default. I know of people I know of use cases where people are delaying by a day, like totally legal. You get a single transport that says the types of things that we receive from this transport really should just be tried tomorrow if they're having problems. And you could put that here. There's also a service because of course this is Symfony. Everything is extendable. So there's a service that says, um, that's uh, that you can control the retry strategy stuff like however you want. All right? Um, but still what happens after three times? So this is another new feature in Symfony 4.3 called the failure_transport. And you do two things first.

Well, I guess first you make a, you make a third transport. Uh, we're calling failed. That's not important. And I'm still using the doctrine transport and I'm giving this another queue name. So if you look at kind of the bottom of this, this is me just creating a third transport, no different than any other transport. But that top key, that failure_transport, that points to it, that activates the failure transport functionality. So before we did this, if it failed three times, it then gets discarded from the queue and it's gone forever. If you activate the failure transport after the three failures, it will actually get sent to that failure transport this, so this says, Hey, I've, I've failed three times, I'm now going to send to the failure transport. So you now have this one transport, this one cue that starts to fill up with messages.

Now you could handle this queue like normal, you could actually consume from this queue, but that's not the way it's meant to be used because probably, you know, these are things that failed three times, probably or more. Um, depending on your configuration. So there might actually be a problem you need to look into. So there's a set of new commands where you can actually look at these and see what went on. So we can say ./bin/console messenger:failed:show list all the messages that have failed when they failed and we can get more information about one of them specifically. It's going to tell us a history of like when they failed, when they were redelivered. You can see the delays, the seconds of delay between those retries, the error class, what messages inside. And also like the full stack trace so you can see the full stack trace of like what happened to cause.

Um, I think this is actually just the most recent error. We don't keep track of all of the stack traces, just the most recent one. And then of course you can retry it. And there's also a command to be like, you know what, we're good. We're going to just remove that. It's not never going to work again. Um, so just, just, just fail it.

Fake Transports

Um, so also new in 4.3, just cause I want to show you guys all the new stuff that has happened since the last 12 months. Uh, fake transports, I call them. So one of them is, uh, one of them. So one of the things that occurred to me is that if you are handling things asynchronously, you might not want to while you're locally developing or you might want to, but you might not want a designer on your team to have things like running asynchronously and have to have a worker set up.

If you have like the whole infrastructure dockerized maybe you can do that easily and it automatically consumes them. So if you want to make all of your messages handled synchronously, you can do that with the sync transport. And it's just that simple. It's S Y N C colon, slash. Slash. It's a fake transport. It's like, yes, I am sending it to this transport. Quick handle it immediately and then it just handles it immediately. The other fake transport is for the test environment. So in the test environment you're like, ah, I don't really want or need things sent to a real queue when I'm doing my functional test. So we have some functional tests here, which makes the, the, the request to the StroopwafelOrder. Uh, I don't have, maybe I don't have, uh, uh, my queuing system set up or I don't, I just, I just don't want to send to the queue.

So the way you can handle that is in this case in .env.test, we will use the in-memory:// transport, which was almost called the null transport cause we're sending it to dev null. We're basically being like, yeah, we'll send it somewhere garbage. Uh, the one subtle difference is that it hangs onto the messages for the rest of that request. So that allows you at the bottom of your test. So this is the bottom of my test. I assert the response is successful. I can actually go and get the container from that last request and fetch that transport from the container. It's always messenger.transport.async. The name of that transport. And that transport has a couple of methods on it. Like, I want you to give me all the messages that were sent on you so you can assert that you actually sent one message and you can actually get the message out of it and make sure it looks correct.

Mailer Support

Um, Mailer supports, uh, I put this in here cause it's really cool but it's a, it's just one slide. Uh, could we send our emails async through messenger? Uh, sure. Just add one, one line to your routing. When Fabien built mailer, he put something in there that said, if messenger is present, I will dispatch a method, I will dispatch a message through messenger in order to send emails. So he, the hook point is already there. All we need to do is just say, when that message is dispatched through messenger, I want it to go to async uh, I want to send to an async transport. And that's it. So this is a really interesting hook point for third party libraries and things like that to be like, Hey, we're doing some work for you. By the way, in order to do that work, we're following the message bus pattern, which you don't care about except that it allows you to delay this until later if you choose to. So you can imagine somebody putting that in their bundles' Read.me says if you want to do this work later, just, you know, add this line to your routing and you're opting in to doing that work later.

More New Features

Um, so the summary of the things that I don't have time to actually talk about, if you are waiting for the Redis Transport that landed in 4.3 if you are using it, but waiting for delay support is the one thing that was missing in the Redis Transport is it couldn't delay. So your retries were like immediate. Um, that's there in 4.4. A piece ah, in 4.3 we changed the serializer to PHPSerializer. How many people, how many of you used messenger before 4.3? Yeah, several hands. And did you ever have any weird situations where the data was kind of missing as yeah, I was already making eye contact.

Yeah. I'm getting a big nod. Yeah. So the in Symfony 4.2 and before the object was serialized to JSON using the Symfony serializer the Symfony serializer is really great. It turns out that if you don't construct your class just the way it wants it, when you, when the message gets serialized, it probably serializes correctly when it gets deserialized some of your properties were missing. So in 4.3, we were like, look, if your Symfony application is responsible for sending and receiving the same message, let's just use PHP realization. The other one is still there. If you're sending to like third parties, you know, to queue for a third party to consume it. So you can still use JSON but out of the box it uses PHP serializer. Um, it basically just removed these WTF moments when you couldn't figure out why things weren't working.

There's also events in 4.3, so you can hook in and run code while the worker is doing things. Um, this is a really good one. It doesn't sound interesting until you hit the bug that causes, uh, between each, uh, message being handled by the worker. The entity manager is now cleared. Uh, we are getting problems where you would have one message handled here and you'd query for an entity. Five minutes later you would have another message that would query for the same entity and your data would be out of date because doctrine is very smart and says, Hey, you already queried for this. I'm not going to make a second query. I'm just going to return to you the five minutes old data. So that's no longer a problem. They really are independent of each other. from_transport, I won't talk about that in detail, but this is the ability to have a message with two handlers.

But one handler goes to one transport and the other handler goes to another transport like high priority, low priority, um, API Platform integration. That's not really new, but that's just once you get used to messenger, like there's a really good way to make, not really a custom operation, but let's say a custom operation on API Platform where when you, uh, when the user sends data to it, you actually, um, uh, handle that through messenger. So you kind of make a message with all the input you want and then it goes through messenger. And, uh, most interestingly, or at least interestingly depending on who you are, it's stable now, don't. Yes. Yeah.

It's um, you know, I'm, I have to stop very soon, but I was, I was listening to Sam, Samuel Roze, um, presentation last year and right at the very end he said these very famous words, he said it's still experimental, but I don't think it will change very much for Symfony 4.3, check out the Symfony, 4.3 messenger change log. There was some, we surprised ourselves. We've gotten those things behind us. If you look at the Symfony, 4.4 change log, it's much, much smaller. It is stable. We changed lots of things. We've had lots to learn. And the most important thing is that it's, it has the experimental off of it, which means even if there was something that we wanted to change, now what's going to happen in a backwards compatible way. So it's, it's actually safe to use. So to put this all together, um, messenger allows you to use a message bus or kind of use the pattern of a message bus, uh, save work for later and retry, uh, and introspect on errors so you just don't like lose them out of thin air. Um, and it's ready for production. So please use it. I love using it. Um, I've only been using it for seven, eight months now. Um, so still like relatively new to me, but, but I absolutely love it. Alright, so thank you guys very much.

Leave a comment!