If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeWe now know that each message will be retried 3 times - which is configurable - and then, if handling it still fails, it will be "rejected"... which is a "queue" word for: it will be removed from the transport and lost forever.
That's... a bummer! Our last retry happened 14 seconds after our first... but if the handler is failing because a third-party server is temporarily down... then if that server is down for even just 30 seconds... the message will be lost forever! It would be better if we could retry it once the server was back up!
The answer to this is... the failure transport!
First, I'm going to uncomment a second transport. In general, you can have as many transports as you want. This one starts with doctrine://default
. If you look at our .env
file... hey! That's exactly what our MESSENGER_TRANSPORT_DSN
environment variable is set to! Yep, both our async
and new failed
transports are using the doctrine
transport and the default
doctrine connection. But the second one also has this little ?queue_name=failed
option. OooooOOOOooo.
framework: | |
messenger: | |
... lines 3 - 5 | |
transports: | |
... lines 7 - 12 | |
failed: 'doctrine://default?queue_name=failed' | |
... lines 14 - 20 |
Go back to whatever you're using to inspect the database and check out the queue table:
DESCRIBE messenger_messages;
Ah. One of the columns in this table is called queue_name
. This column allows us to create multiple transports that all store messages in the same table. Messenger knows which messages belong to which transport thanks to this value. All the messages sent to the failed
transport will have a failed
value... that could be anything - and messages sent to the async
transport will use the default value... which is default
.
By the way, each transport has a number of different connection options and there are two ways to pass them: either as query parameters like this or via an expanded format where you put the dsn
on its own line and then add an options
key with whatever you need below that.
What options can you put here? Each transport type - like doctrine
or amqp
- has its own set of options. Right now, they're not well-documented, but they are easy to find... if you know where to look. By convention, every transport type has a class called Connection
. I'll press Shift+Shift in PhpStorm, search for Connection.php
... and look for files. There they are! A Connection
class for Amqp, Doctrine and Redis.
Open the one for Doctrine. All of these classes have documentation near the top that describe their options, in this case: queue_name
, table_name
and a few others, including auto_setup
. Earlier, we saw that Doctrine will create the messenger_messages
table automatically if it doesn't exist. If you don't want that to happen, you would set auto_setup
to false
.
The transport with the most options can be seen in the Amqp Connection class. We'll talk about Amqp later in the tutorial.
Anyways, back to it! We now have a new transport called failed
... which, despite its name, is the same as any other transport. If we wanted to, we could route message classes there and consume them, just like we're doing for async
.
But... the purpose of this transport is different. Near the top, there's another key called failure_transport
. Uncomment that and notice that this points to our new failed
transport.
framework: | |
messenger: | |
# Uncomment this (and the failed transport below) to send failed messages to this transport for later handling. | |
failure_transport: failed | |
... lines 5 - 20 |
What does it do? Let's see it in action! First, go restart our worker:
php bin/console messenger:consume -vv
Woh! This time, it asks us which "receiver" - which basically means which "transport" - we want to consume. A worker can read from one or many transports - something we'll talk about later with "prioritized" transports. Let's consume just the async
transport - we'll handle messages from the failed
transport in a different way. And actually, to make life easier, we can pass async
as an argument so that it won't ask us which transport to use each time:
php bin/console messenger:consume -vv async
Now... let's upload some images! Then... over here... pretty quickly, all 4 of those exhaust their retries and are eventually rejected from the transport. Until now, that meant that they were gone forever. But this time... that did not happen. Before removing the message from the queue, it says:
Rejected message
AddPonkaToImage
will be sent to the failure transport "failed"
And then... "Sending message". So, it was removed from the async
transport, but it still exists because it was sent to the "failed" transport.
How can we see what messages have failed and try them again if we think those failure were temporary? With a couple of shiny, new console commands. Let's talk about those next.
Hey Roland W.
Yep you can do it with special exception UnrecoverableMessageHandlingException
Cheers!
As far as I can see the UnrecoverableMessageHandlingException ony prevents retries but not the sending to the failure transport, right?
IIRC it should discard message at all without sending it to failed, hm but why you need retries but not failure?
I would have to throw it in the Message Handler, right? Unfortunatelly I guess I need something that works in an Event Subscriber to be able to call this:
if($event->willRetry() === false) { ... }
Becaus I still want the retries but not the failure transport.
// composer.json
{
"require": {
"php": "^7.1.3",
"ext-ctype": "*",
"ext-iconv": "*",
"composer/package-versions-deprecated": "^1.11", // 1.11.99
"doctrine/annotations": "^1.0", // v1.8.0
"doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
"doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
"doctrine/orm": "^2.5.11", // v2.6.3
"intervention/image": "^2.4", // 2.4.2
"league/flysystem-bundle": "^1.0", // 1.1.0
"phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
"sensio/framework-extra-bundle": "^5.3", // v5.3.1
"symfony/console": "4.3.*", // v4.3.2
"symfony/dotenv": "4.3.*", // v4.3.2
"symfony/flex": "^1.9", // v1.18.7
"symfony/framework-bundle": "4.3.*", // v4.3.2
"symfony/messenger": "4.3.*", // v4.3.4
"symfony/property-access": "4.3.*", // v4.3.2
"symfony/property-info": "4.3.*", // v4.3.2
"symfony/serializer": "4.3.*", // v4.3.2
"symfony/validator": "4.3.*", // v4.3.2
"symfony/webpack-encore-bundle": "^1.5", // v1.6.2
"symfony/yaml": "4.3.*" // v4.3.2
},
"require-dev": {
"easycorp/easy-log-handler": "^1.0.7", // v1.0.7
"symfony/debug-bundle": "4.3.*", // v4.3.2
"symfony/maker-bundle": "^1.0", // v1.12.0
"symfony/monolog-bundle": "^3.0", // v3.4.0
"symfony/stopwatch": "4.3.*", // v4.3.2
"symfony/twig-bundle": "4.3.*", // v4.3.2
"symfony/var-dumper": "4.3.*", // v4.3.2
"symfony/web-profiler-bundle": "4.3.*" // v4.3.2
}
}
Is there a possibility to prevent that certain messages are sent to the failure transport after being rejected?