Messenger! Queue work for Later

5:05:05

What you'll be learning

Symfony's Messenger component gives you a beautiful system for designing your code around "message" objects and decoupled "handlers" that do the work!

... wait... that's not right, it's really...

Symfony's Messenger component gives you the power to execute code asynchronously via queues and workers!

Actually, it's both! Messenger is one of the newest parts of Symfony and we absolutely ❤️ it. In this tutorial, we'll create an important application that asynchronously adds our favorite cat (Ponka) to all of your favorite photos. Along the way, we'll learn to:

  • Create a messages classes
  • Add & configure "handler" classes that do the work
  • Dispatch messages through the bus
  • Configure "transports" for async messages
  • Route messages to transports
  • Execute "worker" processes that "consume" queued messages
  • Things fail!? Retry message and set up the "failure transport"
  • Middleware
  • Add & understand message stamps
  • Prioritized transports
  • A little CQRS: a Query bus
  • Testing & Dev: handling messages "sync"
  • Deployment and supervisor
  • RabbitMQ & AMQP

Let's get to work!


Your Guides
Ryan Weaver

Buy Access
Login or register to track your progress!

Questions? Conversation?

  • 2020-06-03 Vladimir Sadicov

    Egor Ushakov Of course it depends on what you need, but building a special service can be an easiest thing you can do in that case :)

    Cheers!

  • 2020-06-03 Egor Ushakov

    Hey, Vladimir Sadicov !
    Yeah, I knew the idea was a trash :) Frankly I just wanted "to cut the corner" as in Redis case I need to build a special Redis storage service. As I understood I can not use "native" Symfony namespaced cache pool based on cache.adapter.redis since CacheInterface is not enough for a given use case. Moreover I need to implement the same "namespacing" concept for this "special" Redis store. OK... As I said I just wanted "to hack the system" using stamp technique :) Thanks you, Vladimir!

  • 2020-06-03 Vladimir Sadicov

    Hello again Egor Ushakov :)

    Form my point of view this breaks the main idea of queue. Looks like you want to do a Soviet queue where you can go and take anything from queue by some criteria, but not by order. In normal queue messages should be processed by handler one by one in some sort of order when they are ready to be processed, and not when you are ready to take it.

    BTW I think you can use shared Redis connection with TTL on keys. It can totally fit your idea.

    Hope, I haven't missed something and understand your idea correctly!

    Cheers!

  • 2020-06-01 Egor Ushakov

    I believe it's a strange idea but can't manage to keep it in my head. I have an async queue for handling "core" domain events. Call them "global" events for now. And there are several bundles that do the job on behalf of "core" app but do it in a completely different way. Some of them "aggressively" interacts with external APIs sending GETs and POSTs. Others just waiting for callbacks from external systems onto bundles' own controllers. Each bundle lives with its own life for executing "global" task. While executing "local" tasks bundle generates some temporary data that have not to be known neither by "core" app nor by neighbour bundles. What if to store these data (from one to three ids literally) in a "local" queues in a message body with DelayStamp and later retrieve those message from the "local" queues by UniqueIdStamp for example? To be more specific: in the middle of bundle's work a $messageId (UUID) is generated and it can be coupled with the id of the "core" entity. In a couple of seconds, after hitting bundle's callback controller there is a $messageId in callback's request which is the only mean that could help us to retrieve "core" entity by its coupled id and relate the entity with the data received from callback. In this case the value of DelayStamp::$delay could be the timeout for waiting callback. Say we'd like to throw away the result of operation if it comes later than 60 seconds. Now we generate message with new DelayStamp(60) and new UniqueIdStamp($messageId). Suppose we receive callback in 3 sec, retrieve message by $messageId, extract $entityId from message body, remove message from the queue. If we still do not receive callback within 60 sec a handler process this message and send an error message to the world. Is it possible to retrieve the message from the queue (by id, stamps, pointers, whatever) and remove it? Does the idea sound reasonable or I must forget it ASAP?

  • 2020-05-19 Egor Ushakov

    You are my hero, Vladimir Sadicov !

  • 2020-05-18 Vladimir Sadicov

    Hey Egor Ushakov

    In theory you should run consuming command with separate debug configuration, and you should be able to connect it to phpstorm. Because it's cli process consuming this async messages.

    Cheers!

  • 2020-05-16 Egor Ushakov

    Hi! Any ideas how to employ xdebug for debugging message handlers for messages routed through async transports? In my scenario the message is dispatched after persisting an entity. Thus "request-response" cycle that could be handled with the pair of browser xdebug extension and PhpStorm listening to `php-xdebug` finishes before message handler hits the breakpoint. Is var_dump'ing (and similars) the only method to watch for the guts of an app in this case?

  • 2020-02-03 Victor Bocharsky

    Hey William,

    I'm not sure I completely understand your task. But usually, CRON is something that you do not generate dynamically. You create a Symfony command, and then set up it to be executed on some schedule. It's more manual process to figure out how often this command should run. And inside the command, you need to have a logic that will check if any work should be done or not. If not, like for example, no entities in the DB that should be processed - then just return. If some work should be done - then process. Something like this.

    I hope this helps!

    Cheers!

  • 2020-01-25 william bridge

    Hi Sir thank very much for your reply. Please i have a question. I would like to do planification task, i would like to save some task in the database and then execute a schedule method dynamically depend on the entry in database. So i want to read planification entity and generate cron with some service.

    Please how can i achieve this with symfony ?

  • 2020-01-20 Victor Bocharsky

    Hey William,

    For solving this task you would need to use a different solution called CRON. With this tool, that you can easily use on Unix systems, you can set up any tasks to be executed by specific schedule, like every hour, day, month, etc. Please, read about it, there's plenty of information all over the internet. Messenger in turn just helps to solve the problem with queues.

    I hope this helps!

    Cheers!

  • 2020-01-18 william bridge

    Hi Sir. Please i have a question. Please is it possible to do scheduled task with messenger like : excuting a task every day or every one day of a week or in a certain time ? Or is there other solution ? Please help. Thank you.

  • 2019-07-30 Diego Aguiar

    Hey ElGovanni

    Yes, that's a good use case. Writing a big file is another good example

    Cheers!

  • 2019-07-30 ElGovanni
    it's useful for any tasks that are heavy


    Like sending e-mail by symfony mailer?

  • 2019-07-24 weaverryan

    Hey @ElGovanni!

    I think, even if you don’t have any heavy work, Messenger enables this “command bus design pattern”. If you love that pattern, it’s awesome. If you don’t care so much, then yea, there’s much less use for it. So, it’s useful for any tasks that are heavy OR if you just want to leverage the command bus pattern.

    Cheers!

  • 2019-07-24 ElGovanni

    Cool feature but after 14 episodes I don't have in mind any use case for messenger.

  • 2019-07-24 Victor Bocharsky

    Hey skyCatalysm,

    Thank you for your interest in this screencast, your vote is counted. We do plan to record it some day, but unfortunately, we don't have any estimations yet - too many good stuff we would like to work on before it.

    Thank you for your patience!

    Cheers!

  • 2019-07-23 Kegan VanSickle

    Awesome series and perfect timing for me as well! I have a big project coming up that will utilize the Messenger component.

  • 2019-07-19 skyCatalysm

    Hi! will symfony cast ever tackle about mercure bundle? and when?

  • 2019-07-16 Skylar Scotlynn Gutman

    I get what your saying and it makes sense. I was going to swallow the whole dragon and test from the message dispatch to the results of the handlers work in one test.

    Thank again for your time. It's always appreciated.

  • 2019-07-16 weaverryan

    Hey Skylar Scotlynn Gutman!

    I don't think that specific topic will make it into this tutorial, so I'll give you some tips here :). The main reason is that (delightfully) there is nothing special about unit testing a handler. If you think about it, a handler is no different than any other service you might build in your app - it's a service, it uses dependency injection and it has a public method that "does" something. The only difference is that the method is called __invoke, which means that when you're calling it in your test, instead of $myService->myMethod('fooArg') it will be $myService('fooArg') (you execute the object like a function).

    The one other interesting thing about a handler is that they (unless you're using messenger as a query bus... but, different topic) never return anything. That's not a strict requirement - messenger doesn't care if you do - but you typically don't, because Messenger doesn't care about your return value (and neither should you when you dispatch to Messenger). This means that your unit tests will typically be testing what a handler function *does* instead of what it returns - e.g. you might test that it calls flush() on the entity manager. If you find that there are some other things internally that you'd like to test but are having trouble (for example, you want to test that an entity was created and with the correct data... before being saved), it's probably a sign that you should refactor that small piece you want to test into its own service, test that, then autowire the new service into your handler.

    Let me know if this helps! If i missed the heart of your question, definitely tell me ;).

    Cheers!

  • 2019-07-16 Skylar Scotlynn Gutman

    Hello guys,

    Thank you for this tutorial.

    I was wondering if you could help me write a unit test for the message handler. How would you go about it? May be you could add a chapter on the topic.

  • 2019-06-18 Huy Nguyen

    wow, I would love to watch this!!!!

  • 2019-06-15 Amin Behravesh

    I've been waiting for this course for a long time.
    Great job.