Mapping Messages to Classes in a Transport Serializer

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

We've written our transport serializer to always expect only one type of message to be put into the queue: a message that tells our app to "log an emoji". Your app might be that simple, but it's more likely that this "external" system might send 5 or 10 different types of messages. In that case, our serializer needs to detect which type of message this is and then turn it into the correct message object.

How can we do that? How can we figure out which one type of message this is? Do we... just look at what fields the JSON has? We could... but we can also do something smarter.

Refactoring to a switch

Let's start by reorganizing this class a bit. Select the code at the bottom of this method - the stuff related to the LogEmoji object - and then go to the Refactor -> "Refactor This" menu, which is Ctrl+T on a Mac. Refactor this code to a method called createLogEmojiEnvelope.

Tip

To make sure "retries" work correctly, some of the code in this section has been tweaked. See the code blocks on this page for the updated examples!

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 15 - 23
$envelope = $this->createLogEmojiEnvelope($data);
// in case of redelivery, unserialize any stamps
$stamps = [];
if (isset($headers['stamps'])) {
$stamps = unserialize($headers['stamps']);
}
$envelope = $envelope->with(... $stamps);
return $envelope;
}
... lines 35 - 63
private function createLogEmojiEnvelope($data): Envelope
{
if (!isset($data['emoji'])) {
throw new MessageDecodingFailedException('Missing the emoji key!');
}
$message = new LogEmoji($data['emoji']);
$envelope = new Envelope($message);
// needed only if you need this to be sent through the non-default bus
$envelope = $envelope->with(new BusNameStamp('command.bus'));
return $envelope;
}
}

Cool! That created a private function down here with that code. I'll add an array type-hint. Back in decode(), we're already calling this method. So, no big change.

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
... lines 13 - 31
private function createLogEmojiEnvelope(array $data): Envelope
{
... lines 34 - 44
}
}

Using Headers for the Type

The key question is: if multiple types of messages are being added to the queue, how can the serializer determine which type of message this is? Well, we could add maybe a type key to the JSON itself. That might be fine. But, there's another spot on the message that can hold data: the headers. These work a lot like HTTP headers: they're just "extra" information you can store about the message. Whatever header we put here will make it back to our serializer when it's consumed.

Ok, so let's add a new header called type set to emoji. I just made that up. I'm not making this a class name... because that external system won't know or care about what PHP classes we use internally to handle this. It's just saying:

This is an "emoji" type of message

Back in our serializer, let's first check to make sure that header is set: if not isset($headers['type']), then throw a new MessageDecodingFailedException with:

Missing "type" header

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 15 - 23
if (!isset($headers['type'])) {
throw new MessageDecodingFailedException('Missing "type" header');
}
... lines 27 - 33
}
... lines 35 - 54
}

Then, down here, we'll use a good, old-fashioned switch case statement on $headers['type']. If this is set to emoji, return $this->createLogEmojiEnvelope().

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 15 - 23
if (!isset($headers['type'])) {
throw new MessageDecodingFailedException('Missing "type" header');
}
switch ($headers['type']) {
case 'emoji':
$envelope = $this->createLogEmojiEnvelope($data);
break;
... lines 32 - 33
}
... lines 35 - 43
return $envelope;
}
... lines 46 - 90
}

After this, you would add any other "types" that the external system publishes, like delete_photo. In those cases you would instantiate a different message object and return that. And, if some unexpected "type" is passed, let's throw a new MessageDecodingFailedException with

Invalid type "%s"

passing $headers['type'] as the wildcard.

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 15 - 27
switch ($headers['type']) {
case 'emoji':
$envelope = $this->createLogEmojiEnvelope($data);
break;
default:
throw new MessageDecodingFailedException(sprintf('Invalid type "%s"', $headers['type']));
}
... lines 35 - 90
}

Tip

To support retries on failure, you also need to re-add the "type" header inside encode():

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
... lines 13 - 46
public function encode(Envelope $envelope): array
{
... lines 49 - 53
if ($message instanceof LogEmoji) {
... lines 55 - 56
$type = 'emoji';
... lines 58 - 59
}
... lines 61 - 66
return [
... line 68
'headers' => [
// store stamps as a header - to be read in decode()
'stamps' => serialize($allStamps),
'type' => $type,
],
];
}
... lines 76 - 90
}

Kinda cool, right? Let's go stop our worker, then restart it so it sees our new code:

php bin/console messenger:consume -vv external_messages

Back in the Rabbit manager, I'll change the emojis key back to emoji and... publish! In the terminal... sweet! It worked! Now change the type header to something we don't support, like photo. Publish and... yea! An exception killed our worker:

Invalid type "photo".

Ok friends... yea... that's it! Congrats on making it to the end! I hope you enjoyed the ride as much as I did! I mean, handling messages asynchronously... that's pretty fun stuff. The great thing about Messenger is that it works brilliantly out of the box with a single message bus and the Doctrine transport. Or, you can go crazy: create multiple transports, send things to RabbitMQ, create custom exchanges with binding keys or use your own serializer to... well... basically do whatever you want. The power... it's... intoxicating!

So, start writing some crazy handler code and then... handle that work later! And let us know what you're building. As always, if you have some questions, we're there for you in the comments.

Alright friends, seeya next time!

Leave a comment!

  • 2020-05-31 weaverryan

    ❤️

  • 2020-05-31 Julien Bonnier

    Yet, another awesome tutorial, thanks Ryan!

  • 2020-02-17 Diego Aguiar

    Yes, it's possible, you can see exactly how here: https://symfony.com/doc/cur...
    But, as the docs say, it's better if you refactor the logic you want to reuse into a service

  • 2020-02-14 Skylar Scotlynn Gutman

    Thanks for the reply. Do you know if it is possible to start a console process from a web request?

  • 2020-02-14 Diego Aguiar

    Hey Skylar Scotlynn Gutman

    Thanks for your kind words :)
    I don't know if RabbitMQ fit your needs, have you give it a try? Also, if you really want push notifications you may be interested in the Symfony Mercure component (https://symfony.com/doc/cur... ). I've never used it before but it seems solid

    Cheers!

  • 2020-02-14 Skylar Scotlynn Gutman

    Thank you so much for this tutorial! I am more comfortable using messenger. I have already implemented some of your recommendations and the doctrine transport is wonderful and now in use. Please keep up the good work.

    I am looking for a solution that doesn't require the server to constantly poll, so I was thinking to use a push notification service (AWS SNS) to start the messenger:consume process and terminate. Is this possible and how would I go about it?

  • 2019-10-28 weaverryan

    Hey Nicolás Rafales!

    Wow :D. That sounds... crazy - nice work to your and your team :). Sorry for not having the Redis support - it's the transport I have the *least* experience with and I'm not (completely) sure how useful it is to people between the doctrine transport and Rabbit. But, this is obviously a vote from you for "yes, useful!".

    > it could include a workaround for the DelayStamp

    Yea, we still need this for Redis - this was a ping to *try* to push that PR forward (https://github.com/symfony/... for 4.4, but we're very late at this point. What workaround did you do?

    > a message board (mercure fetched from middleware or manual dispatch of events) similar to what Laravel Horizon does it

    In 4.3... when I added a BUNCH of features to Messenger, a Horizon-like this is something that I had in mind - specifically I was adding events, etc that would/should make this theoretically possible. Can you tell me more about exactly what you would like to see? Like, do you just want to be able to see a visual representation of how many messages are in each queue, etc? Stats on how long messages are sitting in queues? The auto-scaling thing that Horizon does where it can increase/balance workers as messages get bigger?

    Anyways, thanks for the REALLY nice message - it means a lot ❤️

    Cheers!

  • 2019-10-25 Nicolás Rafales

    Brilliant! this opens up a bunch of diferent solutions... Altough i'd love some 'casts support for RedisTransport since that's the one i'm using right now, it could include a workaround for the DelayStamp which I hacked around; a redis-cli interpretation for reading the pub/sub streams and a message board (mercure fetched from middleware or manual dispatch of events) similar to what Laravel Horizon does it.

    At work, we implemented this transport because it was already deployed in our stack, Rabbit was just overkill for the small input we needed, doctrine is better suited for failed transport and could overflow mysqld on multiple messaging.
    We ended up with a CQRS implementation of some request lifecycles that get back a response with the user using mercure bundle.
    We also used API-Platform to fetch collections of the result of the command and events, with a custom provider for the result, which is upserted in Redis as a storage repository for entities with ttl. These entities feed an adapter custom made for jQuery datatables to populate a "reports" view, and need to be auto-deleted after ttl = 0 and a cronjob should delete the files missing a hash in Redis (wish that could be an event, i hate cronjobs).
    This made possible for us to use an external service made on python to get us back large data reports with ugly queries that take too long, and notify the user when done.
    And this was made posible by your casts.

    It has truly been a great ride for us, and specially for me.

    Thanks Ryan, keep making this awesome casts, and making Symfony a better framework.