Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

Mapping Messages to Classes in a Transport Serializer

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

We've written our transport serializer to always expect only one type of message to be put into the queue: a message that tells our app to "log an emoji". Your app might be that simple, but it's more likely that this "external" system might send 5 or 10 different types of messages. In that case, our serializer needs to detect which type of message this is and then turn it into the correct message object.

How can we do that? How can we figure out which one type of message this is? Do we... just look at what fields the JSON has? We could... but we can also do something smarter.

Refactoring to a switch

Let's start by reorganizing this class a bit. Select the code at the bottom of this method - the stuff related to the LogEmoji object - and then go to the Refactor -> "Refactor This" menu, which is Ctrl+T on a Mac. Refactor this code to a method called createLogEmojiEnvelope.

Tip

To make sure "retries" work correctly, some of the code in this section has been tweaked. See the code blocks on this page for the updated examples!

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 15 - 23
$envelope = $this->createLogEmojiEnvelope($data);
// in case of redelivery, unserialize any stamps
$stamps = [];
if (isset($headers['stamps'])) {
$stamps = unserialize($headers['stamps']);
}
$envelope = $envelope->with(... $stamps);
return $envelope;
}
... lines 35 - 63
private function createLogEmojiEnvelope($data): Envelope
{
if (!isset($data['emoji'])) {
throw new MessageDecodingFailedException('Missing the emoji key!');
}
$message = new LogEmoji($data['emoji']);
$envelope = new Envelope($message);
// needed only if you need this to be sent through the non-default bus
$envelope = $envelope->with(new BusNameStamp('command.bus'));
return $envelope;
}
}

Cool! That created a private function down here with that code. I'll add an array type-hint. Back in decode(), we're already calling this method. So, no big change.

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
... lines 13 - 31
private function createLogEmojiEnvelope(array $data): Envelope
{
... lines 34 - 44
}
}

Using Headers for the Type

The key question is: if multiple types of messages are being added to the queue, how can the serializer determine which type of message this is? Well, we could add maybe a type key to the JSON itself. That might be fine. But, there's another spot on the message that can hold data: the headers. These work a lot like HTTP headers: they're just "extra" information you can store about the message. Whatever header we put here will make it back to our serializer when it's consumed.

Ok, so let's add a new header called type set to emoji. I just made that up. I'm not making this a class name... because that external system won't know or care about what PHP classes we use internally to handle this. It's just saying:

This is an "emoji" type of message

Back in our serializer, let's first check to make sure that header is set: if not isset($headers['type']), then throw a new MessageDecodingFailedException with:

Missing "type" header

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 15 - 23
if (!isset($headers['type'])) {
throw new MessageDecodingFailedException('Missing "type" header');
}
... lines 27 - 33
}
... lines 35 - 54
}

Then, down here, we'll use a good, old-fashioned switch case statement on $headers['type']. If this is set to emoji, return $this->createLogEmojiEnvelope().

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 15 - 23
if (!isset($headers['type'])) {
throw new MessageDecodingFailedException('Missing "type" header');
}
switch ($headers['type']) {
case 'emoji':
$envelope = $this->createLogEmojiEnvelope($data);
break;
... lines 32 - 33
}
... lines 35 - 43
return $envelope;
}
... lines 46 - 90
}

After this, you would add any other "types" that the external system publishes, like delete_photo. In those cases you would instantiate a different message object and return that. And, if some unexpected "type" is passed, let's throw a new MessageDecodingFailedException with

Invalid type "%s"

passing $headers['type'] as the wildcard.

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 15 - 27
switch ($headers['type']) {
case 'emoji':
$envelope = $this->createLogEmojiEnvelope($data);
break;
default:
throw new MessageDecodingFailedException(sprintf('Invalid type "%s"', $headers['type']));
}
... lines 35 - 90
}

Tip

To support retries on failure, you also need to re-add the "type" header inside encode():

... lines 1 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
... lines 13 - 46
public function encode(Envelope $envelope): array
{
... lines 49 - 53
if ($message instanceof LogEmoji) {
... lines 55 - 56
$type = 'emoji';
... lines 58 - 59
}
... lines 61 - 66
return [
... line 68
'headers' => [
// store stamps as a header - to be read in decode()
'stamps' => serialize($allStamps),
'type' => $type,
],
];
}
... lines 76 - 90
}

Kinda cool, right? Let's go stop our worker, then restart it so it sees our new code:

php bin/console messenger:consume -vv external_messages

Back in the Rabbit manager, I'll change the emojis key back to emoji and... publish! In the terminal... sweet! It worked! Now change the type header to something we don't support, like photo. Publish and... yea! An exception killed our worker:

Invalid type "photo".

Ok friends... yea... that's it! Congrats on making it to the end! I hope you enjoyed the ride as much as I did! I mean, handling messages asynchronously... that's pretty fun stuff. The great thing about Messenger is that it works brilliantly out of the box with a single message bus and the Doctrine transport. Or, you can go crazy: create multiple transports, send things to RabbitMQ, create custom exchanges with binding keys or use your own serializer to... well... basically do whatever you want. The power... it's... intoxicating!

So, start writing some crazy handler code and then... handle that work later! And let us know what you're building. As always, if you have some questions, we're there for you in the comments.

Alright friends, seeya next time!

Leave a comment!

34
Login or Register to join the conversation
Cameron Avatar
Cameron Avatar Cameron | posted 2 years ago

Question: why would we use a query bus when we can either create a service or build an API? Its not clear to me the purpose of this tool or when to leverage this tool in a project.

1 Reply

Hey Fox C.!

Hmm... so.... query buses are more of a "programming philosophy" and strategy than a solution. What I mean is, it's about organizing your internal code - not about how your app behaves or anything like that.

For example, if I needed to do some calculation - maybe "calculate average sales " - I'd probably create a service and put a method on it - e.g. SalesStatsCalculator with a getAverageSales(\DateTimeInterface $start, \DateTimeInterface $end): float or something like that.

Someone else who is choosing a CQRS strategy for organizing code (and I am far from an expert on this topic) might choose to use a query bus. In that case, they might create a GetAverageSales message class with the \DateTimeInterface $start, \DateTimeInterface $end as arguments to its constructor. They would then pass that to a bus - $result = $queryBus->dispatch(new GetAverageSales(..., ...)) and expect the result to be sent back through it. So, just a different way to organize code.

Let me know if that clarifies... or if it doesn't ;).

Cheers!

1 Reply

Yet, another awesome tutorial, thanks Ryan!

1 Reply

Thank you so much for this tutorial! I am more comfortable using messenger. I have already implemented some of your recommendations and the doctrine transport is wonderful and now in use. Please keep up the good work.

I am looking for a solution that doesn't require the server to constantly poll, so I was thinking to use a push notification service (AWS SNS) to start the messenger:consume process and terminate. Is this possible and how would I go about it?

1 Reply

Hey Skylar

Thanks for your kind words :)
I don't know if RabbitMQ fit your needs, have you give it a try? Also, if you really want push notifications you may be interested in the Symfony Mercure component (https://symfony.com/doc/cur... ). I've never used it before but it seems solid

Cheers!

Reply

Thanks for the reply. Do you know if it is possible to start a console process from a web request?

Reply

Yes, it's possible, you can see exactly how here: https://symfony.com/doc/cur...
But, as the docs say, it's better if you refactor the logic you want to reuse into a service

Reply
rAfitiiixxx Avatar
rAfitiiixxx Avatar rAfitiiixxx | posted 3 years ago

Brilliant! this opens up a bunch of diferent solutions... Altough i'd love some 'casts support for RedisTransport since that's the one i'm using right now, it could include a workaround for the DelayStamp which I hacked around; a redis-cli interpretation for reading the pub/sub streams and a message board (mercure fetched from middleware or manual dispatch of events) similar to what Laravel Horizon does it.

At work, we implemented this transport because it was already deployed in our stack, Rabbit was just overkill for the small input we needed, doctrine is better suited for failed transport and could overflow mysqld on multiple messaging.
We ended up with a CQRS implementation of some request lifecycles that get back a response with the user using mercure bundle.
We also used API-Platform to fetch collections of the result of the command and events, with a custom provider for the result, which is upserted in Redis as a storage repository for entities with ttl. These entities feed an adapter custom made for jQuery datatables to populate a "reports" view, and need to be auto-deleted after ttl = 0 and a cronjob should delete the files missing a hash in Redis (wish that could be an event, i hate cronjobs).
This made possible for us to use an external service made on python to get us back large data reports with ugly queries that take too long, and notify the user when done.
And this was made posible by your casts.

It has truly been a great ride for us, and specially for me.

Thanks Ryan, keep making this awesome casts, and making Symfony a better framework.

1 Reply

Hey rAfitiiixxx!

Wow :D. That sounds... crazy - nice work to your and your team :). Sorry for not having the Redis support - it's the transport I have the *least* experience with and I'm not (completely) sure how useful it is to people between the doctrine transport and Rabbit. But, this is obviously a vote from you for "yes, useful!".

> it could include a workaround for the DelayStamp

Yea, we still need this for Redis - this was a ping to *try* to push that PR forward (https://github.com/symfony/... for 4.4, but we're very late at this point. What workaround did you do?

> a message board (mercure fetched from middleware or manual dispatch of events) similar to what Laravel Horizon does it

In 4.3... when I added a BUNCH of features to Messenger, a Horizon-like this is something that I had in mind - specifically I was adding events, etc that would/should make this theoretically possible. Can you tell me more about exactly what you would like to see? Like, do you just want to be able to see a visual representation of how many messages are in each queue, etc? Stats on how long messages are sitting in queues? The auto-scaling thing that Horizon does where it can increase/balance workers as messages get bigger?

Anyways, thanks for the REALLY nice message - it means a lot ❤️

Cheers!

Reply

Hi,

This section suggests to add "type" key to "headers" (https://symfonycasts.com/screencast/messenger/serializer-classes#using-headers-for-the-type).

The question is "how exactly ?"

Say I am dispatching a message from app 1 (which puts it into queue q1) and worker runs on app 2, the custom serializer is on app 2. How will app 1 add "type" key to headers ?
Infact, how will it add any custom header at all ? Envelope only provides possibility of adding a stamps.

app 1 by default adds FQCN of message as value of "type" key in headers (iiuc).

Reply

Hey yarikul!

Apologies for the glacial reply - we had a hiccup in our notification system: it swallowed a few comments

I was imagining, perhaps naively, that the "outside" system sending the message was not some other Symfony app. So the idea was "however you're creating that JSON in that other system, set a header". But, very fair question of "how would I do that in Symfony?".

First:

app 1 by default adds FQCN of message as value of "type" key in headers (iiuc).

It's almost cheating, but since that app sets the FCQN, if you can simply re-use it by having that same class, you're done. However, that does require app 1 and app 2 to stay in sync, which in practice might not be a big deal if you "own" both of these.

But if you want to control the headers, here's what I would do:

A) The class responsible for serializing messages to JSON is this one: https://github.com/symfony/symfony/blob/6.3/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php - and its service id is messenger.transport.symfony_serializer
B) I would create a new class, make it implement the same SerializerInterface as that class (the one from Messenger). Give this new class a public function __construct(private SerializerInterface $coreMessengerSerializer). We're "decorating" that core service.
C) In the encode() and decode() methods, call the "inner" serializer (well, decode() might never be called, as app 1 may never be receiving messages). In encode(), after calling the inner serializer, override the type key in the header with whatever you want.

To make Symfony use your new serializer above the new class, add #[AsDecorator(messenger.transport.symfony_serializer)] and done.

This will completely replace that service with your service, so everything using it (i.e. all transports currently using the "symfony serializer") will use your new class.

Let me know if that helps :).

Cheers!

Reply
Boris S. Avatar
Boris S. Avatar Boris S. | posted 2 years ago

The video assumes the extenal system adds info to the headers.

But we have different RabbitMQ queues and different binding_keys which do not end up as headers. How would I go about "reading" the binding key in the serializer, or what is the location where I would map binding_keys to consumers?

Reply

Hey @Botris!

Sorry for the slow reply - I had a family issue come up.

> But we have different RabbitMQ queues and different binding_keys which do not end up as headers. How would I go about "reading" the binding key in the serializer, or what is the location where I would map binding_keys to consumers

Hmm. So to make sure I understand correctly, it sounds like you have this situation:

Some external systems dispatch messages to RabbitMQ. And you already have RabbitMQ configured with binding keys to route those messages to various queues.

Is that correct? If so, then you said:

> How would I go about "reading" the binding key in the serializer, or what is the location where I would map binding_keys to consumers?

On the Symfony side of things, since the messages are being sent from an external system, each Messenger transport has only one job: receiving messages from a specific queue (you will not be also sending messages from the queue). In that case, you don't need to worry about the binding_keys config in Symfony at all. All you care about (and you would configure in messenger.yaml) is that you want to have a transport called "foo_bar" and that it should *read* messages from a queue named "foo_bar" (or queues) - via the "queues" option.

You also mentioned:

> The video assumes the extenal system adds info to the headers

So, somehow, you need to, in your Symfony app, be able to see a message and "recognize" the "meaning" of this message (e.g. this is a message that means we should sign up a user to the newsletter... or this is a message that says we should delete aa photo). In our example, as you already know, we assumed that you could add a "hint" via your external systems by setting a header. In your system: what is the way that you will "recognize" one message type from another one? If you have configured things so that each message type has its own, unique queue in Rabbit, then we can definitely work with that :). In that case, each Messenger transport would read from one specific queue, and that transport would have its OWN custom serializer. That serializer would look similar to the one we created in this tutorial, except that it would be hardcoded to know to handle the ONE type of message. For example, if you route "newsletter signup" messages to a "newsletter_signup" queue, then you could configure a "newsletter_signup" transport, which has a custom NewsletterSignupMessengerSerializer custom serializer, which is hardcoded to know that it should parse and handle the data for that ONE situation.

I did... a lot of guessing about your situation with my answer. So let me know if I was WAY off (it happens a lot ;) ) or if you have other questions.

Cheers!

Reply
Boris S. Avatar

Thanks for getting back and taking the time to answer so elaborate.
My question was poorly phased, sorry about that. But what I was looking for is the following:
A source system could emit an event (payment has been made) to multiple queue's (receipt_generator, email_confirmation, etc)
Rabbitmq can use a binding key (payment) to make sure the message ends up in the correct queue's.

Now I was trying to get Symfony to consume messages based on their binding key, the answer is (I guess) that you can configure (multiple) transports in messenger.yaml and limit each transport based on queues and/or binding_keys. In my scenario limiting a transport to certain queues was enough.

Reply

Hey @Botris!

Lol - no worries - these are complex topics! But now I think I understand :).

> Now I was trying to get Symfony to consume messages based on their binding key, the answer is (I guess) that you can configure (multiple) transports in messenger.yaml and limit each transport based on queues and/or binding_keys

Yes... but with one minor clarification (which may help): you can only (and this is not a Symfony messenger thing, but a "queues" thing) consume from a "queue" - you cannot consume messages based on their binding key. The binding key is used to help GET it to a specific queue. But,, ultimately, you are always consuming a specific queue... regardless of what binding key (or routing mechanism) was used to get the message there.

*Anyways*, it sounds like you got it sorted out. But if I'm wrong and I can (try) to help clarify anything, just let me know :).

Cheers!

Reply
Daniel W. Avatar
Daniel W. Avatar Daniel W. | posted 2 years ago

Question:
Is there some way to reject a message if the validation fails because the messageType is not supported without stopping the worker?
In my case I have more "unsupported" messages in my queue then supported, so the worker would restart on like every 2nd message. I need the worker to reject them and move on.

Reply

Hey Daniel W.!

That's an interesting question.

I think the only way you could accomplish this (but I think it would work just fine) is to ALWAYS return a message from your serializer, even if there is an invalid type. Just create some InvalidType message class... with a handler that does nothing :).

Let me know if that works :).

Cheers!

Reply
Kiuega Avatar

Hey! Thank you for this superb training which will help me a lot! Small question on this last video, shouldn't we implement a "switch case" on the encode () function, just like we did on the decode () function, in order to take into account other types of messages that the 'emoji'?

Reply

Hey Kiuega!

I'm glad it was useful :)!

shouldn't we implement a "switch case" on the encode () function, just like we did on the decode () function, in order to take into account other types of messages that the 'emoji'

Yes! The encode() function isn't currently flexible at all. If it were handling multiple classes, I would also have a switch-case there as well :). To do that with instanceof, you can use a trick:


switch(true) {  
    case $message instanceof LogEmoji:
        // ...
        break;
    // more stuff
}

Cheers!

2 Reply
Kiuega Avatar

Yes good game ! :D

Reply
Cameron Avatar
Cameron Avatar Cameron | posted 2 years ago

Cool series. It would be good to see how to implement this into AWS functions and auto scale up for rapid completion of command queue items and also to reduce server costs - maybe more of a DevOps topic but it would be good to see how to create a php FaaS as it opens up a number of possibilities for delivering backends.

Reply

Hey Fox C.

That is a cool idea :). I don't know if we'll do it, but I've added it to our internal idea list and things like Lamda are gaining a lot of popularity (check out Bref if you haven't already https://github.com/brefphp/... )

Cheers!

1 Reply
Bruno D. Avatar
Bruno D. Avatar Bruno D. | posted 2 years ago | edited

After some search, I didn't found out the answer to this silly question : how is handled the retry in case of events with multiple listeners when part of them fails and part succeed?
Does the php bin/console messenger:failed:retry -vv trigger the retry of only failed handlers or are they all retried?

eg : is there one entry per message/handler or one per message?

Reply

Hey Bruno D.!

Hmm. So first, when a message is retried (not from the failure queue, but just retried after it fails thanks to the normal retry functionality), if a message has multiple handlers/listeners, and 1 fails but 1 succeeds, on the retry, only the 1 message that failed will be retried - it was done a long time ago on this PR - https://github.com/symfony/...

For reading things from the failure transport like this, I'm not 100% sure. I believe it should function the same way. When a handler is successful, a stamp is attached to your message that indicates that this handler was successful. When a message is sent to the failure transport, that stamp *should* still be there. So, I can't say 100% for sure, but it *should* work correctly: it should only call the 1 handler/listener that failed.

Cheers!

1 Reply
Bruno D. Avatar

Thanks @weaverryan and @Diego it will be very precious to know where to look. Now that you say this, it make totally sense!

1 Reply

The changed files in this PR may be a good place to start looking https://github.com/symfony/...

Reply

Hey Bruno D.

That's a really good question. To be honest, I'm not sure but I believe it will re-dispatch the message so all the handlers will be called again. You can give it a try by logging something and forcing some handlers to always fail

Cheers!

Reply
Marcin Avatar

This was a really good course. Very useful for Symfony developers. Looking forward for more solid courses like this one and not the Vue.js like ones that nobody cares about.

P.S. Switch statement you proposed in the last episode breaks open closed principle. Every new message coming from external system will couse a need to edit this class and add another "case". What about ambitious course about SOLID? Why to go with Doctrine course that will be very similar to the Doctrine's courses in SF4, SF3 tutorials?

Reply

Hey @Jack!

Since this is my code, I'll also "chime in" here. I LOVE to break OCP and other design principles :). Ok, to be more fair - I "weigh" whether or not to break it or not. The negative of following OCP in this case is a loss of some simplicity. The class (as written in this tutorial) is *so* easy to read. That doesn't make it correct, but I often prefer readability like this over creating a more "pluggable" system. But, if someone did prefer a pluggable system that didn't violate OCP, I think that would be great :).

About the tutorial ordering - yea, it's not idea - Diego is helping with the SOLID tutorial and we also keep him really busy on other things :/. In the mean time, you're correct that the Doctrine course will be similar to the SF4 version, but we need to keep that core set of tutorials up to date. And actually, this one will have some new stuff - like Docker usage for the database and a new library called Foundry for data fixtures.

Oh, and sorry you don't find the Vue course useful - but I *totally* understand. It is currently the 2nd highest watched tutorial on our site (after API Platform), but I absolutely understand that this type of tutorial won't appeal to everyone. We try to keep a "rotation" of topics to keep things interesting for everyone - e.g. instead of creating 3 Vue courses in a row, we do a Vue course, then something more backend focused. It's not perfect, but we're doing our best :).

Cheers!

Reply
Default user avatar

Maybe a section with advanced tutorials for more ambitious developers? On the other hand good developers read documentation and betters inspect vendor code.
Best!

Reply

Hey Jack,

Thanks for your feedback, it's very useful to us. About the OCP violation, yes, you're right, any new use-case would make us to modify that class, so it can never be closed but unless that's causing you a real problem, and you can come up with a dynamic solution, I don't see why to trouble, at the end you will need a switch-case to instantiate the right object anyway, all you got is a keyword coming from the outside. If you think I'm wrong, please let me know and we can talk about it further :)

BTW, we're actively working on the SOLID tutorial but it's still a bit far from being released. Good stuff takes time to get done :)

Cheers!

Reply
Bruno D. Avatar

I agree with you: While the `switch` could be extracted outside (eg with a voters like pattern) but this would be so more complex that it's clearly not relevant on a tutorial because the logic "out of topic" needs to remain simple, IMHO.

And for the vue.js tutos, while I didn't see it, I'm quite excited as a SF developer to have the opportunity to bootstrap my experience with symfonycasts who is a trusted and deeply related to my SF knowledge.

finally, thanks for those high quality tutorials, they're living, they are interesting, professional without being too serious!

1 Reply

Thanks Bruno D.! We're so happy to know that you find our content valuable :)

Reply
Cat in space

"Houston: no signs of life"
Start the conversation!

This tutorial is built with Symfony 4.3, but will work well on Symfony 4.4 or 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2
    }
}
userVoice