Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

Transportes de alta prioridad

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

Los dos mensajes que enviamos al transporte async sonAddPonkaToImage y DeletePhotoFile, que se encargan de eliminar el archivo físico del sistema de archivos. Y... el segundo no es algo que el usuario note o le importe realmente, es sólo una tarea de mantenimiento. Si ocurriera dentro de 5 minutos o dentro de 10 días, al usuario no le importaría.

Esto crea una situación interesante. Nuestro trabajador maneja las cosas según el principio de "primero en entrar, primero en salir": si enviamos 5 mensajes al transporte, el trabajador los manejará en el orden en que los haya recibido. Esto significa que si se borran un montón de imágenes y luego alguien sube una nueva foto... el trabajador procesará todos esos mensajes de borrado antes de añadir finalmente Ponka a la foto. Y eso... no es lo ideal.

La verdad es que los mensajes de AddPonkaToImage deberían tener una prioridad más alta en nuestro sistema que los de DeletePhotoFile: siempre queremos que AddPonkaToImage se gestione antes que cualquier mensaje de DeletePhotoFile... aunque se hayan añadido primero.

Crear el transporte de "alta" prioridad

Entonces... ¿podemos establecer una prioridad en los mensajes? No exactamente. Resulta que en el mundo de las colas, esto se resuelve creando varias colas y dando a cada una de ellas una prioridad. En Symfony Messenger, eso se traduce en múltiples transportes.

Debajo del transporte async, crea un nuevo transporte llamado, qué tal,async_priority_high. Utilicemos el mismo DSN que antes, que en nuestro caso es doctrine. Debajo, añade options, y luego queue_name ajustado a high. El nombre high no es importante - podríamos usar cualquier cosa. La opción queue_name es específica del transporte Doctrine y, en última instancia, controla el valor de una columna de la tabla, que funciona como una categoría y nos permite tener varias "colas" de mensajes dentro de la misma tabla. Y además, para cualquier transporte, puedes configurar estas opciones como parámetros de consulta en el DSN o bajo esta clave options.

framework:
messenger:
... lines 3 - 10
transports:
... lines 12 - 17
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: high
... lines 22 - 30

En este momento tenemos tres colas, todas ellas almacenadas en la misma tabla de la base de datos, pero con diferentes valores de queue_name. Y ahora que tenemos este nuevo transporte, podemos dirigir AddPonkaToImage a async_priority_high.

framework:
messenger:
... lines 3 - 25
routing:
... line 27
'App\Message\AddPonkaToImage': async_priority_high
... lines 29 - 30

Consumir transportes prioritarios

Si nos detenemos ahora... lo único que hemos hecho realmente es posibilitar el envío de estas dos clases de mensajes diferentes a dos colas distintas. Pero no hay nada especial en async_priority_high. Claro, he puesto la palabra "alto" en su nombre, pero no es diferente de async.

La verdadera magia viene del trabajador. Busca tu terminal donde se esté ejecutando el trabajador y pulsa Control+C para detenerlo. Si sólo ejecutas messenger:consume sin ningún argumento y tienes más de un transporte, te pregunta qué transporte quieres consumir:

php bin/console messenger:consume

Es decir, de qué transporte quieres recibir mensajes. Pero en realidad, puedes leer mensajes de varios transportes a la vez y decirle al trabajador cuál debe leer primero. Fíjate en esto: Yo digo async_priority_high, async.

Esto le dice al trabajador: primero pregunta a async_priority_high si tiene algún mensaje. Si no lo tiene, entonces ve a comprobar el transporte async.

Deberíamos ver esto en acción. Actualizaré la página, borraré un montón de imágenes aquí tan rápido como pueda y luego subiré un par de fotos. Comprueba la salida del terminal:

Se maneja DeletePhotoFile y luego... AddPonkaToImage, otro AddPonkaToImage, otro AddPonkaToImage y... ¡sí! Vuelve a gestionar el DeletePhotoFile de menor prioridad.

Así que, al principio -antes de la carga- sí que consumía unos cuantos mensajes de DeletePhotoFile. Pero en cuanto vio un mensaje en ese transporte async_priority_high, los consumió todos hasta que estuvo vacío. Cuando lo estaba, volvía a consumir mensajes de async.

Básicamente, cada vez que el trabajador busca el siguiente mensaje, comprueba primero el transporte de mayor prioridad y sólo pregunta al siguiente transporte -o transportes- si está vacío.

Y... ¡ya está! Crea un nuevo transporte para el número de "niveles" de prioridad que necesites, y luego dile al comando del trabajador en qué orden debe procesarlos. Ah, y en lugar de utilizar esta forma interactiva de hacer las cosas, puedes ejecutar:

php bin/console messenger:consume async_priority_high async

Perfecto. A continuación, vamos a hablar de una opción que podemos utilizar para facilitar el desarrollo mientras usamos colas... porque tener que recordar siempre que hay que ejecutar el comando trabajador mientras se codifica puede ser un dolor.

Leave a comment!

10
Login or Register to join the conversation
Tim-K Avatar

Hello SymfonyCast-Team,

I learnt something about the PRIORITY that I like to share.

In the video it is said:

Basically, each time the worker looks for the next message, it checks the highest priority transport first and only asks the next transport - or transports - if it's empty.

Which would mean, that if a delay is high but the transport priority is high too (=queue not empty for a long time), the lower priority messages would be "stucked".

I guess this is not quite true. The 'messenger_message'-table has a column called 'available_at', which is the moment after its expiration a message shalll be executed. So I observed, that this is giving the priority (even the other transport is not empty). This becomes important in case of delays (either as stamp or in the retry-strategy).

Maybe you can have a look and share your thoughts if I am wrong.

Cheers!

Reply

Hey Tim K.!

Nice to chat with you... especially about interesting complex things like this! And sorry for the slow reply - I had a family situation.

> guess this is not quite true. The 'messenger_message'-table has a column called 'available_at', which is the moment after its expiration a message shalll be executed. So I observed, that this is giving the priority (even the other transport is not empty). This becomes important in case of delays (either as stamp or in the retry-strategy).

You're correct about the purpose of that available_at column. However, it "should" be the case (I designed this part, so this was my intention, though it doesn't mean that I didn't miss some little detail), that this happens:

A) At the start of the loop, the worker FIRST looks in the highest priority transport. Let's suppose that this has a queue_name (in the messenger_message table) of "high". Then, the query should look something like this:

> SELECT * FROM messenger_message WHERE available_at < NOW() AND queue_name = 'high';

B) Only if the previous query returned an empty string would it then go try the next transport... which maybe has aa queue_name set to "normal":

> SELECT * FROM messenger_message WHERE available_at < NOW() AND queue_name = 'normal';

The point is, unless I mucked something up (very possible, but the code looks ok to me), the available_at column alone is not enough for a message to be processed because we always include the "AND queue_name = '...'" part in the query. So once a message is available, it will still be waiting for its specific transport to "ask" for it.

But if you *are* seeing different behavior, let me know! One thing to check (in case you haven't already) is that each transport you have configured has a different queue_name config in messenger.yaml.

Cheers!

Reply
Tim-K Avatar

Hey Ryan,

thank you for the SQL. I guess that make it much more clear. I think both of our comments have to be merged and can be summerized like this:

Basically, each time the worker looks for the next message, it checks the highest priority transport first and only asks the next transport - or transports - if it's empty if no more available messages (see column 'available_at') exist in that transport (=queue name).

Means: In case of a big backlog (= a lot of messages expired the 'available_at'-moment), the messages with the highest transport are treated first.

Do you share the same understanding?

Cheers
Tim

Background (my use-case):
I created a Mailing-process, that sends out 10.000 customized (=individuel) emails. The user-experience shall be quick and easy (just activating the mailing and continue working and not waiting for a long script to run).

On the "upper level" the command divides the emails into 500-email batches (20 messages on upper level with a BIG delay-stamp (20 min. between each batch)). Now each batch is treated (unpacked) and will create a message (lower level) to send out emails (short delay = 1 sec between each email)... This means I have two queues running in parallel, but the available_at will determine, when the batch is unpacked or email is send. So in my case, only in case of the exact same 'available_at'-moment the queue priority becomes important.

Reply

Hey Tim K.!

Yes, I think that's a perfect description. So I think your system should work just fine :).

Cheers!

Reply
Mark N. Avatar

Hi. Firstly, absolutely loving this tutorial, thanks so much for doing them.

I'm wondering what your recommendations are for consuming prioritised transports with multiple consumers? It feels like you should either have separate consumers per transport (with multiple or higher resourced consumers working on the "high priority" transport) or multiple consumers consuming multiple transports. For some reason it feels strange to have each consuming multiple transports (I guess it feels like you're introducing potential race conditions etc) but as far as I can tell this is the only way to actually keep the prioritisation accurate.

Just wondering if there's a recommended approach as I struggled to find one in the docs.

Reply

Hey Mark N.!

This is an excellent question :). The "priority" feature is fairly basic. What I mean is, in an even more robust system, you might build something that monitors the queue sizes and adjusts the number of workers that are reading from each transport, giving more priority to some transports over others if there are a limited number of total resources/workers. That's probably overkill for most situations - but that's kind of the "ideal" system, because you the system would auto-adjust dynamically.

In the absence of something like that, you have 2 choices (as you said):

A) Make each worker read from the highest transport first, then the next, etc (as described here)
B) Make each worker handle just one transport, but make extra workers for higher priority transports (or workers with more resources).

The problem with (B) is that you could find yourself in a situation where all of the high priority messages have been handled, but the there are a bunch of low priority messages. Instead of "helping out" on the low priority messages, the high priority workers sit idle. Or worse, you could have this the other way around where low priority workers sit idle while the high priority workers are overwhelmed.

So no solution will fit all situations (e.g. you might have a situation where certain messages require SO much memory that they can really only be handled by workers with more resources), but that's why we use method (A). There shouldn't be any race conditions, as queueing systems are *really* good at delivering a message exactly one time: no more and no less. In reality, when you have a worker that consumes 2 transports at once, internally, it first checks for messages on the high priority transports. Then, if there are none, it goes to the next one. Once it finishes a message, it goes back to ask the highest priority transport for messages. So, it's not really "reading from multiple transports". It's more that it has an internal list of transports and each time it needs a message, it goes down the list one-by-one until it finds one :).

Let me know if that helps!

Cheers!

1 Reply
Mark N. Avatar

Very helpful thanks, and such a fast response given how early it is over there! You must tell me which brand of coffee you drink.

I was more thinking about messages potentially being handled in the "incorrect" order for our use-case when I mentioned a race condition, rather than racing to handle the same message, so was a bit of a misnomer on my part. In any case, those edge cases can be solved by throwing the correct exceptions to requeue and reject where appropriate so there's no problem at all.

Going to go with option A) as I agree that it makes little sense to have workers sat idle while others are struggling, particularly if you've souped up that worker's resources.

Thanks again

Reply

Hey Mark N.!

> Very helpful thanks, and such a fast response given how early it is over there! You must tell me which brand of coffee you drink

Haha, I have a rare week where grandma & grandpa are watching my son! So what do I do with my time? Get to work early :)

> I was more thinking about messages potentially being handled in the "incorrect" order for our use-case when I mentioned a race condition, rather than racing to handle the same message, so was a bit of a misnomer on my part. In any case, those edge cases can be solved by throwing the correct exceptions to requeue and reject where appropriate so there's no problem at all.

You are 100% correct about this. But the order of messages is never guaranteed with queue systems (with a few exceptions - there are some queueing systems which have modes to "mostly" guarantee this, but in general, "order" is not a property of queues) - so if order is important, you're correct that (no matter how you do the priority stuff) you would need to have some extra logic to handle this. For those messages that will reject and requeue, just make sure they're on a special transport where you set the max_retries to something really huge or implement a custom retry strategy to retry forever. Another option might be to *avoid* dispatching the message that is "dependent on another message". And instead, dispatch that message from inside the handler that it depends on.

Anyways, it sounds like you're on top of things - good luck!

Cheers!

1 Reply
Default user avatar
Default user avatar N.N. | posted hace 3 años | edited

php bin/console messenger:consume async_priotity_high async
Hey, in the very last cli example you have small typo.
async_prio<b>t</b>ity_high --> async_prio<b>r</b>ity_high

Reply

Hey N.N.

Nice catch! Thank you for pointing us, it was fixed in https://github.com/SymfonyC...

Cheers!

1 Reply
Cat in space

"Houston: no signs of life"
Start the conversation!

Este tutorial está construido con Symfony 4.3, pero funcionará bien en Symfony 4.4 o 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2
    }
}
userVoice