Transport for Consuming External Messages
Keep on Learning!
If you liked what you've learned so far, dive in! Subscribe to get access to this tutorial plus video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeWe've just created a new message class & handler... then instantiated it and dispatched it directly into the message bus. Yep, we just did something totally... boring! But... it's actually pretty similar to our real goal! Our real goal is to pretend that an outside system is putting messages into a RabbitMQ queue... probably formatted as JSON... and we will read those messages, transform that JSON into a LogEmoji
object and... basically dispatch that through the message bus. It's really the same basic flow: in both cases, we create a LogEmoji
object and pass it to Messenger.
// ... lines 1 - 25 | |
class ImagePostController extends AbstractController | |
{ | |
// ... lines 28 - 42 | |
public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus) | |
{ | |
// ... lines 45 - 73 | |
//$messageBus->dispatch(new LogEmoji(2)); | |
// ... lines 75 - 76 | |
} | |
// ... lines 78 - 105 | |
} |
Creating a Dedicated Transport
The first step is to create a transport that will read these messages from whatever queue the outside system is placing them into. We'll keep the async
and async_priority_high
transports: we'll continue to send and receive from those. But now create a new one called, how about: external_messages
. I'll use the same DSN because we're still consuming things from RabbitMQ. But for the options, instead of consuming messages from message_high
or messages_normal
, we'll consume them from whatever queue that outside system is using - let's pretend it's called messages_from_external
. Set that to just ~
.
framework: | |
messenger: | |
// ... lines 3 - 19 | |
transports: | |
// ... lines 21 - 50 | |
external_messages: | |
dsn: '%env(MESSENGER_TRANSPORT_DSN)%' | |
options: | |
exchange: | |
name: messages | |
type: direct | |
default_publish_routing_key: from_external | |
queues: | |
messages_from_external: | |
binding_keys: [from_external] | |
// ... lines 61 - 69 |
By the way, it is important that we use a different transport that reads from a different queue for these external messages. Why? Because, as you'll see in a few minutes, these external messages will need special logic to decode them back into the correct object. We'll attach that special logic to the transport.
Anyways, above this add auto_setup: false
.
Tip
To support retry, you should use auto_setup
and configure a few more things.
See the tip below for more details.
Ok, there are a few important things happening here. The first is that this queue config means that when we consume from the external_messages
transport, Messenger will read messages from a queue called messages_from_external
. The second important thing is auto_setup: false
. This tells Messenger not to try to create this queue. Why? Well... I guess our app could create that queue... that would probably be fine... but since we're expecting an external system to send messages to this queue, I'm guessing that that system will want to be responsible for making sure it exists.
Oh, and you probably also noticed that I didn't add any exchange
config. That was on purpose. An exchange is only used when sending a message. And because we're not planning on ever sending a message through this transport, that part of the transport just won't ever be used.
Tip
Correction: if you're using AMQP and want "retries" to work, you will need to
configure a routing & binding key so that if a message needs to be sent to this
transport (for redelivery), Messenger can attach the correct binding key so
that the message will end up in the messages_from_external
queue. See the
code block
on this page for an updated example.
So with just this, we should be able to consume from the new transport. Spin over to your terminal and run:
php bin/console messenger:consume -vv external_messages
And... it explodes! This is awesome.
Server channel error: 404, message: NOT_FOUND - no queue 'messages_from_external'
We're seeing our auto_setup: false
in action! Instead of creating that queue when it didn't exist, it exploded. Love it!
Creating the Queue By Hand
So now, let's pretend that we are that "external" system and we want to create that queue. Copy the queue name - messages_from_external
- and, inside the Rabbit Manager, create a new queue with that name. Don't worry about the options - they won't matter for us.
And... hello queue! Let's go see if we can consume messages from it:
php bin/console messenger:consume -vv external_messages
It works! Well... there aren't any messages in the queue yet, but it's happily checking for them.
Putting an "External" Message into the queue
Now, let's continue to pretend like we are the "external" system that will be sending messages to this queue. On the queue management screen, we can publish a message into the queue. Convenient!
So... what will these messages look like? Well... they can look like anything: JSON, XML, a binary image, ASCII art - whatever we want. We'll just need to make sure that our Symfony app can understand the message - that's something we'll work on in a few minutes.
Let's think: if an outside system wants to send our app a command to log an emoji... and it can choose which emoji via a number... then... maybe the message is JSON that looks like this? An emoji
key set to 2:
{
"emoji": 2
}
Publish! Ok, go check the worker! Woh... it exploded! Cool!
Could not decode message using PHP serialization
And then it shows our JSON. Of course! If you're consuming a message that was placed in the queue by an external system... that message probably won't be in the PHP serialized format... and it really shouldn't be. Nope, the message will probably be JSON or XML. The problem is that our transport is trying to transform that JSON into an object by using the default PHP serializer. Literally, it's calling unserialize()
on that JSON.
We need to be smarter: when a transport consumes messages from an external system, it needs to have a custom serializer so we can take control. Let's do that next.
I can't find the mentioned code block with updated example...