Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

Tracking Messages with Middleware & a Stamp

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

We somehow want to attach a unique id - just some string - that stays with the message forever: whether it's handled immediately, sent to a transport, or even retried multiple times.

Creating a Stamp

How can we attach extra... "stuff" to a message? By giving it our very-own stamp! In the Messenger/ directory, create a new PHP class called UniqueIdStamp. Stamps also have just one rule: they implement MessengerEnvelopeMetadataAwareContainerReaderInterface. Nah I'm kidding - that would be a silly name. They just need to implement StampInterface.

... lines 1 - 2
namespace App\Messenger;
use Symfony\Component\Messenger\Stamp\StampInterface;
class UniqueIdStamp implements StampInterface
{
... lines 9 - 19
}

And... that's it! This is an empty interface that just serves to "mark" objects as stamps. Inside... we get to do whatever we want... as long as PHP can serialize this message... which basically means: as long as it holds simple data. Let's add a private $uniqueId property, then a constructor with no arguments. Inside, say $this->uniqueId = uniqid(). At the bottom, go to Code -> Generate - or Command+N on a Mac - and generate the getter... which will return a string.

... lines 1 - 2
namespace App\Messenger;
use Symfony\Component\Messenger\Stamp\StampInterface;
class UniqueIdStamp implements StampInterface
{
private $uniqueId;
public function __construct()
{
$this->uniqueId = uniqid();
}
public function getUniqueId(): string
{
return $this->uniqueId;
}
}

Stamp, done!

Stamping... um... Attaching the Stamp

Next, inside AuditMiddleware, before we call the next middleware - which will call the rest of the middleware and ultimately handle or send the message - let's add the stamp.

But, be careful: we need to make sure that we only attach the stamp once. As we'll see in a minute, a message may be passed to the bus - and so, to the middleware - many times! Once when it's initially dispatched and again when it's received from the transport and handled. If handling that message fails and is retried, it would go through the bus even more times.

So, start by checking if null === $envelope->last(UniqueIdStamp::class), then $envelope = $envelope->with(new UniqueIdStamp()).

... lines 1 - 8
class AuditMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
if (null === $envelope->last(UniqueIdStamp::class)) {
$envelope = $envelope->with(new UniqueIdStamp());
}
... lines 16 - 21
}
}

Envelopes are Immutable

There are a few interesting things here. First, each Envelope is "immutable", which means that, just due to the way that class was written, you can't change any data on it. When you call $envelope->with() to add a new stamp, it doesn't actually modify the Envelope. Nope, internally, it makes a clone of itself plus the new stamp.

That's... not very important except that you need to remember to say $envelope = $envelope->with() so that $envelope becomes the newly stamped object.

Fetching Stamps

Also, when it comes to stamps, an Envelope could, in theory, hold multiple stamps of the same class. The $envelope->last() method says:

Give me the most recently added UniqueIdStamp or null if there are none.

Dumping the Unique Id

Thanks to our work, below the if statement - regardless of whether this message was just dispatched... or just received from a transport... or is being retried - our Envelope should have exactly one UniqueIdStamp. Fetch it off with $stamp = $envelope->last(UniqueIdStamp::class). I'm also going to add a little hint to my editor so that it knows that this is specifically a UniqueIdStamp.

... lines 1 - 8
class AuditMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
if (null === $envelope->last(UniqueIdStamp::class)) {
$envelope = $envelope->with(new UniqueIdStamp());
}
/** @var UniqueIdStamp $stamp */
$stamp = $envelope->last(UniqueIdStamp::class);
... lines 19 - 21
}
}

To see if this is working, let's dump($stamp->getUniqueId()).

... lines 1 - 8
class AuditMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
if (null === $envelope->last(UniqueIdStamp::class)) {
$envelope = $envelope->with(new UniqueIdStamp());
}
/** @var UniqueIdStamp $stamp */
$stamp = $envelope->last(UniqueIdStamp::class);
dump($stamp->getUniqueId());
return $stack->next()->handle($envelope, $stack);
}
}

Let's try it! If we've done our job well, for an asynchronous message, that dump() will be executed once when the message is dispatched and again inside of the worker when it's received from the transport and handled.

Refresh the page just to be safe, then upload an image. To see if our dump() was hit, I'll use the link on the web debug toolbar to open up the profiler for that request. Click "Debug" on the left and... there it is! Our unique id! In a few minutes, we'll make sure that this code is also executed in the worker.

And because middleware are executed for every message, we should also be able to see this when deleting a message. Click that, then open up the profiler for the DELETE request and click "Debug". Ha! This time there are two distinct unique ids because deleting dispatches two different messages.

Next, let's actually do something useful with this! Inside of our middleware, we're going to log the entire lifecycle of a single message: when it's originally dispatched, when it's sent to a transport and when it's received from a transport and handled. To figure out which part of the process the message is currently in, we're going to once again use stamps. But instead of creating new stamps, we'll read the core stamps.

Leave a comment!

12
Login or Register to join the conversation
AymDev Avatar
AymDev Avatar AymDev | posted 3 years ago | edited

I really thought the MessengerEnvelopeMetadataAwareContainerReaderInterface thing was real at first ..!

1 Reply
Balaram M. Avatar
Balaram M. Avatar Balaram M. | posted 1 year ago | edited

Hi there

is there a way to get retry count on Symfony message handler.

Messenger consumer command throws an error and retries it 5 times according to configuration. So I want to get retry count in handler so I can add some logic accordingly.

Reply

Hi!

For anyone looking, this question (a very interesting question!) is answered over here: https://symfonycasts.com/sc...

Cheers!

Reply
SamuelVicent Avatar
SamuelVicent Avatar SamuelVicent | posted 1 year ago

Hi, I would like to track the state of all messages to a tracking table, does any one know if this functionality is implemented or is planned to? thanks a lot!

Reply

Hey SamuelVicent!

You have a few options here. The easiest, but the one that contains the least information, would be to use the "doctrine" transport so that your messages are stored in a database. Then you can query them to get some information. But, this table structure is "fixed" and it's meant to help Messenger - not to give you all the information that you need, for example :).

So, if you need more information or control, I would register an event listener (or several) and update a separate table as things happen. If you look in the Worker class - https://github.com/symfony/... - you'll see a number of different events you can listen to (search for dispatchEvent).

But, let's back up: what kind of tracking would you like to do?

Cheers!

Reply
SamuelVicent Avatar
SamuelVicent Avatar SamuelVicent | weaverryan | posted 1 year ago

Hi, first of all, thanks for the feedback.
Let me explain my experience on this:

A few weeks ago we created 2 listeners for WorkerMessageFailedEvent and WorkerMessageHandledEvent.
We use mariaDb as a database engine for tracking the state of each message (only track messages of type SendEmailMessage.)
It worked in some way, but had some problems with deadlocks tables. We saw a solution on symfony github issue 42345.
We have been working on some solution using a custom Middleware and playing with Stamps but still have not a robust code.
We will try to apply the github solution to see if the Event approach works.

That's the reason why I asked you if you know some message tracking implementation.
Thanks a lot :)

Reply

Hey SamuelVicent!

Hmm, that's very interesting about the deadlocks! Let me know how it goes, I'd like to know :).

Cheers!

Reply
SamuelVicent Avatar
SamuelVicent Avatar SamuelVicent | weaverryan | posted 1 year ago

Hi Ryan,
We finally used listeners for tracking handled and failed messages.
Had to use rabbitMq as transport for managing messages, an a database in mariadb for the tracking (for knowing which messages were send properly or not).
Apart from this, using MariaDb as transport gave us DeadLock problems in a master to master setup (sync db between 2 servers).
Hope it helps as a experience :)
Regards and merry christmas!

Reply

Thanks for sharing all of that! And yes, I think that, in the real world, using the database (at least MySQL/MariaDB - the Pgsql queue transport is much more robust in messenger... because pgsql is much more robust) can cause dead lock issues.

Anyways, I always like to hear solutions that work nicely - thanks for sharing and a merry christmas back to you!

Reply
Dariia M. Avatar
Dariia M. Avatar Dariia M. | posted 3 years ago

I understand main idea about adding stamps without duplicates, but I don't understand code design in Messenger source code. It's very confusing to me :)
Why methods "last" and "with" have different signatures? Why we need check stamp by FQCN but can't instantiate stamp by FQCN inside "with" method? What's the purpose of keeping stamps in array by "FQCN" => "stamp instance" as "key" => "value" inside envelope?

Reply

Hey Dariia M.

The with() method clones such Envelope and if you pass in any stamps it will add them to the new cloned envelope object. It's just a way to create your current envelope with more or the same stamps it already has

The last() method only finds the latest stamp added to your envelope by a FQCN. Since an envelope can have many stamps of the same type, storing them in an array indexed by their FQCN is a handy way to fetch them by type

Cheers!

1 Reply
Dariia M. Avatar
Dariia M. Avatar Dariia M. | MolloKhan | posted 3 years ago | edited

Hi MolloKhan
Thank you for detailed explanation, now everything is clear for me!

1 Reply
Cat in space

"Houston: no signs of life"
Start the conversation!

This tutorial is built with Symfony 4.3, but will work well on Symfony 4.4 or 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2
    }
}
userVoice