If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeOur shiny new external_messages
transport reads messages from this messages_from_external
queue, which we're pretending is being populated by an external application. We're taking this JSON and, in ExternalJsonMessengerSerializer
, decoding it, creating the LogEmoji
object, putting it into an Envelope
, even adding a stamp to it, and ultimately returning it, so that it can then be dispatched back through the message bus system.
This is looking great! But there are two improvements I want to make. First, we haven't been coding very defensively. For example, what if, for some reason, the message contains invalid JSON? Let's check for that: if null === $data
, then throw a new MessageDecodingFailedException('Invalid JSON')
... lines 1 - 6 | |
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; | |
... lines 8 - 10 | |
class ExternalJsonMessageSerializer implements SerializerInterface | |
{ | |
public function decode(array $encodedEnvelope): Envelope | |
{ | |
... lines 15 - 19 | |
if (null === $data) { | |
throw new MessageDecodingFailedException('Invalid JSON'); | |
} | |
... lines 23 - 34 | |
} | |
... lines 36 - 40 | |
} |
I'll show you why we're using this exact exception class in a minute. But let's try this with some invalid JSON and... see what happens. Go restart the worker so it sees our new code:
php bin/console messenger:consume -vv external_messages
Then, in the RabbitMQ manager, let's make a very annoying JSON mistake: add a comma after the last property. Publish that message! Ok, move over and... explosion!
MessageDecodingFailedException: Invalid JSON
Oh, and interesting: this killed our worker process! Yep, if an error happens during the decoding process, the exception does kill your worker. That's not ideal... but in reality... it's not a problem. On production, you'll already be using something like supervisor that will restart the process when it dies.
Let's add code to check for a different possible problem: let's check to see if this emoji
key is missing: if not isset($data['emoji'])
, this time throw a normal exception: throw new \Exception('Missing the emoji key!')
.
... lines 1 - 10 | |
class ExternalJsonMessageSerializer implements SerializerInterface | |
{ | |
public function decode(array $encodedEnvelope): Envelope | |
{ | |
... lines 15 - 23 | |
if (!isset($data['emoji'])) { | |
throw new \Exception('Missing the emoji key!'); | |
} | |
... lines 27 - 34 | |
} | |
... lines 36 - 40 | |
} |
Ok, move over and restart the worker:
php bin/console messenger:consume -vv external_messages
Back in Rabbit, remove the extra comma and change emoji
to emojis
. Publish! Over in the terminal... great! It exploded! And other than the exception class... it looks identical to the failure we saw before:
Exception: Missing the emoji key!
But... something different did just happen. Try running the worker again:
php bin/console messenger:consume -vv external_messages
Woh! It exploded! Missing the emoji key. Run it again:
php bin/console messenger:consume -vv external_messages
The same error! This is the difference between throwing a normal Exception
in the serializer versus the special MessageDecodingFailedException
. When you throw a MessageDecodingFailedException
, your serializer is basically saying:
Hey! Something went wrong... and I do want to throw an exception. But, I think we should discard this message from the queue: there is no point to trying it over and over again. kthxbai!
And that's super important. If we don't discard this message, each time our worker restarts, it will fail on that same message... over-and-over again... forever. Any new messages will start piling up behind it in the queue.
So let's change the Exception
to MessageDecodingFailedException
. Try it now:
... lines 1 - 10 | |
class ExternalJsonMessageSerializer implements SerializerInterface | |
{ | |
public function decode(array $encodedEnvelope): Envelope | |
{ | |
... lines 15 - 23 | |
if (!isset($data['emoji'])) { | |
throw new MessageDecodingFailedException('Missing the emoji key!'); | |
} | |
... lines 27 - 34 | |
} | |
... lines 36 - 40 | |
} |
php bin/console messenger:consume -vv external_messages
It will explode the first time... but the MessageDecodingFailedException
should have removed it from the queue. When we run the worker now:
php bin/console messenger:consume -vv external_messages
Yep! The message is gone and the queue is empty.
Next, let's add one more superpower to this serializer. What if that outside system actually sends our app many different types of message - not only a message to log emojis, but maybe also messages to delete photos or cook some pizza! How can our serializer figure out which messages are which... and which message object to create?
Hey SWORP!
Yea, that makes sense :). Have you tried the WorkerMessageFailedEvent
event? It's triggered each time a message fails while being processed by the worker.
Cheers!
So the message is removed from the queue, and lost ? But if it's our fault ? I mean we didn't implemented yet. Could we leave the message but at the end of the queue ? or with a low priority. So we could implement it and then get it ?
Hey Julien,
I suppose failure transport is what you need here: https://symfonycasts.com/sc... . The idea is to retry a few times and if it still fails - moved that message to the failure transport where you will be able to retry it whenever you're ready.
I hope this helps!
Cheers!
// composer.json
{
"require": {
"php": "^7.1.3",
"ext-ctype": "*",
"ext-iconv": "*",
"composer/package-versions-deprecated": "^1.11", // 1.11.99
"doctrine/annotations": "^1.0", // v1.8.0
"doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
"doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
"doctrine/orm": "^2.5.11", // v2.6.3
"intervention/image": "^2.4", // 2.4.2
"league/flysystem-bundle": "^1.0", // 1.1.0
"phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
"sensio/framework-extra-bundle": "^5.3", // v5.3.1
"symfony/console": "4.3.*", // v4.3.2
"symfony/dotenv": "4.3.*", // v4.3.2
"symfony/flex": "^1.9", // v1.18.7
"symfony/framework-bundle": "4.3.*", // v4.3.2
"symfony/messenger": "4.3.*", // v4.3.4
"symfony/property-access": "4.3.*", // v4.3.2
"symfony/property-info": "4.3.*", // v4.3.2
"symfony/serializer": "4.3.*", // v4.3.2
"symfony/validator": "4.3.*", // v4.3.2
"symfony/webpack-encore-bundle": "^1.5", // v1.6.2
"symfony/yaml": "4.3.*" // v4.3.2
},
"require-dev": {
"easycorp/easy-log-handler": "^1.0.7", // v1.0.7
"symfony/debug-bundle": "4.3.*", // v4.3.2
"symfony/maker-bundle": "^1.0", // v1.12.0
"symfony/monolog-bundle": "^3.0", // v3.4.0
"symfony/stopwatch": "4.3.*", // v4.3.2
"symfony/twig-bundle": "4.3.*", // v4.3.2
"symfony/var-dumper": "4.3.*", // v4.3.2
"symfony/web-profiler-bundle": "4.3.*" // v4.3.2
}
}
Is there a way to handle all the exception from Messages the way it is done by Subscribers to
KernelEvents::EXCEPTION
orExceptionListener->onKernelException
?Seems like exceptions occurring when processing messages is some separate event , other than
KernelEvents::EXCEPTION
.For example, need to send email to admin, in case some problem occurs with the queue.
Thanks!