Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

Intercambio de prioridades AMQP

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

La idea de nuestros transportes async y async_priority_high es que podamos enviar algunos mensajes a async_priority_high y otros a async, con el objetivo de que esos mensajes acaben en diferentes "cubos"... o, más técnicamente, en diferentes "colas". Entonces podemos ordenar a nuestro trabajador que lea primero todos los mensajes de la cola a la que esté vinculado async_priority_high antes de leer los mensajes de la cola a la que esté vinculado el transporte async.

La opción nombre_cola en Doctrine

Esto ya funcionaba antes con Doctrine, gracias a esta opción queue_name: high. El valor por defecto de esta opción es... default. Como recordatorio, voy a entrar rápidamente en mi base de datos:

mysql -u root messenger_tutorial

Y veré cómo era esa tabla:

DESCRIBE messenger_messages;

Sí, la columna queue_name era la clave para que esto funcionara. Los mensajes que se enviaban a async_priority_high tenían un queue_name fijado en high, y los que se enviaban al transporte async tenían un valor de default. Así que, aunque sólo teníamos una tabla de base de datos, funcionaba como dos colas: cuando consumíamos el transporteasync_priority_high, consultaba todos los mensajesWHERE queue_name="high".

El problema es que esta opción queue_name es específica del transporte Doctrine, y no tiene ningún efecto cuando se utiliza AMQP.

¿Enrutamiento de mensajes a... una cola?

Pero... a alto nivel... nuestro objetivo es el mismo: necesitamos dos colas. Necesitamos el transporteasync_priority_high para enviar mensajes a una cola y el transporte asyncpara enviar mensajes a otra cola.

Pero con AMQP... no envías un mensaje directamente a una cola... lo envías a un intercambio... y luego es responsabilidad del intercambio mirar sus reglas internas y averiguar a qué cola, o colas, debe ir realmente ese mensaje.

Esto significa que para que un mensaje llegue a una cola, tenemos que ajustar las cosas a nivel de intercambio. Y hay dos formas diferentes de hacerlo. En primer lugar, podríamos seguir teniendo un único intercambio y luego añadir algunas reglas internas -llamadas vinculaciones- para enseñar al intercambio que algunos mensajes deben ir a una cola y otros mensajes deben ir a otra cola. Te mostraré cómo hacerlo un poco más adelante.

La segunda opción no es tan genial, pero es un poco más sencilla. Por defecto, cuando Messenger crea un intercambio, lo hace de tipo fanout. Eso significa que cuando se envía un mensaje a este intercambio, se dirige a todas las colas que estén vinculadas a él. Así que si añadimos un segundo enlace a una segunda cola -quizásmessages_high_priority -, cada mensaje que se envíe a este intercambio se dirigirá a ambas colas. ¡Se duplicaría! Eso... no es lo que queremos.

En su lugar, vamos a crear dos intercambios fanout, y cada intercambio dirigirá todos sus mensajes a una cola distinta. Tendremos dos intercambios y dos colas.

Configurar un segundo intercambio

Vamos a configurar esto dentro de messenger.yaml. Debajo de options añade exchangey luego name con, qué tal, messages_high_priority. Debajo de esto, añadequeues con una sola clave: messages_high ajustada a null.

framework:
messenger:
... lines 3 - 19
transports:
... lines 21 - 26
async_priority_high:
... line 28
options:
exchange:
name: messages_high_priority
queues:
messages_high: ~
... lines 34 - 42

Esta configuración tiene tres efectos. En primer lugar, como tenemos activada la función auto_setup, la primera vez que hablemos con RabbitMQ, Messenger creará el intercambiomessages_high_priority, la cola messages_high y los enlazará. El segundo efecto es que cuando enviemos mensajes a este transporte se enviarán al intercambio messages_high_priority. El tercer y último efecto es que cuando consumamos desde este transporte, Messenger leerá los mensajes de la cola messages_high.

Si esto aún no tiene todo el sentido... no te preocupes: vamos a ver esto en acción. En primer lugar, asegúrate de que tu trabajador no está en marcha: el nuestro está parado. Ahora vamos a eliminar tres fotos -una, dos y tres- y a subir cuatro fotos.

¡Genial! ¡Veamos qué ha pasado en RabbitMQ! Dentro del gestor, haz clic en "Intercambios". ¡Bien! ¡Tenemos un nuevo intercambio messages_high_priority! El intercambio originalmessages sigue enviando todos sus mensajes a una cola messages... pero el nuevo intercambio envía todos sus mensajes a una cola llamada messages_high. Eso es gracias a nuestra configuración queues.

Y... ¿qué hay dentro de cada cola? ¡Ve a comprobarlo! Es exactamente lo que queremos: los 3 mensajes borrados están esperando en la cola messages y las 4 fotos recién subidas están en messages_high. ¡Cada transporte está recibiendo con éxito sus mensajes en una cola distinta! Y eso significa que podemos consumirlos de forma independiente.

En la línea de comandos, normalmente le diríamos a Messenger que consuma deasync_priority_high y luego de async para obtener nuestra entrega priorizada. Pero para mostrar claramente lo que ocurre, vamos a consumirlos de forma independiente por ahora. Empieza consumiendo mensajes del transporte async:

php bin/console messenger:consume -vv async

Comienza a procesar los objetos de ImagePostDeletedEvent... y se detiene después de esos tres. ¡Ya ha terminado! La cola está vacía. El comando no ha leído los mensajes de messages_high. Para ello, consume el transporte async_priority_high:

php bin/console messenger:consume -vv async_priority_high

¡Ya está! La forma más sencilla... pero no más elegante... de tener transportes priorizados con AMQP es enviar cada transporte a un intercambio diferente y configurarlo para que se dirija a una cola diferente. Más adelante... veremos la forma más elegante.

Antes de llegar ahí, ¿recuerdas cuando te hice comentar el DelayStamp antes de que empezáramos a usar RabbitMQ? A continuación, te mostraré por qué: volveremos a añadir ese DelayStampy veremos la forma loca en que se "retrasan" los mensajes en RabbitMQ.

Leave a comment!

6
Login or Register to join the conversation
Mehul-J Avatar
Mehul-J Avatar Mehul-J | posted hace 1 año

Hi, I am using first time RabbitMq . In rabbitmq inside query the ready is always 0 . Even after messenger is consumed . And the state is in running. Why is this not changing???? Can anyone help

Reply
Mehul-J Avatar

I got it. before worker is consumed i get ready. after worker consumed it will be zero

Reply

Hey Nagashree,

Glad you were figured it out yourself!

Cheers!

Reply
Dariia M. Avatar
Dariia M. Avatar Dariia M. | posted hace 3 años | edited

Hello! Can you provide the origin source of messenger configuration for amqp queues, please? It's the beginning of "Configuring a Second Exchange" paragraph in this lesson, changing the configuration of messenger.yaml file.
I didn't find this in official documentation neither in results on php bin/console config:dump framework messenger command.

Reply

Hey Dariia M.

Yes it's not fully described in official doncumentation, not in config:dump. I think it is so because it's DSN based configuration but you can find it in source code <a href="https://github.com/symfony/symfony/blob/efbe75291869ab944c62b48de9a0c2257ccace8f/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php#L80&quot;&gt; here </a>

Hope it will help you!

Cheers!

Reply
Dariia M. Avatar
Dariia M. Avatar Dariia M. | sadikoff | posted hace 3 años | edited

Hello sadikoff
Thank you for response, now I know where to find it :)

Reply
Cat in space

"Houston: no signs of life"
Start the conversation!

Este tutorial está construido con Symfony 4.3, pero funcionará bien en Symfony 4.4 o 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2
    }
}
userVoice