Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

Retraso en AMQP: intercambio de letra muerta

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

Cuando empezamos a trabajar con AMQP, te dije que entraras en ImagePostController y quitaras el sello DelayStamp. Este sello es una forma de decirle al sistema de transporte que espere al menos 500 milisegundos antes de permitir que un trabajador reciba el mensaje. Vamos a cambiarlo a 10 segundos, es decir, 10000 milisegundos.

... lines 1 - 23
class ImagePostController extends AbstractController
{
... lines 26 - 40
public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus)
{
... lines 43 - 63
$envelope = new Envelope($message, [
new DelayStamp(10000)
]);
... lines 67 - 69
}
... lines 71 - 98
}

Ahora, dirígete a tu terminal y asegúrate de que tu trabajador no se está ejecutando.

Bien, ¡vamos a ver qué pasa! Ahora mismo ambas colas están vacías. Voy a subir 3 fotos... entonces... ¡rápido, rápido, rápido! Ve a mirar las colas. De repente, ¡puf! ha aparecido una nueva cola... con un nombre extraño: delay_messages_high_priority__10000. Y tiene - ¡dun, dun, dun! - tres mensajes en ella.

Vamos a mirar dentro. Es interesante, los mensajes se entregaron aquí, en lugar de la cola normal. Pero luego... ¿desaparecieron? El gráfico muestra cómo los mensajes en esta cola pasaron de 3 a 0. Pero... ¿cómo? ¡Nuestro trabajador ni siquiera está en marcha!

¡Woh! ¡La página acaba de salir en 404! ¡La cola ha desaparecido! ¡Algo está atacando nuestras colas!

Vuelve a la lista de colas. Sí, esa extraña cola de "retraso" ha desaparecido... oh, pero ahora los tres mensajes están de alguna manera en messages_high. ¿Qué demonios ha pasado?

Bueno, en primer lugar, para demostrar que todo el sistema sigue funcionando... independientemente de la locura que acaba de ocurrir... ejecutemos nuestro trabajador y consumamos desde los transportesasync_priority_high y async:

php bin/console messenger:consume -vv async_priority_high async

Los consume y... cuando nos desplazamos, vamos a la página de inicio y refrescamos, ¡sí! se ha añadido Ponka a esas imágenes.

El intercambio de retrasos

Vale, vamos a ver cómo ha funcionado esto. Por un lado, no es importante: si hubiéramos estado ejecutando nuestro trabajador todo el tiempo, habrías visto que esos mensajes se retrasaron, de hecho, 10 segundos. Cómo se retrasan los mensajes en RabbitMQ es una locura... pero si no te importan los detalles, Messenger se encarga de ello por ti.

Pero quiero ver cómo funciona esto... en parte porque será una gran oportunidad para ver cómo funcionan algunas de las características más avanzadas de AMQP.

Haz clic en "Intercambios". ¡Sorpresa! Hay un nuevo intercambio llamado delays. Y en lugar de ser del tipo fanout como nuestros otros dos intercambios, se trata de un intercambio direct. Pronto hablaremos de lo que eso significa.

Pero lo primero que hay que saber es que cuando Messenger ve que un mensaje debe retrasarse, lo envía a este intercambio en lugar de enviarlo al intercambio normal, el "correcto". En este momento, el intercambio delays no tiene enlaces... pero eso cambiará cuando enviemos un mensaje retrasado.

Para poder ver realmente lo que ocurre, aumentemos el retraso a 60 segundos.

... lines 1 - 23
class ImagePostController extends AbstractController
{
... lines 26 - 40
public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus)
{
... lines 43 - 63
$envelope = new Envelope($message, [
new DelayStamp(60000)
]);
... lines 67 - 69
}
... lines 71 - 98
}

Bien, sube 3 fotos más: ahora sabemos que se acaban de enviar al intercambio delays. Y... si actualizas ese intercambio... ¡tiene un nuevo enlace! Esto dice:

Si un mensaje enviado aquí tiene una "clave de enrutamiento" establecida en delay_messages_high_priority__60000, entonces enviaré ese mensaje a una cola llamada delay_messages_high_priority__60000

Una "clave de enrutamiento" es una propiedad extra que puedes establecer en un mensaje que se envía a AMQP. Normalmente, Messenger no establece ninguna clave de enrutamiento, pero cuando un mensaje tiene un retraso, sí lo hace. Y gracias a esta vinculación, esos tres mensajes se envían a la cola delay_messages_high_priority__60000. Así es como funciona un intercambio direct: en lugar de enviar cada mensaje a todas las colas vinculadas, utiliza las reglas de la "clave de vinculación" para averiguar a qué cola -o colas- debe ir un mensaje.

Colas de retraso: x-message-ttl y x-deal-letter-exchange

Haz clic en la cola porque es súper interesante. Tiene unas cuantas propiedades importantes. La primera es un x-message-ttl fijado en 60 segundos. ¿Qué significa esto? Cuando estableces esto en una cola, significa que, después de que un mensaje haya estado en esta cola durante 60 segundos, RabbitMQ debería eliminarlo... lo que parece una locura, ¿verdad? ¿Por qué querríamos que los mensajes sólo vivieran durante 60 segundos... y luego fueran eliminados? Bueno... es por diseño... y funciona junto con esta segunda propiedad importante:x-dead-letter-exchange.

Si una cola tiene esta propiedad, le dice a Rabbit que cuando un mensaje alcanza su TTL de 60 segundos y necesita ser eliminado, no debe ser borrado. En su lugar, debe ser enviado al intercambio messages_high_priority.

Así, Messenger entrega los mensajes al intercambio delays con una clave de enrutamiento que hace que se envíen aquí. Después, tras permanecer 60 segundos, el mensaje se elimina de esta cola y se envía a la central messages_high_priority. ¡Sí, se entrega en el lugar correcto después de 60 segundos!

Y entonces... ¡404! Incluso la propia cola está marcada como "temporal": una vez que no le quedan mensajes, se borra.

Cuando vuelves a ver las colas, los mensajes fueron entregados a la cola messages_high... pero ésta ya está vacía porque nuestro trabajador los consumió.

Así que... sí... ¡vaya! Cada vez que publicamos un mensaje con retardo, Messenger configura todo esto: crea la cola de retardo temporal con la configuración del TTL y del intercambio de letras muertas, añade un enlace al intercambio delays para enrutar a esta cola, y añade la clave de enrutamiento correcta al mensaje para asegurarse de que acaba en esa cola.

Realmente puedes empezar a ver lo ricas que son las funciones de AMQP... aunque no las necesites. La característica más importante que acabamos de ver es el tipo de intercambio direct: un intercambio que se basa en las claves de enrutamiento para averiguar a dónde debe ir cada mensaje.

A continuación, ¿podríamos utilizar intercambios directos para nuestros mensajes no retardados? En lugar de dos intercambios que se "abanican" cada uno a una cola distinta, ¿podríamos crear un solo intercambio que, mediante el uso de claves de enrutamiento, entregue los mensajes correctos a las colas correctas? Totalmente.

Leave a comment!

9
Login or Register to join the conversation
Dang Avatar

Hi guys,
In this tutorial, you use the doctrine to store failures messages. Why you don't store them in RabbitMq with that x-dead-letter-exchange ? Is there any specific reason ? Thanks

Reply

Hey @Dang

Yea, you can store your failed messages in RabbitMq as well but we kept using Doctrine just for simplicity reasons.

Cheers!

Reply

Hi, guys. As always great course, one of your finest.

Can a message be delayed for a whole month? My use case is to schedule a task to check in a month if a notification has been read and if not delete it. Would this AMQP delays would be useful for such a task?

Thanks a lot

Reply

Hey danresmejia!

> Can a message be delayed for a whole month?

Ha! That's a fun question :). Umm... maybe? I think this is not the normal use-case. It might work in theory (you'd need to make sure AMQP is "persistent" so that messages aren't lost on shutdown), but I'm not sure this would be wise. I could *totally* be wrong and maybe this is normal, I just haven't heard of amqp being used with such long delays. Personally, I would probably do this with a custom console command that "checks for all unread messages that are older than 1 month" and deletes them. Then I'd run that on a CRON job as often as you need :).

Cheers!

Reply

Thanks for getting back to me. A cron job is what I do now, it works fine until it doesn't ;) and then debugging is hard. Also everything we do is in a multi tenant context so the command needs to check every notification for every tenant as each tenant lives in a different data base. For me it seems that a message on an async queue is the perfect choice as I'd know that for every created notification a message would be also created to check it validity in a month.

Also is important to point out that on the JMS Job Queue we had such feature, i.e. a message would only be processed on an specific date and time if such thing is needed. In that regard I see that doctrine transport table has a field called 'available_at' it seems to be related to it. What do you thing?

One last idea is to have the handler sending a message to the event bus if the notification is not yet ready to be deleted, not old enough. It could be an alternative if the AMQP implementation doesn't persist delayed messages over reboots or outages.

Cheers!

Reply

Hey danresmejia!

> In that regard I see that doctrine transport table has a field called 'available_at' it seems to be related to it. What do you thing?

Especially with the Doctrine transport, I certainly can't see a problem with super-delayed messages like this (I'm less certain about how appropriate this is with AMQP).

So... I think it would be fine to run with this :).

Cheers and sorry for the slow reply!

Reply
php-programmist Avatar
php-programmist Avatar php-programmist | posted hace 3 años

Hi, thank you for great totorials!
There is wrong command at this page:
php bin/console -vv async_priority_high async
Insted of:
php bin/console messenger:consume -vv async_priority_high async

Reply

Hey php-programmist

Good catch! Thanks!

Reply
Cat in space

"Houston: no signs of life"
Start the conversation!

Este tutorial está construido con Symfony 4.3, pero funcionará bien en Symfony 4.4 o 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2
    }
}
userVoice