Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

El ciclo de vida de un mensaje y sus sellos

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

Olvídate de los mensajes asíncronos y de los transportes externos y todas esas cosas. Abre ImagePostController. Como recordatorio, cuando envías un mensaje, en realidad envías un objeto Envelope, que es una simple "envoltura" que contiene el mensaje en sí y puede contener también algunos sellos... que añaden información extra.

Si despachas el objeto mensaje directamente, el bus de mensajes crea unEnvelope para ti y pone tu mensaje dentro. La cuestión es que, internamente, Messenger siempre está trabajando con un Envelope. Y cuando llamas a $messageBus->dispatch(), también devuelve un Envelope: el Envelope final después de que Messenger haya hecho todo su trabajo.

Veamos qué aspecto tiene: dump() toda esa línea $messageBus->dispatch(). Ahora, muévete y sube una foto. Una vez hecho esto, busca esa petición en la barra de herramientas de depuración de la web... y abre el perfilador.

... lines 1 - 25
class ImagePostController extends AbstractController
{
... lines 28 - 42
public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus)
{
... lines 45 - 71
dump($messageBus->dispatch($envelope));
... lines 73 - 76
}
... lines 78 - 105
}

El sobre y los sellos después del envío

¡Perfecto! Puedes ver que el Envelope final tiene el objeto mensaje original dentro: AddPonkaToImage. Pero este Envelope tiene ahora más sellos.

¡Hora de repasar rápidamente! Cuando enviamos un mensaje al bus de mensajes, éste pasa por una colección de middleware... y cada middleware puede añadir sellos adicionales al sobre. Si amplías stamps en el volcado, ¡vaya! ¡Ahora hay 5 sellos! Los dos primeros - DelayStamp y AmqpStamp - no son un misterio. Los añadimos manualmente cuando enviamos el mensaje originalmente. El último - SentStamp - es un sello que añade el SendMessageMiddleware. Como hemos configurado este mensaje para que se dirija al transporte async_priority_high, elSendMessageMiddleware envía el mensaje a RabbitMQ y luego añade este SentStamp. Esto es una señal - para cualquiera que se preocupe - nosotros, u otro middleware - de que este mensaje fue de hecho "enviado" a un transporte. En realidad, es gracias a este sello que el siguiente middleware que se ejecuta - HandleMessageMiddleware - sabe que no debe manejar este mensaje en este momento. Ve que SentStamp, se da cuenta de que el mensaje fue enviado a un transporte y, por tanto, no hace nada. Lo manejará más tarde.

BusNameStamp: Cómo el trabajador envía al bus correcto

¿Pero qué pasa con este BusNameStamp? Abramos esa clase. Huh, BusNameStampcontiene literalmente... el nombre del bus al que se despachó el mensaje. Si miras en messenger.yaml, en la parte superior, tenemos tres buses:command.bus, event.bus y query.bus. Vale, pero ¿qué sentido tieneBusNameStamp? Es decir, enviamos el mensaje a través del bus de comandos... entonces, ¿por qué es importante que el mensaje tenga un sello que diga esto?

La respuesta tiene que ver con lo que ocurre cuando un trabajador consume este mensaje. El proceso es así. En primer lugar, el comando messenger:consume -que es el "trabajador"- lee un mensaje de una cola. En segundo lugar, el serializador de ese transporte lo convierte en un objeto Envelope con un objeto de mensaje en su interior, como nuestro objeto LogEmoji. Por último, el trabajador envía ese sobre de vuelta al bus de mensajes. Sí, internamente, ¡algo llama $messageBus->dispatch($envelope)!

Espera... pero si tenemos varios buses de mensajes... ¿cómo sabe el trabajador a qué bus de mensajes debe enviar el Sobre? Pues sí Ese es el propósito de este BusNameStamp. Messenger añade este sello para que, cuando el trabajador reciba el mensaje, pueda utilizarlo para enviarlo al bus correcto.

Ahora mismo, en nuestro serializador, no estamos añadiendo ningún sello al Envelope. Como el sello no existe, el trabajador utiliza el default_bus, que es el command.bus. Así que, en este caso... ¡adivinó correctamente! Este mensaje es un comando.

El sello UniqueIdStamp

El último sello que se añadió fue este UniqueIdStamp. Es algo que hemos creado... y se añade a través de un middleware personalizado: AuditMiddleware. Cada vez que se envía un mensaje, este middleware se asegura de que cada Envelopetenga exactamente un UniqueIdStamp. Entonces, cualquiera puede utilizar la cadena de identificación única de ese sello para seguir este mensaje exacto a lo largo de todo el proceso.

Espera... así que si esto se añade normalmente cuando enviamos originalmente un mensaje... ¿debemos añadir manualmente el sello dentro de nuestro serializador para que el Envelopetenga uno?

Míralo de esta manera: un mensaje normal que se envía desde nuestra aplicación ya tendría este sello en el momento en que se publica en RabbitMQ. Cuando un trabajador lo reciba, estará ahí.

Pero... en este caso, como puedes ver claramente, después de recibir el mensaje externo, no estamos añadiendo ese sello. Entonces, ¿es algo que deberíamos añadir aquí para que esto "actúe" como otros mensajes?

¡Gran pregunta! La respuesta es... ¡no! Comprueba los mensajes del registro: ya puedes ver algunos mensajes con esta cadena 5d7bc. Ese es el identificador único. ¡Nuestro mensaje sí tiene un UniqueIdStamp!

¿Cómo? Recuerda que, después de que nuestro serializador devuelva el Envelope, el trabajador lo envía de vuelta a través del bus. Y así, nuestro AuditMiddleware es llamado, añade ese sello y luego registra algunos mensajes al respecto.

Las grandes conclusiones

Para retroceder un poco, hay dos grandes puntos que quiero destacar. En primer lugar, cuando un mensaje se lee y se gestiona a través de un trabajador, se envía a través del bus de mensajes y se ejecuta todo el middleware normal. En el caso de un mensaje que se envía desde nuestra aplicación y que es manejado por ella, pasará dos veces por el middleware.

El segundo punto importante es que cuando consumes un mensaje que fue puesto allí desde un sistema externo, a ese mensaje le pueden faltar algunos sellos que tendría un mensaje normal. Y, en la mayoría de los casos, ¡eso está bien! El DelayStampy el AmqpStamp son irrelevantes porque ambos indican al transporte cómo enviar el mensaje.

Añadir el BusNameStamp

Pero... el BusNameStamp es uno de los que quizás quieras añadir. Seguro que Messenger utilizó el bus correcto en este caso por accidente, ¡pero podemos ser más explícitos!

Entra en ExternalJsonMessengerSerializer. Cámbialo por$envelope = new Envelope() y, al final, devuelve $envelope. Añade el sello con $envelope = $envelope->with() - así es como se añade un sello -new BusNameStamp().

Entonces... hmm... como nuestro transporte y serializador sólo manejan este único mensaje... y como este único mensaje es un comando, querremos poner el bus de comando aquí. Copia el nombre del bus command.bus y pégalo. Añadiré un comentario que diga que esto es técnicamente sólo necesario si necesitas que el mensaje se envíe a través de un bus no predeterminado.

... lines 1 - 7
use Symfony\Component\Messenger\Stamp\BusNameStamp;
... lines 9 - 10
class ExternalJsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
... lines 15 - 26
$envelope = new Envelope($message, $stamps);
// needed only if you need this to be sent through the non-default bus
$envelope = $envelope->with(new BusNameStamp('command.bus'));
return $envelope;
... lines 33 - 61
}

A continuación, nuestro serializador es genial, pero no hemos codificado de forma muy defensiva. ¿Qué pasaría si el mensaje contuviera un JSON no válido... o le faltara el campo emoji? ¿Fallaría nuestra aplicación con gracia... o explotaría?

Leave a comment!

2
Login or Register to join the conversation
Krzysztof H. Avatar
Krzysztof H. Avatar Krzysztof H. | posted hace 1 año | edited

I'm having a problem with the part of code where you apply code in encode function, particularly in


        $allStamps = [];
        foreach ($envelope->all() as $stamps) {
            $allStamps = array_merge($allStamps, $stamps);
        }

        return [
            'body' => json_encode($message),
            'headers' => [
                // store stamps as a header - to be read in decode()
                'stamps' => serialize($allStamps),
                'type' => $type,
            ],
        ];```


where you array_merge stamps and then try to serialize them. I'm having an error:
<blockquote>[Exception]                                
  Serialization of 'Closure' is not allowed  </blockquote>
and after I tried to create a callback function, the consumer goes into an endless loop where it retries to consume message, but the retryCount is still on 1 (retry_strategy is set to default)

How can I surpass this error and handle the messagges in ExternalJsonMessageSerializer in the encode section?
Reply

Hey Krzysztof,

You hit an interesting problem, PHP does not allow to serialize closure functions out of the box but here is an example of how you can make it work https://www.amitmerchant.co....

I'm not sure what's going on with the retry functionality, perhaps it's a side effect of the other error?

Cheers!

Reply
Cat in space

"Houston: no signs of life"
Start the conversation!

Este tutorial está construido con Symfony 4.3, pero funcionará bien en Symfony 4.4 o 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2
    }
}
userVoice