Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

High Priority Transports

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

The two messages that we we're sending to the async transport are AddPonkaToImage and DeletePhotoFile, which handles deleting the physical file from the filesystem. And... that second one isn't something the user actually notices or cares about - it's just housekeeping. If it happened 5 minutes from now or 10 days from now, the user wouldn't care.

This creates an interesting situation. Our worker handles things in a first-in-first-out basis: if we send 5 messages to the transport, the worker will handle them in the order in which they were received. This means that if a bunch of images are deleted and then someone uploads a new photo... the worker will process all of those delete messages before finally adding Ponka to the photo. And that... isn't ideal.

The truth is that AddPonkaToImage messages should have a higher priority in our system than DeletePhotoFile: we always want AddPonkaToImage to be handled before any DeletePhotoFile messages... even if they were added first.

Creating the "high" Priority Transport

So... can we set a priority on messages? Not exactly. It turns out that in the queueing world, this is solved by creating multiple queues and giving each of those a priority. In Symfony Messenger, that translates to multiple transports.

Below the async transport, create a new transport called, how about, async_priority_high. Let's use the same DSN as before, which in our case is using doctrine. Below, add options, then queue_name set to high. The name high isn't important - we could use anything. The queue_name option is specific to the Doctrine transport and ultimately controls the value of a column in the table, which operates like a category and allows us to have multiple "queues" of messages inside the same table. And also, for any transport, you can configure these options as query parameters on the DSN or under this options key.

framework:
messenger:
... lines 3 - 10
transports:
... lines 12 - 17
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: high
... lines 22 - 30

At this point we have three queues - which are all stored in the same table in the database, but with different queue_name values. And now that we have this new transport, we can route AddPonkaToImage to async_priority_high.

framework:
messenger:
... lines 3 - 25
routing:
... line 27
'App\Message\AddPonkaToImage': async_priority_high
... lines 29 - 30

Consuming Prioritized Transports

If we stopped now... all we've really done is make it possible to send these two different message classes to two different queues. But there's nothing special about async_priority_high. Sure, I put the word "high" in its name, but it's no different than async.

The real magic comes from the worker. Find your terminal where the worker is running and hit Control+C to stop it. If you just run messenger:consume without any arguments and you have more than one transport, it asks you which transport you want to consume:

php bin/console messenger:consume

Meaning, which transport do you want to receive messages from. But actually, you can read messages from multiple transports at once and tell the worker which should be read first. Check this out: I'll say async_priority_high, async.

This tells the worker: first ask async_priority_high if it has any messages. If it doesn't, then go check the async transport.

We should be able to see this in action. I'll refresh the page, delete a bunch of images here as fast as I can and then upload a couple of photos. Check the terminal output:

It's handles DeletePhotoFile then... AddPonkaToImage, another AddPonkaToImage, another AddPonkaToImage and... yea! It goes back to handling the lower-priority DeletePhotoFile.

So, in the beginning - before we uploaded - it did consume a few DeletePhotoFile messages. But as soon as it saw a message on that async_priority_high transport, it consumed all of those until it was empty. When it was, it then returned to consuming messages from async.

Basically, each time the worker looks for the next message, it checks the highest priority transport first and only asks the next transport - or transports - if it's empty.

And... that's it! Create a new transport for however many different priority "levels" you need, then tell the worker command which order to process them. Oh, and instead of using this interactive way of doing things, you can run:

php bin/console messenger:consume async_priority_high async

Perfect. Next, let's talk about one option we can use to make it easier to develop while using queues... because always needing to remember to run the worker command while coding can be a pain.

Leave a comment!

10
Login or Register to join the conversation
Tim-K Avatar

Hello SymfonyCast-Team,

I learnt something about the PRIORITY that I like to share.

In the video it is said:

Basically, each time the worker looks for the next message, it checks the highest priority transport first and only asks the next transport - or transports - if it's empty.

Which would mean, that if a delay is high but the transport priority is high too (=queue not empty for a long time), the lower priority messages would be "stucked".

I guess this is not quite true. The 'messenger_message'-table has a column called 'available_at', which is the moment after its expiration a message shalll be executed. So I observed, that this is giving the priority (even the other transport is not empty). This becomes important in case of delays (either as stamp or in the retry-strategy).

Maybe you can have a look and share your thoughts if I am wrong.

Cheers!

Reply

Hey Tim K.!

Nice to chat with you... especially about interesting complex things like this! And sorry for the slow reply - I had a family situation.

> guess this is not quite true. The 'messenger_message'-table has a column called 'available_at', which is the moment after its expiration a message shalll be executed. So I observed, that this is giving the priority (even the other transport is not empty). This becomes important in case of delays (either as stamp or in the retry-strategy).

You're correct about the purpose of that available_at column. However, it "should" be the case (I designed this part, so this was my intention, though it doesn't mean that I didn't miss some little detail), that this happens:

A) At the start of the loop, the worker FIRST looks in the highest priority transport. Let's suppose that this has a queue_name (in the messenger_message table) of "high". Then, the query should look something like this:

> SELECT * FROM messenger_message WHERE available_at < NOW() AND queue_name = 'high';

B) Only if the previous query returned an empty string would it then go try the next transport... which maybe has aa queue_name set to "normal":

> SELECT * FROM messenger_message WHERE available_at < NOW() AND queue_name = 'normal';

The point is, unless I mucked something up (very possible, but the code looks ok to me), the available_at column alone is not enough for a message to be processed because we always include the "AND queue_name = '...'" part in the query. So once a message is available, it will still be waiting for its specific transport to "ask" for it.

But if you *are* seeing different behavior, let me know! One thing to check (in case you haven't already) is that each transport you have configured has a different queue_name config in messenger.yaml.

Cheers!

Reply
Tim-K Avatar

Hey Ryan,

thank you for the SQL. I guess that make it much more clear. I think both of our comments have to be merged and can be summerized like this:

Basically, each time the worker looks for the next message, it checks the highest priority transport first and only asks the next transport - or transports - if it's empty if no more available messages (see column 'available_at') exist in that transport (=queue name).

Means: In case of a big backlog (= a lot of messages expired the 'available_at'-moment), the messages with the highest transport are treated first.

Do you share the same understanding?

Cheers
Tim

Background (my use-case):
I created a Mailing-process, that sends out 10.000 customized (=individuel) emails. The user-experience shall be quick and easy (just activating the mailing and continue working and not waiting for a long script to run).

On the "upper level" the command divides the emails into 500-email batches (20 messages on upper level with a BIG delay-stamp (20 min. between each batch)). Now each batch is treated (unpacked) and will create a message (lower level) to send out emails (short delay = 1 sec between each email)... This means I have two queues running in parallel, but the available_at will determine, when the batch is unpacked or email is send. So in my case, only in case of the exact same 'available_at'-moment the queue priority becomes important.

Reply

Hey Tim K.!

Yes, I think that's a perfect description. So I think your system should work just fine :).

Cheers!

Reply
Mark N. Avatar
Mark N. Avatar Mark N. | posted 2 years ago

Hi. Firstly, absolutely loving this tutorial, thanks so much for doing them.

I'm wondering what your recommendations are for consuming prioritised transports with multiple consumers? It feels like you should either have separate consumers per transport (with multiple or higher resourced consumers working on the "high priority" transport) or multiple consumers consuming multiple transports. For some reason it feels strange to have each consuming multiple transports (I guess it feels like you're introducing potential race conditions etc) but as far as I can tell this is the only way to actually keep the prioritisation accurate.

Just wondering if there's a recommended approach as I struggled to find one in the docs.

Reply

Hey Mark N.!

This is an excellent question :). The "priority" feature is fairly basic. What I mean is, in an even more robust system, you might build something that monitors the queue sizes and adjusts the number of workers that are reading from each transport, giving more priority to some transports over others if there are a limited number of total resources/workers. That's probably overkill for most situations - but that's kind of the "ideal" system, because you the system would auto-adjust dynamically.

In the absence of something like that, you have 2 choices (as you said):

A) Make each worker read from the highest transport first, then the next, etc (as described here)
B) Make each worker handle just one transport, but make extra workers for higher priority transports (or workers with more resources).

The problem with (B) is that you could find yourself in a situation where all of the high priority messages have been handled, but the there are a bunch of low priority messages. Instead of "helping out" on the low priority messages, the high priority workers sit idle. Or worse, you could have this the other way around where low priority workers sit idle while the high priority workers are overwhelmed.

So no solution will fit all situations (e.g. you might have a situation where certain messages require SO much memory that they can really only be handled by workers with more resources), but that's why we use method (A). There shouldn't be any race conditions, as queueing systems are *really* good at delivering a message exactly one time: no more and no less. In reality, when you have a worker that consumes 2 transports at once, internally, it first checks for messages on the high priority transports. Then, if there are none, it goes to the next one. Once it finishes a message, it goes back to ask the highest priority transport for messages. So, it's not really "reading from multiple transports". It's more that it has an internal list of transports and each time it needs a message, it goes down the list one-by-one until it finds one :).

Let me know if that helps!

Cheers!

1 Reply
Mark N. Avatar

Very helpful thanks, and such a fast response given how early it is over there! You must tell me which brand of coffee you drink.

I was more thinking about messages potentially being handled in the "incorrect" order for our use-case when I mentioned a race condition, rather than racing to handle the same message, so was a bit of a misnomer on my part. In any case, those edge cases can be solved by throwing the correct exceptions to requeue and reject where appropriate so there's no problem at all.

Going to go with option A) as I agree that it makes little sense to have workers sat idle while others are struggling, particularly if you've souped up that worker's resources.

Thanks again

Reply

Hey Mark N.!

> Very helpful thanks, and such a fast response given how early it is over there! You must tell me which brand of coffee you drink

Haha, I have a rare week where grandma & grandpa are watching my son! So what do I do with my time? Get to work early :)

> I was more thinking about messages potentially being handled in the "incorrect" order for our use-case when I mentioned a race condition, rather than racing to handle the same message, so was a bit of a misnomer on my part. In any case, those edge cases can be solved by throwing the correct exceptions to requeue and reject where appropriate so there's no problem at all.

You are 100% correct about this. But the order of messages is never guaranteed with queue systems (with a few exceptions - there are some queueing systems which have modes to "mostly" guarantee this, but in general, "order" is not a property of queues) - so if order is important, you're correct that (no matter how you do the priority stuff) you would need to have some extra logic to handle this. For those messages that will reject and requeue, just make sure they're on a special transport where you set the max_retries to something really huge or implement a custom retry strategy to retry forever. Another option might be to *avoid* dispatching the message that is "dependent on another message". And instead, dispatch that message from inside the handler that it depends on.

Anyways, it sounds like you're on top of things - good luck!

Cheers!

1 Reply
Default user avatar
Default user avatar N.N. | posted 3 years ago | edited

php bin/console messenger:consume async_priotity_high async
Hey, in the very last cli example you have small typo.
async_prio<b>t</b>ity_high --> async_prio<b>r</b>ity_high

Reply

Hey N.N.

Nice catch! Thank you for pointing us, it was fixed in https://github.com/SymfonyC...

Cheers!

1 Reply
Cat in space

"Houston: no signs of life"
Start the conversation!

This tutorial is built with Symfony 4.3, but will work well on Symfony 4.4 or 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2
    }
}
userVoice