Custom Transport Serializer
Keep on Learning!
If you liked what you've learned so far, dive in! Subscribe to get access to this tutorial plus video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeIf an external system sends messages to a queue that we're going to read, those messages will probably be sent as JSON or XML. We added a message formatted as JSON. To read those, we set up a transport called external_messages
. But when we consumed that JSON message... it exploded! Why? Because the default serializer for every transport is the PhpSerializer
. Basically, it's trying to call unserialize()
on our JSON. That's...uh... not gonna work.
Nope, if you're consuming messages that came from an external system, you're going to need a custom serializer for your transport. Creating a custom serializer is... actually a very pleasant experience.
Creating the Custom Serializer Class
Inside of our src/Messenger/
directory... though this class could live anywhere.. let's create a new PHP class called ExternalJsonMessengerSerializer
. The only rule is that this needs to implement SerializerInterface
. But, careful! There are two SerializerInterface
: one is from the Serializer component. We want the other one: the one from the Messenger component. I'll go to the "Code Generate" menu - or Command + N on a Mac - and select "Implement Methods" to add the two that this interface requires: decode()
and encode()
.
// ... lines 1 - 2 | |
namespace App\Messenger; | |
use Symfony\Component\Messenger\Envelope; | |
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; | |
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; | |
class ExternalJsonMessageSerializer implements SerializerInterface | |
{ | |
public function decode(array $encodedEnvelope): Envelope | |
{ | |
// TODO: Implement decode() method. | |
} | |
public function encode(Envelope $envelope): array | |
{ | |
// TODO: Implement encode() method. | |
} | |
} |
The encode() Method
The idea is beautifully simple: when we send a message through a transport that uses this serializer, the transport will call the encode()
method and pass us the Envelope
object that contains the message. Our job is to turn that into a string format that can be sent to the transport. Oh, well, notice that this returns an array. But if you look at the SerializerInterface
, this method should return an array with two keys: body
- the body of the message - and headers
- any headers that should be sent.
Nice, right? But... we're actually never going to send any messages through our external transport... so we don't need this method. To prove that it will never be called, throw a new Exception
with:
Transport & serializer not meant for sending messages
// ... lines 1 - 9 | |
class ExternalJsonMessageSerializer implements SerializerInterface | |
{ | |
// ... lines 12 - 22 | |
public function encode(Envelope $envelope): array | |
{ | |
throw new \Exception('Transport & serializer not meant for sending messages'); | |
} | |
} |
That'll give me a gentle reminder in case I do something silly and route a message to a transport that uses this serializer by accident.
Tip
Actually, if you want your messages to be redelivered, you do need to
implement the encode()
method. See the code-block on this page
for an example, which includes a small update to decode()
.
// ... lines 1 - 9 | |
class ExternalJsonMessageSerializer implements SerializerInterface | |
{ | |
public function decode(array $encodedEnvelope): Envelope | |
{ | |
// ... lines 14 - 19 | |
// in case of redelivery, unserialize any stamps | |
$stamps = []; | |
if (isset($headers['stamps'])) { | |
$stamps = unserialize($headers['stamps']); | |
} | |
return new Envelope($message, $stamps); | |
} | |
public function encode(Envelope $envelope): array | |
{ | |
// this is called if a message is redelivered for "retry" | |
$message = $envelope->getMessage(); | |
// expand this logic later if you handle more than | |
// just one message class | |
if ($message instanceof LogEmoji) { | |
// recreate what the data originally looked like | |
$data = ['emoji' => $message->getEmojiIndex()]; | |
} else { | |
throw new \Exception('Unsupported message class'); | |
} | |
$allStamps = []; | |
foreach ($envelope->all() as $stamps) { | |
$allStamps = array_merge($allStamps, $stamps); | |
} | |
return [ | |
'body' => json_encode($data), | |
'headers' => [ | |
// store stamps as a header - to be read in decode() | |
'stamps' => serialize($allStamps) | |
], | |
]; | |
} | |
} |
The decode() Method
The method that we need to focus on is decode()
. When a worker consumes a message from a transport, the transport calls decode()
on its serializer. Our job is to read the message from the queue and turn that into an Envelope
object with the message object inside. If you check out the SerializerInterface
one more time, you'll see that the argument we're passed - $encodedEnvelope
- is really just an array with the same two keys we saw a moment ago: body
and headers
.
Let's separate the pieces first: $body = $encodedEnvelope['body']
and $headers = $encodedEnvelope['headers']
. The $body
will be the raw JSON in the message. We'll talk about the headers later: it's empty right now.
// ... lines 1 - 9 | |
class ExternalJsonMessageSerializer implements SerializerInterface | |
{ | |
public function decode(array $encodedEnvelope): Envelope | |
{ | |
$body = $encodedEnvelope['body']; | |
$headers = $encodedEnvelope['headers']; | |
// ... lines 16 - 20 | |
} | |
// ... lines 22 - 26 | |
} |
Turning JSON into the Envelope
Ok, remember our goal here: to turn this JSON into a LogEmoji
object and then put that into an Envelope
object. How? Let's keep it simple! Start with $data = json_decode($body, true)
to turn the JSON into an associative array.
// ... lines 1 - 9 | |
class ExternalJsonMessageSerializer implements SerializerInterface | |
{ | |
public function decode(array $encodedEnvelope): Envelope | |
{ | |
$body = $encodedEnvelope['body']; | |
$headers = $encodedEnvelope['headers']; | |
$data = json_decode($body, true); | |
// ... lines 18 - 20 | |
} | |
// ... lines 22 - 26 | |
} |
I'm not doing any error-checking yet... like to check that this is valid JSON - we'll do that a bit later. Now say: $message = new LogEmoji($data['emoji'])
because emoji
is the key in the JSON that we've decided will hold the $emojiIndex
.
// ... lines 1 - 4 | |
use App\Message\Command\LogEmoji; | |
// ... lines 6 - 9 | |
class ExternalJsonMessageSerializer implements SerializerInterface | |
{ | |
public function decode(array $encodedEnvelope): Envelope | |
{ | |
$body = $encodedEnvelope['body']; | |
$headers = $encodedEnvelope['headers']; | |
$data = json_decode($body, true); | |
$message = new LogEmoji($data['emoji']); | |
// ... lines 19 - 20 | |
} | |
// ... lines 22 - 26 | |
} |
Finally, we need to return an Envelope
object. Remember: an Envelope
is just a small wrapper around the message itself... and it might also hold some stamps. At the bottom, return new Envelope()
and put $message
inside.
// ... lines 1 - 4 | |
use App\Message\Command\LogEmoji; | |
use Symfony\Component\Messenger\Envelope; | |
// ... lines 7 - 9 | |
class ExternalJsonMessageSerializer implements SerializerInterface | |
{ | |
public function decode(array $encodedEnvelope): Envelope | |
{ | |
$body = $encodedEnvelope['body']; | |
$headers = $encodedEnvelope['headers']; | |
$data = json_decode($body, true); | |
$message = new LogEmoji($data['emoji']); | |
// in case of redelivery, unserialize any stamps | |
$stamps = []; | |
if (isset($headers['stamps'])) { | |
$stamps = unserialize($headers['stamps']); | |
} | |
return new Envelope($message, $stamps); | |
} | |
// ... lines 28 - 55 | |
} |
Configuring the Serializer on the Transport
Done! We rock! This is already a fully functional serializer that can read messages from a queue. But our transport won't just start "magically" using it: we need to configure that. And.. we already know how! We learned earlier that each transport can have a serializer
option. Below the external transport, add serializer
and set this to the id of our service, which is the same as the class name: App\Messenger\
... and then I'll go copy the class name: ExternalJsonMessengerSerializer
.
framework: | |
messenger: | |
// ... lines 3 - 19 | |
transports: | |
// ... lines 21 - 50 | |
external_messages: | |
// ... line 52 | |
serializer: App\Messenger\ExternalJsonMessageSerializer | |
// ... lines 54 - 69 |
This is why we created a separate transport with a separate queue: we only want the external messages to use our ExternalJsonMessengerSerializer
. The other two transports - async
and async_priority_high
- will still use the simpler PhpSerializer... which is perfect.
Ok, let's try this! First, find an open terminal and tail the logs:
tail -f var/log/dev.log
And I'll clear the screen. Then, in my other terminal, I'll consume messages from the external_messages
transport:
php bin/console messenger:consume -vv external_messages
Perfect! There are no messages yet... so it's just waiting. But we're hoping that when we publish this message to the queue, it will be consumed by our worker, decoded correctly, and that an emoji will be logged! Ah, ok - let's try it. Publish! Oh, then move back over to the terminal.... there it is! We got an important message: cheese: it received the message and handled it down here.
So... we did it! We rock!
But... when we created the Envelope
, we didn't put any stamps into it. Should we have? Does a message that goes through the "normal" flow have some stamps on it that we should manually add here? Let's dive into the workflow of a message and its stamps, next.
Would utilizing a custom serializer be the best place to pull in a third party serializer? For instance, I want to utilize the CloudEvent PHP SDK to standardize our messages that we produce in our Symfony apps but also that we'll need to consume from external non Symfony apps. Would I then just utilize the serialize method in the encode() method and the opposite, deserialize in the decode() method? We're utilizing Kafka which really seems to be quite overcomplicated having to utilize messenger->enqueue->messenger-enqueue-transport.
I'm also figuring out how best to integrate the CloudEventImmutable object within our message/message handler class. I think a good approach would be to have a single message class that utilizes the CloudEvent object and then just pass in the application data through parameters to a wrapper class as opposed to creating multiple messages and message handlers. Curious as to best practices for this type of configuration. Loved the series, it was great having the deep dive on a lot of the internals of messenger!