Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

Middleware Message Lifecycle Logging

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

Our middleware is called in two different situations. First, it's called when you initially dispatch the message. For example, in ImagePostController, the moment we call $messageBus->dispatch(), all the middleware are called - regardless of whether or not the message will be handled async. And second, when the worker - bin/console messenger:consume - receives a message from the transport, it passes that message back into the bus and the middleware are called again.

This is the trickiest thing about middleware: trying to figure out which situation you're currently in. Fortunately, Messenger adds "stamps" to the Envelope along the way, and these tell us exactly what's going on.

Was the Message Received from the Transport? ReceivedStamp

For example, when a message is received from a transport, messenger adds a ReceivedStamp. So, if $envelope->last(ReceivedStamp::class), then this message is currently being processed by the worker and was just received from a transport.

... lines 1 - 8
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
class AuditMiddleware implements MiddlewareInterface
{
... lines 13 - 19
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
... lines 22 - 32
if ($envelope->last(ReceivedStamp::class)) {
... line 34
} else {
... line 36
}
... lines 38 - 39
}
}

Let's log that: $this->logger->info() with a special syntax:

[{id}] Received and handling {class}

Then pass $context as the second argument. The $context array is cool for two reasons. First, each log handler receives this and can do whatever it wants with it - usually the $context is printed at the end of the log message. And second, if you use these little {} wildcards, the context values will get filled in automatically!

... lines 1 - 8
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
class AuditMiddleware implements MiddlewareInterface
{
... lines 13 - 19
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
... lines 22 - 32
if ($envelope->last(ReceivedStamp::class)) {
$this->logger->info('[{id}] Received & handling {class}', $context);
} else {
... line 36
}
... lines 38 - 39
}
}

If the message was not just received, say $this->logger->info() and start the same way:

[{id}] Handling or sending {class}

... lines 1 - 8
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
class AuditMiddleware implements MiddlewareInterface
{
... lines 13 - 19
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
... lines 22 - 32
if ($envelope->last(ReceivedStamp::class)) {
$this->logger->info('[{id}] Received & handling {class}', $context);
} else {
$this->logger->info('[{id}] Handling or sending {class}', $context);
}
... lines 38 - 39
}
}

At this point, we know that the message was just dispatched... but we don't know whether or not it will be handled right now or sent to a transport. We'll improve that in a few minutes.

But first, let's try it! Start the worker and tell it to read from the async transport:

php bin/console messenger:consume -vv async

Ah, I think we had a few messages from earlier still in the queue! When that finishes, let's clear the screen. Let's also open up another tab and create the new log file - messenger.log - if it's not already there:

touch var/log/messenger.log

Then, tail it so we can watch the messages:

tail -f var/log/messenger.log

Oh, cool! This already has a few lines from those old messages it just processed. Let's clear that so we have fresh screens to look at.

Testing time! Move over and upload one new photo. Spin back to your terminal and... yea! Both log messages are already there: "Handling or sending" and then "Received and handling" when the message was received from the transport... which was almost instant. We know these log entries are for the same message thanks to the unique id at the beginning.

Determining if Message is Handled or Sent

But... we can do better than just saying "handling or sending". How? This $stack->next()->handle() line is responsible for calling the next middleware... which will then call the next middleware and so on. Because our logging code is above this, it means that our code is potentially being called before some other middleware do their work. In fact, our code is being executed before the core middleware that are responsible for handling or sending the message.

So... how can we determine whether the message will be sent versus handled immediately... before the message is actually sent or handled immediately? We can't!

Check it out: remove the return and instead say $envelope = $stack->next()->handle(). Then, move that line above our code and, at the bottom, return $envelope.

... lines 1 - 11
class AuditMiddleware implements MiddlewareInterface
{
... lines 14 - 20
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
... lines 23 - 35
$envelope = $stack->next()->handle($envelope, $stack);
if ($envelope->last(ReceivedStamp::class)) {
$this->logger->info('[{id}] Received & handling {class}', $context);
} else {
$this->logger->info('[{id}] Handling or sending {class}', $context);
}
return $envelope;
}
}

If we did nothing else... the result would be pretty much the same: we would log the exact same messages... but technically, the log entries would happen after the message was sent or handled instead of before.

But! Notice that when we call $stack->next()->handle() to execute the rest of the middleware, we get back an $envelope... which may contain new stamps! In fact, if the message was sent to a transport instead of being handled immediately, it will be marked with a SentStamp.

Add elseif $envelope->last(SentStamp::class) then we know that this message was sent, not handled. Use $this->logger->info() with our {id} trick and sent {class}.

... lines 1 - 9
use Symfony\Component\Messenger\Stamp\SentStamp;
class AuditMiddleware implements MiddlewareInterface
{
... lines 14 - 20
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
... lines 23 - 37
if ($envelope->last(ReceivedStamp::class)) {
... line 39
} elseif ($envelope->last(SentStamp::class)) {
$this->logger->info('[{id}] Sent {class}', $context);
} else {
... line 43
}
... lines 45 - 46
}
}

Below, now we know that we're definitely "Handling sync". The top message - "Received and handling" is still true, but I'll change this to just say "Received": a message is always handled when it's received, so that was redundant.

... lines 1 - 9
use Symfony\Component\Messenger\Stamp\SentStamp;
class AuditMiddleware implements MiddlewareInterface
{
... lines 14 - 20
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
... lines 23 - 37
if ($envelope->last(ReceivedStamp::class)) {
$this->logger->info('[{id}] Received {class}', $context);
} elseif ($envelope->last(SentStamp::class)) {
$this->logger->info('[{id}] Sent {class}', $context);
} else {
$this->logger->info('[{id}] Handling sync {class}', $context);
}
... lines 45 - 46
}
}

Ok! Let's clear our log screen and restart the worker:

php bin/console messenger:consume -vv async

Upload one photo... then move over... and go to the log file. Yep! Sent, then Received! If we had uploaded 5 photos, we could use the unique id to identify each message individually.

Hit enter a few times: I want to see an even cooler example. Delete a photo and move back over! Remember, this dispatches two messages! The unique id part makes it even more obvious what's going on: DeletePhotoFile was sent to the transport, then DeleteImagePost was handled synchronously... then DeletePhotoFile was received and processed.

Actually, what really happened was this: DeleteImagePost was handled synchronously and, internally, it dispatched DeletePhotoFile which was sent to the transport. The first two messages are a bit out of order because our logging code is always running after we execute the rest of the chain, so after DeleteImagePost was handled. We could improve that by moving the Handling Sync logging logic above the code that calls the rest of the middleware. Yea, this stuff is super powerful... but can be a bit complex to navigate. This logging stuff is probably as confusing as it gets.

Next: the worker handles each message in the order it was received. But... that's not ideal: it's way more important for all AddPonkaToImage messages to be handled before any DeletePhotoFile messages. Let's do that with priority transports.

Leave a comment!

7
Login or Register to join the conversation

Does there a way to use that unique id to remove a failed message from failure transport for example ?

Reply

Hey ahmedbhs

If you have a message hanging in the failed queue, then, you can use Messenger's commands to investigate more about it. There are these 3 commands you can play with and decide what to do with such a message
messenger:failed:remove
messenger:failed:retry
messenger:failed:show

Cheers!

-1 Reply

Hi

if i have routing like this:
`
routing:

'App\Events\SomeEvent': [one, two]

`

Is it possible somehow, (depend on some logic in code) to send message only to "<b>two</b>" transport?
I'm using AWS SQS.

Reply

Hey kwolniak

There is a way to do that, a bit confusing but possible. Here you can learn how https://symfonycasts.com/sc...

Cheers!

Reply

Thanks, I know this "from_transport" thing, but it's not what I need.
When I declare routing like above, SomeEvent message is sent automaticaly to one and two transport. And the same message appear in my case on one and two queue and in most cases it's OK.
But sometimes I want change it on the runtime in the code (for example somehow in middleware or before dispatch) and sent it only to one transport so message will appear only in one queue.
I have found that it should be possible with ampq and routing key but is it possible when I'm using enqueue/sqs?

Reply

Hey kwolniak!

This is currently a shortcoming with Messenger - and probably the one that bothers me the most! Routing is super static - there is no way to hook in at runtime to modify this. I think there should be, but it's not there yet. So, you have 2 options:

1) Do fanciness with routing key and amqp (or something similar in sqs... if there is such a mechanism). This is basically you going outside of the system to get what you want ;).

2) Create 2 different classes: one that routes to both transports and another that routes to only one.

SUPER not ideal, but that's the state of things. Hmm, or you could do something totally nuts and "decorate" (with Symfony service decoration) the messenger.senders_locator service (which is this class https://github.com/symfony/symfony/blob/master/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php) and do some custom logic there. That is the "I'm taking full control" hammer solution to the problem - but because there isn't such a nice solution yet, I won't be disappointed if you do it ;).

Cheers!

1 Reply

Thank you very much for the comprehensive answer!
For now I decided on two separate events and two separate transports, and before dispatch I do some logic.

Thanks anyway :)

Reply
Cat in space

"Houston: no signs of life"
Start the conversation!

This tutorial is built with Symfony 4.3, but will work well on Symfony 4.4 or 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2
    }
}
userVoice