Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

Custom Transport Serializer

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

If an external system sends messages to a queue that we're going to read, those messages will probably be sent as JSON or XML. We added a message formatted as JSON. To read those, we set up a transport called external_messages. But when we consumed that JSON message... it exploded! Why? Because the default serializer for every transport is the PhpSerializer. Basically, it's trying to call unserialize() on our JSON. That's...uh... not gonna work.

Nope, if you're consuming messages that came from an external system, you're going to need a custom serializer for your transport. Creating a custom serializer is... actually a very pleasant experience.

Creating the Custom Serializer Class

Inside of our src/Messenger/ directory... though this class could live anywhere.. let's create a new PHP class called ExternalJsonMessengerSerializer. The only rule is that this needs to implement SerializerInterface. But, careful! There are two SerializerInterface: one is from the Serializer component. We want the other one: the one from the Messenger component. I'll go to the "Code Generate" menu - or Command + N on a Mac - and select "Implement Methods" to add the two that this interface requires: decode() and encode().

... lines 1 - 2
namespace App\Messenger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
// TODO: Implement decode() method.
}
public function encode(Envelope $envelope): array
{
// TODO: Implement encode() method.
}
}

The encode() Method

The idea is beautifully simple: when we send a message through a transport that uses this serializer, the transport will call the encode() method and pass us the Envelope object that contains the message. Our job is to turn that into a string format that can be sent to the transport. Oh, well, notice that this returns an array. But if you look at the SerializerInterface, this method should return an array with two keys: body - the body of the message - and headers - any headers that should be sent.

Nice, right? But... we're actually never going to send any messages through our external transport... so we don't need this method. To prove that it will never be called, throw a new Exception with:

Transport & serializer not meant for sending messages

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
... lines 12 - 22
public function encode(Envelope $envelope): array
{
throw new \Exception('Transport & serializer not meant for sending messages');
}
}

That'll give me a gentle reminder in case I do something silly and route a message to a transport that uses this serializer by accident.

Tip

Actually, if you want your messages to be redelivered, you do need to implement the encode() method. See the code-block on this page for an example, which includes a small update to decode().

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 14 - 19
// in case of redelivery, unserialize any stamps
$stamps = [];
if (isset($headers['stamps'])) {
$stamps = unserialize($headers['stamps']);
}
return new Envelope($message, $stamps);
}
public function encode(Envelope $envelope): array
{
// this is called if a message is redelivered for "retry"
$message = $envelope->getMessage();
// expand this logic later if you handle more than
// just one message class
if ($message instanceof LogEmoji) {
// recreate what the data originally looked like
$data = ['emoji' => $message->getEmojiIndex()];
} else {
throw new \Exception('Unsupported message class');
}
$allStamps = [];
foreach ($envelope->all() as $stamps) {
$allStamps = array_merge($allStamps, $stamps);
}
return [
'body' => json_encode($data),
'headers' => [
// store stamps as a header - to be read in decode()
'stamps' => serialize($allStamps)
],
];
}
}

The decode() Method

The method that we need to focus on is decode(). When a worker consumes a message from a transport, the transport calls decode() on its serializer. Our job is to read the message from the queue and turn that into an Envelope object with the message object inside. If you check out the SerializerInterface one more time, you'll see that the argument we're passed - $encodedEnvelope - is really just an array with the same two keys we saw a moment ago: body and headers.

Let's separate the pieces first: $body = $encodedEnvelope['body'] and $headers = $encodedEnvelope['headers']. The $body will be the raw JSON in the message. We'll talk about the headers later: it's empty right now.

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
... lines 16 - 20
}
... lines 22 - 26
}

Turning JSON into the Envelope

Ok, remember our goal here: to turn this JSON into a LogEmoji object and then put that into an Envelope object. How? Let's keep it simple! Start with $data = json_decode($body, true) to turn the JSON into an associative array.

... lines 1 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
$data = json_decode($body, true);
... lines 18 - 20
}
... lines 22 - 26
}

I'm not doing any error-checking yet... like to check that this is valid JSON - we'll do that a bit later. Now say: $message = new LogEmoji($data['emoji']) because emoji is the key in the JSON that we've decided will hold the $emojiIndex.

... lines 1 - 4
use App\Message\Command\LogEmoji;
... lines 6 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
$data = json_decode($body, true);
$message = new LogEmoji($data['emoji']);
... lines 19 - 20
}
... lines 22 - 26
}

Finally, we need to return an Envelope object. Remember: an Envelope is just a small wrapper around the message itself... and it might also hold some stamps. At the bottom, return new Envelope() and put $message inside.

... lines 1 - 4
use App\Message\Command\LogEmoji;
use Symfony\Component\Messenger\Envelope;
... lines 7 - 9
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$body = $encodedEnvelope['body'];
$headers = $encodedEnvelope['headers'];
$data = json_decode($body, true);
$message = new LogEmoji($data['emoji']);
// in case of redelivery, unserialize any stamps
$stamps = [];
if (isset($headers['stamps'])) {
$stamps = unserialize($headers['stamps']);
}
return new Envelope($message, $stamps);
}
... lines 28 - 55
}

Configuring the Serializer on the Transport

Done! We rock! This is already a fully functional serializer that can read messages from a queue. But our transport won't just start "magically" using it: we need to configure that. And.. we already know how! We learned earlier that each transport can have a serializer option. Below the external transport, add serializer and set this to the id of our service, which is the same as the class name: App\Messenger\... and then I'll go copy the class name: ExternalJsonMessengerSerializer.

framework:
messenger:
... lines 3 - 19
transports:
... lines 21 - 50
external_messages:
... line 52
serializer: App\Messenger\ExternalJsonMessageSerializer
... lines 54 - 69

This is why we created a separate transport with a separate queue: we only want the external messages to use our ExternalJsonMessengerSerializer. The other two transports - async and async_priority_high - will still use the simpler PhpSerializer... which is perfect.

Ok, let's try this! First, find an open terminal and tail the logs:

tail -f var/log/dev.log

And I'll clear the screen. Then, in my other terminal, I'll consume messages from the external_messages transport:

php bin/console messenger:consume -vv external_messages

Perfect! There are no messages yet... so it's just waiting. But we're hoping that when we publish this message to the queue, it will be consumed by our worker, decoded correctly, and that an emoji will be logged! Ah, ok - let's try it. Publish! Oh, then move back over to the terminal.... there it is! We got an important message: cheese: it received the message and handled it down here.

So... we did it! We rock!

But... when we created the Envelope, we didn't put any stamps into it. Should we have? Does a message that goes through the "normal" flow have some stamps on it that we should manually add here? Let's dive into the workflow of a message and its stamps, next.

Leave a comment!

21
Login or Register to join the conversation
Emag-L Avatar
Emag-L Avatar Emag-L | posted 5 months ago | edited

Hello,
When I try to serialize the stamps('stamps' => serialize($allStamps),) the following error occurs:

[Exception] Serialization of 'Closure' is not allowed

To fix this issue you need to add in the encode function before you get all the stamps the following line:

$envelope = $envelope->withoutStampsOfType(NonSendableStampInterface::class);

Reply

Hey @Emag-L!

Good tip - the core PhpSerializer does that - https://github.com/symfony/symfony/blob/6.3/src/Symfony/Component/Messenger/Transport/Serialization/PhpSerializer.php#L59

The actual error might be due to Symfony doing something with exceptions that it shouldn't - by complete chance, this issue just came up in a core team chat today :).

Cheers!

Reply
Geoffrey M. Avatar
Geoffrey M. Avatar Geoffrey M. | posted 2 years ago

Hello !
I am tryin to do the same thing but with SQS, and I have a problem with the stamps:
HTTP 400 returned for "https://sqs.eu-central-1.am...".

Code: InvalidParameterValue
Message: Message (user) attribute 'stamps' value contains invalid binary ch
aracter '#x0'. You must use only the following supported characters: #x9 |
#xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF].
Type: Sender
Detail:

I am using the SQS adapter from messenger with SF 5.1
Does it work differently with AWS SQS ?.

Thank you :)

Reply
Petru Avatar

for anyone that has the same issue,

a general good solution is to configure symfony/messenger to use Symfony\Component\Messenger\Transport\Serialization\Serializer instead of Symfony\Component\Messenger\Transport\Serialization\PhpSerializer.

if any specific needs are in place, e.g. for external message support, you might adapt that code in your project to do what you have to do. for example, i recommend do a data type mapping between "type" header and the class you want to use (this is to prevent the need to provide internal fqcn to external party).

it would be nice to have better support for external messages directly in symfony, for example, instead of using fully-qualified class name, you could do a config map in messenger.yaml. until then, do this way, and you'll sleep well at night.

1 Reply

Hey Geoffrey,

Yes, it might be different with different transports. For example, IIRC, Doctrine transport does not allow you to work with binary files because of limitation of MySQL that does not allow storing binary files in a simple text column. That's a known limitation, and if you need store binary files - you would probably need to go with a different transport. Hm, not sure about AWS SQS unfortunately, probably you have the same problem as the Doctrine transport, or it might be something different. Probably the first step would be to figure out what chars exactly are invalid, and if you can change them to valid chars - that would be an easy fix. Otherwise, probably you need to go with a different transport unfortunately. Or sometimes, it might be just a misconfiguration.

Cheers!

Reply
Geoffrey M. Avatar
Geoffrey M. Avatar Geoffrey M. | Victor | posted 2 years ago

Hi Victor,
The problem was about storing the envelopes in the header of a message to SQS. I added it to the body, and now it works.
Thank you !

Reply

Hey Geoffrey,

Glad you got it working! And thank you for sharing your solution with us! It might be helpful for others

Cheers!

Reply
Janet Avatar
Janet Avatar Janet | posted 3 years ago | edited

Hi there

I'm new to Symfony.

I've been asked to create a service to send emails with Mailer using an async transport (I'm using AMQP atm). I'm supposed to use the Symfony serialiser because even though initially we'll consume the messages ourselves, it's possible that in the future we create them for external consumption.

I'm finding lots of problems while trying to achieve this. I'm now creating a new transport to consume messages from the same queue we're putting them up to. I have also created a new External Json serializer. I'm now thinking I'm going to need to extend SendEmailMessage and overwrite the send method so I can use this for the code that actually sends the messages.

So far I'm not sure how I'm going to convert the data into a Mailer object.

Anyway, I just wanted to know if you have any tips for achieving this.

BTW, thanks for your tutorial. You make it easy to understand!

Regards

Reply

Hey Janet !

Welcome to Symfony! And thanks for the nice note! I might have a few thoughts that could help :).

1) If you plan on possibly creating messages for external consumption in the future, I wouldn't necessarily, immediately use the Json serializer instead of the normal one. It's totally up to you - but I might just get it working first, and tackle using JSON in the future if you ever have that case :).

2) But, if you DO want to serialize to JSON right now, I think you will not need a custom serializer class or multiple transports. You should be able to just have one transport (this transport would be used for both sending and receiving the messages). Then, for the serializer option under that transport, set it to messenger.transport.symfony_serializer. That is the service id of a built-in serializer for the Messenger component. It's quite smart - it uses the serializer component go serialize to JSON and also stores all the class names of the message (and stamps) as headers on the message. Then, when the message is received, it decodes the JSON back into the original objects. The only thing that you need to worry about is having "serializer-friendly" classes - e.g. classes where you have "getter" and "setter" methods (or public properties) so that the serializer can turn the properties into JSON and then re-set the JSON keys back onto the object later.

Let me know if that helps! I could be way off - but it seems like you shouldn't need to do too much work to get this going. In this chapter, we created a custom serializer class because if we are truly receiving messages from an external system, then there is no alternative than for us to write custom code that maps each JSON message into a PHP object by hand. But if the JSON message is actually sent by Messenger, then the built-in messenger.transport.symfony_serializer is awesome because it adds the extra metadata (as headers) so that the message can be deserialized on "receive" without any custom code.

Cheers!

Reply
Janet Avatar

Hi @weaverryan

Thanks for your prompt and helpful reply!

I had actually tried what you're suggesting but I keep getting these two errors when I'm running the consumer:

In Serializer.php line 85:

[Symfony\Component\Messenger\Exception\MessageDecodingFailedException]
Could not decode message: Cannot create an instance of Symfony\Component\Mime\RawMessage from serialized data because its constructor requires parameter "message" to be present..

Exception trace:
at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Transport/Serialization/Serializer.php:85
Symfony\Component\Messenger\Transport\Serialization\Serializer->decode() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Transport/AmqpExt/AmqpReceiver.php:68
Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver->getEnvelope() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Transport/AmqpExt/AmqpReceiver.php:47
Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver->get() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Worker.php:98
Symfony\Component\Messenger\Worker->run() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Worker/StopWhenRestartSignalIsReceived.php:54
Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived->run() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php:230
Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/console/Command/Command.php:255
Symfony\Component\Console\Command\Command->run() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/console/Application.php:953
Symfony\Component\Console\Application->doRunCommand() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/framework-bundle/Console/Application.php:87
Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/console/Application.php:273
Symfony\Component\Console\Application->doRun() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/framework-bundle/Console/Application.php:73
Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/console/Application.php:149
Symfony\Component\Console\Application->run() at /usr/local/share/symfony-repos/spark-media-symfony/bin/console:42


In AbstractNormalizer.php line 505:

[Symfony\Component\Serializer\Exception\MissingConstructorArgumentsException]
Cannot create an instance of Symfony\Component\Mime\RawMessage from serialized data because its constructor requires parameter "message" to be present.

Exception trace:
at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/serializer/Normalizer/AbstractNormalizer.php:505
Symfony\Component\Serializer\Normalizer\AbstractNormalizer->instantiateObject() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/serializer/Normalizer/AbstractObjectNormalizer.php:231
Symfony\Component\Serializer\Normalizer\AbstractObjectNormalizer->instantiateObject() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/serializer/Normalizer/AbstractObjectNormalizer.php:329
Symfony\Component\Serializer\Normalizer\AbstractObjectNormalizer->denormalize() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/serializer/Serializer.php:191
Symfony\Component\Serializer\Serializer->denormalize() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/serializer/Normalizer/AbstractObjectNormalizer.php:428
Symfony\Component\Serializer\Normalizer\AbstractObjectNormalizer->validateAndDenormalize() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/serializer/Normalizer/AbstractObjectNormalizer.php:463
Symfony\Component\Serializer\Normalizer\AbstractObjectNormalizer->denormalizeParameter() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/serializer/Normalizer/AbstractNormalizer.php:496
Symfony\Component\Serializer\Normalizer\AbstractNormalizer->instantiateObject() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/serializer/Normalizer/AbstractObjectNormalizer.php:231
Symfony\Component\Serializer\Normalizer\AbstractObjectNormalizer->instantiateObject() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/serializer/Normalizer/AbstractObjectNormalizer.php:329
Symfony\Component\Serializer\Normalizer\AbstractObjectNormalizer->denormalize() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/serializer/Serializer.php:191
Symfony\Component\Serializer\Serializer->denormalize() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/serializer/Serializer.php:142
Symfony\Component\Serializer\Serializer->deserialize() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Transport/Serialization/Serializer.php:83
Symfony\Component\Messenger\Transport\Serialization\Serializer->decode() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Transport/AmqpExt/AmqpReceiver.php:68
Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver->getEnvelope() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Transport/AmqpExt/AmqpReceiver.php:47
Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver->get() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Worker.php:98
Symfony\Component\Messenger\Worker->run() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Worker/StopWhenRestartSignalIsReceived.php:54
Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived->run() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php:230
Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/console/Command/Command.php:255
Symfony\Component\Console\Command\Command->run() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/console/Application.php:953
Symfony\Component\Console\Application->doRunCommand() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/framework-bundle/Console/Application.php:87
Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/console/Application.php:273
Symfony\Component\Console\Application->doRun() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/framework-bundle/Console/Application.php:73
Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /usr/local/share/symfony-repos/spark-media-symfony/vendor/symfony/console/Application.php:149
Symfony\Component\Console\Application->run() at /usr/local/share/symfony-repos/spark-media-symfony/bin/console:42

Could this be due to not having the serializer-friendly class(es) you suggested?
I've created a mailer service class, is that where the getter and setter methods should go?

I'd appreciate any more suggestions you have to help.

Thanks very much :0)

Reply

Hey Janet!

Oh boy - it looks like you hit a bug - https://github.com/symfony/symfony/issues/33394

This is why the native, Symfony serializer is a pain for Messenger: the classes need to be constructed just right in order for it to work. In this case, because the RawMessage class doesn't have a getMessage() method on it, the "message" key isn't serialized to JSON and so it's missing during deserialization (I could be wrong about the details, but I'm 95% sure that's what's going on).

There are some workarounds on that issue... but basically, there aren't many easy work arounds. My recommendation might be to create your own EmailMessage class that contains all the data you want on your email. Configure this class to be handled async. Then create a EmailMessageHandler. THIS class would use the EmailMessage to create the real Email object and deliver it via Mailer. You would then not route Mailer messages to be sync - in other words do NOT do this - https://symfony.com/doc/current/mailer.html#sending-messages-async - you would now want mailer messages to be sent synchronously... which is fine - because you created your own message that is being handled async.

Let me know if this helps!

Cheers!

Reply
Janet Avatar
Janet Avatar Janet | weaverryan | posted 3 years ago | edited

Thanks weaverryan
That's an interesting idea worth trying. I'll let you know how it goes.
:0)

Reply
Rainer-S Avatar
Rainer-S Avatar Rainer-S | posted 3 years ago | edited

Hi, there is only one part of the Symfony Messenger I do not understand.

At the beginning of the tutorial we have gotten the following error
<br /> Could not decode message using PHP serialization: { <br /> "emoji": 2 <br /> }. <br />

Now my question is, why is the failed message not stored in the failed (doctrine queue_name=failed) transport for later handling?

In the messenger.yaml the following is configured

`
framework:

messenger:
    # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
    <b>failure_transport: failed</b>
    transports:

        external_messages:
            dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
            options:
                auto_setup: false # it doesn´t create this queue
                queues:
                    messages_from_external: ~

        <b>failed: 'doctrine://default?queue_name=failed'</b>`
Reply

Hey Rainer S.

Looks like this Github issue is related to your situation https://github.com/symfony/...
sadly they haven't fixed the problem yet

Cheers!

Reply

Hi,

When I try command php bin/console messenger:consume -vv external_messages, I have this error message :

[Symfony\Component\Debug\Exception\FatalThrowableError]
Argument 1 passed to App\Message\Command\LogEmoji::__construct() must be of the type integer, null given, called in /home/stephane/Hubic/www/
knpuniversity/messenger/src/Messenger/ExternalJsonMessageSerializer.php on line 19

You have any idea of the problem.
Thank

Reply

I find the solution : add ? before int and clean my array of emojis.

Reply

Hey Stephane,

I'm glad you were able to fix the problem by yourself! And thank you for sharing your solution with others. Though, the fix might depend. If it's ok to have no emoji index - then allowing null is something you would need here. But if every LogEmoji message should contain the index, i.e. should be an integer number - then you should search for the alternative fix. Most probably a LogEmoji message without Emoji index was saved to the DB somehow and now can't be unserialized and processed. So, the fix depends on your needs and your implementation

Cheers!

Reply
VERMON Avatar
VERMON Avatar VERMON | posted 3 years ago | edited

Hi,
Just a question about the custom serializer. I try to play a little with and if we just implement the decode part, and the handler fail, then, Messenger want to send the message again in the same queue with a RetryStamp and a DelayStamp.

In this example and in the way I've done it, in the client application we only have a Command Message, this one is created from the custom Serializer when we receibed the EventMessage from an other application. But when it fail, we need to re-encode it in an array to re-treat him.

If we do it simply, this create a kind of infinity loop (Delay and Retry Stamp are not added), what is the best way to do ?

  1. In the Serializer->encode(), I add some headers for the Stamps ? (By using the Symfony\Component\Messenger\Transport\Serialization\SerializerInterface)
  2. Try to reqeue the Command object in an other queue in place of the Event ?
  3. I try to implement the same EventMessage classes in twice, but one that sending without handlers, and the second with the Handler. I use the Symfony serializer to recreate the same message. It works well for Retry "1, then #2 but after the consumer finish with that error:

`
In Serializer.php line 123:

[Symfony\Component\Messenger\Exception\MessageDecodingFailedException]
Could not decode stamp: The type of the "headers" attribute for class "Symfony\Component\Debug\Exception\FlattenException" must be one of "array" ("NULL" given)..
`

Thanks a lot :)

Reply

Hey VERMON!

Wow, awesome question. Honestly, even though I wrote the retry logic... I totally did *not* think about the retry stuff here. You're right that, on a high level, there are a few options here. First, for others, if we receive a message from an external transport while consuming from some transport called "from_external", on failure, Messenger will try to *send* to that same transport.

So, the most natural solution is (1). Basically, make sure your "from_external" transport is configured so that when it *sends*, it will end up in the same queue that it's receiving from (so, you may need to setup some binding keys). Then, as you mentioned, you would need to put the RedeliveryStamp stuff into the message somehow (e.g. headers)... and make sure that you also read it in decode(). That's indeed a tricky flow - I just asked another contributor on Messenger to verify if I'm missing something. I'll update you, assuming I hear from him (everyone is busy!).

Anyways, let me know what you think - and if you get the flow working. I may need to update the tutorial with some notes related to this. Btw, another (not-as-cool) solution is to set your retry limit to 1 and leverage a failure transport. But then you need to manually retry (via the retry commands) after just one failure... which is kinda lame.

Cheers!

Reply
VERMON Avatar
VERMON Avatar VERMON | weaverryan | posted 3 years ago | edited

Hi @wweaverryan !

Thanks a lot for your reply, finaly I've done the Solution 1 by addind encode and decode of Stamps in the custom serializer. That avoid the unexpected bug of the third solution.

I've do that: https://gist.github.com/mpi... (I can post it here if needed, but a bit long)
In my Serializer I've added 2 functions: encodeStamps and decodeStamps (thath just take care of DelayStamp and RedeliveryStamp, with a lite encoding).

Thanks again :)

Reply

Hey VERMON!

Thanks for posting and verifying that this fixed things :). I talked to a co-maintainer on Messenger and what he does is the same basic idea, but with one simple change. To encode the stamps, he uses PHP's serialize() function and then puts it on a header - I think just a single header (e.g. X-Stamps). Then, in decode, he uses unserialize() on that header. And that makes sense: this header would ONLY exist if the message were being redelivered from Messenger. And so, using native PHP serialization for this one field (and JSON for the actual message body) makes a lot of sense.

Cheers!

Reply
Cat in space

"Houston: no signs of life"
Start the conversation!

This tutorial is built with Symfony 4.3, but will work well on Symfony 4.4 or 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2
    }
}
userVoice