If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeRun:
php bin/console messenger:consume --help
We saw earlier that this has an option called --time-limit
, which you can use to tell the command to run for 60 minutes and then exit. The command also has two other options - --memory-limit
- to tell the command to exit once its memory usage is above a certain level - or --limit
- to tell it to run a specific number of messages and then exit. All of these are great options to use because we really don't want our messenger:consume
command to run too long: we really just want it to handle a few messages, then exit. Restarting the worker is handled by Supervisor and doesn't take a huge amount of resources. All of these options cause the worker to exit gracefully, meaning, it only exits after a message has been fully handled, never in the middle of it. But, if you let your worker run too long and it runs out of memory... that would cause it to exit in the middle of handling a message and... well... that's not great. Use these options. You can even use all of them at once.
There's also a completely different situation when you want all of your workers to restart: whenever you deploy. We've seen why many times already: whenever we make a change to our code, we've been manually restarting the messenger:consume
command so that the worker sees the new code. The same thing will happen on production: when you deploy, your workers won't see the new code until they exit and are restarted. Right now, that could take up to six minutes to happen! That is not okay. Nope, at the moment we deploy, we need all of or worker processes to exit, and we need that to happen gracefully.
Fortunately, Symfony has our back. Once again, run ps -A
to see the worker processes.
ps -A | grep messenger:consume
Now, pretend we've just deployed. To stop all the workers, run:
php bin/console messenger:stop-workers
Check the processes again:
ps -A | grep messenger:consume
Ha! Perfect! The two new process ids prove that the workers were restarted! How does this work? Magic! I mean, caching. Seriously.
Behind the scenes, this command sends a signal to each worker that it should exit. But the workers are smart: they don't exit immediately, they finish whatever message they're handling and then exit: a graceful exit. To send this signal, Symfony actually sets a flag in the cache system - and each worker checks this flag. If you have a multi-server setup, you'll need to make sure that your Symfony "app cache" is stored in something like Redis or Memcache instead of the filesystem so that everyone can read those keys.
There's one more detail you need to think about and it's due to the asynchronous nature of handling messages. Open up AddPonkaToImage
. Imagine that our site is currently deployed and the AddPonkaToImage
class looks like this. When someone uploads an image, we serialize this class and send it to the transport.
Imagine now that we have a bunch of these messages sitting in the queue at the moment we deploy a new version of our site. In this new version, we've refactored the AddPonkaToImage
class: we've renamed $imagePostId
to $imagePost
. What will happen when those old versions of AddPonkaToImage
are loaded from the queue?
The answer... the new $imagePost
property will be null... and some non-existent $imagePostId
property would be set instead. And that would probably cause your handler some serious trouble. So, if you need to tweak some properties on an existing message class, you have two options. First, don't: create a new message class instead. Then, after you deploy, remove the old message class. Or second, update the message class but, temporarily, keep both the old and new properties and make your handler smart enough to look for both. Again, after one deploy, or really, once you're sure all the old messages have been processed, you can remove the old stuff.
And... that's it! Use Supervisor to keep your processes running and the messenger:stop-workers
command to restart on deploy. You are ready to put this stuff into production.
Before we keep going, I'm going to find my terminal and run:
supervisorctl -c /usr/local/etc/supervisord.ini stop messenger-consume:*
That stops the two processes. Now I'll run my worker manually:
php bin/console messenger:consume -vv async_priority_high async
This just makes life easier and more obvious locally: I can see the output from my worker.
Next: we've talked about commands & command handlers. Now it's time to talk about events and event handlers, how we can use Messenger as an event bus and... what the heck that means.
Hey Roman A.!
Ok, good question - this can be a bit of a confusing part - and it depends on your setup and deploy process.
If I run messenger:stop-workers command, supervisor anyway will restart all workers. Did I understand correctly?
Yes, you do understand correctly!
Here's the super important thing:
A) On deploy, you somehow need to stop/restart your workers so that they see the new code. The messenger:stop-workers
is one easy way to do this.
B) BUT, however you stop/restart your workers, it needs to be done gracefully. What I mean is, you can't stop the workers in the middle of handling a message (as you correctly already knew). The messenger:stop-workers
command does this.
So basically, your issue is a bit specific to Docker / your deployment strategy. It looks like (tell me if I'm wrong) that you're deploying with Docker. And so, as part of your deploy, you stop the old containers. Is that right? If so, you probably don't need to call messenger:stop-workers
because stopping the container will stop those workers anyways. But that doesn't solve your problem. The real important part is that you need to "kill" your old "worker" containers gracefully. If you send a SIGTERM, then Messenger will finish its current message and THEN exit - https://github.com/symfony/symfony/blob/f4ff77cc0867d46c944594451c46689aa55c5ffb/src/Symfony/Component/Messenger/EventListener/StopWorkerOnSigtermSignalListener.php#L24 - so you should keep the old containers alive until that happens. Also if you are using Docker, you might not need supervisor. A Docker container itself is centered around a process. It depends on your setup, but if you start a container that runs a worker, often people will have another Docker mechanism set up so that if that process (and thus the container) ever quits, a new one is restarted. That's sort of... built-in supervisor :).
Let me know if this makes sense. A lot of this depends on how you're deploying... and I'm making a lot of assumptions about that... so I could be totally wrong :).
Cheers!
Hi, @weaverryan ! Thank you for your good answer!
You understood correctly that I'm deploying with Docker. My deploying strategy looks like:
1. Pull new docker image from the registry
2. Stop service by docker-compose down
3. Delete old containers
4. Start a new container with service
Will my workers die gracefully If I just use "docker-compose down"? Must I call "messenger:stop-workers" before I kill my container?
It is very interesting about workers without the supervisor. Could you explain a little bit more about how I can avoid using the supervisor and use only Docker? I understand that it is kinda out of Symfony scope, but maybe you can share some links about that theme?
Hey Roman A.!
I'm not a Docker expert, so take what I say with a "grain of salt", but I'll do my best to give you some advice :).
> Will my workers die gracefully If I just use "docker-compose down"? Must I call "messenger:stop-workers" before I kill my container?
It will shutdown gracefully. Well, more specifically, here's what happens:
A) docker-compose down sends a TERM signal to the process
B) If messenger:consume is currently handling a message, it will *ignore* TERM (which is basically a "request" to terminate) until it finishes the message. Once it finishes the message fully, it will exit.
The one catch is that, by default, docker-compose down will only "wait" 10 seconds for the process to quit before it takes the container down anyways. That's the "timeout" argument on that command and it's configurable. So you should configure it to be higher than the longest a message should take to handle, to avoid the container quitting too early.
> It is very interesting about workers without the supervisor. Could you explain a little bit more about how I can avoid using the supervisor and use only Docker?
I'm not sure how you're configuring Docker now, but here are two different ways of doing things:
A) You start a container that runs supervisor that runs messenger:consume. If you do this, I think it will all work correctly - when you "down" the Docker container, that should kill both supervisor and messenger:consume "gracefully", but I've not tested it.
B) You start a container whose "process" is actually "php bin/console messenger:consume" itself - along with some argument like --memory-limit=128M (you should ALWAYS pass some flags like this to messenger:consume so that it doesn't run forever - regardless of how you're running all of this). This will mean that the process will exit occasionally... which means that the container will shut down. Certain docker infrastructure setups are built to handle this: you will have other pieces that constantly restart containers (if I understand things correctly) as soon as they exit. So basically, whenever the process exits (and so, the container stops), something else re-creates the container.
I hope that helps. I'm VERY much not an expert on the infrastructure & deployment stuff with Docker - so I'm talking at a "high level" here :).
Cheers!
Just a tip: processing TERM signal in messenger requires pcntl php extension.
Another tip: supervisor has option stopwaitsecs (The number of seconds to wait for the OS to return a SIGCHLD to supervisord after the program has been sent a stopsignal. If this number of seconds elapses before supervisord receives a SIGCHLD from the process, supervisord will attempt to kill it with a final SIGKILL. Default: 10). You should probably increase it to prevent undesired sigkills.
One more idea. Of course if it is possible in a particular project. For my projects mostly this should work, if I am right.
This should work, right?
Hey Thomas,
Sounds correct, yes. Well, if you're using a Symfony Cloud / Platform.sh for example - it should be already done behind the scene for you. But with your custom deploy system the strategy should be like this I think. Well, probably we can even simplify it to this:
messenger:stop-workers
commandThen the supervisor should take care of re-running the workers again for you. I think it should be enough too.
Cheers!
Hi,
IIRC Stopping supervisor will stop all workers it created, so you don't need to run messenger:stop-workers
if you are stopping supervisor. This stop-workers command is helpful when you need to restart messenger without stopping supervisor.
Cheers
First a wee note: on Ubuntu, ps -A
is not returning information on the messenger processes, however ps -S
seems to be doing the trick.
My problem is that when running messenger:stop-workers
, the worker does not stop probably because of APCu. To keep things simple during debugging, I have stopped Supervisor and am running messenger:consume
manually in a tab, and messenger:stop-workers
in a separate tab. The worker in the first tab, however, does not seem to receive the stop. This is also confirmed by the results of ps
. I can confirm that there are no messages getting processed which might keep the worker from gracefully stopping.
This is my cache.yaml config
framework:
cache:
app: '%cache_adapter%' # in services.yaml, cache_adapter: cache.adapter.apcu
pools:
cache.flysystem.psr6:
adapter: cache.app
It looks like when I comment out the app: '%cache_adapter%'
line, messenger:stop-workers
starts working.
Hey apphancer!
First a wee note: on Ubuntu,
ps -A
is not returning information on the messenger processes, howeverps -S
seems to be doing the trick
Ha! This is why we can't have nice things :p. I just checked man ps
on my Mac and then on an (older) Ubuntu version and both of these flags basically had completely different meanings. 🙃 Thanks for the note.
My problem is that when running
messenger:stop-workers
, the worker does not stop probably because of APCu
That is possible - it's possible that your CLI php is not using APCu. Unfortunately (but by design), if the caching system fails (like because the APCu extension isn't installed for your CLI php), the cache component doesn't fail - it just doesn't cache (it's done this way so that a caching failure doesn't take down your site). I would run php -m
at your terminal and check to see if apcu is there. Ubuntu is funny because they usually use a different php.ini file for your CLI vs the web. And so, you might have it configured only for one but not the other. Run php --ini
to see what the CLI php.init path is.
It looks like when I comment out the
app: '%cache_adapter%'
line,messenger:stop-workers
starts working
This definitely tells me that the APCU cache is failing in the CLI... or in both places. Btw, if you wanted, you could temporarily put debug code in this class - https://github.com/symfony/symfony/blob/5.x/src/Symfony/Component/Cache/Adapter/ApcuAdapter.php - to see what's going on, like in the constructor and also in doSave().
Let me know what you find out!
Cheers!
I also was mislead by this `ps -A`, maybe adding a note for linux users: go for the classic `ps aux`
Hey Jean-Christophe,
Thank you for sharing your solution with others and confirming it works for you on Linux systems!
Cheers!
php -m returns apcu as present. But my bad: the result of bin/console messenger:stop-workers
is a beautiful green [OK] Signal successfully sent to stop any running workers.
which made me miss out the line above: WARNING [cache] Failed to save key "workers.restart_requested_timstamp" of type double...
So I went into the ini file for PHP cli and added apc.enable_cli = On
. I am no longer getting that warning now, but the worker is still not receiving the stop signal.
I've added a bit of debug code into the ApcuAdapter class. Inside the doSave() method when running bin/console messenger:stop-workers
, it looks like it is successfully persisting the data, e.g.
array(1) {
[0]=>
string(46) "XxIXZBaWIQ:workers.restart_requested_timestamp"
}
This same array is passed in doFetch() every few seconds while the worker is running. However, apcu_fetch($ids, $ok) returns an empty array therefore return $values; is also empty.
Not sure if this is something due to a misconfiguration with APCu. Everything works fine with the PDO adapter, so probably will leave it to PDO considering soon I will probably switch to Redis anyway.
Hey apphancer!
Good debugging! Unfortunately, I don't have an answer about why it's ultimately not seeing that cache key - it's definitely a misconfiguration with APCu, but I can't imagine *what* the misconfiguration is. It might not be worth the time digging in if you have other caching options - I would just use them :).
Cheers!
I have a question about an "interrupted" message. This can be because I accidently stopped it ungracefully, or a server instance (EC2 on AWS) is rebooted, or just plain crashed. In my tests I
1. start messenger
2. do something in my app to send a message
3. when the terminal starts to output I Ctrl + C and stop messenger
The database still has a row of data in it at this point
now when I restart messenger, nothing happens. How can I get messenger to "restart" the handler code?
EDIT: I should add I am using `doctrine://default`
It would be awesome if you could pass in a "max age" for the "deliveredAt" property so the worker will re-consume the message if "deviveredAt" is > X seconds old.
I didn't see anything obvious to do that in the vendor code....
acually I DO SEE!
vendor/symfony/messenger/Transport/Doctrine/Connection.php:line 281
private function createAvailableMessagesQueryBuilder(): QueryBuilder
{
$now = new \DateTime();
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
return $this->createQueryBuilder()
->where('m.delivered_at is null OR m.delivered_at < ?')
->andWhere('m.available_at <= ?')
->andWhere('m.queue_name = ?')
->setParameters([
$redeliverLimit,
$now,
$this->configuration['queue_name'],
. . .
Now I just need to figure out if I can set "$this->configuration['redeliver_timeout']", it's default is 3600, 1 hour! 15 minutes would be more then enough for me. :)
I am a dope! RTFM, https://symfony.com/doc/cur..., under options "redeliver_timeout"
Can't see the forest for the trees. ugh, quitting time.
I really like the new Messenger component. And this is a great course! Thanks, Ryan!
My deployment tool (<a href="https://deployer.org/">PHP Deployer</a>) always clears the cache when deploying. I guessphp bin/console messenger:stop-workers
won't work after clearing the cache?
Any ideas how I would be able to restart my workers after clearing the cache?
Hey JBMan!
Hmm, an excellent question! What command are you using to clear the cache? The cache "pool" that's created for the "restart" signal uses cache.app
as its parent. And cache.app
is meant to be a store that is persistent across deploys. That's a fancy way of saying that Symfony has two main cache systems: cache.system
(which is cleared between deploys) and cache.app
(which persists between deploys). So... it shouldn't be a problem. But, what are you seeing?
Cheers!
Using symfony for many years and first time read about difference of meaning for two cache systems!
Hey Yanosh,
Yeah, probably still a little-known fact. I'm glad you found something interesting for you :)
Cheers!
PHP Deployer creates a completely new directory when deploying a new version of my app. So var/cache
will be empty after the deployment.
I didn't know cache.app
is meant to be persistent across deploys. Based on your answer I started using Redis for the cache.app
pool. After that the worker restarts work fine.
Thank you very much for your help!
Hey JBMan!
Woo! Awesome :).
I didn't know cache.app is meant to be persistent across deploys. Based on your answer I started using Redis for the cache.app pool.
Well-done. Yes, I don't know if the purpose of cache.app is as obvious as it should be - I might tweak the recipe to add some more comments around it in cache.yaml
. Anyways, I'm glad we got it sorted!
Cheers!
// composer.json
{
"require": {
"php": "^7.1.3",
"ext-ctype": "*",
"ext-iconv": "*",
"composer/package-versions-deprecated": "^1.11", // 1.11.99
"doctrine/annotations": "^1.0", // v1.8.0
"doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
"doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
"doctrine/orm": "^2.5.11", // v2.6.3
"intervention/image": "^2.4", // 2.4.2
"league/flysystem-bundle": "^1.0", // 1.1.0
"phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
"sensio/framework-extra-bundle": "^5.3", // v5.3.1
"symfony/console": "4.3.*", // v4.3.2
"symfony/dotenv": "4.3.*", // v4.3.2
"symfony/flex": "^1.9", // v1.18.7
"symfony/framework-bundle": "4.3.*", // v4.3.2
"symfony/messenger": "4.3.*", // v4.3.4
"symfony/property-access": "4.3.*", // v4.3.2
"symfony/property-info": "4.3.*", // v4.3.2
"symfony/serializer": "4.3.*", // v4.3.2
"symfony/validator": "4.3.*", // v4.3.2
"symfony/webpack-encore-bundle": "^1.5", // v1.6.2
"symfony/yaml": "4.3.*" // v4.3.2
},
"require-dev": {
"easycorp/easy-log-handler": "^1.0.7", // v1.0.7
"symfony/debug-bundle": "4.3.*", // v4.3.2
"symfony/maker-bundle": "^1.0", // v1.12.0
"symfony/monolog-bundle": "^3.0", // v3.4.0
"symfony/stopwatch": "4.3.*", // v4.3.2
"symfony/twig-bundle": "4.3.*", // v4.3.2
"symfony/var-dumper": "4.3.*", // v4.3.2
"symfony/web-profiler-bundle": "4.3.*" // v4.3.2
}
}
I am a little bit confused...
If I run messenger:stop-workers command, supervisor anyway will restart all workers. Did I understand correctly?
But how about this case:
1. I run command "messenger:stop-workers"
2. Supervisor spawns new workers
3. Some of them start to handle new messages
4. I kill my docker container with workers inside
5. Some messages become kinda inconsistent
Can you explain a little more about stopping workers?