If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeThe idea behind our async
and async_priority_high
transports was that we can send some messages to async_priority_high
and others to async
, with the goal that those messages would end up in different "buckets"... or, more technically, in different "queues". Then we can instruct our worker to first read all messages from whatever queue async_priority_high
is bound to before reading messages from whatever queue the async
transport is bound to.
This did work before with Doctrine, thanks to this queue_name: high
option. The default value for this option is... default
. As a reminder, I'll quickly log into my database:
mysql -u root messenger_tutorial
And see what that table looked like:
DESCRIBE messenger_messages;
Yep, the queue_name
column was the key to making this work. Messages that were sent to async_priority_high
had a queue_name
set to high
, and those sent to the async
transport had a value of default
. So even though we only had one database table, it functioned like two queues: when we consumed the async_priority_high
transport, it queried for all messages WHERE queue_name="high"
.
The problem is that this queue_name
option is specific to the doctrine transport, and it has absolutely no effect when using AMQP.
But... on a high-level... our goal is the same: we need two queues. We need the async_priority_high
transport to send messages to one queue and the async
transport to send messages to a different queue.
But with AMQP... you don't send a message directly to a queue... you send it to an exchange... and then it's the exchange's responsibility to look at its internal rules and figure out which queue, or queues, that message should actually go to.
This means that to get a message to a queue, we need to tweak things on the exchange level. And there are two different ways to do this. First, we could continue to have a single exchange and then add some internal rules - called bindings - to teach the exchange that some messages should go to one queue and other messages should go to another queue. I'm going to show you how to do this a bit later.
The second option isn't quite as cool, but it's a bit simpler. By default, when Messenger creates an exchange, it creates it as a fanout
type. That means that when a message is sent to this exchange, it's routed to every queue that's bound to it. So if we added a second binding to a second queue - maybe messages_high_priority
- then every message that's sent to this exchange would be routed to both queues. It would be duplicated! That's... not what we want.
Instead, we're going to create two fanout
exchanges, and each exchange will route all of its messages to a separate queue. We'll have two exchanges and two queues.
Let's configure this inside of messenger.yaml
. Under options
add exchange
then name
set to, how about, messages_high_priority
. Below this, add queues
with just one key below: messages_high
set to null
.
framework: | |
messenger: | |
... lines 3 - 19 | |
transports: | |
... lines 21 - 26 | |
async_priority_high: | |
... line 28 | |
options: | |
exchange: | |
name: messages_high_priority | |
queues: | |
messages_high: ~ | |
... lines 34 - 42 |
This config has three effects. First, because we have the auto_setup
feature enabled, the first time we talk to RabbitMQ, Messenger will create the messages_high_priority
exchange, the messages_high
queue and bind them together. The second effect is that when we send messages to this transport they will be sent to the messages_high_priority
exchange. The third and final effect is that when we consume from this transport, Messenger will read messages from the messages_high
queue.
If that still doesn't make complete sense... don't worry: let's see this in action. First, make sure that your worker is not running: our's is stopped. Now let's go over and delete three photos - one, two, three - and upload four photos.
Cool! Let's see what happened in RabbitMQ! Inside the manager, click "Exchanges". Nice! We do have a new messages_high_priority
exchange! The original messages
exchange still sends all of its messages to a messages
queue... but the new exchange sends all of its messages to a queue called messages_high
. That's thanks to our queues
config.
And... what's inside each queue? Go check it out! It's exactly what we want: the 3 deleted messages are waiting in the messages
queue and the 4 newly-uploaded photos are in messages_high
. Each transport is successfully getting their messages into a separate queue! And that means that we can consume them independently.
At the command line, we would normally tell Messenger to consume from async_priority_high
and then async
to get our prioritized delivery. But to clearly show what's happening, let's consume them independently for now. Start by consuming messages from the async
transport:
php bin/console messenger:consume -vv async
It starts processing the ImagePostDeletedEvent
objects... and stops after those three. It's done! That queue is empty. The command did not read the messages from messages_high
. To do that, consume the async_priority_high
transport:
php bin/console messenger:consume -vv async_priority_high
There we go! The simplest... but not fanciest... way to have prioritized transports with AMQP is to send each transport to a different exchange and configure it to route to a different queue. Later... we'll see the fancier way.
Before we get there, remember when I had you comment-out the DelayStamp
before we started using RabbitMQ? Next, I'll show you why: we'll re-add that DelayStamp
and see the crazy way that messages are "delayed" in RabbitMQ.
Hello! Can you provide the origin source of messenger configuration for amqp queues, please? It's the beginning of "Configuring a Second Exchange" paragraph in this lesson, changing the configuration of messenger.yaml file.
I didn't find this in official documentation neither in results on php bin/console config:dump framework messenger
command.
Hey Dariia M.
Yes it's not fully described in official doncumentation, not in config:dump
. I think it is so because it's DSN based configuration but you can find it in source code <a href="https://github.com/symfony/symfony/blob/efbe75291869ab944c62b48de9a0c2257ccace8f/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php#L80"> here </a>
Hope it will help you!
Cheers!
// composer.json
{
"require": {
"php": "^7.1.3",
"ext-ctype": "*",
"ext-iconv": "*",
"composer/package-versions-deprecated": "^1.11", // 1.11.99
"doctrine/annotations": "^1.0", // v1.8.0
"doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
"doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
"doctrine/orm": "^2.5.11", // v2.6.3
"intervention/image": "^2.4", // 2.4.2
"league/flysystem-bundle": "^1.0", // 1.1.0
"phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
"sensio/framework-extra-bundle": "^5.3", // v5.3.1
"symfony/console": "4.3.*", // v4.3.2
"symfony/dotenv": "4.3.*", // v4.3.2
"symfony/flex": "^1.9", // v1.18.7
"symfony/framework-bundle": "4.3.*", // v4.3.2
"symfony/messenger": "4.3.*", // v4.3.4
"symfony/property-access": "4.3.*", // v4.3.2
"symfony/property-info": "4.3.*", // v4.3.2
"symfony/serializer": "4.3.*", // v4.3.2
"symfony/validator": "4.3.*", // v4.3.2
"symfony/webpack-encore-bundle": "^1.5", // v1.6.2
"symfony/yaml": "4.3.*" // v4.3.2
},
"require-dev": {
"easycorp/easy-log-handler": "^1.0.7", // v1.0.7
"symfony/debug-bundle": "4.3.*", // v4.3.2
"symfony/maker-bundle": "^1.0", // v1.12.0
"symfony/monolog-bundle": "^3.0", // v3.4.0
"symfony/stopwatch": "4.3.*", // v4.3.2
"symfony/twig-bundle": "4.3.*", // v4.3.2
"symfony/var-dumper": "4.3.*", // v4.3.2
"symfony/web-profiler-bundle": "4.3.*" // v4.3.2
}
}
Hi, I am using first time RabbitMq . In rabbitmq inside query the ready is always 0 . Even after messenger is consumed . And the state is in running. Why is this not changing???? Can anyone help