Transport: Do Work Later (Async)

Video not working?

It looks like your browser may not support the H264 codec. If you're using Linux, try a different browser or try installing the gstreamer0.10-ffmpeg gstreamer0.10-plugins-good packages.

Thanks! This saves us from needing to use Flash or encode videos in multiple formats. And that let's us get back to making more videos :). But as always, please feel free to message us.

So far, we've separated the instructions of what we want to do - we want to add Ponka to this ImagePost - from the logic that actually does that work. And... it's a nice coding pattern: it's easy to test and if we need to add Ponka to an image from anywhere else in our system, it will be super pleasant.

But this pattern unlocks some serious possibilities. Think about it: now that we've isolate the instructions on what we want to do, instead of handling the command object immediately, couldn't we, in theory, "save" that object somewhere... then read and process it later? That's... basically how a queuing system works. The advantage is that, depending on your setup, you could put less load on your web server and give users a faster experience. Like, right now, when a user clicks to upload a file, it takes a few seconds before it finally pops over here. It's not the biggest deal, but it's not ideal. If we can fix that easily, why not?

Hello Transports

In Messenger, the key to "saving work for later" is a system called transports. Open up config/packages/messenger.yaml. See that transports key? The details are actually configured in .env.

Here's the idea: we're going to say to Messenger:

Yo! When I create an AddPonkaToImage object, instead of handling it immediately, I want you to send it somewhere else.

That "somewhere else" is a transport. And a transport is usually a "queue". If you're new to queueing, the idea is refreshingly simple. A queue is an external system that "holds" onto information in a big list. In our case, it will hold onto serialized message objects. When we send it another message, it adds it to the list. Later, you can read those messages from the queue one-by-one, handle them and, when you're done, the queue will remove it from the list.

Sure... robust queuing systems have a lot of other bells and whistles... but that really is the main concept.

Transport Types

There are a bunch of queueing systems available, like RabbitMQ, Amazon SQS, Kafka, and queueing at the supermarket. Out-of-the box, Messenger supports three: amqp - which basically means RabbitMQ, but technically means any system that implements the "AMQP" spec - doctrine and redis. AMQP is the most powerful... but unless you're already a queueing pro and want to do something crazy, these all work exactly the same.

Oh, and if you need to talk to some unsupported transport, Messenger integrates with another library called Enqueue, which supports a bunch more.

Activating the doctrine Transport

Because I'm already using Doctrine in this project, let's use the doctrine transport. Uncomment the environment variable for that.

36 lines .env
... lines 1 - 29
###> symfony/messenger ###
... lines 31 - 32
MESSENGER_TRANSPORT_DSN=doctrine://default
... line 34
###

See this ://default part? That tells the Doctrine transport that we want to use the default Doctrine connection. Yep, it'll re-use the connection you've already set up in your app to store the message inside a new table. More on that soon.

Now, back in messenger.yaml, uncomment this async transport, which uses that MESSENGER_TRANSPORT_DSN environment variable we just created. The name - async - isn't important - that could be anything. But, in a second, we'll start referencing that name.

framework:
messenger:
... lines 3 - 5
transports:
# https://symfony.com/doc/current/messenger.html#transports
async: '%env(MESSENGER_TRANSPORT_DSN)%'
... lines 9 - 16

Routing to Transports

At this point... yay! We've told Messenger that we have an async transport. And if we want back and uploaded a file now, it would... make absolutely no difference: it would still be processed immediately. Why?

Because we need to tell Messenger that this message should be sent to that transport, instead of being handled right now.

Back in messenger.yaml, see this routing key? When we dispatch a message, Messenger looks at all of the classes in this list... which is zero right now if you don't count the comment... and looks for our class - AddPonkaToImage. If it doesn't find the class, it handles the message immediately.

Let's tell Messenger to instead send that to the async transport. Set App\Message\AddPonkaToImage to async.

framework:
messenger:
... lines 3 - 12
routing:
# Route your messages to the transports
'App\Message\AddPonkaToImage': async

As soon as we do that... it makes a huge difference. Watch how fast the image loads on the right after uploading. Boom! That was faster than before and... Ponka isn't there! Gasp!

Actually, let's try one more - that first image was a little bit slow because Symfony was rebuilding its cache. This one should be nearly instant. It is! Instead of calling our handler immediately, Messenger is sending our message to the Doctrine transport.

Seeing the Queued Message

And... um... what does that actually mean? Find your terminal... or whatever tool you like to use to play with databases. I'll use the mysql client to connect to the messenger_tutorial database. Inside, let's:

SHOW TABLES;

Woh! We expected migration_versions and image_post... but suddenly we have a third table called messenger_messages. Let's see what's in there:

SELECT * FROM messenger_messages;

Nice! It has two rows for our two messages! Let's use the magic \G to format this nicer:

SELECT * FROM messenger_messages \G

Cool! The body holds our object: it's been serialized using PHP's serialize() function... though that can be configured. The object is wrapped inside something called an Envelope... but inside... we can see our AddPonkaToImage object and the ImagePost inside of that... complete with the filename, createdAt date, etc.

Wait... but where did this table come from? By default, if it's not there, Messenger creates it for you. If you don't want that, there's a config option called auto_setup to disable this - I'll show you how later. If you did disable auto setup, you could then use the handy setup-transports command on deploy to create that table for you.

php bin/console messenger:setup-transports

This doesn't do anything now... because the table is already there.

Hey! This was a huge step! Whenever we upload images... they are not being handled immediately: when we upload two more... they're being sent to Doctrine and it is keeping track of them. Thanks Doctrine!

Next, it's time to read those messages one-by-one and start handling them. We do that with a console command called a "worker".

Leave a comment!

  • 2020-04-27 Victor Bocharsky

    Hey Taken,

    Wow, a lot of questions at once... have you watched this course up to the end? Because I suppose you will get answers on almost all your questions in it later :) OK, a few thoughts about your questions from me:

    1. To handle messages you have to run "bin/console messenger:consume-messages" command, where you can specify memory limit with "--memory-limit" option. And this command tracks memory usage, so when the command handles a new message - it will check if the memory limit you specified is reached. As soon as it found the memory limit was hit - it will exit. That's why you need to specify a memory limit lower than your max memory. If you set 128MB memory limit - command will be stopped only when the memory is over this limit. E.g. used memory after 8th message is 121MB, 121 < 128, so the command will start handling a new message, but e.g. used memory after 9th message is 136MB, 136 > 128, so the command will exit. As you see, if you max allowed memory size would be 128 - the 9th message will fail because of free no memory. Usually --memory-limit is 2x lower than max memory limit to avoid "max memory reached" errors.

    2. It depends, by following this course you would see when it's better to use messenger. In some cases, your code might become more clean with messenger even if you use it sync, but sometimes it just may complicate things and lead to bugs if you will do it incorrectly. The advantage is that if you don't need the async handling yet, but switched to messenger architecture using sync, you would be easier to switch your project to async message handling later if needed - it's just like change one line of code in your configuration to use a new transport.

    3. It depends on your use cases :) If you have 1000 new users every day, and all them you sends welcome emails after registration - that's totally ok to do it sync, like after each successful registration send the email immediately. But if you need at some point send 1000 emails to your subscribers - that's another question, and that's why messenger really may help.

    4. Haha, please, watch the course till the end, I bet you will answer this question yourself ;) But basically, maybe you even don't need it at all. You don't have use ALL Symfony components in your project :) Usually, people use messenger to do some heavy operations async, that's the best benefit of using messenger.

    5. Once again, it depends. If those tasks are heavy - you may think of doing them async to return your users response faster.

    6. If we're talking about messenger component - it depends on which transport you use. If you put some tasks to your queue but use sync transport - messenger will handle the message in the same process, so your users will wait the response until messenger finished processing the message. For example, after user registration you send a welcome email. If you will do it with messenger but with sync transport. User will see your "You're successfully registered" message only when the email is actually been sent. But if you will do it async - messenger will just add that message to the queue and you will show the user "You're successfully registered" message faster, even before the message is actually been sent to them. And another process, that handles those messages will eventually handles that message later and actually sent the email. So as a result, we may consider the 2nd option with async as better user experience as you return the response a little faster to user and then your system handles the remaining work.

    I hope this is a bit clearer for you. But I'd really suggest you to watch the course completely to understand the Messenger better.

    Cheers!

  • 2020-04-25 Taken

    Helo guys i'm new to symfony cast, thank you for the amazing tuto:

    1/ By what criteria Symfony messenger knows that this is the right time to process the following message. How the broker knows that there is some memory available and that the handler has finished processing all these messages to go to the next message! How ke broker knows that the handler has finished his job. I'm afraid that in some case it can lead to memory problems.

    2/ Is it good way to decouple code via messenger or event ubscriber is the traitment is synchrone?

    3/ What is the best way to send 1000 emails per a day.

    4/ Is there some usescase where it's not necessary to use messenger ! I still dont understand why why use symfony messenger !
    5/ For example, do we use the messenger when we are treating 3 dependent tasks to each other or not ?
    6/ Is queing système is synchronous traitment or asynchronous ? I mean queing is also synchronous he trait thing in FIFO mode. It's clear that the user gonna not wait for the response and that help for user expirience.

  • 2020-04-21 Diego Aguiar

    Hey Ahmedbhs

    As far as I know Symfony Messenger covers the concurrency problems when using Doctrine (the database) as a message transport. Actually, we use it on Symfonycasts, we have a couple of workers to consume all the messages and we haven't got any issues related to consume the same message twice

    Cheers!

  • 2020-04-19 Ahmedbhs

    Hi Rayen thank you so mush for the tuto,
    * There's some point of view that saying "a database is not a message queue." is this is true. I'm sure that, a database makes it very easy to centralize information, and we know very well how to save and redundate it.
    A lot of online recommendation say that we should not use a database as a message queue because problems can be appeared during concurrency, for exemple: if we made a SELECT request to retrieve the identifier of the next task to be processed, then an UPDATE request to indicate that this task is being treatment ; obviously, if two programs do this at the same time, the task may be processed twice.
    * The fact remains that it requires polling data Fromm database for each task, that is to say, scanning the database very regularly to see if there are new tasks waiting. It is true that this is not ideal, because it unnecessarily loads the base, but above all implies latency in the treatments

    I know that doctrine transport is amazing for development experience. But I'm wondring is symfony core team already solved the's problems.

    PS: This is the slide that talk about we should not use database as a queue système https://fr.slideshare.net/R...

    Thank you!

  • 2019-11-23 Victor Bocharsky

    Hey Roman,

    Interesting question :) Hm, creating a new transport and duplicate messages to it does not mean the messages were successfully handled in other transports, so most probably it's not something you need. Actually, the everything should be logged, so maybe it's enough for you to have those logs? I think you should be able to hook the Messenger, it has some events, you can take a look at them here: https://github.com/symfony/... . Or, as an alternative solution you can create your own middleware and then do whatever you want in it, like storing successfully handled messages in a separate table, etc.

    I hope this helps!

    Cheers!

  • 2019-11-19 Roman Andreev

    Hi! Is there a way to save all the messages even if they were consumed successfully? It may be quite useful. For example, I want to store history about all messages.
    This moment I can guess that I can create my own Transport, but maybe there is a more elegant and simple way?

  • 2019-10-21 Rémi Dck

    Good options ! And I had tested two of them yet... ;)
    1) I don't really want to save temporary-uploaded files (whooo,so scary, am I working on secrets stuffs? shhhh!) but i guess I will resignate because as you said PHP delete them at the end of the request. Life is sad but I know this is the right thing to do.
    2) Yes, doing this already, it's fine.
    3) Mmm agree, on Doctrine transport it'll definitively take too long and i'm working on heavy stuff.

    You gave me to think, thank you so much (and I have an other question for you, asking it in the right tuto)

  • 2019-10-20 weaverryan

    Yo @Rémi!

    Ah, really cool question. I totally get what you're doing - you want to process the file upload later so the user doesn't have to wait. Hmm, here's how I would do this, at least approximately - and you can let me know what you think ;).

    1) Move the temporary-uploaded file somewhere that the worker will have access to. If you have just one server, this is easy: just move it somewhere on your filesystem that is not the temporary directory (because I believe PHP deletes the tmp uploaded file at the end of the request - so that's no good!).

    2) Create a new message object that is, sort of, similar to the UploadedFile. It would contain anything you find relevant: like the original filename, the path to where you moved the file on the filesystem... and anything else. This will absolutely be serializable, because it will contain very simple data.

    Another option it to put the binary contents of the file itself onto the message class. However, I think you may run into problems if you do this with the Doctrine transport (which, out-of-the-box doesn't like binary content, though you could base64_encode it to get around this). Also, this might slow things down even worse (not sure, just guessing), because now you would need to send the (for example) 2mb of data to your queue and your user would need to wait for this to happen.

    Let me know what you think!

    Cheers!

  • 2019-10-19 Rémi Dck

    Hello ! Thanks for this tuto (amazing work!),
    I faced "Serialization of 'Symfony\Component\HttpFoundation\File\UploadedFile' is not allowed", trying to process upload files but without saving them on hosting? The goal is to merge all of this stuff asynchronously, and avoid waiting time for users, but I really want to avoid saving...
    Actually my handler works perfectly while messenger is synchronous but find the error on async. Could you help me? <3

  • 2019-09-06 Houssem D

    It's very clear thank you. I'll test both ;)

  • 2019-09-03 weaverryan

    Hey Houssem D!

    Cool question :). Short answer: both are fine. Slightly-less short answer: RabbitMQ is better, faster and more powerful.

    Here's an analogy. Suppose you want to cache something. Should you use "filesystem" cache or Redis? For many situations, both are just fine. If you have a huge, heavy-traffic, distributed app, Redis will start to make a difference. Or, if you use some advanced functionality or Redis itself (beyond just using it as a key-value store for data), then you'll need Redis.

    The exact same is true for RabbitMQ. It's definitely superior to Doctrine. But, unless you are processing many, many messages, I don't think you would notice much performance difference. Using the Doctrine transport will result in frequent (but fast) queries made to fetch messages... but that's about it. RabbitMQ also has many more advanced features & workflows... which if you don't know about them, then you don't need them... at least not yet ;).

    So, stick with Doctrine unless you have the need for more complex use-case, already know Rabbit or are excited to learn and try it.

    Let me know if this clears things up!

    Cheers!

  • 2019-09-01 Houssem D

    Hi, thanks for this tutorial.
    I have created some messages and I used doctrine transport like in this tutorial , but I want to ask you as you are an expert in symfony , better using doctrine transport or rabbitmq ? which one is the most efficient and faster ? Thanks

  • 2019-08-12 Victor Bocharsky

    Hey Alex,

    Glad you found the problem yourself and was able to fix it! Yeah, each new version might bring new commands, it's a good idea to check your version first if you don't see some of them in the list.

    And thank you for your feedback! It might be useful for others ;)

    Cheers!

  • 2019-08-07 Alex

    Hi Again! I was following the videos using my own project, which it was on 4.2.7 symfony version...So, upgrading to 4.3 (as the downloaded code) did the trick. Cheers!

  • 2019-08-07 Alex

    Hi! Right now when I type bin/console messenger, I only see debug:messenger and messenger:consume-messages, so I'm missing the rest of commands, any idea why? Thanks!