# Containers

MassTransit supports several dependency injection containers. And since Microsoft introduced its own container, it has become the most commonly used container.

Optional

MassTransit does not require a container, as demonstrated in the configuration example. So if you aren't already using a container, you can get started without having adopt one. However, when you're ready to use a container, perhaps to deploy your service using the .NET Generic Host, you will likely want to use Microsoft's built-in solution.

Regardless of which container is used, supported containers have a consistent registration syntax used to add consumers, sagas, and activities, as well as configure the bus. Behind the scenes, MassTransit is configuring the container, including container-specific features such as scoped lifecycles, consistently and correctly. Use of the registration syntax has drastically reduced container configuration support questions.

# Consumer Registration

Uses MassTransit.Extensions.DependencyInjection (opens new window)

To configure a bus using RabbitMQ and register the consumers, sagas, and activities to be used by the bus, call the AddMassTransit extension method. The UsingRabbitMq method can be changed to the appropriate method for the proper transport if RabbitMQ is not being used.

namespace MicrosoftContainer
{
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using ContainerConsumers;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));

                x.SetKebabCaseEndpointNameFormatter();

                x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
            });

            var provider = services.BuildServiceProvider();

            var busControl = provider.GetRequiredService<IBusControl>();

            await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
            try
            {
                Console.WriteLine("Press enter to exit");

                await Task.Run(() => Console.ReadLine());
            }
            finally
            {
                await busControl.StopAsync();
            }
        }
    }
}

The AddConsumer method is one of several methods used to register consumers, some of which are shown below.

namespace MicrosoftContainerAddConsumer
{
    using System;
    using System.Threading.Tasks;
    using ContainerConsumers;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                // Add a single consumer
                x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));

                // Add a single consumer by type
                x.AddConsumer(typeof(SubmitOrderConsumer), typeof(SubmitOrderConsumerDefinition));

                // Add all consumers in the specified assembly
                x.AddConsumers(typeof(SubmitOrderConsumer).Assembly);

                // Add all consumers in the namespace containing the specified type
                x.AddConsumersFromNamespaceContaining<SubmitOrderConsumer>();
            });
        }
    }
}

# Consumer Definition

A consumer definition is used to configure the receive endpoint and pipeline behavior for the consumer. When scanning assemblies or namespaces for consumers, consumer definitions are also found and added to the container. The SubmitOrderConsumer and matching definition are shown below.

namespace ContainerConsumers
{
    using System;
    using System.Threading.Tasks;
    using ContainerContracts;
    using GreenPipes;
    using MassTransit;
    using MassTransit.ConsumeConfigurators;
    using MassTransit.Definition;
    using Microsoft.Extensions.Logging;    

    class SubmitOrderConsumer :
        IConsumer<SubmitOrder>
    {
        ILogger<SubmitOrderConsumer> _logger;

        public SubmitOrderConsumer(ILogger<SubmitOrderConsumer> logger)
        {
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<SubmitOrder> context)
        {
            _logger.LogInformation("Order Submitted: {OrderId}", context.Message.OrderId);

            await context.Publish<OrderSubmitted>(new
            {
                context.Message.OrderId
            });
        }
    }

    class SubmitOrderConsumerDefinition :
        ConsumerDefinition<SubmitOrderConsumer>
    {
        public SubmitOrderConsumerDefinition()
        {
            // override the default endpoint name
            EndpointName = "order-service";

            // limit the number of messages consumed concurrently
            // this applies to the consumer only, not the endpoint
            ConcurrentMessageLimit = 8;
        }

        protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
            IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator)
        {
            // configure message retry with millisecond intervals
            endpointConfigurator.UseMessageRetry(r => r.Intervals(100,200,500,800,1000));

            // use the outbox to prevent duplicate events from being published
            endpointConfigurator.UseInMemoryOutbox();
        }
    }
}

# Endpoint Definition

To configure the endpoint for a consumer registration, or override the endpoint configuration in the definition, the Endpoint method can be added to the consumer registration. This will create an endpoint definition for the consumer, and register it in the container. This method is available on consumer and saga registrations, with separate execute and compensate endpoint methods for activities.

namespace MicrosoftContainerAddConsumerEndpoint
{
    using System;
    using System.Threading.Tasks;
    using ContainerConsumers;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition))
                    .Endpoint(e =>
                    {
                        // override the default endpoint name
                        e.Name = "order-service-extreme";

                        // specify the endpoint as temporary (may be non-durable, auto-delete, etc.)
                        e.Temporary = false;

                        // specify an optional concurrent message limit for the consumer
                        e.ConcurrentMessageLimit = 8;

                        // only use if needed, a sensible default is provided, and a reasonable
                        // value is automatically calculated based upon ConcurrentMessageLimit if 
                        // the transport supports it.
                        e.PrefetchCount = 16;                        
                    });

                x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
            });
        }
    }
}

When the endpoint is configured after the AddConsumer method, the configuration overrides any endpoint configuration in the consumer definition.

# Bus Configuration

In the above examples, the bus is configured by the UsingRabbitMq method, which is passed two arguments. context is the registration context, used to configure endpoints. cfg is the bus factory configurator, used to configure the bus. The above examples use the default conventions to configure the endpoints. Alternatively, endpoints can be explicitly configured. However, when configuring endpoints manually, the ConfigureEndpoints methods should not be used (duplicate endpoints may result).

ConfigureEndpoints uses an IEndpointNameFormatter to generate endpoint names, which by default uses a PascalCase formatter. There are two additional endpoint name formatters included, snake and kebab case.

For the SubmitOrderConsumer, the endpoint names would be:

Formatter Name
Default SubmitOrder
Snake Case submit_order
Kebab Case submit-order

All of the included formatters trim the Consumer, Saga, or Activity suffix from the end of the class name. If the consumer name is generic, the generic parameter type is used instead of the generic type.

The endpoint name formatter can be set as shown below.

namespace MicrosoftContainerFormatter
{
    using System;
    using System.Threading.Tasks;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.SetKebabCaseEndpointNameFormatter();

                x.SetSnakeCaseEndpointNameFormatter();

                x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
            });
        }
    }
}

The endpoint formatter can also be passed to the ConfigureEndpoints method as shown.

namespace MicrosoftContainerFormatterInline
{
    using System;
    using System.Threading.Tasks;
    using MassTransit;
    using MassTransit.Definition;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.ConfigureEndpoints(context, KebabCaseEndpointNameFormatter.Instance);
                });
            });
        }
    }
}

To explicitly configure endpoints, use the ConfigureConsumer and/or ConfigureConsumers methods.

namespace MicrosoftContainerConfigureConsumer
{
    using System;
    using System.Threading.Tasks;
    using ContainerConsumers;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));

                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.ReceiveEndpoint("order-service", e =>
                    {
                        e.ConfigureConsumer<SubmitOrderConsumer>(context);
                    }); 
                });
            });
        }
    }
}

When using ConfigureConsumer, the EndpointName, PrefetchCount, and Temporary properties of the consumer definition are not used.

# Saga Registration

To add a state machine saga, use the AddSagaStateMachine methods. For a consumer saga, use the AddSaga methods.

Important

State machine sagas should be added before class-based sagas, and the class-based saga methods should not be used to add state machine sagas. This may be simplified in the future, but for now, be aware of this registration requirement.

containerBuilder.AddMassTransit(r =>
{
    // add a state machine saga, with the in-memory repository
    r.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .InMemoryRepository();

    // add a consumer saga with the in-memory repository
    r.AddSaga<OrderSaga>()
        .InMemoryRepository();

    // add a saga by type, without a repository. The repository should be registered
    // in the container elsewhere
    r.AddSaga(typeof(OrderSaga));

    // add a state machine saga by type, including a saga definition for that saga
    r.AddSagaStateMachine(typeof(OrderState), typeof(OrderStateDefinition))

    // add all saga state machines by type
    r.AddSagaStateMachines(Assembly.GetExecutingAssembly());

    // add all sagas in the specified assembly
    r.AddSagas(Assembly.GetExecutingAssembly());

    // add sagas from the namespace containing the type
    r.AddSagasFromNamespaceContaining<OrderSaga>();
    r.AddSagasFromNamespaceContaining(typeof(OrderSaga));
});

To add a saga registration and configure the consumer endpoint in the same expression, a definition can automatically be created.

containerBuilder.AddMassTransit(r =>
{
    r.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .NHibernateRepository()
        .Endpoint(e =>
        {
            e.Name = "order-state";
            e.ConcurrentMessageLimit = 8;
        });
});

Supported saga persistence storage engines are documented in the saga documentation section.