Custom Transport Serializer

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

If an external system sends messages to a queue that we're going to read, those messages will probably be sent as JSON or XML. We added a message formatted as JSON. To read those, we set up a transport called external_messages. But when we consumed that JSON message... it exploded! Why? Because the default serializer for every transport is the PhpSerializer. Basically, it's trying to call unserialize() on our JSON. That's...uh... not gonna work.

Nope, if you're consuming messages that came from an external system, you're going to need a custom serializer for your transport. Creating a custom serializer is... actually a very pleasant experience.

Creating the Custom Serializer Class

Inside of our src/Messenger/ directory... though this class could live anywhere.. let's create a new PHP class called ExternalJsonMessengerSerializer. The only rule is that this needs to implement SerializerInterface. But, careful! There are two SerializerInterface: one is from the Serializer component. We want the other one: the one from the Messenger component. I'll go to the "Code Generate" menu - or Command + N on a Mac - and select "Implement Methods" to add the two that this interface requires: decode() and encode().

... lines 1 - 2
namespace App\Messenger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
// TODO: Implement decode() method.
}
public function encode(Envelope $envelope): array
{
// TODO: Implement encode() method.
}
}

The encode() Method

The idea is beautifully simple: when we send a message through a transport that uses this serializer, the transport will call the encode() method and pass us the Envelope object that contains the message. Our job is to turn that into a string format that can be sent to the transport. Oh, well, notice that this returns an array. But if you look at the SerializerInterface, this method should return an array with two keys: body - the body of the message - and headers - any headers that should be sent.

Nice, right? But... we're actually never going to send any messages through our external transport... so we don't need this method. To prove that it will never be called, throw a new Exception with:

Transport & serializer not meant for sending messages

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
... lines 12 - 22
public function encode(Envelope $envelope): array
{
throw new \Exception('Transport & serializer not meant for sending messages');
}
}

That'll give me a gentle reminder in case I do something silly and route a message to a transport that uses this serializer by accident.

Tip

Actually, if you want your messages to be redelivered, you do need to implement the encode() method. See the code-block on this page for an example, which includes a small update to decode().

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 14 - 19
// in case of redelivery, unserialize any stamps
$stamps = [];
if (isset($headers['stamps'])) {
$stamps = unserialize($headers['stamps']);
}
return new Envelope($message, $stamps);
}
public function encode(Envelope $envelope): array
{
// this is called if a message is redelivered for "retry"
$message = $envelope->getMessage();
// expand this logic later if you handle more than
// just one message class
if ($message instanceof LogEmoji) {
// recreate what the data originally looked like
$data = ['emoji' => $message->getEmojiIndex()];
} else {
throw new \Exception('Unsupported message class');
}
$allStamps = [];
foreach ($envelope->all() as $stamps) {
$allStamps = array_merge($allStamps, $stamps);
}
return [
'body' => json_encode($data),
'headers' => [
// store stamps as a header - to be read in decode()
'stamps' => serialize($allStamps)
],
];
}
}

The decode() Method

The method that we need to focus on is decode(). When a worker consumes a message from a transport, the transport calls decode() on its serializer. Our job is to read the message from the queue and turn that into an Envelope object with the message object inside. If you check out the SerializerInterface one more time, you'll see that the argument we're passed - $encodedEnvelope - is really just an array with the same two keys we saw a moment ago: body and headers.

Let's separate the pieces first: $body = $encodedEnvelope['body'] and $headers = $encodedEnvelope['headers']. The $body will be the raw JSON in the message. We'll talk about the headers later: it's empty right now.

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
... lines 16 - 20
}
... lines 22 - 26
}

Turning JSON into the Envelope

Ok, remember our goal here: to turn this JSON into a LogEmoji object and then put that into an Envelope object. How? Let's keep it simple! Start with $data = json_decode($body, true) to turn the JSON into an associative array.

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
$data = json_decode($body, true);
... lines 18 - 20
}
... lines 22 - 26
}

I'm not doing any error-checking yet... like to check that this is valid JSON - we'll do that a bit later. Now say: $message = new LogEmoji($data['emoji']) because emoji is the key in the JSON that we've decided will hold the $emojiIndex.

... lines 1 - 4
use App\Message\Command\LogEmoji;
... lines 6 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
$data = json_decode($body, true);
$message = new LogEmoji($data['emoji']);
... lines 19 - 20
}
... lines 22 - 26
}

Finally, we need to return an Envelope object. Remember: an Envelope is just a small wrapper around the message itself... and it might also hold some stamps. At the bottom, return new Envelope() and put $message inside.

... lines 1 - 4
use App\Message\Command\LogEmoji;
use Symfony\Component\Messenger\Envelope;
... lines 7 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
$data = json_decode($body, true);
$message = new LogEmoji($data['emoji']);
// in case of redelivery, unserialize any stamps
$stamps = [];
if (isset($headers['stamps'])) {
$stamps = unserialize($headers['stamps']);
}
return new Envelope($message, $stamps);
}
... lines 28 - 55
}

Configuring the Serializer on the Transport

Done! We rock! This is already a fully functional serializer that can read messages from a queue. But our transport won't just start "magically" using it: we need to configure that. And.. we already know how! We learned earlier that each transport can have a serializer option. Below the external transport, add serializer and set this to the id of our service, which is the same as the class name: App\Messenger\... and then I'll go copy the class name: ExternalJsonMessengerSerializer.

framework:
messenger:
... lines 3 - 19
transports:
... lines 21 - 50
external_messages:
... line 52
serializer: App\Messenger\ExternalJsonMessageSerializer
... lines 54 - 69

This is why we created a separate transport with a separate queue: we only want the external messages to use our ExternalJsonMessengerSerializer. The other two transports - async and async_priority_high - will still use the simpler PhpSerializer... which is perfect.

Ok, let's try this! First, find an open terminal and tail the logs:

tail -f var/log/dev.log

And I'll clear the screen. Then, in my other terminal, I'll consume messages from the external_messages transport:

php bin/console messenger:consume -vv external_messages

Perfect! There are no messages yet... so it's just waiting. But we're hoping that when we publish this message to the queue, it will be consumed by our worker, decoded correctly, and that an emoji will be logged! Ah, ok - let's try it. Publish! Oh, then move back over to the terminal.... there it is! We got an important message: cheese: it received the message and handled it down here.

So... we did it! We rock!

But... when we created the Envelope, we didn't put any stamps into it. Should we have? Does a message that goes through the "normal" flow have some stamps on it that we should manually add here? Let's dive into the workflow of a message and its stamps, next.

Leave a comment!

  • 2020-05-25 Diego Aguiar

    Hey Sven

    Looks like this Github issue is related to your situation https://github.com/symfony/...
    sadly they haven't fixed the problem yet

    Cheers!

  • 2020-05-23 Sven

    Hi, there is only one part of the Symfony Messenger I do not understand.

    At the beginning of the tutorial we have gotten the following error

    Could not decode message using PHP serialization: {
    "emoji": 2
    }.

    Now my question is, why is the failed message not stored in the failed (doctrine queue_name=failed) transport for later handling?

    In the messenger.yaml the following is configured


    framework:
    messenger:
    # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
    failure_transport: failed
    transports:

    external_messages:
    dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
    options:
    auto_setup: false # it doesn´t create this queue
    queues:
    messages_from_external: ~

    failed: 'doctrine://default?queue_name=failed'

  • 2019-11-11 Victor Bocharsky

    Hey Stephane,

    I'm glad you were able to fix the problem by yourself! And thank you for sharing your solution with others. Though, the fix might depend. If it's ok to have no emoji index - then allowing null is something you would need here. But if every LogEmoji message should contain the index, i.e. should be an integer number - then you should search for the alternative fix. Most probably a LogEmoji message without Emoji index was saved to the DB somehow and now can't be unserialized and processed. So, the fix depends on your needs and your implementation

    Cheers!

  • 2019-11-10 Stéphane

    I find the solution : add ? before int and clean my array of emojis.

  • 2019-11-10 Stéphane

    Hi,

    When I try command php bin/console messenger:consume -vv external_messages, I have this error message :

    [Symfony\Component\Debug\Exception\FatalThrowableError]
    Argument 1 passed to App\Message\Command\LogEmoji::__construct() must be of the type integer, null given, called in /home/stephane/Hubic/www/
    knpuniversity/messenger/src/Messenger/ExternalJsonMessageSerializer.php on line 19

    You have any idea of the problem.
    Thank

  • 2019-11-04 weaverryan

    Hey Mathieu Piot!

    Thanks for posting and verifying that this fixed things :). I talked to a co-maintainer on Messenger and what he does is the same basic idea, but with one simple change. To encode the stamps, he uses PHP's serialize() function and then puts it on a header - I think just a single header (e.g. X-Stamps). Then, in decode, he uses unserialize() on that header. And that makes sense: this header would ONLY exist if the message were being redelivered from Messenger. And so, using native PHP serialization for this one field (and JSON for the actual message body) makes a lot of sense.

    Cheers!

  • 2019-11-04 Mathieu Piot

    Hi @wweaverryan !

    Thanks a lot for your reply, finaly I've done the Solution 1 by addind encode and decode of Stamps in the custom serializer. That avoid the unexpected bug of the third solution.

    I've do that: https://gist.github.com/mpi... (I can post it here if needed, but a bit long)
    In my Serializer I've added 2 functions: encodeStamps and decodeStamps (thath just take care of DelayStamp and RedeliveryStamp, with a lite encoding).

    Thanks again :)

  • 2019-11-04 weaverryan

    Hey Mathieu Piot!

    Wow, awesome question. Honestly, even though I wrote the retry logic... I totally did *not* think about the retry stuff here. You're right that, on a high level, there are a few options here. First, for others, if we receive a message from an external transport while consuming from some transport called "from_external", on failure, Messenger will try to *send* to that same transport.

    So, the most natural solution is (1). Basically, make sure your "from_external" transport is configured so that when it *sends*, it will end up in the same queue that it's receiving from (so, you may need to setup some binding keys). Then, as you mentioned, you would need to put the RedeliveryStamp stuff into the message somehow (e.g. headers)... and make sure that you also read it in decode(). That's indeed a tricky flow - I just asked another contributor on Messenger to verify if I'm missing something. I'll update you, assuming I hear from him (everyone is busy!).

    Anyways, let me know what you think - and if you get the flow working. I may need to update the tutorial with some notes related to this. Btw, another (not-as-cool) solution is to set your retry limit to 1 and leverage a failure transport. But then you need to manually retry (via the retry commands) after just one failure... which is kinda lame.

    Cheers!

  • 2019-10-31 Mathieu Piot

    Hi,
    Just a question about the custom serializer. I try to play a little with and if we just implement the decode part, and the handler fail, then, Messenger want to send the message again in the same queue with a RetryStamp and a DelayStamp.

    In this example and in the way I've done it, in the client application we only have a Command Message, this one is created from the custom Serializer when we receibed the EventMessage from an other application. But when it fail, we need to re-encode it in an array to re-treat him.

    If we do it simply, this create a kind of infinity loop (Delay and Retry Stamp are not added), what is the best way to do ?

    1. In the Serializer->encode(), I add some headers for the Stamps ? (By using the Symfony\Component\Messenger\Transport\Serialization\SerializerInterface)
    2. Try to reqeue the Command object in an other queue in place of the Event ?
    3. I try to implement the same EventMessage classes in twice, but one that sending without handlers, and the second with the Handler. I use the Symfony serializer to recreate the same message. It works well for Retry "1, then #2 but after the consumer finish with that error:


    In Serializer.php line 123:

    [Symfony\Component\Messenger\Exception\MessageDecodingFailedException]
    Could not decode stamp: The type of the "headers" attribute for class "Symfony\Component\Debug\Exception\FlattenException" must be one of "array" ("NULL" given)..

    Thanks a lot :)