Custom Transport Serializer

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

If an external system sends messages to a queue that we're going to read, those messages will probably be sent as JSON or XML. We added a message formatted as JSON. To read those, we set up a transport called external_messages. But when we consumed that JSON message... it exploded! Why? Because the default serializer for every transport is the PhpSerializer. Basically, it's trying to call unserialize() on our JSON. That's...uh... not gonna work.

Nope, if you're consuming messages that came from an external system, you're going to need a custom serializer for your transport. Creating a custom serializer is... actually a very pleasant experience.

Creating the Custom Serializer Class

Inside of our src/Messenger/ directory... though this class could live anywhere.. let's create a new PHP class called ExternalJsonMessengerSerializer. The only rule is that this needs to implement SerializerInterface. But, careful! There are two SerializerInterface: one is from the Serializer component. We want the other one: the one from the Messenger component. I'll go to the "Code Generate" menu - or Command + N on a Mac - and select "Implement Methods" to add the two that this interface requires: decode() and encode().

... lines 1 - 2
namespace App\Messenger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
// TODO: Implement decode() method.
}
public function encode(Envelope $envelope): array
{
// TODO: Implement encode() method.
}
}

The encode() Method

The idea is beautifully simple: when we send a message through a transport that uses this serializer, the transport will call the encode() method and pass us the Envelope object that contains the message. Our job is to turn that into a string format that can be sent to the transport. Oh, well, notice that this returns an array. But if you look at the SerializerInterface, this method should return an array with two keys: body - the body of the message - and headers - any headers that should be sent.

Nice, right? But... we're actually never going to send any messages through our external transport... so we don't need this method. To prove that it will never be called, throw a new Exception with:

Transport & serializer not meant for sending messages

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
... lines 12 - 22
public function encode(Envelope $envelope): array
{
throw new \Exception('Transport & serializer not meant for sending messages');
}
}

That'll give me a gentle reminder in case I do something silly and route a message to a transport that uses this serializer by accident.

Tip

Actually, if you want your messages to be redelivered, you do need to implement the encode() method. See the code-block on this page for an example, which includes a small update to decode().

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 14 - 19
// in case of redelivery, unserialize any stamps
$stamps = [];
if (isset($headers['stamps'])) {
$stamps = unserialize($headers['stamps']);
}
return new Envelope($message, $stamps);
}
public function encode(Envelope $envelope): array
{
// this is called if a message is redelivered for "retry"
$message = $envelope->getMessage();
// expand this logic later if you handle more than
// just one message class
if ($message instanceof LogEmoji) {
// recreate what the data originally looked like
$data = ['emoji' => $message->getEmojiIndex()];
} else {
throw new \Exception('Unsupported message class');
}
$allStamps = [];
foreach ($envelope->all() as $stamps) {
$allStamps = array_merge($allStamps, $stamps);
}
return [
'body' => json_encode($data),
'headers' => [
// store stamps as a header - to be read in decode()
'stamps' => serialize($allStamps)
],
];
}
}

The decode() Method

The method that we need to focus on is decode(). When a worker consumes a message from a transport, the transport calls decode() on its serializer. Our job is to read the message from the queue and turn that into an Envelope object with the message object inside. If you check out the SerializerInterface one more time, you'll see that the argument we're passed - $encodedEnvelope - is really just an array with the same two keys we saw a moment ago: body and headers.

Let's separate the pieces first: $body = $encodedEnvelope['body'] and $headers = $encodedEnvelope['headers']. The $body will be the raw JSON in the message. We'll talk about the headers later: it's empty right now.

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
... lines 16 - 20
}
... lines 22 - 26
}

Turning JSON into the Envelope

Ok, remember our goal here: to turn this JSON into a LogEmoji object and then put that into an Envelope object. How? Let's keep it simple! Start with $data = json_decode($body, true) to turn the JSON into an associative array.

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
$data = json_decode($body, true);
... lines 18 - 20
}
... lines 22 - 26
}

I'm not doing any error-checking yet... like to check that this is valid JSON - we'll do that a bit later. Now say: $message = new LogEmoji($data['emoji']) because emoji is the key in the JSON that we've decided will hold the $emojiIndex.

... lines 1 - 4
use App\Message\Command\LogEmoji;
... lines 6 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
$data = json_decode($body, true);
$message = new LogEmoji($data['emoji']);
... lines 19 - 20
}
... lines 22 - 26
}

Finally, we need to return an Envelope object. Remember: an Envelope is just a small wrapper around the message itself... and it might also hold some stamps. At the bottom, return new Envelope() and put $message inside.

... lines 1 - 4
use App\Message\Command\LogEmoji;
use Symfony\Component\Messenger\Envelope;
... lines 7 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
$data = json_decode($body, true);
$message = new LogEmoji($data['emoji']);
// in case of redelivery, unserialize any stamps
$stamps = [];
if (isset($headers['stamps'])) {
$stamps = unserialize($headers['stamps']);
}
return new Envelope($message, $stamps);
}
... lines 28 - 55
}

Configuring the Serializer on the Transport

Done! We rock! This is already a fully functional serializer that can read messages from a queue. But our transport won't just start "magically" using it: we need to configure that. And.. we already know how! We learned earlier that each transport can have a serializer option. Below the external transport, add serializer and set this to the id of our service, which is the same as the class name: App\Messenger\... and then I'll go copy the class name: ExternalJsonMessengerSerializer.

framework:
messenger:
... lines 3 - 19
transports:
... lines 21 - 50
external_messages:
... line 52
serializer: App\Messenger\ExternalJsonMessageSerializer
... lines 54 - 69

This is why we created a separate transport with a separate queue: we only want the external messages to use our ExternalJsonMessengerSerializer. The other two transports - async and async_priority_high - will still use the simpler PhpSerializer... which is perfect.

Ok, let's try this! First, find an open terminal and tail the logs:

tail -f var/log/dev.log

And I'll clear the screen. Then, in my other terminal, I'll consume messages from the external_messages transport:

php bin/console messenger:consume -vv external_messages

Perfect! There are no messages yet... so it's just waiting. But we're hoping that when we publish this message to the queue, it will be consumed by our worker, decoded correctly, and that an emoji will be logged! Ah, ok - let's try it. Publish! Oh, then move back over to the terminal.... there it is! We got an important message: cheese: it received the message and handled it down here.

So... we did it! We rock!

But... when we created the Envelope, we didn't put any stamps into it. Should we have? Does a message that goes through the "normal" flow have some stamps on it that we should manually add here? Let's dive into the workflow of a message and its stamps, next.

Leave a comment!

This tutorial is built with Symfony 4.3, but will work well on Symfony 4.4 or 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.9.10
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/orm-pack": "^1.0", // v1.0.6
        "symfony/serializer-pack": "^1.0", // v1.0.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "symfony/debug-pack": "^1.0", // v1.0.7
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/test-pack": "^1.0" // v1.0.6
    }
}