Flag of Ukraine
SymfonyCasts stands united with the people of Ukraine

AMQP with RabbitMQ

Keep on Learning!

If you liked what you've learned so far, dive in!
Subscribe to get access to this tutorial plus
video, code and script downloads.

Start your All-Access Pass
Buy just this tutorial for $12.00

With a Subscription, click any sentence in the script to jump to that part of the video!

Login Subscribe

Open up your .env file and check out the MESSENGER_TRANSPORT_DSN setting. We've been using the doctrine transport type. The doctrine://default string says that messages should be stored using Doctrine's default connection. In config/packages/messenger.yaml, we're referencing this environment variable for both the async and async_priority_high transports.

So... yep! We've been storing messages in a database table. It was quick to set up, easy to use - because we already understand databases - and robust enough for most use-cases.

Hello AMQP... RabbitMQ

But the industry standard "queueing system" or "message broker" is not a database table, it's something called AMQP, or "Advanced Message Queuing Protocol". AMQP is... not itself a technology... it's a "standard" for how a, so-called, "message broker system" should work. Then, different queuing systems can "implement" this standard. Honestly, usually when someone talks about AMQP, they're talking about one specific tool: RabbitMQ.

Here's the idea: in the same way that you launch a "database server" and make queries to it, you can launch a "Rabbit MQ instance" then send messages to it and receive messages from it. On a high level... it doesn't work much differently than our simple database table: you put messages in... then ask for them later.

So... what are the advantages of using RabbitMQ instead of Doctrine? Maybe... nothing! What I mean is, if you just use the standard Messenger features and never dig deeper, both will work just fine. But if you have a highly-scaled system or want to use some advanced, RabbitMQ-specific features, well... then... RabbitMQ is the answer!

What are those more advanced features? Well, stick with me over the next few chapters and you'll start to uncover them.

Launching an Instance via CloudAMQP.com

The easiest way to spin up a RabbitMQ instance is via cloudamqp.com: an awesome service for cloud-based RabbitMQ... with a free tier so we can play around! After logging in, create a new instance, give it a name, select any region... yep we do want the free tier and... "Create instance".

AMQP Transport Configuration

Cool! Click into the new instance to find... a beautiful AMQP connection string! Copy that, go find our .env file... and paste over doctrine://default. You can also put this into a .env.local file... which is what I would normally do so I can avoid committing these credentials.

Tip

The URL that you copied will now start with amqps:// (with an "s"!). That is "secure" AMQP. Change it to amqp:// to get things working. Support for SSL was introduced in Symfony 5.2, but requires extra configuration.

Anyways, the amqp:// part activates the AMQP transport in Symfony... and the rest of this contains a username, password and other connection details. As soon as we make this change, both our async and async_priority_high transports... are now using RabbitMQ! That was easy!

Oh, but notice that I am still using doctrine for my failure transport... and I'm going to keep that. The failure transport is a special type of transport... and it turns out that the doctrine transport type actually has the most features for reviewing failed messages. You can use AMQP for this, but I recommend Doctrine.

Before we try this, I want to make one other change. Open up src/Controller/ImagePostController.php and find the create() method. This is the controller that's executed whenever we upload a photo... and it's responsible for dispatching the AddPonkaToImage command. It also adds a 500 millisecond delay via this stamp. Comment that out for now... I'll show you why we're doing this a bit later.

... lines 1 - 23
class ImagePostController extends AbstractController
{
... lines 26 - 40
public function create(Request $request, ValidatorInterface $validator, PhotoFileManager $photoManager, EntityManagerInterface $entityManager, MessageBusInterface $messageBus)
{
... lines 43 - 63
$envelope = new Envelope($message, [
//new DelayStamp(500)
]);
... lines 67 - 69
}
... lines 71 - 98
}

The AMQP PHP Extension

Ok! Other than removing that delay, all we've done is swap our transport config from Doctrine to AMQP. Let's... see if things still work! First, make sure your worker is not running... to begin with. Then, find your browser, select a photo and... it worked! Well, hold on... because you may have gotten a big AJAX error. If you did, open the profiler for that request. I'm pretty sure I know what error you'll see:

Attempted to load class "AMQPConnection" from the global namespace. Did you forget a "use" statement?

Why... no we did not! Under the hood, Symfony's AMQP transport type uses a PHP extension called... well... amqp! It's an add-on to PHP - like xdebug or pdo_mysql - that you'll probably need to install.

The pain with PHP extensions is that installing them can vary based on your system. For Ubuntu, you may be able to run

sudo apt-get install php-amqp

Or you might use pecl, like I did with my Homebrew Mac install:

pecl install amqp

Once you do manage to get it installed, make sure to restart the Symfony web server so that it sees the change. If you're having issues getting this configured, let us know in the comments and we'll do our best to help!

When it is all configured, you should be able upload a photo with no errors. And... because this had no errors... it... probably just got sent to RabbitMQ? When I refresh, it says "Ponka is napping"... because nothing has consumed our message yet. Well, let's see what happens. Find your terminal and consume messages from both of our transports:

php bin/console messenger:consume -vv async_priority_high async

And... there it is! It received the message, handled it... and it's done! When we refresh the page... there's Ponka! It worked! Switching from Doctrine to RabbitMQ was as simple as changing our connection string.

Next, let's dig deeper into what just happened behind the scenes: what does it mean to "send" a message to RabbitMQ or "get" a message from it? Oh, and you're going to love the RabbitMQ debugging tools.

Leave a comment!

21
Login or Register to join the conversation
Default user avatar

While i am creting the instance for Rabbitmq on https://api.cloudamqp.com/
It was generating the connection URL like "amqps://siyjukin:8pBsS8JxGr..." host instead of "amqp://siyjukin:8pBsS8JxGr9..."

Because of this i was geting the below error

No transport supports the given Messenger DSN "amqps://vdfhbywr:qfdJGzTGWS...".

1 Reply

Hey Balu,

Unfortunately your comment got spam filter, I just approved it. So, after you changed it from "amqps" to "amqp" - everything works fine now?

Cheers!

Reply
Stefan T. Avatar

Yep,this is solution you should note it.

Reply

Hello! How to xdebug AMQP message handler? Breakpoint not working, however if i use doctrine message handler (i previously comment in messenger.yaml line with message) xdebug work perfect P.s. I use PHPStorm with Xdebug

Reply

Hey @Mepcuk ,

I think you can use dump() debug function inside the message handle, run the Messenger worker with symfony console messenger:consume, do something to trigger that message creation and wait when the Messenger handles it, then open the console where you you're running the messenger:consume to see the dump in the console :) Thanks to the VarDump component the dump will be well-formatted in the console output :)

Another way to debug it - temporarily add MESSENGER_TRANSPORT_DSN=sync:// to your .env.local (make sure no more MESSENGER_TRANSPORT_DSN declarations there that will overwrite the var). Thanks to this message the Messenger will work sync instead of async, i.e. it will run the message handler in the same request and so you will see the dump on the Symfony's WDT. Or use dd() to stop the execution and see the dump right on the page :)

I hope this helps!

Cheers!

Reply
Tomasz-I Avatar
Tomasz-I Avatar Tomasz-I | posted 7 months ago

Hello!
I was wondering if you have a good solution for production and development apps?
I have RabbitMQ on a server and supervisor process for handling Worker for the app from main domain (production). On the same server, on different domain a have a development version of the app. In order to user RabbitMQ there I should add another process for Supervisord with another worker so that it consumes messages from Development app. But actually these are the same messages, sent to the same RabbitMQ instance. So what should I do? Have different names for transport for both Production and Development apps? Any ideas?

Reply

Hey Tomasz,

I believe the easiest thing to do is to create another account for dev and use those credentials locally. Otherwise, you'll need to configure Messenger to send your messages to a different RabbitMQ exchange.

I hope it helps. Cheers!

Reply
Daniel W. Avatar
Daniel W. Avatar Daniel W. | posted 2 years ago

Hi, when my consumer consumes the message from rabbitMQ I get an exception:

[Symfony\Component\Messenger\Exception\MessageDecodingFailedException]
Could not decode message using PHP serialization: ��-.

Is it because the message was created and dispatched by hand in rabbitmq UI and not with symfony messenger?

Reply

Hey Daniel W.!

Woh! That's very interesting! When you dispatched the message by hand, what format did you use for the data? Was it JSON? Did you actually try to serialize a PHP object and "paste" it in?

If you used JSON, then the general answer of how to make Messenger understand custom created message bodies is a a bit later in the tutorial: https://symfonycasts.com/sc...

If you pasted a serialized object by hand, the answer is probably that you should first base64_encode the PHP class before sticking it into the body. PHP serialization creates binary characters... which don't play well in some transports. That's why we base64_encode when we encode a message - https://github.com/symfony/... - and base64_decode when we read it back in (well, technically, the decoding is optional): https://github.com/symfony/...

Let me know if that helps!

Cheers!

1 Reply
Daniel W. Avatar

Ahh I was too unpatient experimenting again. Yea I need a custom serializer for handling external messages.
I kinda expected it I just thought that if no serializer is defined the messenger would handle the content as pain text.
Thanks a lot.

Reply
Nourheine Avatar
Nourheine Avatar Nourheine | posted 2 years ago

hello , i used amqp cloud , create an instance and add its url in my messenger transport dsn, i got this error:
No transport supports the given DSN:***,
is there extra econfiguartion to add in my app ?
thanks in advance

Reply

Hey!

Are you in Symfony 5.1? The Ampq transport was move to its own library, read more info here https://symfony.com/doc/current/messenger.html#amqp-transport
if thats the case just install composer require symfony/amqp-messenger

Cheers

Reply
Default user avatar

While i am creting the instance for Rabbitmq on https://api.cloudamqp.com/
It was generating the connection URL like "amqps://siyjukin:8pBsS8JxGr..." host instead of "amqp://siyjukin:8pBsS8JxGr9..."

Because of this i was geting the below error

No transport supports the given Messenger DSN "amqps://vdfhbywr:qfdJGzTGWS...".

Reply
sadikoff Avatar sadikoff | SFCASTS | Balu | posted 2 years ago | edited

Hey Balu

Sorry for late reply, your comment was somehow missed in multiple comments =( So have you tried to remove this letter "s" from url? and use it?

Cheers!

Reply

My error was You cannot use the "Symfony\Component\Messenger\Transport\AmqpExt\Connection" as the "amqp" extension is not installed.

I'm using Docker with php:7.3.8-fpm-alpine3.10.

I added to my Dockerfile something equvalent to:

<br />RUN apk add rabbitmq-c rabbitmq-c-dev<br />RUN pecl install amqp<br />RUN docker-php-ext-enable amqp<br />RUN apk del --purge rabbitmq-c-dev<br />

Sould you have the same problem.

Cheers

Reply
Default user avatar
Default user avatar Henry Vallenilla | julien_bonnier | posted 2 years ago | edited

Hi julien_bonnier I am having the same issue, did you solve it? Thx

Reply

That was a while ago, but I'm pretty sure I was posting my solution and not just asking for help. Are you using Docker? If so, did you try to add the pecl package and enable the extension as mentioned in my previous post?

Reply

Hey julien_bonnier!

Hmm, I'm not sure! Here's what I would check. Open any page in your site, wait for the web debug toolbar to open, hover over the Symfony version on the bottom right, and then click View phpinfo(). This will show you the phpinfo() details. Is AMQP here? My guess is that it is not here... and so there's somethiung wrong with your Docker setup. It could be that your web browser is using a different php container (sometimes people have 2 containers for php - one for the cli and one for php-fpm) or something else.

Cheers!

Reply
Default user avatar
Default user avatar Henry Vallenilla | weaverryan | posted 2 years ago

Hi, I am having the same issue, did you solve it? Thanx

Reply

Hey Henry Vallenilla!

What operating system are you on and how do you have PHP installed? Are you using Docker? Installing PHP extensions is different for every setup, unfortunately :p.

Cheers!

Reply
Cat in space

"Houston: no signs of life"
Start the conversation!

This tutorial is built with Symfony 4.3, but will work well on Symfony 4.4 or 5.

What PHP libraries does this tutorial use?

// composer.json
{
    "require": {
        "php": "^7.1.3",
        "ext-ctype": "*",
        "ext-iconv": "*",
        "composer/package-versions-deprecated": "^1.11", // 1.11.99
        "doctrine/annotations": "^1.0", // v1.8.0
        "doctrine/doctrine-bundle": "^1.6.10", // 1.11.2
        "doctrine/doctrine-migrations-bundle": "^1.3|^2.0", // v2.0.0
        "doctrine/orm": "^2.5.11", // v2.6.3
        "intervention/image": "^2.4", // 2.4.2
        "league/flysystem-bundle": "^1.0", // 1.1.0
        "phpdocumentor/reflection-docblock": "^3.0|^4.0", // 4.3.1
        "sensio/framework-extra-bundle": "^5.3", // v5.3.1
        "symfony/console": "4.3.*", // v4.3.2
        "symfony/dotenv": "4.3.*", // v4.3.2
        "symfony/flex": "^1.9", // v1.18.7
        "symfony/framework-bundle": "4.3.*", // v4.3.2
        "symfony/messenger": "4.3.*", // v4.3.4
        "symfony/property-access": "4.3.*", // v4.3.2
        "symfony/property-info": "4.3.*", // v4.3.2
        "symfony/serializer": "4.3.*", // v4.3.2
        "symfony/validator": "4.3.*", // v4.3.2
        "symfony/webpack-encore-bundle": "^1.5", // v1.6.2
        "symfony/yaml": "4.3.*" // v4.3.2
    },
    "require-dev": {
        "easycorp/easy-log-handler": "^1.0.7", // v1.0.7
        "symfony/debug-bundle": "4.3.*", // v4.3.2
        "symfony/maker-bundle": "^1.0", // v1.12.0
        "symfony/monolog-bundle": "^3.0", // v3.4.0
        "symfony/stopwatch": "4.3.*", // v4.3.2
        "symfony/twig-bundle": "4.3.*", // v4.3.2
        "symfony/var-dumper": "4.3.*", // v4.3.2
        "symfony/web-profiler-bundle": "4.3.*" // v4.3.2
    }
}
userVoice