If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeWhen we started working with AMQP, I told you to go into ImagePostController
and remove the DelayStamp
. This stamp is a way to tell the transport system to wait at least 500 milliseconds before allowing a worker to receive the message. Let's change this to 10 seconds - so 10000
milliseconds.
... lines 1 - 23 | |
class ImagePostController extends AbstractController | |
{ | |
... lines 26 - 40 | |
public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus) | |
{ | |
... lines 43 - 63 | |
$envelope = new Envelope($message, [ | |
new DelayStamp(10000) | |
]); | |
... lines 67 - 69 | |
} | |
... lines 71 - 98 | |
} |
Now, move over to your terminal and make sure that your worker is not running.
Ok, let's see what happens! Right now both queues are empty. I'll upload 3 photos... then... quick, quick, quick! Go look at the queues. Suddenly, poof! A new queue appeared... with a strange name: delay_messages_high_priority__10000
. And it has - dun, dun, dun! - three messages in it.
Let's look inside. Interesting, the messages were delivered here, instead of the normal queue. But then... they disappeared? The graph shows how the messages sitting in this queue went from 3 to 0. But... how? Our worker isn't even running!
Woh! This page just 404'ed! The queue is gone! Something is attacking our queues!
Head back to the queue list. Yea, that weird "delay" queue is gone... oh, but now the three messages are somehow in messages_high
. What the heck just happened?
Well first, to prove that the whole system still works... regardless of what craziness just occurred... let's run our worker and consume from both the async_priority_high
and async
transports:
php bin/console messenger:consume -vv async_priority_high async
It consumes them and... when we move over, go to the homepage and refresh, yep! Ponka was added to those images.
Ok, let's figure out how this worked. I mean, on the one hand, it's not important: if we had been running our worker the entire time, you would have seen that those messages were in fact delayed by 10 seconds. How you delay messages in RabbitMQ is kinda crazy... but if you don't care about the details, Messenger just takes care of it for you.
But I do want to see how this works... in part because it'll be a great chance to see how some of the more advanced features of AMQP work.
Click on "Exchanges". Surprise! There's a new exchange called delays
. And instead of being a fanout
type like our other two exchanges, this is a direct
exchange. We'll talk about what that that means soon.
But the first thing to know is that when Messenger sees that a message should be delayed, it sends it to this exchange instead of sending it to the normal, "correct" exchange. At this moment, the delays
exchange has no bindings... but that will change when we send a delayed message.
To be able to really see what's happening, let's increase the delay to 60 seconds.
... lines 1 - 23 | |
class ImagePostController extends AbstractController | |
{ | |
... lines 26 - 40 | |
public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus) | |
{ | |
... lines 43 - 63 | |
$envelope = new Envelope($message, [ | |
new DelayStamp(60000) | |
]); | |
... lines 67 - 69 | |
} | |
... lines 71 - 98 | |
} |
Ok, upload 3 more photos: we now know that these were just sent to the delays
exchange. And... if you refresh that exchange... it has a new binding! This says:
If a message sent here has a "routing key" set to
delay_messages_high_priority__60000
, then I will send that message to a queue called delay_messages_high_priority__60000
A "routing key" is an extra property that you can set on a message that's sent to AMQP. Normally Messenger doesn't set any routing key, but when a message has a delay, it does. And thanks to this binding - those three messages are sent to the delay_messages_high_priority__60000
queue. This is how a direct
exchange works: instead of sending each message to all queues bound to it, it uses the "binding key" rules to figure out which queue - or queues - a message should go to.
Click into the queue because it's super interesting. It has a few important properties. The first is an x-message-ttl
set to 60 seconds. What does that means? When you set this on a queue, it means that, after a message has been sitting in this queue for 60 seconds, RabbitMQ should remove it... which seems crazy, right? Why would we want messages to only live for 60 seconds... and then be deleted? Well... it's by design... and works together with this second important property: x-dead-letter-exchange
.
If a queue has this property, it tells Rabbit that when a message hits its 60 second TTL and needs to be removed, it should not be deleted. Instead, it should be sent to the messages_high_priority
exchange.
So, Messenger delivers messages to the delays
exchange with a routing key that makes it get sent here. Then, after sitting around for 60 seconds, the message is removed from this queue and sent to the messages_high_priority
exchange. Yep, it's delivered to the correct place after 60 seconds!
And then... 404! Even the queue itself is marked as "temporary": once it doesn't have any messages left, it deletes itself.
When you click back to see the Queues, the messages were delivered to the messages_high
queue... but that's already empty because our worker consumed them.
So... yea... wow! Whenever we publish a message with a delay, Messenger sets all of this up: it creates the temporary delay queue with the TTL and dead letter exchange settings, adds a binding to the delays
exchange to route to this queue, and adds the correct routing key to the message to make sure it ends up in that queue.
You can really start to see how rich the features are in AMQP... even if you won't need them. The most important feature we just saw was the direct
exchange type: an exchange that relies on routing keys to figure out where each message should go.
Next, could we use direct exchanges for our non-delayed messages? Instead of two exchanges that each "fan out" to a separate queue, could we create just one exchange that, by using routing keys, delivers the correct messages to the correct queues? Totally.
Hey @Dang
Yea, you can store your failed messages in RabbitMq as well but we kept using Doctrine just for simplicity reasons.
Cheers!
Hi, guys. As always great course, one of your finest.
Can a message be delayed for a whole month? My use case is to schedule a task to check in a month if a notification has been read and if not delete it. Would this AMQP delays would be useful for such a task?
Thanks a lot
Hey danresmejia!
> Can a message be delayed for a whole month?
Ha! That's a fun question :). Umm... maybe? I think this is not the normal use-case. It might work in theory (you'd need to make sure AMQP is "persistent" so that messages aren't lost on shutdown), but I'm not sure this would be wise. I could *totally* be wrong and maybe this is normal, I just haven't heard of amqp being used with such long delays. Personally, I would probably do this with a custom console command that "checks for all unread messages that are older than 1 month" and deletes them. Then I'd run that on a CRON job as often as you need :).
Cheers!
Thanks for getting back to me. A cron job is what I do now, it works fine until it doesn't ;) and then debugging is hard. Also everything we do is in a multi tenant context so the command needs to check every notification for every tenant as each tenant lives in a different data base. For me it seems that a message on an async queue is the perfect choice as I'd know that for every created notification a message would be also created to check it validity in a month.
Also is important to point out that on the JMS Job Queue we had such feature, i.e. a message would only be processed on an specific date and time if such thing is needed. In that regard I see that doctrine transport table has a field called 'available_at' it seems to be related to it. What do you thing?
One last idea is to have the handler sending a message to the event bus if the notification is not yet ready to be deleted, not old enough. It could be an alternative if the AMQP implementation doesn't persist delayed messages over reboots or outages.
Cheers!
Hey danresmejia!
> In that regard I see that doctrine transport table has a field called 'available_at' it seems to be related to it. What do you thing?
Especially with the Doctrine transport, I certainly can't see a problem with super-delayed messages like this (I'm less certain about how appropriate this is with AMQP).
So... I think it would be fine to run with this :).
Cheers and sorry for the slow reply!
Hi, thank you for great totorials!
There is wrong command at this page:
php bin/console -vv async_priority_high async
Insted of:
php bin/console messenger:consume -vv async_priority_high async
// composer.json
{
"require": {
"php": "^7.1.3",
"ext-ctype": "*",
"ext-iconv": "*",
"composer/package-versions-deprecated": "^1.11", // 1.11.99
"doctrine/annotations": "^1.0", // v1.8.0
"doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
"doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
"doctrine/orm": "^2.5.11", // v2.6.3
"intervention/image": "^2.4", // 2.4.2
"league/flysystem-bundle": "^1.0", // 1.1.0
"phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
"sensio/framework-extra-bundle": "^5.3", // v5.3.1
"symfony/console": "4.3.*", // v4.3.2
"symfony/dotenv": "4.3.*", // v4.3.2
"symfony/flex": "^1.9", // v1.18.7
"symfony/framework-bundle": "4.3.*", // v4.3.2
"symfony/messenger": "4.3.*", // v4.3.4
"symfony/property-access": "4.3.*", // v4.3.2
"symfony/property-info": "4.3.*", // v4.3.2
"symfony/serializer": "4.3.*", // v4.3.2
"symfony/validator": "4.3.*", // v4.3.2
"symfony/webpack-encore-bundle": "^1.5", // v1.6.2
"symfony/yaml": "4.3.*" // v4.3.2
},
"require-dev": {
"easycorp/easy-log-handler": "^1.0.7", // v1.0.7
"symfony/debug-bundle": "4.3.*", // v4.3.2
"symfony/maker-bundle": "^1.0", // v1.12.0
"symfony/monolog-bundle": "^3.0", // v3.4.0
"symfony/stopwatch": "4.3.*", // v4.3.2
"symfony/twig-bundle": "4.3.*", // v4.3.2
"symfony/var-dumper": "4.3.*", // v4.3.2
"symfony/web-profiler-bundle": "4.3.*" // v4.3.2
}
}
Hi guys,
In this tutorial, you use the doctrine to store failures messages. Why you don't store them in RabbitMq with that x-dead-letter-exchange ? Is there any specific reason ? Thanks