Symfony Messenger with SQS and SNS AWS-Services

Let’s checkout how to connect Symfony with Amazon SQS and SNS Services by using a Symfony component.

Github repo for this article:

What is the Symfony Messenger?

It’s a Message-Bus that can be used for the following design patterns:

  • Command-Bus
  • Query-Bus
  • Event-Bus

We are going to use a Command-Bus pattern that seperates the description, what needs to be done, from where it’s going to happen. It’s like getting an order in a restaurant that gets prepared in the kitchen. In our case we will create a TestMessage object, that gets send and consumed by a handler.

Setup the Symfony project:

Create a Symfony CLI skeleton:

docker run — rm -it -v $PWD:/app composer create-project symfony/skeleton symfony_messenger

Install Symfony Messenger component:

docker run --rm -it -v $PWD:/app composer require messenger

To enable the support of AWS SQS install these two packages:

docker run --rm -it -v $PWD:/app composer require sroze/messenger-enqueue-transport enqueue/sqs

Create a SQS Queue in your AWS account:

What is SQS?

  • Fully managed queuing service
  • Asynchronous communication and decouple processes via messages / events from a sender and receiver(producer and consumer)
  • Not reactive you have to pull messages, no real-time
  • Messages get deleted once they got acknowledged
  • Temporary repository for messages that await to be processed

Limitations:

  • Message size can be between 1 byte and 256 KB
  • Default message retention is 4 days, 60 secs minimum and 14 days maximum, before the messages get dropped
  • Standard Queue: Nearly unlimited number of transactions per second, guarantees that a message will be delivered at least once, more than one copy of a message coud be potentially delivered out of order
  • FIFO -> guaranteed order, limit of 300 transactions per second, anything else is the same as on the standard queue
  • Visibility timeout default 30s, can be up to 12 hours, if a message is not processed within the visiblity timeout period, the message could be read by another reader -> message could be delivered twice

Create the Queue:

Sign in to your AWS management console and filter for SQS:

Create a new Queue with the standard setup:

Click the button that says “Quick-Create Queue” and you should see something like this:

Below in the details Tab you will find the corresponding URL:

https://sqs.YOUR_REGION.amazonaws.com/YOUR_USER_ID/QUEUE_NAME

Now we need a user with credentials that we can use in our Symfony application to send and receive messages from the queue. Let’s go to the Identity and Access Management interface(IAM):

Click on User on the left navigation panel. Then set a user name and enable the programmatic access:

To keep things simple we attach the AmazonSQSFullAccess policy:

Configure the Symfony project:

Create a .env.local file with the AWS params of the created user:

AWS_KEY=xxx
AWS_SECRET=xxx
AWS_REGION=eu-central-1
AWS_TOKEN=xxx
AWS_TOPIC_ARN=arn:aws:sns:eu-central-1:YOUR_ID:TOPIC_NAME

The token param might not be necessary in your case. You can always debug the variables that are visible to your Symfony project with this command:

php bin/console debug:container --env-vars

Last configuration steps are to change config/packages/enqueue.yaml:

enqueue:
sqs:
transport: 'sqs:?key=%env(resolve:AWS_KEY)%&secret=%env(resolve:AWS_SECRET)%&token=%env(resolve:AWS_TOKEN)%&region=%env(resolve:AWS_REGION)%'
client: ~

And the config/packages/messenger.yaml:

framework:
messenger:
transports:
sqs: "enqueue://sqs?topic[name]=test&queue[name]=test&receiveTimeout=3"
routing:
'App\Message\TestMessage': sqs

Create a Message Object and a Handler:

Message(I hold the message data):

<?php
namespace App\Message;
class TestMessage
{
private $payload; /**
*
@return mixed
*/
public function getPayload()
{
return $this->payload;
}
/**
*
@param mixed $payload
*/
public function setPayload($payload): void
{
$this->payload = $payload;
}
}

Handler (I consume messages from the queue and dump the payload):

<?phpnamespace App\MessageHandler;use App\Message\TestMessage;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
/**
* Class TestMessageHandler
*
@package App\MessageHandler
*/
class TestMessageHandler implements MessageHandlerInterface
{
public function __invoke(TestMessage $message)
{
dump($message);
}
}

Let’s see if everything fits together:

php bin/console debug:messenger
Output of: php bin/console debug:messenger

As you can see we configured everything properly and it works!

Build a command to send data to the SQS Queue

Install the maker bundle to build a command skeleton class quickly:

docker run --rm -it -v $PWD:/app composer require maker
Output of: php bin/console make:command MessageDispatcher

Command Code to dispatch a message to the Queue:

<?php

namespace App\Command;

use App\Message\TestMessage;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\MessageBusInterface;

/**
* Class MessageDispatcherCommand
*
@package App\Command
*/
class MessageDispatcherCommand extends Command
{
protected static $defaultName = 'app:message:dispatcher';

/**
*
@var MessageBusInterface
*/
private $messageBus;

/**
* MessageDispatcherCommand constructor.
*/
public function __construct(MessageBusInterface $messageBus)
{
$this->messageBus = $messageBus;
$this->setDescription('Dispatch a message to the configured queuing system');
parent::__construct(self::$defaultName);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);

$message = new TestMessage();
$message->setPayload(date('Y-m-d H:i:s') . ' here is my content');
$this->messageBus->dispatch($message);

$io->success('Message dispatched successfully!');

return 0;
}
}

The major step is that we get the MessageBus automatically injected(autowiring) and we need to dispatch a self-made TestMessage and thats it! Really easy!

So lets run the command:

Output of: php bin/console app:message:dispatch

Debug Messages in SQS

In your AWS management console you can debug the received messages pretty simple:

Now you can click on any message available in the Queue and see what has been added:

Run the simple messenger consumer to see what’s in the queue:

Output of: php bin/console messenger:consume

Our Handler gets invoked and dumps the given payload. The messenger:consume command also acknowledges the consumed messages, so they get removed from the queue.

Consume Queue Messages and send notifications with SNS

It’s a pretty common use case that applications need to scale to send mass E-Mails. We can do it by sending all the E-Mail contents to the queue and consume whenever we find the time.

What is SNS?

  • Pub/Sub messaging service -> publishers push events to a SNS topic and subscribers subscribed to a SNS topic get events pushed
  • SNS topic logical access point and communication channel
  • Topics can be encrypted with KMS
  • A Topic automatically formats according to the subscription protocol(E-Mail, SMS, HTTP/S)

Create SNS topic & subscription

First of all we need to setup a SNS Notification in our AWS management console:

Click on create topic, pass a name and you get the necessary ARN that you put into your .env.local file as AWS_TOPIC_ARN=xxxxx.

The last step is to setup a subscription for the topic. To keep things simple just add a E-Mail subscription with your own private E-Mail address.

Select your topic, use the Email protocol and paste your private address

After configuring this step you get a confirmation E-Mail that you need to activate the subscription. Once this is done, the setup is ready and we need to start with our Symfony setup.

SNS Symfony setup

The nice thing is the messenger-enqueue-transport package already ships with the AWS-SDK for php, so just need to add a SNS Service to the DI-Container and we are ready to go (config/services.yml):

parameters:
sns_config: {
region: '%env(AWS_REGION)%',
credentials: {key: '%env(AWS_KEY)%', secret: '%env(AWS_SECRET)%', token: '%env(AWS_TOKEN)%'}
version: '2010-03-31'
}
services:
_defaults:
bind:
string $topicArn: '%env(AWS_TOPIC_ARN)%'
Aws\Sns\SnsClient:
arguments: ['%sns_config%']

We configure a param for the SnsClient Service and we bind the topic environment variable to be used via autowiring in any Service that needs it.

To publish SNS messages we just need to add an other MessageHandler and everything is done:

<?php

namespace App\MessageHandler;

use App\Message\TestMessage;
use Aws\Sns\SnsClient;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class SnsTopicMessageHandler implements MessageHandlerInterface
{

/**
*
@var SnsClient
*/
private $snsClient;

/**
*
@var string
*/
private $topicArn;

/**
* SnsTopicMessageHandler constructor.
*
@param SnsClient $snsClient
*
@param string $topicArn
*/
public function __construct(SnsClient $snsClient, string $topicArn)
{
$this->snsClient = $snsClient;
$this->topicArn = $topicArn;
}


public function __invoke(TestMessage $message)
{
$this->snsClient->publish([
'TopicArn' => $this->topicArn,
'Message' => $message->getPayload(),
'MessageStructure' => 'string'
]);
}
}

The SnsClient and the topicArn get injected automatically, so we just need to configure the params for the publish method and we’re good to go.

First we run again the message dispatcher:

php bin/console app:message:dispatch

Then lets consume and publish a message to your SNS topic that send the email:

php bin/console messenger:consume

Summary

We have created a setup that can send messages to a SQS queue and have also shown the ability to route them to SNS. There are endless possibilities for architectural concepts to design highly scalable systems. Symfony messenger component is really flexible and easy to use. Paired with the power of AWS you can built almost everything that comes to your mind.

Software Architect

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store