If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.
With a Subscription, click any sentence in the script to jump to that part of the video!
Login SubscribeNuestro middleware es llamado en dos situaciones diferentes. En primer lugar, se llama cuando se envía inicialmente el mensaje. Por ejemplo, enImagePostController
, en el momento en que llamamos a $messageBus->dispatch()
, se llama a todo el middleware -independientemente de si el mensaje se gestionará de forma asíncrona o no. Y en segundo lugar, cuando el trabajador - bin/console messenger:consume
- recibe un mensaje del transporte, lo devuelve al bus y se llama de nuevo al middleware.
Esto es lo más complicado del middleware: intentar averiguar en qué situación se encuentra actualmente. Afortunadamente, Messenger añade "sellos" al Envelope
a lo largo del camino, y éstos nos dicen exactamente lo que está pasando.
Por ejemplo, cuando se recibe un mensaje de un transporte, Messenger añade unReceivedStamp
. Así, si $envelope->last(ReceivedStamp::class)
, entonces este mensaje está siendo procesado por el trabajador y acaba de ser recibido desde un transporte.
... lines 1 - 8 | |
use Symfony\Component\Messenger\Stamp\ReceivedStamp; | |
class AuditMiddleware implements MiddlewareInterface | |
{ | |
... lines 13 - 19 | |
public function handle(Envelope $envelope, StackInterface $stack): Envelope | |
{ | |
... lines 22 - 32 | |
if ($envelope->last(ReceivedStamp::class)) { | |
... line 34 | |
} else { | |
... line 36 | |
} | |
... lines 38 - 39 | |
} | |
} |
Vamos a registrarlo: $this->logger->info()
con una sintaxis especial:
[{id}] Recibido y gestionado {clase}
A continuación, pasa $context
como segundo argumento. La matriz $context
es genial por dos razones. En primer lugar, cada gestor de registro lo recibe y puede hacer lo que quiera con él; normalmente el $context
se imprime al final del mensaje de registro. Y en segundo lugar, si utilizas estos pequeños comodines de {}
, ¡los valores del contexto se rellenarán automáticamente!
... lines 1 - 8 | |
use Symfony\Component\Messenger\Stamp\ReceivedStamp; | |
class AuditMiddleware implements MiddlewareInterface | |
{ | |
... lines 13 - 19 | |
public function handle(Envelope $envelope, StackInterface $stack): Envelope | |
{ | |
... lines 22 - 32 | |
if ($envelope->last(ReceivedStamp::class)) { | |
$this->logger->info('[{id}] Received & handling {class}', $context); | |
} else { | |
... line 36 | |
} | |
... lines 38 - 39 | |
} | |
} |
Si el mensaje no se acaba de recibir, di $this->logger->info()
y empieza de la misma manera:
[{id}] Manejo o envío de {clase}
... lines 1 - 8 | |
use Symfony\Component\Messenger\Stamp\ReceivedStamp; | |
class AuditMiddleware implements MiddlewareInterface | |
{ | |
... lines 13 - 19 | |
public function handle(Envelope $envelope, StackInterface $stack): Envelope | |
{ | |
... lines 22 - 32 | |
if ($envelope->last(ReceivedStamp::class)) { | |
$this->logger->info('[{id}] Received & handling {class}', $context); | |
} else { | |
$this->logger->info('[{id}] Handling or sending {class}', $context); | |
} | |
... lines 38 - 39 | |
} | |
} |
En este punto, sabemos que el mensaje se acaba de enviar... pero no sabemos si se va a manejar en este momento o se va a enviar a un transporte. Lo mejoraremos en unos minutos.
Pero antes, ¡probemos! Pon en marcha el trabajador y dile que lea del transporte async
:
php bin/console messenger:consume -vv async
Ah, ¡creo que tenemos algunos mensajes de antes todavía en la cola! Cuando termine, despejemos la pantalla. Abramos también otra pestaña y creemos el nuevo archivo de registro - messenger.log
- si no está ya ahí:
touch var/log/messenger.log
Luego, colócalo en la cola para que podamos ver los mensajes:
tail -f var/log/messenger.log
¡Oh, qué bien! Esto ya tiene unas cuantas líneas de los antiguos mensajes que acaba de procesar. Borremos eso para tener pantallas frescas que mirar.
¡Hora de probar! Muévete y sube una nueva foto. Vuelve a tu terminal y... ¡sí! Los dos mensajes de registro ya están ahí: "Manipulando o enviando" y luego "Recibido y manipulando" cuando se recibió el mensaje del transporte... que fue casi instantáneo. Sabemos que estas entradas de registro son para el mismo mensaje gracias al identificador único que aparece al principio.
Pero... podemos hacer algo mejor que decir "manipulación o envío". ¿Cómo? Esta línea$stack->next()->handle()
se encarga de llamar al siguiente middleware... que a su vez llamará al siguiente middleware y así sucesivamente. Como nuestro código de registro está por encima de esto, significa que nuestro código está siendo potencialmente llamado antes de que otros middleware hagan su trabajo. De hecho, nuestro código se está ejecutando antes que el middleware principal que es responsable de manejar o enviar el mensaje.
Entonces... ¿cómo podemos determinar si el mensaje será enviado o manejado inmediatamente... antes de que el mensaje sea realmente enviado o manejado inmediatamente? No podemos!
Compruébalo: elimina el return
y en su lugar di$envelope = $stack->next()->handle()
. Luego, mueve esa línea por encima de nuestro código y, al final, return $envelope
.
... lines 1 - 11 | |
class AuditMiddleware implements MiddlewareInterface | |
{ | |
... lines 14 - 20 | |
public function handle(Envelope $envelope, StackInterface $stack): Envelope | |
{ | |
... lines 23 - 35 | |
$envelope = $stack->next()->handle($envelope, $stack); | |
if ($envelope->last(ReceivedStamp::class)) { | |
$this->logger->info('[{id}] Received & handling {class}', $context); | |
} else { | |
$this->logger->info('[{id}] Handling or sending {class}', $context); | |
} | |
return $envelope; | |
} | |
} |
Si no hiciéramos nada más... el resultado sería prácticamente el mismo: registraríamos exactamente los mismos mensajes... pero técnicamente, las entradas de registro se producirían después de que el mensaje se enviara o gestionara en lugar de antes.
Pero! fíjate en que cuando llamamos a $stack->next()->handle()
para ejecutar el resto del middleware, obtenemos de vuelta un $envelope
... ¡que puede contener nuevos sellos! De hecho, si el mensaje fue enviado a un transporte en lugar de ser manejado inmediatamente, se marcará con un SentStamp
.
Si añadimos elseif
$envelope->last(SentStamp::class)
, sabremos que ese mensaje fue enviado, no gestionado. Utiliza $this->logger->info()
con nuestro truco {id}
y sent {class}
.
... lines 1 - 9 | |
use Symfony\Component\Messenger\Stamp\SentStamp; | |
class AuditMiddleware implements MiddlewareInterface | |
{ | |
... lines 14 - 20 | |
public function handle(Envelope $envelope, StackInterface $stack): Envelope | |
{ | |
... lines 23 - 37 | |
if ($envelope->last(ReceivedStamp::class)) { | |
... line 39 | |
} elseif ($envelope->last(SentStamp::class)) { | |
$this->logger->info('[{id}] Sent {class}', $context); | |
} else { | |
... line 43 | |
} | |
... lines 45 - 46 | |
} | |
} |
A continuación, ahora sabemos que definitivamente estamos "Manejando la sincronización". El mensaje superior "Recibido y manipulado" sigue siendo cierto, pero lo cambiaré para que sólo diga "Recibido": un mensaje siempre se manipula cuando se recibe, así que eso era redundante.
... lines 1 - 9 | |
use Symfony\Component\Messenger\Stamp\SentStamp; | |
class AuditMiddleware implements MiddlewareInterface | |
{ | |
... lines 14 - 20 | |
public function handle(Envelope $envelope, StackInterface $stack): Envelope | |
{ | |
... lines 23 - 37 | |
if ($envelope->last(ReceivedStamp::class)) { | |
$this->logger->info('[{id}] Received {class}', $context); | |
} elseif ($envelope->last(SentStamp::class)) { | |
$this->logger->info('[{id}] Sent {class}', $context); | |
} else { | |
$this->logger->info('[{id}] Handling sync {class}', $context); | |
} | |
... lines 45 - 46 | |
} | |
} |
De acuerdo Vamos a borrar nuestra pantalla de registro y a reiniciar el trabajador:
php bin/console messenger:consume -vv async
Sube una foto... luego pasa... y ve al archivo de registro. ¡Sí! ¡Enviada, y luego Recibida! Si hubiéramos subido 5 fotos, podríamos utilizar el id único para identificar cada mensaje individualmente.
Pulsa intro varias veces: Quiero ver un ejemplo aún más genial. ¡Elimina una foto y vuelve a pasar por encima! ¡Recuerda que esto envía dos mensajes! La parte del id. único hace aún más evidente lo que está ocurriendo: DeletePhotoFile
se envió al transporte, luego DeleteImagePost
se gestionó de forma sincrónica... y después se recibió y procesóDeletePhotoFile
.
En realidad, lo que ocurrió fue lo siguiente: DeleteImagePost
se manejó de forma sincrónica e, internamente, despachó DeletePhotoFile
que se envió al transporte. Los dos primeros mensajes están un poco desordenados porque nuestro código de registro se ejecuta siempre después de que se ejecute el resto de la cadena, así que después de que se manejaraDeleteImagePost
. Podríamos mejorar esto moviendo la lógica de registro deHandling Sync
por encima del código que llama al resto del middleware. Sí, este material es súper potente... pero puede ser un poco complejo de navegar. Este asunto del registro es probablemente lo más confuso que puede haber.
A continuación: el trabajador gestiona cada mensaje en el orden en que lo ha recibido. Pero... eso no es lo ideal: es mucho más importante que todos los mensajes de AddPonkaToImage
se gestionen antes que cualquier mensaje de DeletePhotoFile
. Hagamos eso con transportes prioritarios.
Hey ahmedbhs
If you have a message hanging in the failed queue, then, you can use Messenger's commands to investigate more about it. There are these 3 commands you can play with and decide what to do with such a message
messenger:failed:remove
messenger:failed:retry
messenger:failed:show
Cheers!
Hi
if i have routing like this:
`
routing:
'App\Events\SomeEvent': [one, two]
`
Is it possible somehow, (depend on some logic in code) to send message only to "<b>two</b>" transport?
I'm using AWS SQS.
Hey kwolniak
There is a way to do that, a bit confusing but possible. Here you can learn how https://symfonycasts.com/sc...
Cheers!
Thanks, I know this "from_transport" thing, but it's not what I need.
When I declare routing like above, SomeEvent message is sent automaticaly to one and two transport. And the same message appear in my case on one and two queue and in most cases it's OK.
But sometimes I want change it on the runtime in the code (for example somehow in middleware or before dispatch) and sent it only to one transport so message will appear only in one queue.
I have found that it should be possible with ampq and routing key but is it possible when I'm using enqueue/sqs?
Hey kwolniak!
This is currently a shortcoming with Messenger - and probably the one that bothers me the most! Routing is super static - there is no way to hook in at runtime to modify this. I think there should be, but it's not there yet. So, you have 2 options:
1) Do fanciness with routing key and amqp (or something similar in sqs... if there is such a mechanism). This is basically you going outside of the system to get what you want ;).
2) Create 2 different classes: one that routes to both transports and another that routes to only one.
SUPER not ideal, but that's the state of things. Hmm, or you could do something totally nuts and "decorate" (with Symfony service decoration) the messenger.senders_locator
service (which is this class https://github.com/symfony/symfony/blob/master/src/Symfony/Component/Messenger/Transport/Sender/SendersLocator.php) and do some custom logic there. That is the "I'm taking full control" hammer solution to the problem - but because there isn't such a nice solution yet, I won't be disappointed if you do it ;).
Cheers!
Thank you very much for the comprehensive answer!
For now I decided on two separate events and two separate transports, and before dispatch I do some logic.
Thanks anyway :)
// composer.json
{
"require": {
"php": "^7.1.3",
"ext-ctype": "*",
"ext-iconv": "*",
"composer/package-versions-deprecated": "^1.11", // 1.11.99
"doctrine/annotations": "^1.0", // v1.8.0
"doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
"doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
"doctrine/orm": "^2.5.11", // v2.6.3
"intervention/image": "^2.4", // 2.4.2
"league/flysystem-bundle": "^1.0", // 1.1.0
"phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
"sensio/framework-extra-bundle": "^5.3", // v5.3.1
"symfony/console": "4.3.*", // v4.3.2
"symfony/dotenv": "4.3.*", // v4.3.2
"symfony/flex": "^1.9", // v1.18.7
"symfony/framework-bundle": "4.3.*", // v4.3.2
"symfony/messenger": "4.3.*", // v4.3.4
"symfony/property-access": "4.3.*", // v4.3.2
"symfony/property-info": "4.3.*", // v4.3.2
"symfony/serializer": "4.3.*", // v4.3.2
"symfony/validator": "4.3.*", // v4.3.2
"symfony/webpack-encore-bundle": "^1.5", // v1.6.2
"symfony/yaml": "4.3.*" // v4.3.2
},
"require-dev": {
"easycorp/easy-log-handler": "^1.0.7", // v1.0.7
"symfony/debug-bundle": "4.3.*", // v4.3.2
"symfony/maker-bundle": "^1.0", // v1.12.0
"symfony/monolog-bundle": "^3.0", // v3.4.0
"symfony/stopwatch": "4.3.*", // v4.3.2
"symfony/twig-bundle": "4.3.*", // v4.3.2
"symfony/var-dumper": "4.3.*", // v4.3.2
"symfony/web-profiler-bundle": "4.3.*" // v4.3.2
}
}
Does there a way to use that unique id to remove a failed message from failure transport for example ?