If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeWe've just changed our messenger configuration to send messages to a cloud-based RabbitMQ instance instead of sending them to Doctrine to be stored in the database. And after we made that change... everything... just kept working! We can send messages like normal and consume them with the messenger:consume
command. That's awesome!
But I want to look a bit more at how this works... what's actually happening inside of RabbitMQ. Stop the worker... and then lets go delete a few images: one, two, three. This should have caused three new messages to be sent to Rabbit.
When we were using the Doctrine transport, we could query a database table to see these. Can we do something similar with RabbitMQ? Yea... we can! RabbitMQ comes with a lovely tool called the RabbitMQ Manager. Click to jump into it.
Aw yea, we've got data! And if we learn what some of these terms mean... this data will even start to make sense!
The first big concept in RabbitMQ is an exchange... and, for me, this was the most confusing part of learning how Rabbit works. When you send a message to RabbitMQ, you send it to a specific exchange. Most of these exchanges were automatically created for us... and you can ignore them. But see that messages
exchange? That was created by our application and, right now, all messages that Messenger transports to RabbitMQ are being sent to this exchange.
You won't see the name of this exchange in our messenger config yet, but each transport that uses AMQP has an exchange
option and it defaults to messages
. See this "Type" column? Our exchange is a type called fanout
. Click into this exchange to get more info... and open up "bindings". This exchange has a "binding" to a "queue" that's... by coincidence... also called "messages".
And this is where things can get a little confusing... but it's really a simple idea. The two main concepts in RabbitMQ are exchanges and queues. We're a lot more familiar with the idea of a queue. When we used the Doctrine transport type, our database table was basically a queue: it was a big list of queued messages... and when we ran the worker, it read messages from that list.
In RabbitMQ, queues have the same role: queues hold messages and we read messages from queues. So then... what the heck do these exchange things do?
The key difference between the Doctrine transport type and AMQP is that with AMQP you do not send a message directly to a queue. You can't say:
Hey RabbitMQ! I would like to send this message to the
important_stuff
queue.
Nope, in RabbitMQ, you send messages to an exchange. Then, that exchange will have some config that routes that message to a specific queue... or possibly multiple queues. The "Bindings" represents that config.
The simplest type of exchange is this fanout
type. It says that each message that's sent to this exchange should be sent to all the queues that have been bound to it... which in our case is just one. The "binding" rules can get a lot smarter - sending different messages to different queues - but let's worry about that later. For now, this whole fancy setup means that every message will ultimately end up in a queue called messages
.
Let's click on the Queues link on top. Yep, we have exactly one queue: messages
. And... hey! It has 3 messages "Ready" inside of it, waiting for us to consume them!
By the way... who created the messages
exchange and messages
queue? Are they... just standard to RabbitMQ? Rabbit does come with some exchanges out-of-the-box, but these were created by our app. Yep, like with the Doctrine transport-type, Messenger's AMQP transport has an auto_setup
option that defaults to true. This means that it will detect if the exchange and queue it needs exist, and if they're don't, it will automatically create them. Yep, Messenger took care of creating the exchange, creating the queue and tying them together with the exchange binding. Both the exchange name and queue name are options that you can configure on your transport... and both default to the word messages
. We'll see that config a bit later.
To summarize all of this: we send a message to an exchange and it forwards it to one or more queues based on some internal rules. Whoever is "sending" - or "producing" - the message just says:
Go to the exchange called "messages"
... and in theory... the "sender" doesn't really know or care what queue that message will end up in. Once the message is in a queue... it just sits there.. and waits!
The second part of the equation is your "worker" - the thing that consumes messages. The worker is the opposite of the sender: it doesn't know anything about exchanges. It just says:
Hey! Give me the next message in the "messages" queue.
We send messages to exchanges, RabbitMQ routes those to queues, and we consume from those queues. The exchange is a new, extra layer... but the end-result is still pretty simple.
Phew! Before we try to run our worker, let's upload 4 photos. Then.... if you look at the messages
queue... and refresh.... there it is! It has 7 messages!
As a reminder, we're sending AddPonkaToImage
messages to async_priority_high
and ImagePostDeletedEvent
to async
. The idea is that we can put different messages into different queues and then consume messages in the async_priority_high
queue before consuming messages in the async
queue. The problem is that... right now... everything is ending up in the same, one queue!
Check this out - find your terminal and only consume from the async
transport. This should cause only the ImagePostDeletedEvent
messages to be consumed:
php bin/console messenger:consume -vv async
And... yup, it does handle a few ImagePostDeletedEvent
objects. But if you keep watching... once it finishes those, it does start processing the AddPonkaToImage
messages.
We have such a simple AMQP setup right now that we've introduced a bug: our two transports are actually sending to the exact same queue... which kills our ability to consume them in a prioritized way. We'll fix that next by using two exchanges.
Oh, but if you flip back over to the RabbitMQ manager - you can see all the messages being consumed. Cool stuff.
Hey Genesis!
Hmm, I'm not sure about that. I honestly can't remember if this was showing up correctly for me or not - I was more focused on watching the number of messages in the queue increase then decrease. Sorry I can't offer more insight!
Cheers!
Hey Limozaza,
Messenger takes care of it automatically, you just need to have auto_setup option is set to true, that should be by default. See this config example: https://symfony.com/doc/cur... . If the exchange and queue are not exist - Messenger will create it automatically. Actually, that's why we're talking about in this video.
Also, take a look at further videos where we're talking about exchange as well.
I hope this helps!
Cheers!
// composer.json
{
"require": {
"php": "^7.1.3",
"ext-ctype": "*",
"ext-iconv": "*",
"composer/package-versions-deprecated": "^1.11", // 1.11.99
"doctrine/annotations": "^1.0", // v1.8.0
"doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
"doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
"doctrine/orm": "^2.5.11", // v2.6.3
"intervention/image": "^2.4", // 2.4.2
"league/flysystem-bundle": "^1.0", // 1.1.0
"phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
"sensio/framework-extra-bundle": "^5.3", // v5.3.1
"symfony/console": "4.3.*", // v4.3.2
"symfony/dotenv": "4.3.*", // v4.3.2
"symfony/flex": "^1.9", // v1.18.7
"symfony/framework-bundle": "4.3.*", // v4.3.2
"symfony/messenger": "4.3.*", // v4.3.4
"symfony/property-access": "4.3.*", // v4.3.2
"symfony/property-info": "4.3.*", // v4.3.2
"symfony/serializer": "4.3.*", // v4.3.2
"symfony/validator": "4.3.*", // v4.3.2
"symfony/webpack-encore-bundle": "^1.5", // v1.6.2
"symfony/yaml": "4.3.*" // v4.3.2
},
"require-dev": {
"easycorp/easy-log-handler": "^1.0.7", // v1.0.7
"symfony/debug-bundle": "4.3.*", // v4.3.2
"symfony/maker-bundle": "^1.0", // v1.12.0
"symfony/monolog-bundle": "^3.0", // v3.4.0
"symfony/stopwatch": "4.3.*", // v4.3.2
"symfony/twig-bundle": "4.3.*", // v4.3.2
"symfony/var-dumper": "4.3.*", // v4.3.2
"symfony/web-profiler-bundle": "4.3.*" // v4.3.2
}
}
Could you tell me how to resolve the problem - rabbitMQ dashboard in the queue section doesn`t show any consumers which read from this queue although in the console I see that consumers handle messages.