# Amazon SQS

MassTransit.AmazonSQS (opens new window)

MassTransit combines Amazon SQS (Simple Queue Service) with SNS (Simple Notification Service) to provide both send and publish support.

Configuring a receive endpoint will use the message topology to create and subscribe SNS topics to SQS queues so that published messages will be delivered to the receive endpoint queue.

# Minimal Example

In the example below, the Amazon SQS settings are configured.

namespace AmazonSqsConsoleListener;

using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.Hosting;

public class Program
{
    public static async Task Main(string[] args)
    {
        await Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddMassTransit(x =>
                {
                    x.UsingAmazonSqs((context, cfg) =>
                    {
                        cfg.Host("us-east-2", h =>
                        {
                            h.AccessKey("your-iam-access-key");
                            h.SecretKey("your-iam-secret-key");
                        });
                    });
                });
            })
            .Build()
            .RunAsync();
    }
}

# Broker Topology

With SQS/SNS, which supports topics and queues, messages are sent or published to SNS Topics and then routes those messages through subscriptions to the appropriate SQS Queues.

When the bus is started, MassTransit will create SNS Topics and SQS Queues for the receive endpoint.

# Configuration

The configuration includes:

  • The Amazon SQS host
    • Region name: us-east-2
    • Access key and secret key used to access the resources

# Additional Examples

Any topic can be subscribed to a receive endpoint, as shown below. The topic attributes can also be configured, in case the topic needs to be created.

namespace AmazonSqsReceiveEndpoint;

using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.Hosting;

public class Program
{
    public static async Task Main(string[] args)
    {
        await Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddMassTransit(x =>
                {
                    x.UsingAmazonSqs((context, cfg) =>
                    {
                        cfg.Host("us-east-2", h =>
                        {
                            h.AccessKey("your-iam-access-key");
                            h.SecretKey("your-iam-secret-key");
                        });

                        cfg.ReceiveEndpoint("input-queue", e =>
                        {
                            // disable the default topic binding
                            e.ConfigureConsumeTopology = false;

                            e.Subscribe("event-topic", s =>
                            {
                                // set topic attributes
                                s.TopicAttributes["DisplayName"] = "Public Event Topic";
                                s.TopicSubscriptionAttributes["some-subscription-attribute"] = "some-attribute-value";
                                s.TopicTags.Add("environment", "development");
                            });
                        });
                    });
                });
            })
            .Build()
            .RunAsync();
    }
}

# Errata

# Scoping

Because there is only ever one "SQS/SNS" per AWS account it can be helpful to "Scope" your queues and topics. This will prefix all SQS queues and SNS topics with scope value.

namespace AmazonSqsScopedConsoleListener;

using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.Hosting;

public class Program
{
    public static async Task Main(string[] args)
    {
        await Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddMassTransit(x =>
                {
                    x.UsingAmazonSqs((context, cfg) =>
                    {
                        cfg.Host("us-east-2", h =>
                        {
                            h.AccessKey("your-iam-access-key");
                            h.SecretKey("your-iam-secret-key");

                            // specify a scope for all topics
                            h.Scope("dev", true);
                        });

                        // additionally include the queues
                        cfg.ConfigureEndpoints(context, new DefaultEndpointNameFormatter("dev-", false));
                    });
                });
            })
            .Build()
            .RunAsync();
    }
}

# Example IAM Policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "SqsAccess",
            "Effect": "Allow",
            "Action": [
                "sqs:SetQueueAttributes",
                "sqs:ReceiveMessage",
                "sqs:CreateQueue",
                "sqs:DeleteMessage",
                "sqs:SendMessage",
                "sqs:GetQueueUrl",
                "sqs:GetQueueAttributes",
                "sqs:ChangeMessageVisibility"
                ],
            "Resource": "arn:aws:sqs:*:YOUR_ACCOUNT_ID:*"
        },{
            "Sid": "SnsAccess",
            "Effect": "Allow",
            "Action": [
                "sns:GetTopicAttributes",
                "sns:CreateTopic",
                "sns:Publish",
                "sns:Subscribe"
            ],
            "Resource": "arn:aws:sns:*:YOUR_ACCOUNT_ID:*"
        },{
            "Sid": "SnsListAccess",
            "Effect": "Allow",
            "Action": [
                "sns:ListTopics"
            ],
            "Resource": "*"
        }
    ]
}