Transport for Consuming External Messages

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

We've just created a new message class & handler... then instantiated it and dispatched it directly into the message bus. Yep, we just did something totally... boring! But... it's actually pretty similar to our real goal! Our real goal is to pretend that an outside system is putting messages into a RabbitMQ queue... probably formatted as JSON... and we will read those messages, transform that JSON into a LogEmoji object and... basically dispatch that through the message bus. It's really the same basic flow: in both cases, we create a LogEmoji object and pass it to Messenger.

... lines 1 - 25
class ImagePostController extends AbstractController
{
... lines 28 - 42
public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus)
{
... lines 45 - 73
//$messageBus->dispatch(new LogEmoji(2));
... lines 75 - 76
}
... lines 78 - 105
}

Creating a Dedicated Transport

The first step is to create a transport that will read these messages from whatever queue the outside system is placing them into. We'll keep the async and async_priority_high transports: we'll continue to send and receive from those. But now create a new one called, how about: external_messages. I'll use the same DSN because we're still consuming things from RabbitMQ. But for the options, instead of consuming messages from message_high or messages_normal, we'll consume them from whatever queue that outside system is using - let's pretend it's called messages_from_external. Set that to just ~.

framework:
messenger:
... lines 3 - 19
transports:
... lines 21 - 50
external_messages:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: messages
type: direct
default_publish_routing_key: from_external
queues:
messages_from_external:
binding_keys: [from_external]
... lines 61 - 69

By the way, it is important that we use a different transport that reads from a different queue for these external messages. Why? Because, as you'll see in a few minutes, these external messages will need special logic to decode them back into the correct object. We'll attach that special logic to the transport.

Anyways, above this add auto_setup: false.

Tip

To support retry, you should use auto_setup and configure a few more things. See the tip below for more details.

Ok, there are a few important things happening here. The first is that this queue config means that when we consume from the external_messages transport, Messenger will read messages from a queue called messages_from_external. The second important thing is auto_setup: false. This tells Messenger not to try to create this queue. Why? Well... I guess our app could create that queue... that would probably be fine... but since we're expecting an external system to send messages to this queue, I'm guessing that that system will want to be responsible for making sure it exists.

Oh, and you probably also noticed that I didn't add any exchange config. That was on purpose. An exchange is only used when sending a message. And because we're not planning on ever sending a message through this transport, that part of the transport just won't ever be used.

Tip

Correction: if you're using AMQP and want "retries" to work, you will need to configure a routing & binding key so that if a message needs to be sent to this transport (for redelivery), Messenger can attach the correct binding key so that the message will end up in the messages_from_external queue. See the code block on this page for an updated example.

So with just this, we should be able to consume from the new transport. Spin over to your terminal and run:

php bin/console messenger:consume -vv external_messages

And... it explodes! This is awesome.

Server channel error: 404, message: NOT_FOUND - no queue 'messages_from_external'

We're seeing our auto_setup: false in action! Instead of creating that queue when it didn't exist, it exploded. Love it!

Creating the Queue By Hand

So now, let's pretend that we are that "external" system and we want to create that queue. Copy the queue name - messages_from_external - and, inside the Rabbit Manager, create a new queue with that name. Don't worry about the options - they won't matter for us.

And... hello queue! Let's go see if we can consume messages from it:

php bin/console messenger:consume -vv external_messages

It works! Well... there aren't any messages in the queue yet, but it's happily checking for them.

Putting an "External" Message into the queue

Now, let's continue to pretend like we are the "external" system that will be sending messages to this queue. On the queue management screen, we can publish a message into the queue. Convenient!

So... what will these messages look like? Well... they can look like anything: JSON, XML, a binary image, ASCII art - whatever we want. We'll just need to make sure that our Symfony app can understand the message - that's something we'll work on in a few minutes.

Let's think: if an outside system wants to send our app a command to log an emoji... and it can choose which emoji via a number... then... maybe the message is JSON that looks like this? An emoji key set to 2:

{
  "emoji": 2
}

Publish! Ok, go check the worker! Woh... it exploded! Cool!

Could not decode message using PHP serialization

And then it shows our JSON. Of course! If you're consuming a message that was placed in the queue by an external system... that message probably won't be in the PHP serialized format... and it really shouldn't be. Nope, the message will probably be JSON or XML. The problem is that our transport is trying to transform that JSON into an object by using the default PHP serializer. Literally, it's calling unserialize() on that JSON.

We need to be smarter: when a transport consumes messages from an external system, it needs to have a custom serializer so we can take control. Let's do that next.

Leave a comment!

  • 2020-01-08 weaverryan

    Ah, actually, I'm still wrong - I forgot what I was doing when I fixed this originally :). The code block on THIS page *should* have been updated when I added this note, but I missed that detail. I've just fixed that: the messenger.yaml code - https://symfonycasts.com/sc... - now shows how to set things up with routing & binding keys. You will *also* need to make a tweak in the custom serializer in the next chapter - but there's a separate note there for that.

    Phew! If that doesn't make sense, please let me know.

    Cheers!

  • 2020-01-08 weaverryan

    Ah! My mistake! Thanks for asking :). The code blocks are on the NEXT page - https://symfonycasts.com/sc... - the video doesn't include a few necessary details, but the related code blocks DO contain the necessary code. See the TIP in this section - https://symfonycasts.com/sc... - you need to implement encode() and a small change in decode().

    Let me know if this helps! I'll update the note :). Cheers!

  • 2020-01-07 carlosbuenosvinoszamora

    Same here!

  • 2020-01-06 Victor Bocharsky

    Hey Ruben,

    Thank you for such a nice feedback! That's exactly our mission, make explanation short and easy to understand! :)

    Cheers!

  • 2020-01-05 hawk
    Correction: if you want "retries" to work, you will need to configure
    routing & binding keys so that if a message is sent to this transport
    (for redelivery) that it will end up in the messages_from_external queue.
    See the code block on this page for an updated example!

    I can't find the mentioned code block with updated example...

  • 2019-10-28 Ruben Hazenbosch

    Very nice Tutorials! You explain it in an understanding manner, and make it easy to dive into the deep:) Keep them coming!