Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

AMQP Internals: Exchanges & Queues

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

We've just changed our messenger configuration to send messages to a cloud-based RabbitMQ instance instead of sending them to Doctrine to be stored in the database. And after we made that change... everything... just kept working! We can send messages like normal and consume them with the messenger:consume command. That's awesome!

But I want to look a bit more at how this works... what's actually happening inside of RabbitMQ. Stop the worker... and then lets go delete a few images: one, two, three. This should have caused three new messages to be sent to Rabbit.

When we were using the Doctrine transport, we could query a database table to see these. Can we do something similar with RabbitMQ? Yea... we can! RabbitMQ comes with a lovely tool called the RabbitMQ Manager. Click to jump into it.

Aw yea, we've got data! And if we learn what some of these terms mean... this data will even start to make sense!

Exchanges

The first big concept in RabbitMQ is an exchange... and, for me, this was the most confusing part of learning how Rabbit works. When you send a message to RabbitMQ, you send it to a specific exchange. Most of these exchanges were automatically created for us... and you can ignore them. But see that messages exchange? That was created by our application and, right now, all messages that Messenger transports to RabbitMQ are being sent to this exchange.

You won't see the name of this exchange in our messenger config yet, but each transport that uses AMQP has an exchange option and it defaults to messages. See this "Type" column? Our exchange is a type called fanout. Click into this exchange to get more info... and open up "bindings". This exchange has a "binding" to a "queue" that's... by coincidence... also called "messages".

Exchanges Send to Queues

And this is where things can get a little confusing... but it's really a simple idea. The two main concepts in RabbitMQ are exchanges and queues. We're a lot more familiar with the idea of a queue. When we used the Doctrine transport type, our database table was basically a queue: it was a big list of queued messages... and when we ran the worker, it read messages from that list.

In RabbitMQ, queues have the same role: queues hold messages and we read messages from queues. So then... what the heck do these exchange things do?

The key difference between the Doctrine transport type and AMQP is that with AMQP you do not send a message directly to a queue. You can't say:

Hey RabbitMQ! I would like to send this message to the important_stuff queue.

Nope, in RabbitMQ, you send messages to an exchange. Then, that exchange will have some config that routes that message to a specific queue... or possibly multiple queues. The "Bindings" represents that config.

The simplest type of exchange is this fanout type. It says that each message that's sent to this exchange should be sent to all the queues that have been bound to it... which in our case is just one. The "binding" rules can get a lot smarter - sending different messages to different queues - but let's worry about that later. For now, this whole fancy setup means that every message will ultimately end up in a queue called messages.

Let's click on the Queues link on top. Yep, we have exactly one queue: messages. And... hey! It has 3 messages "Ready" inside of it, waiting for us to consume them!

auto_setup Exchange & Queues

By the way... who created the messages exchange and messages queue? Are they... just standard to RabbitMQ? Rabbit does come with some exchanges out-of-the-box, but these were created by our app. Yep, like with the Doctrine transport-type, Messenger's AMQP transport has an auto_setup option that defaults to true. This means that it will detect if the exchange and queue it needs exist, and if they're don't, it will automatically create them. Yep, Messenger took care of creating the exchange, creating the queue and tying them together with the exchange binding. Both the exchange name and queue name are options that you can configure on your transport... and both default to the word messages. We'll see that config a bit later.

Send to an Exchange, Read from a Queue

To summarize all of this: we send a message to an exchange and it forwards it to one or more queues based on some internal rules. Whoever is "sending" - or "producing" - the message just says:

Go to the exchange called "messages"

... and in theory... the "sender" doesn't really know or care what queue that message will end up in. Once the message is in a queue... it just sits there.. and waits!

The second part of the equation is your "worker" - the thing that consumes messages. The worker is the opposite of the sender: it doesn't know anything about exchanges. It just says:

Hey! Give me the next message in the "messages" queue.

We send messages to exchanges, RabbitMQ routes those to queues, and we consume from those queues. The exchange is a new, extra layer... but the end-result is still pretty simple.

Phew! Before we try to run our worker, let's upload 4 photos. Then.... if you look at the messages queue... and refresh.... there it is! It has 7 messages!

Consuming from the Queue

As a reminder, we're sending AddPonkaToImage messages to async_priority_high and ImagePostDeletedEvent to async. The idea is that we can put different messages into different queues and then consume messages in the async_priority_high queue before consuming messages in the async queue. The problem is that... right now... everything is ending up in the same, one queue!

Check this out - find your terminal and only consume from the async transport. This should cause only the ImagePostDeletedEvent messages to be consumed:

php bin/console messenger:consume -vv async

And... yup, it does handle a few ImagePostDeletedEvent objects. But if you keep watching... once it finishes those, it does start processing the AddPonkaToImage messages.

We have such a simple AMQP setup right now that we've introduced a bug: our two transports are actually sending to the exact same queue... which kills our ability to consume them in a prioritized way. We'll fix that next by using two exchanges.

Oh, but if you flip back over to the RabbitMQ manager - you can see all the messages being consumed. Cool stuff.

Leave a comment!

4
Login or Register to join the conversation
Genesis Avatar
Genesis Avatar Genesis | posted 1 year ago

Could you tell me how to resolve the problem - rabbitMQ dashboard in the queue section doesn`t show any consumers which read from this queue although in the console I see that consumers handle messages.

Reply

Hey Genesis!

Hmm, I'm not sure about that. I honestly can't remember if this was showing up correctly for me or not - I was more focused on watching the number of messages in the queue increase then decrease. Sorry I can't offer more insight!

Cheers!

Reply
Boufares Avatar
Boufares Avatar Boufares | posted 2 years ago

Hello
how to create queue or exgenge rabbitmq from symfony

Reply

Hey Limozaza,

Messenger takes care of it automatically, you just need to have auto_setup option is set to true, that should be by default. See this config example: https://symfony.com/doc/cur... . If the exchange and queue are not exist - Messenger will create it automatically. Actually, that's why we're talking about in this video.

Also, take a look at further videos where we're talking about exchange as well.

I hope this helps!

Cheers!

Reply
Cat in space

"Houston: no signs of life"
Start the conversation!

This tutorial is built with Symfony 4.3, but will work well on Symfony 4.4 or 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2
    }
}
userVoice