If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeEven if I refresh the page, now that our messages aren't being handled immediately... the four most recent photos don't have Ponka in them. That's tragic! Instead, those messages were sent to the doctrine
transport and are waiting patiently inside of a messenger_messages
table.
So... how can we read these back out and process them? We need something that can fetch each row one-by-one, deserialize each message back into PHP, then pass it to the message bus to be actually handled. That "thing" is a special console command. Run:
php bin/console messenger:consume
You won't see any output... yet... but, unless we messed something up, this is doing exactly what we need: reading each message, deserializing it, and sending it back to the bus for handling.
So... let's go refresh. Woh! It did work! All 4 messages now have Ponka on them! We're saved!
To make this more interesting, as you can see, it says to run this command with -vv
if you want to see what it's doing behind-the-scenes. But... interesting, once the command finished reading and handling all 4 messages... it didn't quit: it's still running. And if we restart it with -vv
on the end:
php bin/console messenger:consume -vv
... it does the same. A command that "handles" messages from a queue is called a "worker". And the job of a worker is to watch and wait for new messages to be added to the queue and handle them the instant one is added. It waits and runs... forever! Well, that's not totally true - but more on that later when we talk about deployment.
Let's peek back over in our "queue" - the messenger_messages
table:
SELECT * FROM messenger_messages \G
Yep! This holds zero rows because all those messages were processed and removed from the queue. Back at the browser, let's upload... how about... 5 new photos. Woh... that was awesome fast!
Ok, ok, move back to the terminal that's running the worker! We can see it doing its job! It says: "Received message", "Message handled by AddPonkaToImageHandler
" then "AddPonkaToImage
was handled successfully (acknowledging)". That last part, "acknowledging" means that Messenger notified the Doctrine transport that the message was handled and can be removed from the queue.
Then... it keeps going to the next message... and the next... and the next... until it's done. So if we refresh... Ponka was added to all of these! Let's do it again - upload 5 more photos. And... let's refresh and watch... there's Ponka! We can see them being handled little-by-little. So much wonderful Ponka!
Ok, this would be cooler if our JavaScript automatically refreshed the image when Ponka was added... instead of me needing to refresh the page... but that's a totally different topic, and one that's covered by the Mercure component in Symfony.
And... that's it! This messenger:consume
command is something that you'll have running on production all the time. Heck, you might decide to run multiple worker processes. Or, you could even deploy your app to a totally different server - one that's not handling web requests - and run the worker processes there! Then, handling these messages wouldn't use any resources from your web server. We'll talk more about deployment later.
Because right now... we have a problem... a kinda weird problem. Refresh the page. Hmm, the original photos all say something like:
Ponka visited 13 minutes ago. Ponka visited 11 minutes ago.
But, since we made things asynchronous, these all say:
Ponka is napping. Check back soon.
Open up the ImagePost
entity and find the $ponkaAddedAt
property. This is a datetime
field, which records when Ponka was added to the photo. The message on the front-end comes from this value.
For the original ones... back when the whole process was synchronous, this field was set successfully. But now... it looks like it isn't. Let's check the database to be sure. Over in MySQL, run:
SELECT * FROM image_post \G
All the way back in the beginning... ponka_added_at
was being set. But now they're all null
. So... our images are being processed correctly, but, for some reason, this field in the database is not. If we look inside AddPonkaToImageHandler
... yea... right here: $imagePost->markPonkaAsAdded()
. That sets the property. So... why isn't it saving?
Let's figure out what's going on next and learn a bit more about some "best practices" when it comes to building your message class.
Hi Tim K.
First I'd recommend you to reinstall vendors/
and try again. Also you said that it appeared after upgrade to 5.4 I think you already tried to clear caches manually, so you need also to check doctrine updates, may be something is out of sync.
Cheers!
Hello Vladimir,
Thanks for your support!
1) yes, I deleted already the vendor-directory and re-installed with `composer install`.
2) yes, i cleared the cache `./bin/console cache:clear`
3) All other parts of the project seem to work (all test run smoothly ... code-coverage 70%)
4) `composer update`don't show anything left to install (--> Is this what you mean check doctrine updates?)
Any other idea?
Cheers!
hm... which OS and PHP version? do you have multiple PHP versions? probably CLI version is not the same as working Server
Hi Vladimir,
here is local dev-environment:
- MacOS (Monterey 12.3)
- `php -v` (CLI) --> 7.4.27
- `phpinfo()` (webserver) --> 7.4.27
not aware of multiple PHP versions... how would I validate this?
that is ok. Try to run composer outdated
to see the list of outdated packages, also try to remove cache manually.
Also I found you posted an issue to github, and you have platform.php 7.3 as requirement, can you drop it and updated packages? also try to test it without ext xdebug
cheers!
Hey Vladimir,
i just saw that my answer has not been posted/send... so once again... (I hope it's not double now).
Here is the result:
1) I removed the cache manually (`rm -rf /var/cache`) --> no effect, same error
2) `composer outdated` gave this result
doctrine/dbal 2.13.8 3.3.4 Powerful PHP database abst...
doctrine/doctrine-migrations-bundle 2.2.3 3.2.2 Symfony DoctrineMigrations...
doctrine/migrations 2.3.5 3.4.1 PHP Doctrine Migrations pr...
doctrine/persistence 2.4.0 2.4.1 The Doctrine Persistence p...
laminas/laminas-code 3.4.1 3.5.1 Extensions to the PHP Refl...
league/html-to-markdown 4.10.0 5.1.0 An HTML-to-markdown conver...
psr/container 1.1.1 2.0.1 Common Container Interface...
symfony/phpunit-bridge v5.4.3 v6.0.3 Provides utilities for PHP...
3) removed `plattform.php 7.3` and `composer update`: --> 3 packages installed --> no effect, same error
- Upgrading doctrine/persistence (2.4.0 => 2.4.1)
- Upgrading laminas/laminas-zendframework-bridge (1.4.1 => 1.5.0)
- Upgrading psr/container (1.1.1 => 1.1.2)
4) removed `ext-xdebug` (and added plattform.php 7.3 for my prod-env) --> no effect, same error
Hey!
Sorry for long waiting. I honestly have no idea what is wrong, probably you can make a bug reproducer on github, so I can look at the code, and test it on my server.
Cheers!
Hello Vladimir.
Thanks again for support!!!
I sorry, but I will not be able to publish the code.
Do you think it could be connected to my local Apache-Server? I know that MacOS internal and homebrew versions fighting sometimes against each other.
I'm not about sharing your private code, but probably you can bootstrap some stub project with issue so we can investigate it =)
Hello Vladimir,
installing latest doctrine-migrations-bundle (3.2.2) solved the issue (see also GitHub). But strange that only the messenger consumption per CLI caused an issue. The rest of the website worked perfect with the previous version.
Anyway ... many thanks for supporting me!!
That's surprising! I'm happy that you find a way to solve issue. I guess something works different in the CLI that's why you didn't faced problem on site, maybe some error handling... not sure =)
You are welcome! Cheers! And happy coding!
Communicating with IoT devices
I'm wondering how to use the Messenger to communicate with IoT devices (using mqtt protocol, but I think Rabbit can already handle that),
so that I can send commands to devices (like "open the door"), and receive messages from device (like "someone with ID card ### wants to get in", or simply its reply to the previuos command "open", like "door opened").
I have to manage 2 (or more) queue?
Thanks and sorry for that stupid question which denotes I don't understand really how to do things... :((
Hey Carlo L.!
Hmm. I'm not really sure - I'm not familiar with mqtt. However, my impression is that this may be a custom enough protocol that you would want to use it directly vs using Messenger to try to use mqtt. My impression is based on the fact that the mqtt support in Rabbit is a plugin, which tells me that it's some special functionality built on top of RabbitMQ. So my guess is that Messenger wouldn't automatically fit well into that, but I could be wrong. In theory, yes, you could "publish" messages (e.g. open the door) to one transport in Messenger and also listen to another transport for messages like (someone with ID card ### wants to get in). That part makes sense to me. But I just don't know what mqtt really is, so I can't give a solid answer.
Cheers!
Hi @weaverryan!
I figured out how to work with mqqt protocol inside RabbitMQ: basically it's about (manually) configure binding keys in RabbitMQ and Messenger, and managing routing keys in messages with AmqpStamp when dispatching (because each device has its own queue). And, of course, creating a custom serializer because communications are in JSON.
(a brief summary: I'm communicating with IoT devices: I send a command message - like "open", the device reply with an event message - "opened"-; or the device may spontaneously transmit an event message - "someone wants to get in", and awaits for my reply - command or event in this case?? it's a reply to an event-).
Now when dispatching a command message, my custom serializer encode method correctly sends data, but (someone) calls the decode method too on the same outgoing message: how to avoid that?? I'm sending this message outside, I don't want to handle that. It's like when putting a message on the bus, it get sent and received in the same time (I'm expecting that when I dispatch, the encode method is called, when I receive, then the decode one, but not both)
(another question: I'm doing correctly in the decode method using a switch/case, based on incoming json, and instantiating and returning the correct message class? But if there's no message to instantiate, what to return from decode and let go on? Maybe I'm doing that in the wrong place?)
Which is the correct way to work with a command message that expects a possible answer?
Sorry for my poor english and thanks for your great work.
Cheers!
Hey Carlo L.!
Nice work! This is generally what I had in mind, but getting it all to work is a whole other thing.
> Now when dispatching a command message, my custom serializer encode method correctly sends data, but (someone) calls the decode method too on the same outgoing message: how to avoid that??
Hmm, I would not expect that. Can you find a stack trace to see who is doing that? That method *should* only be called when your transport is *receiving* a message: https://github.com/symfony/... - so it sounds to me like, somehow, your transport is receiving the same message that you're sending...
Why would this be happening? If you're exclusively using amqp, then when you dispatch a message into messenger, it will be routed to amqp, and sent to amqp. And that's it. It should not be sent to amqp AND then also handled in your app. And even if it *were*, it wouldn't be decoded: decoding only happens when a message is truly being *received* from the outside. Is it possible that your routing rules are routing the message through amqp and then right back to your transport? You'd want to make sure that your transport is not "listening" on some queue where the message is ultimately sent. This stuff gets complicated, but we talk a bit about it around here: https://symfonycasts.com/sc...
> (another question: I'm doing correctly in the decode method using a switch/case, based on incoming json, and instantiating and returning the correct message class? But if there's no message to instantiate, what to return from decode and let go on? Maybe I'm doing that in the wrong place?)
Hmm. So you have a situation where you receive a message... but you simply have nothing to do? Like you say "I see this message, but I don't need to do any work or handle it in any way". Is that the idea? If so, I'd create and instantiate a some NoopMessage and a NoopHandler that simply does... nothing. Then create and return NoopMessage in this case.
> Which is the correct way to work with a command message that expects a possible answer?
Afaik, you don't really get an "answer". It's more (and you described this above in your message), that you send a message ("open door") and then, 5ms later, something sends YOU a message "door opened". In this situation, I would have a OpenDoorMessage that I originally dispatch. This is then sent to Amqp and goes through your custom serializer. Then, 5ms later, you will have a transport that "receives" the "door opened" message. In decode() of your custom serializer, you turn this into a DoorOpenedMessage. Then, in a DoorOpenedHandler, you do some work - like maybe save in the database that the door is now open. Or, maybe you do nothing in that handle :).
I don't know if these answers help - but let me know.
Cheers!
I'm not getting any output to the console when running:
bin/console messenger:consume -vvv -e test
Is this normal for messenger when in test mode and does anyone know how to turn on output? (things like message received, displaying exceptions etc).
Hey Fox C.
I'm afraid you'll have to smash your computer with a hammer! J/K
What's happening is for the test environment some log channels are disabled by default. You can enable them by adding this piece of config to your test monolog.yaml
file
// config/packages/test/monolog.yaml
monolog:
handlers:
... (other handlers)
console:
type: console
process_psr_3_messages: false
channels: [ "!event", "!doctrine", "!console" ]
Cheers!
Extra info:
- the command runs and processes messages correctly (just no output)
- symfony framework 4.4
My messages aren't being consumed because of a time zone issue that I can't figure out. They are stored with a created_at that is 6 hours ahead.
My php ini time zone setting is correct.
In the terminal I have running the symfony local web server, it shows logs for "Application" and "Web Server".
The [Web Server] logs show the correct time. The [Application] logs show a time 6 hours ahead.
On the symfony web profiler's phpinfo page, underneath the "date" header, the "Default timezone" shows as America/Detroit, but underneath, it shows the "date.timezone" Directive has Local Value and Master Value set to Europe/Berlin.
Why are these different and how can I fix the "Application" time zone?
Hi Nick F.,
Hm... interesting issue. First lets try to debug it:
dd([
date_default_timezone_get(),
ini_get('date.timezone')
]);
Do it somewhere in controller to see the values.
Cheers!
In our company We would like to implement rabbitMQ to do few things on our sites. I have read a little bit about "supervisor" and possibilities to work with rabbitMQ. In our team we would like to (if it possible) dynamically change num of processes of workers (e.g if it is 50 messages awaited in queue, one worker will be enough to proccess them, but if number of messages increase dramatically in short period of time then it will be the best to supervisor dynamically changed number of workers whose will be process that messages). I read it is not possible to dynamically change it by supervisor without stop previous workers. Is there another tool which allow to do that without shutting down working workers??
Hey Cristóbal Rejdych!
Excellent question! First, there is not tool at this point in Symfony itself to help with this. It would be great if there were - it's something that I would love to see (but time is short!). Full disclosure - this is a feature that Laravel *does* have (Horizon), but Symfony does not have yet. It works by creating your own custom console command. Its job is to run in a loop, read the size of the each queue, and create worker processes (e.g. messenger:consume command processes) based on their size. Ultimately, you run supervisord to ensure that the main command you've created always has 1 process running. This gives you control over spinning up as many (or as few) worker processes as you want. However, you'll need to make sure you fork the process so that the child worker processes keep running in case the main command dies. Here is an old example in Symfony (from a no-longer-existing server:start command) that you can check out - https://github.com/symfony/...
There are also other options, that are more related to your infrastructure. For example, you could have some external process that monitors the queue size and then spins up new "worker containers" (if your infrastructure is set up in such a way).
I hope this helps!
Cheers!
Is it possible to not refresh the page every Time, Is there any way to get to change the image on the front automatically ?
Hey ahmedbhs!
Ah yes, definitely! It's kind of a "different topic", which is why we didn't cover it here :). On a high level, you need your frontend JavaScript to be able to get a "notification" from the server that something changed so that it can update the DOM in some way. There are various ways to do this, but the recommended way in our world for sure is Mercure: https://github.com/symfony/...
Unfortunately, we don't have a tutorial on it yet - it's highly asked for. But I hope this will help.
Cheers!
Hello! I have strange problem with verbose output in messenger:consume command. In video everything works fine at 2:20, but my output freezes at first INFO message no matter what. Example:
`root@7ea27b7cb7c7:/app# php bin/console messenger:consume -vv
[OK] Consuming messages from transports "async".
// The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.
// Quit the worker with CONTROL-C.
22:22:28 INFO [messenger] Received message App\Message\AddPonkaToImage ["message" => App\Message\AddPonkaToImage`
I use Docker with php 7.3.6, symfony/messenger 4.3.9.
There is no error messages in var/log/dev.log (only debug and info), messages are processed successfully.
Other console commands work fine.
Even strace command didn't show me any suspicious behavior.
Do you have any recommendations how to fix or analyze this problem?
<b>UPD.</b> Thanks to Diego, I analyzed my situation more deeply. The problem is in PhpStorm+Docker. Hope it will help someone in the future :)
Hey Dariia M.
That's weird :)
Does it only happens when verbose output is enabled?
What happens if you try to consume a different message? (Maybe a dummy one)
Hi Diego :)
I've created DummyMessage - yes, the problem with freezing is the same. Output is freezing even in non-verbose mode when error starts to display in console.
Updated response: well, I've managed to clone my application from docker to host machine with php 7.4.4. and problem with freezing output is not reproducing. I don't even know - in docker script has 2048 RAM to consume, more than enough, logs are clear...
No, the problem is in running worker from Docker inside PHPStorm terminal :) Updated my first post in discussion.
When i upload project to my server ./bin/console messenger:consume -vv
It needs 3 hours to start handle the messages
But when i change created_at and available_at to the past 3 hours ago and run cli again it works immediately
What cause this issue
Hey AhmedWali!
Hmm, it sounds like a possible timezone issue... though I'm not sure. Your server is responsible for both *setting* the original available_at dates AND for setting which values to look for via the queries. So even if your server and database are on different timezones, I would *expect* that it would make no difference. However, I still think this is probably the issue (and I'm missing some detail). What timezone is your server and database in?
Cheers!
Hi,
when it try to run this command ./bin/console messenger:consume -vv
it's working well in Sy 4
but
when i using Sy 5
it didn't work even though i found messages in database
i realized log file return different Datetime formate between my Sy 4 and Sy 5:
Sy 4
SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE ["2020-02-01 23:26:53","2020-02-02 00:26:53","default"]
Sy 5
SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC LIMIT 1 FOR UPDATE ["2020-02-08T17:55:23+01:00","2020-02-08T18:55:23+01:00","default"]
Mysql version: 5.7
How i can fix it?
Hey Ahmed,
Well done! I'm glad you found the problem yourself! And thanks for letting us know you was able to handle it!
Cheers!
Hi !
I do not see any output to the consume command.
I tried -v, -vv, -vvv with no results. I can see in the database that messages are inserted then deleted, so it works well, but still no output of the worker command.
I use Symfony 4.3, php 7.2, the monolog bundle 3.5.0 and launch the command on ubuntu.
Any idea?
Hey Gilles,
Do you run this command manually? Could you write the exact command you're running? Also, do you run this command in prod or dev environment? Could you try to run the command in the dev env? And please, make sure the cache is cleared. Still no output?
Cheers!
Hello Victor,
Thanks, I do try what you suggest: the following command in dev after clearing the cache. Still, it does not display anything, but messenger is doing its job in the background. Symfony version is @4.3.8.
php ./bin/console messenger:consume -vv async_priority_high async async_priority_low
I should try on another environment and another project to spot the error.
Hey unicolored!
Check that you have this section in your config/packages/dev/monolog.yaml
file: https://github.com/symfony/recipes/blob/a4a3f3448d0673e82e10bec9e6c75bbf0fe206ff/symfony/monolog-bundle/3.3/config/packages/dev/monolog.yaml#L16-L19
This is what controls outputting logs to stdout when you're using -v
. If that is NOT the problem, let me know :).
Cheers!
Hello Ryan,
Thanks, I do NOT let you know this was indeed the problem :)
Learning everyday! Thanks
Hi,
I'm receiving "No transport supports the given DSN "doctrine://default""
error after setting up my handler and setting the .env key.
Any ideas?
Hey @Sym
If you have Doctrine installed on your project it should just work out of the box. Double check that you are using Doctrine on your project and if that's the case and the problem persists, please, let us know :)
Cheers!
Thank you for the quick response. I have symfony/orm-pack installed and the following bundles relating to doctrine:
`
...
Doctrine\Bundle\DoctrineCacheBundle\DoctrineCacheBundle::class => ['all' => true],
Doctrine\Bundle\DoctrineBundle\DoctrineBundle::class => ['all' => true],
Doctrine\Bundle\MigrationsBundle\DoctrineMigrationsBundle::class => ['all' => true],
Stof\DoctrineExtensionsBundle\StofDoctrineExtensionsBundle::class => ['all' => true],
...
`
Also, Doctrine is configured in the .env
Was using an outdated version of symfony/messenger. Didn't realise the transport wasnt introduced until 4.3.
// composer.json
{
"require": {
"php": "^7.1.3",
"ext-ctype": "*",
"ext-iconv": "*",
"composer/package-versions-deprecated": "^1.11", // 1.11.99
"doctrine/annotations": "^1.0", // v1.8.0
"doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
"doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
"doctrine/orm": "^2.5.11", // v2.6.3
"intervention/image": "^2.4", // 2.4.2
"league/flysystem-bundle": "^1.0", // 1.1.0
"phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
"sensio/framework-extra-bundle": "^5.3", // v5.3.1
"symfony/console": "4.3.*", // v4.3.2
"symfony/dotenv": "4.3.*", // v4.3.2
"symfony/flex": "^1.9", // v1.18.7
"symfony/framework-bundle": "4.3.*", // v4.3.2
"symfony/messenger": "4.3.*", // v4.3.4
"symfony/property-access": "4.3.*", // v4.3.2
"symfony/property-info": "4.3.*", // v4.3.2
"symfony/serializer": "4.3.*", // v4.3.2
"symfony/validator": "4.3.*", // v4.3.2
"symfony/webpack-encore-bundle": "^1.5", // v1.6.2
"symfony/yaml": "4.3.*" // v4.3.2
},
"require-dev": {
"easycorp/easy-log-handler": "^1.0.7", // v1.0.7
"symfony/debug-bundle": "4.3.*", // v4.3.2
"symfony/maker-bundle": "^1.0", // v1.12.0
"symfony/monolog-bundle": "^3.0", // v3.4.0
"symfony/stopwatch": "4.3.*", // v4.3.2
"symfony/twig-bundle": "4.3.*", // v4.3.2
"symfony/var-dumper": "4.3.*", // v4.3.2
"symfony/web-profiler-bundle": "4.3.*" // v4.3.2
}
}
Hey SymfonyCast-Team,
I hope you can give me a hand on this. After my update to Symfony 5.4
(and other composer updates), my wonderful transport (sending emails) is
exploding and I have following error, which comes from
`/vendor/doctrine/dbal/lib/Doctrine/DBAL/Driver/PDOMySql/Driver.php
Do you have an idea what could have happened?
`
Which transports/receivers do you want to consume?
Choose which receivers you want to consume messages from in order of priority.
Hint: to consume from multiple, use a list of their names, e.g. async_emailing_batch, async_emailing_email, failed
Select receivers to consume: [async_emailing_batch]:
[0] async_emailing_batch
[1] async_emailing_email
[2] failed
> 0
[OK] Consuming messages from transports "async_emailing_batch".
// The worker will automatically exit once it has received a stop signal via the messenger:stop-workers command.
// Quit the worker with CONTROL-C.
// Re-run the command with a -vv option to see logs about consumed messages.
In Driver.php line 24:
Attempted to load class "Connection" from namespace "Doctrine\DBAL\Driver\PDO".
Did you forget a "use" statement for e.g. "Symfony\Component\VarDumper\Server\Connection", "Symfony\Component\Messenger\Bridge\Redis\Transport\Connection", "Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection", "Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection", "Symfony\Component\Messenger\Transport\AmqpExt\Connection", "Symfony\Component\Messenger\Transport\Doctrine\Connection", "Symfony\Component\Messenger\Transport\RedisExt\Connection", "Doctrine\DBAL\Connection", "Doctrine\DBAL\Driver\Connection", "Doctrine\DBAL\Driver\DrizzlePDOMySql\Connection", "Doctrine\DBAL\Driver\PDOSqlsrv\Connection", "Doctrine\DBAL\Driver\SQLSrv\Connection", "Doctrine\DBAL\Driver\PDO\Connection", "Doctrine\DBAL\Driver\PDO\SQLSrv\Connection", "Doctrine\DBAL\Driver\IBMDB2\Connection", "Doctrine\DBAL\Driver\OCI8\Connection", "Doctrine\DBAL\Driver\Mysqli\Connection" or "Doctrine\DBAL\Portability\Connection"?
messenger:consume [-l|--limit LIMIT] [-f|--failure-limit FAILURE-LIMIT] [-m|--memory-limit MEMORY-LIMIT] [-t|--time-limit TIME-LIMIT] [--sleep SLEEP] [-b|--bus BUS] [--queues QUEUES] [--no-reset] [--] [<receivers>...]
`
Cheers!
Tim