If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeRepitamos la configuración del nuevo intercambio para el transporte async_priority_high
: queremos que éste entregue al mismo intercambio directo, pero que utilice una clave de enrutamiento diferente para dirigir los mensajes a una cola distinta.
Cambia el intercambio a messages
, establece el tipo a direct
, y luego utilizadefault_publish_routing_key
para adjuntar automáticamente una clave de enrutamiento llamada high
a cada mensaje.
A continuación, para la cola messages_high
, esto le dice a Messenger que queremos que esta cola se cree y se vincule al intercambio. Eso está bien, pero ahora necesitamos que esa vinculación tenga una clave de enrutamiento. Establece binding_keys
como [high]
.
framework: | |
messenger: | |
... lines 3 - 19 | |
transports: | |
... lines 21 - 37 | |
async_priority_high: | |
... line 39 | |
options: | |
exchange: | |
name: messages | |
type: direct | |
default_publish_routing_key: high | |
queues: | |
messages_high: | |
binding_keys: [high] | |
... lines 48 - 56 |
¿Cómo podemos hacer que Messenger cree esa nueva cola y añada el nuevo enlace? Simplemente realiza cualquier operación que utilice este transporte... ¡como subir una foto! Bien, ve a comprobar el gestor de RabbitMQ: empieza por Intercambios.
Sí, todavía tenemos un solo intercambio messages
... ¡pero ahora tiene dos enlaces! Si envías un mensaje a este intercambio con una clave de enrutamiento high
, se enviará a message_high
.
Haz clic en "Colas" para ver... bien: una nueva cola messages_high
con un mensaje esperando dentro.
Y... ¡hemos terminado! Esta nueva configuración tiene el mismo resultado final: cada transporte entrega finalmente los mensajes a una cola diferente. Vamos a consumir los mensajes en espera: consume async_priority_high
y luego async
.
php bin/console messenger:consume -vv async_priority_high async
Y los consume en el orden correcto: gestionando primero AddPonkaToImage
porque está en la cola de alta prioridad y pasando después a los mensajes de la otra cola.
Por cierto, cuando consumimos desde el transporte async
, por ejemplo, entre bastidores, significa que Messenger está leyendo mensajes de cualquier cola que esté configurada para ese transporte. En nuestra aplicación, cada transporte tiene configurada sólo una cola, pero podrías configurar varias colas bajo un transporte e incluso establecer diferentes claves de enlace para cada una. Pero cuando consumas ese transporte, estarás consumiendo mensajes de todas las colas que hayas configurado.
Volvamos atrás y veamos todo el flujo. Cuando enviamos un objeto AddPonkaToImage
, nuestra configuración de enrutamiento de Messenger siempre lo dirige al transporte async_priority_high
. Esto hace que el mensaje se envíe al intercambio messages
con una clave de enrutamiento establecida en high
... y la lógica de enlace significa que finalmente se entregará a la cola messages_high
.
Debido a la forma en que funciona el enrutamiento de Messenger -el hecho de enrutar una clase a un transporte- cada clase de mensaje se entregará siempre a la misma cola. ¿Pero qué pasaría si quisieras controlar esto dinámicamente? ¿Y si, en el momento de enviar un mensaje, necesitaras enviar ese mensaje a un transporte diferente al normal? Tal vez decidas que ese mensaje concreto deAddPonkaToImage
no es importante y que debe ser enviado a async
.
Bueno... eso no es posible con Messenger: cada clase se enruta siempre a un transporte específico. Pero este resultado final es posible... si sabes cómo aprovechar las claves de enrutamiento.
Este es el truco: ¿qué pasaría si pudiéramos publicar un objeto AddPonkaToImage
... pero decirle a Messenger que cuando lo envíe al intercambio, utilice la clave de enrutamiento normal
en lugar de high
? Sí, el mensaje seguiría siendo técnicamente enrutado al transporte async_priority_high
... pero en última instancia acabaría en la cola demessages_normal
. ¡Eso sería todo!
¿Es posible? ¡Totalmente! Abre ImagePostController
y busca dónde enviamos el mensaje. Después de DelayStamp
, añade un nuevo AmqpStamp
- pero ten cuidado de no elegir AmqpReceivedStamp
- eso es algo diferente... y no nos sirve. Este sello acepta unos cuantos argumentos y el primero -¡juego! - es la clave de enrutamiento a utilizar! Pasa este normal
.
... lines 1 - 18 | |
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp; | |
... lines 20 - 24 | |
class ImagePostController extends AbstractController | |
{ | |
... lines 27 - 41 | |
public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus) | |
{ | |
... lines 44 - 64 | |
$envelope = new Envelope($message, [ | |
new DelayStamp(1000), | |
... lines 67 - 68 | |
new AmqpStamp('normal') | |
]); | |
... lines 71 - 73 | |
} | |
... lines 75 - 102 | |
} |
¡Vamos a probarlo! Detén el trabajador para que podamos ver lo que ocurre internamente. Luego, sube una foto, ve al gestor de RabbitMQ, haz clic en colas... actualiza hasta que veas el mensaje en la cola correcta... tenemos que esperar el retraso... ¡y ahí está! Acabó en messages_normal
.
Por cierto, si miras dentro de esta clase AmqpStamp
, el segundo y tercer argumento son para algo llamado $flags
y $attributes
. Estos son un poco más avanzados, pero pueden resultar útiles. Pulsa Shift+Shift para abrir un archivo llamado Connection.php
- asegúrate de abrir el que está en el directorio AmqpExt
. Ahora busca un método llamado publishOnExchange()
.
Cuando se envía un mensaje a RabbitMQ, éste es el método de bajo nivel que realmente hace ese envío. Aquí se utilizan los $flags
y $attributes
del sello Se pasan como tercer y cuarto argumento a algún método de $exchange->publish()
. Mantén pulsado Cmd o Ctrl y haz clic para saltar a ese método.
Esto nos hace saltar a un "stub" -un método y una declaración "falsos"... porque esta clase -llamada AMQPExchange
no es algo que vayas a encontrar en tu directorio vendor/
. No, esta clase proviene de la extensión AMQP PHP que hemos instalado antes.
Así que, si encuentras que realmente necesitas controlar algo sobre cómo se publica un mensaje a través de esta extensión, puedes hacerlo con las clases $flags
y$attributes
. Los documentos de arriba hacen un buen trabajo mostrándote las opciones.
Y... ¡eso es todo para AMQP y RabbitMQ! Seguro que hay más cosas que aprender sobre RabbitMQ -es un tema enorme por sí mismo-, pero ahora tienes un firme conocimiento de sus conceptos más importantes y de cómo funcionan. Y, a menos que necesites hacer cosas muy avanzadas, entiendes bastante para trabajar con Messenger.
A continuación, hasta ahora hemos estado enviando mensajes desde nuestra aplicación Symfony y consumiéndolos desde esa misma aplicación. Pero no siempre es así. Uno de los poderes de un "broker de mensajes" como RabbitMQ es la capacidad de enviar mensajes desde un sistema y manejarlos en un sistema totalmente diferente... quizás en un servidor totalmente diferente o escrito en un lenguaje totalmente diferente. ¡Una locura!
Pero si vamos a utilizar Messenger para enviar mensajes a una cola que luego será manejada por una aplicación totalmente diferente... probablemente necesitemos codificar esos mensajes como JSON... en lugar del formato serializado de PHP que estamos utilizando ahora.
Hey @Anton!
Hmm. I do not believe so, the "framework -> messenger -> routing" config is very simple, just routing to which transport each message class should go. If you'd like to centralize this in some way, one option is via an event listener - I think listening to https://github.com/symfony/... would work. In this listener, you could have logic where you add the AmqpStamp based on the class of the message that is being routed. Or, you could make your messages implement some marker interface, and look for that interface from inside your listener.
Anyways, let me know if this is useful or not :).
Cheers!
Hey wtk13!
Sorry for my late reply - I saw how complex your message was and needed to find time to review it. But now I just saw that you seem to have found the answer yourself :). If you still have any questions about it, let me know.
Cheers!
why is it that even if I send a message already routed to an exchange but with a AmqpStamp/binding key linked with another exchange/queue the message will get routed to the exchange that handles that binding key? Even if this is desired outcome would not be considerate to maybe fire a warning saying that message-exchange-binding_key-route combination is overruled?
I want to know the details of what happens behind messenger/ampq-protocol.
Thanks!
Hey Francisc!
Great question :). If you're setting up binding & routing keys like we do in this chapter, you are definitely doing something more advanced and choosing to take more control over the process. For most cases, just "letting Messenger handle it" is the best way. But for some advanced use-cases - and for users that are very familiar with AMQP - this chapter shows how you could take more control.
But let me answer your questions specifically:
why is it that even if I send a message already routed to an exchange but with a AmqpStamp/binding key linked with another exchange/queue the message will get routed to the exchange that handles that binding key?
All of this is a specific to how AMQP works. Apologies if you already know some of this - I just want to give a complete answer.
So first, you are always sending a message to an exchange (not a queue). In this case, both async
and async_priority_high
are configured to send to the same exchange: messages
. AMQP is, sort of, "dumb". When the "messages" exchange receives a message, (because it is a direct exchange) it will look at the "routing key" (e.g. FOO_ROUTING_KEY) of that message and then send it to the 0-to-many queues that have said "Hey! If a FOO_ROUTING_KEY is set on a message, please send to me!". This is called a "binding".
In the Messenger config, each transport that we've configured has one item under "queues". For example:
async:
# ...
queues:
messages_normal:
binding_keys: [normal]
This causes a few things to happen in Messenger:
A) It creates a queue named "messages_normal"
B) It adds a "binding" to the "messages" exchange that says: "If the routing key is 'normal', please send to me".
C) When you "consume": this transport, it will "consume" messages from the "messages_normal" queue.
So ultimately, all of our config will cause 1 exchange to be created and 2 queues to be created, each with one binding key to the one exchange. Phew!
Even if this is desired outcome would not be considerate to maybe fire a warning saying that message-exchange-binding_key-route combination is overruled?
Yes, we could do do that. But at this point, we're doing SUCH custom things with AMQP, that we have to assume you know what you're doing. Heck, it's legal to publish a message with a routing key that corresponds to ZERO "bindings". You might think that this means that the message would get routed to NO queues. And you would be correct. Unless there is some other (3rd party) system that is also creating bindings. That's one of the truly powerful (but complex) things with AMQP: we cannot even assume that our Symfony app is the only thing creating exchanges, queues, publishing keys, etc. So even if the user does something that seems "crazy", they might have a perfectly good reason to do it.
This is all fascinating stuff to learn about it. But until/unless you have a reason to take advanced control over binding & routing keys like this, don't do it ;).
And if I've done a bad job explaining things, please let me know.
Cheers!
Thank you weaverryan for the answer, it help! Thank you also for the symfonycasts tutorials, they are really easy to follow and quite funny.
// composer.json
{
"require": {
"php": "^7.1.3",
"ext-ctype": "*",
"ext-iconv": "*",
"composer/package-versions-deprecated": "^1.11", // 1.11.99
"doctrine/annotations": "^1.0", // v1.8.0
"doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
"doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
"doctrine/orm": "^2.5.11", // v2.6.3
"intervention/image": "^2.4", // 2.4.2
"league/flysystem-bundle": "^1.0", // 1.1.0
"phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
"sensio/framework-extra-bundle": "^5.3", // v5.3.1
"symfony/console": "4.3.*", // v4.3.2
"symfony/dotenv": "4.3.*", // v4.3.2
"symfony/flex": "^1.9", // v1.18.7
"symfony/framework-bundle": "4.3.*", // v4.3.2
"symfony/messenger": "4.3.*", // v4.3.4
"symfony/property-access": "4.3.*", // v4.3.2
"symfony/property-info": "4.3.*", // v4.3.2
"symfony/serializer": "4.3.*", // v4.3.2
"symfony/validator": "4.3.*", // v4.3.2
"symfony/webpack-encore-bundle": "^1.5", // v1.6.2
"symfony/yaml": "4.3.*" // v4.3.2
},
"require-dev": {
"easycorp/easy-log-handler": "^1.0.7", // v1.0.7
"symfony/debug-bundle": "4.3.*", // v4.3.2
"symfony/maker-bundle": "^1.0", // v1.12.0
"symfony/monolog-bundle": "^3.0", // v3.4.0
"symfony/stopwatch": "4.3.*", // v4.3.2
"symfony/twig-bundle": "4.3.*", // v4.3.2
"symfony/var-dumper": "4.3.*", // v4.3.2
"symfony/web-profiler-bundle": "4.3.*" // v4.3.2
}
}
Hi, I'd like to configure a single exchange with multiple queues. I know the way to specify routing_key for the message by adding AmqpStamp directly when publishing a new message, but is there a way to configure it within yml configuration under framework -> messenger -> routing for each message?
Symfony 5.x