If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeLet's repeat the new exchange setup for the async_priority_high
transport: we want this to deliver to the same direct exchange, but then use a different routing key to route messages to a different queue.
Change the exchange to messages
, set the type to direct
, then use default_publish_routing_key
to automatically attach a routing key called high
to each message.
Below, for the messages_high
queue, this tells Messenger that we want this queue to be created and bound to the exchange. That's cool, but we now need that binding to have a routing key. Set binding_keys
to [high]
.
framework: | |
messenger: | |
... lines 3 - 19 | |
transports: | |
... lines 21 - 37 | |
async_priority_high: | |
... line 39 | |
options: | |
exchange: | |
name: messages | |
type: direct | |
default_publish_routing_key: high | |
queues: | |
messages_high: | |
binding_keys: [high] | |
... lines 48 - 56 |
How can we trigger Messenger to create that new queue and add the new binding? Just perform any operation that uses this transport... like uploading a photo! Ok, go check out the RabbitMQ manager - start with Exchanges.
Yep, we still have just one messages
exchange... but now it has two bindings! If you send a message to this exchange with a high
routing key, it will be sent to message_high
.
Click "Queues" to see... nice - a new messages_high
queue with one message waiting inside.
And... we're done! This new setup has the same end-result: each transport ultimately delivers messages to a different queue. Let's go consume the waiting messages: consume async_priority_high
then async
.
php bin/console messenger:consume -vv async_priority_high async
And it consumes them in the correct order: handling AddPonkaToImage
first because that's in the high priority queue and then moving to messages in the other queue.
By the way, when we consume from the async
transport, for example, behind-the-scenes, it means that Messenger is reading messages from any queue that's configured for that transport. In our app, each transport has config for only one queue, but you could configure multiple queues under a transport and even set different binding keys for each one. But when you consume that transport, you'll be consuming messages from every queue you've configured.
So, let's back up and look at the whole flow. When we dispatch an AddPonkaToImage
object, our Messenger routing config always routes this to the async_priority_high
transport. This causes the message to be sent to the messages
exchange with a routing key set to high
... and the binding logic means that it will ultimately be delivered to the messages_high
queue.
Due to the way that Messenger's routing works - the fact that you route a class to a transport - every message class will always be delivered to the same queue. But what if you did want to control this dynamically? What if, at the moment you dispatch a message, you needed to send that message to a different transport than normal? Maybe you decide that this particular AddPonkaToImage
message is not important and should be routed to async
.
Well... that's just not possible with Messenger: each class is always routed to a specific transport. But this end-result is possible... if you know how to leverage routing keys.
Here's the trick: what if we could publish an AddPonkaToImage
object... but tell Messenger that when it sends it to the exchange, it should use the normal
routing key instead of high
? Yea, the message would technically still be routed to the async_priority_high
transport... but it would ultimately end up in the messages_normal
queue. That would do it!
Is that possible? Totally! Open up ImagePostController
and find where we dispatch the message. After the DelayStamp
, add a new AmqpStamp
- but be careful not to choose AmqpReceivedStamp
- that's something different... and isn't useful for us. This stamp accepts a few arguments and the first one - gasp! - is the routing key to use! Pass this normal
.
... lines 1 - 18 | |
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp; | |
... lines 20 - 24 | |
class ImagePostController extends AbstractController | |
{ | |
... lines 27 - 41 | |
public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus) | |
{ | |
... lines 44 - 64 | |
$envelope = new Envelope($message, [ | |
new DelayStamp(1000), | |
... lines 67 - 68 | |
new AmqpStamp('normal') | |
]); | |
... lines 71 - 73 | |
} | |
... lines 75 - 102 | |
} |
Let's try it! Stop the worker so we can see what happens internally. Then, upload a photo, go to the RabbitMQ manager, click on queues... refresh until you see the message in the right queue... we have to wait for the delay... and there it is! It ended up in messages_normal
.
By the way, if you look inside this AmqpStamp
class, the second and third arguments are for something called $flags
and $attributes
. These are a bit more advanced, but might just come in handy. I'll hit Shift+Shift to open a file called Connection.php
- make sure to open the one in the AmqpExt
directory. Now search for a method called publishOnExchange()
.
When a message is sent to RabbitMQ, this is the low-level method that actually does that sending. Those $flags
and $attributes
from the stamp are used here! Passed as the third and fourth arguments to some $exchange->publish()
method. Hold Cmd or Ctrl and click to jump into that method.
Oh! This jumps us to a "stub" - a "fake" method & declaration... because this class - called AMQPExchange
is not something you'll find in your vendor/
directory. Nope, this class comes from the AMQP PHP extension that we installed earlier.
So, if you find that you really need to control something about how a message is published through this extension, you can do that with the $flags
and $attributes
. The docs above this do a nice job of showing you the options.
And... that's it for AMQP and RabbitMQ! Sure, there's more to learn about RabbitMQ - it's a huge topic on its own - but you now have a firm grasp of its most important concepts and how they work. And unless you need to do some pretty advanced stuff, you understand plenty to work with Messenger.
Next, up until now we've been sending messages from our Symfony app and consuming them from that same app. But, that's not always the case. One of the powers of a "message broker" like RabbitMQ is the ability to send messages from one system and handle them in a totally different system... maybe on a totally different server or written in a totally different language. Craziness!
But if we're going to use Messenger to send messages to a queue that will then be handled by a totally different app... we probably need to encode those messages as JSON... instead of the PHP serialized format we're using now.
Hey @Anton!
Hmm. I do not believe so, the "framework -> messenger -> routing" config is very simple, just routing to which transport each message class should go. If you'd like to centralize this in some way, one option is via an event listener - I think listening to https://github.com/symfony/... would work. In this listener, you could have logic where you add the AmqpStamp based on the class of the message that is being routed. Or, you could make your messages implement some marker interface, and look for that interface from inside your listener.
Anyways, let me know if this is useful or not :).
Cheers!
Hey wtk13!
Sorry for my late reply - I saw how complex your message was and needed to find time to review it. But now I just saw that you seem to have found the answer yourself :). If you still have any questions about it, let me know.
Cheers!
why is it that even if I send a message already routed to an exchange but with a AmqpStamp/binding key linked with another exchange/queue the message will get routed to the exchange that handles that binding key? Even if this is desired outcome would not be considerate to maybe fire a warning saying that message-exchange-binding_key-route combination is overruled?
I want to know the details of what happens behind messenger/ampq-protocol.
Thanks!
Hey Francisc!
Great question :). If you're setting up binding & routing keys like we do in this chapter, you are definitely doing something more advanced and choosing to take more control over the process. For most cases, just "letting Messenger handle it" is the best way. But for some advanced use-cases - and for users that are very familiar with AMQP - this chapter shows how you could take more control.
But let me answer your questions specifically:
why is it that even if I send a message already routed to an exchange but with a AmqpStamp/binding key linked with another exchange/queue the message will get routed to the exchange that handles that binding key?
All of this is a specific to how AMQP works. Apologies if you already know some of this - I just want to give a complete answer.
So first, you are always sending a message to an exchange (not a queue). In this case, both async
and async_priority_high
are configured to send to the same exchange: messages
. AMQP is, sort of, "dumb". When the "messages" exchange receives a message, (because it is a direct exchange) it will look at the "routing key" (e.g. FOO_ROUTING_KEY) of that message and then send it to the 0-to-many queues that have said "Hey! If a FOO_ROUTING_KEY is set on a message, please send to me!". This is called a "binding".
In the Messenger config, each transport that we've configured has one item under "queues". For example:
async:
# ...
queues:
messages_normal:
binding_keys: [normal]
This causes a few things to happen in Messenger:
A) It creates a queue named "messages_normal"
B) It adds a "binding" to the "messages" exchange that says: "If the routing key is 'normal', please send to me".
C) When you "consume": this transport, it will "consume" messages from the "messages_normal" queue.
So ultimately, all of our config will cause 1 exchange to be created and 2 queues to be created, each with one binding key to the one exchange. Phew!
Even if this is desired outcome would not be considerate to maybe fire a warning saying that message-exchange-binding_key-route combination is overruled?
Yes, we could do do that. But at this point, we're doing SUCH custom things with AMQP, that we have to assume you know what you're doing. Heck, it's legal to publish a message with a routing key that corresponds to ZERO "bindings". You might think that this means that the message would get routed to NO queues. And you would be correct. Unless there is some other (3rd party) system that is also creating bindings. That's one of the truly powerful (but complex) things with AMQP: we cannot even assume that our Symfony app is the only thing creating exchanges, queues, publishing keys, etc. So even if the user does something that seems "crazy", they might have a perfectly good reason to do it.
This is all fascinating stuff to learn about it. But until/unless you have a reason to take advanced control over binding & routing keys like this, don't do it ;).
And if I've done a bad job explaining things, please let me know.
Cheers!
Thank you weaverryan for the answer, it help! Thank you also for the symfonycasts tutorials, they are really easy to follow and quite funny.
// composer.json
{
"require": {
"php": "^7.1.3",
"ext-ctype": "*",
"ext-iconv": "*",
"composer/package-versions-deprecated": "^1.11", // 1.11.99
"doctrine/annotations": "^1.0", // v1.8.0
"doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
"doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
"doctrine/orm": "^2.5.11", // v2.6.3
"intervention/image": "^2.4", // 2.4.2
"league/flysystem-bundle": "^1.0", // 1.1.0
"phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
"sensio/framework-extra-bundle": "^5.3", // v5.3.1
"symfony/console": "4.3.*", // v4.3.2
"symfony/dotenv": "4.3.*", // v4.3.2
"symfony/flex": "^1.9", // v1.18.7
"symfony/framework-bundle": "4.3.*", // v4.3.2
"symfony/messenger": "4.3.*", // v4.3.4
"symfony/property-access": "4.3.*", // v4.3.2
"symfony/property-info": "4.3.*", // v4.3.2
"symfony/serializer": "4.3.*", // v4.3.2
"symfony/validator": "4.3.*", // v4.3.2
"symfony/webpack-encore-bundle": "^1.5", // v1.6.2
"symfony/yaml": "4.3.*" // v4.3.2
},
"require-dev": {
"easycorp/easy-log-handler": "^1.0.7", // v1.0.7
"symfony/debug-bundle": "4.3.*", // v4.3.2
"symfony/maker-bundle": "^1.0", // v1.12.0
"symfony/monolog-bundle": "^3.0", // v3.4.0
"symfony/stopwatch": "4.3.*", // v4.3.2
"symfony/twig-bundle": "4.3.*", // v4.3.2
"symfony/var-dumper": "4.3.*", // v4.3.2
"symfony/web-profiler-bundle": "4.3.*" // v4.3.2
}
}
Hi, I'd like to configure a single exchange with multiple queues. I know the way to specify routing_key for the message by adding AmqpStamp directly when publishing a new message, but is there a way to configure it within yml configuration under framework -> messenger -> routing for each message?
Symfony 5.x