# Configuration

MassTransit can be used in most .NET application types. Commonly used application types are documented below, including the package references used, and show the minimal configuration required. More thorough configuration details can be found throughout the documentation.

The configuration examples all use the EventContracts.ValueEntered message type. The message type is only included in the first example's source code.

# Configuration

Uses MassTransit (opens new window), MassTransit.RabbitMQ (opens new window)

MassTransit is easily configured in ASP.NET Core or .NET Generic Host applications (using .NET 3.1 or later). The built-in configuration will:

  • Add several interfaces (and their implementations)
    • IBusControl (singleton)
    • IBus (singleton)
    • IReceiveEndpointConnector (singleton)
    • ISendEndpointProvider (scoped)
    • IPublishEndpoint (scoped)
  • Add a hosted service to start and stop the bus (or buses)
  • Add health checks for the bus and receive endpoints
  • Use ILoggerFactory to create log writers

To configure MassTransit so that it can be used to send/publish messages, the configuration below is recommended as a starting point.

namespace AspNetCorePublisher
{
    using System;
    using System.Threading.Tasks;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq();
            });

            // OPTIONAL, but can be used to configure the bus options
            services.AddOptions<MassTransitHostOptions>()
                .Configure(options =>
                {
                    // if specified, waits until the bus is started before
                    // returning from IHostedService.StartAsync
                    // default is false
                    options.WaitUntilStarted = true;

                    // if specified, limits the wait time when starting the bus
                    options.StartTimeout = TimeSpan.FromSeconds(10);

                    // if specified, limits the wait time when stopping the bus
                    options.StopTimeout = TimeSpan.FromSeconds(30);
                });
        }
    }
}

In this example, MassTransit is configured to connect to RabbitMQ (which should be accessible on localhost) and publish messages. The messages can be published from a controller as shown below.

namespace AspNetCorePublisher
{
    using System;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;
    using Microsoft.AspNetCore.Mvc;

    public class ValueController :
        ControllerBase
    {
        readonly IPublishEndpoint _publishEndpoint;

        public ValueController(IPublishEndpoint publishEndpoint)
        {
            _publishEndpoint = publishEndpoint;
        }

        [HttpPost]
        public async Task<ActionResult> Post(string value)
        {
            await _publishEndpoint.Publish<ValueEntered>(new
            {
                Value = value
            });

            return Ok();
        }
    }
}

# Consumers

To configure a bus using RabbitMQ and register the consumers, sagas, and activities to be used by the bus, call the AddMassTransit extension method. The UsingRabbitMq method can be changed to the appropriate method for the proper transport if RabbitMQ is not being used.

namespace MicrosoftContainer
{
    using System.Threading.Tasks;
    using ContainerConsumers;
    using MassTransit;
    using Microsoft.Extensions.Hosting;

    public class Program
    {
        public static async Task Main(string[] args)
        {
            await Host.CreateDefaultBuilder(args)
                .ConfigureServices(services =>
                {
                    services.AddMassTransit(x =>
                    {
                        x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));

                        x.SetKebabCaseEndpointNameFormatter();

                        x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
                    });
                })
                .Build()
                .RunAsync();
        }
    }
}

The AddConsumer method is one of several methods used to register consumers, some of which are shown below.

namespace MicrosoftContainerAddConsumer
{
    using System;
    using System.Threading.Tasks;
    using ContainerConsumers;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;

    public class Program
    {
        public static async Task Main(string[] args)
        {
            await Host.CreateDefaultBuilder(args)
                .ConfigureServices(services =>
                {
                    services.AddMassTransit(x =>
                    {
                        // Add a single consumer
                        x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));

                        // Add a single consumer by type
                        x.AddConsumer(typeof(SubmitOrderConsumer), typeof(SubmitOrderConsumerDefinition));

                        // Add all consumers in the specified assembly
                        x.AddConsumers(typeof(SubmitOrderConsumer).Assembly);

                        // Add all consumers in the namespace containing the specified type
                        x.AddConsumersFromNamespaceContaining<SubmitOrderConsumer>();
                    });
                })
                .Build()
                .RunAsync();
        }
    }
}

# Consumer Definition

A consumer definition is used to configure the receive endpoint and pipeline behavior for the consumer. When scanning assemblies or namespaces for consumers, consumer definitions are also found and added to the container. The SubmitOrderConsumer and matching definition are shown below.

namespace ContainerConsumers
{
    using System.Threading.Tasks;
    using ContainerContracts;
    using MassTransit;
    using Microsoft.Extensions.Logging;

    class SubmitOrderConsumer :
        IConsumer<SubmitOrder>
    {
        readonly ILogger<SubmitOrderConsumer> _logger;

        public SubmitOrderConsumer(ILogger<SubmitOrderConsumer> logger)
        {
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<SubmitOrder> context)
        {
            _logger.LogInformation("Order Submitted: {OrderId}", context.Message.OrderId);

            await context.Publish<OrderSubmitted>(new
            {
                context.Message.OrderId
            });
        }
    }

    class SubmitOrderConsumerDefinition :
        ConsumerDefinition<SubmitOrderConsumer>
    {
        public SubmitOrderConsumerDefinition()
        {
            // override the default endpoint name
            EndpointName = "order-service";

            // limit the number of messages consumed concurrently
            // this applies to the consumer only, not the endpoint
            ConcurrentMessageLimit = 8;
        }

        protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
            IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator)
        {
            // configure message retry with millisecond intervals
            endpointConfigurator.UseMessageRetry(r => r.Intervals(100,200,500,800,1000));

            // use the outbox to prevent duplicate events from being published
            endpointConfigurator.UseInMemoryOutbox();
        }
    }
}

# Endpoint Definition

To configure the endpoint for a consumer registration, or override the endpoint configuration in the definition, the Endpoint method can be added to the consumer registration. This will create an endpoint definition for the consumer, and register it in the container. This method is available on consumer and saga registrations, with separate execute and compensate endpoint methods for activities.

namespace MicrosoftContainerAddConsumerEndpoint
{
    using System;
    using System.Threading.Tasks;
    using ContainerConsumers;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition))
                    .Endpoint(e =>
                    {
                        // override the default endpoint name
                        e.Name = "order-service-extreme";

                        // specify the endpoint as temporary (may be non-durable, auto-delete, etc.)
                        e.Temporary = false;

                        // specify an optional concurrent message limit for the consumer
                        e.ConcurrentMessageLimit = 8;

                        // only use if needed, a sensible default is provided, and a reasonable
                        // value is automatically calculated based upon ConcurrentMessageLimit if 
                        // the transport supports it.
                        e.PrefetchCount = 16;

                        // set if each service instance should have its own endpoint for the consumer
                        // so that messages fan out to each instance.
                        e.InstanceId = "something-unique";
                    });

                x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
            });
        }
    }
}

When the endpoint is configured after the AddConsumer method, the configuration then overrides the endpoint configuration in the consumer definition. However, it cannot override the EndpointName if it is specified in the constructor. The order of precedence for endpoint naming is explained below.

  1. Specifying EndpointName = "submit-order-extreme" in the constructor which cannot be overridden

    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()
    
    public SubmitOrderConsumerDefinition()
    {
        EndpointName = "submit-order-extreme";
    }
    
  2. Specifying .Endpoint(x => x.Name = "submit-order-extreme") in the consumer registration, chained to AddConsumer

    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()
        .Endpoint(x => x.Name = "submit-order-extreme");
    
    public SubmitOrderConsumerDefinition()
    {
        Endpoint(x => x.Name = "not used");
    }
    
  3. Specifying Endpoint(x => x.Name = "submit-order-extreme") in the constructor, which creates an endpoint definition

    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()
    
    public SubmitOrderConsumerDefinition()
    {
        Endpoint(x => x.Name = "submit-order-extreme");
    }
    
  4. Unspecified, the endpoint name formatter is used (in this case, the endpoint name is SubmitOrder using the default formatter)

    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()
    
    public SubmitOrderConsumerDefinition()
    {
    }
    

# Saga Registration

To add a state machine saga, use the AddSagaStateMachine methods. For a consumer saga, use the AddSaga methods.

Important

State machine sagas should be added before class-based sagas, and the class-based saga methods should not be used to add state machine sagas. This may be simplified in the future, but for now, be aware of this registration requirement.

services.AddMassTransit(r =>
{
    // add a state machine saga, with the in-memory repository
    r.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .InMemoryRepository();

    // add a consumer saga with the in-memory repository
    r.AddSaga<OrderSaga>()
        .InMemoryRepository();

    // add a saga by type, without a repository. The repository should be registered
    // in the container elsewhere
    r.AddSaga(typeof(OrderSaga));

    // add a state machine saga by type, including a saga definition for that saga
    r.AddSagaStateMachine(typeof(OrderState), typeof(OrderStateDefinition))

    // add all saga state machines by type
    r.AddSagaStateMachines(Assembly.GetExecutingAssembly());

    // add all sagas in the specified assembly
    r.AddSagas(Assembly.GetExecutingAssembly());

    // add sagas from the namespace containing the type
    r.AddSagasFromNamespaceContaining<OrderSaga>();
    r.AddSagasFromNamespaceContaining(typeof(OrderSaga));
});

To add a saga registration and configure the consumer endpoint in the same expression, a definition can automatically be created.

services.AddMassTransit(r =>
{
    r.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .NHibernateRepository()
        .Endpoint(e =>
        {
            e.Name = "order-state";
            e.ConcurrentMessageLimit = 8;
        });
});

Supported saga persistence storage engines are documented in the saga documentation section.

# Receive Endpoints

In the above examples, the bus is configured by the UsingRabbitMq method, which is passed two arguments. context is the registration context, used to configure endpoints. cfg is the bus factory configurator, used to configure the bus. The above examples use the default conventions to configure the endpoints. Alternatively, endpoints can be explicitly configured. However, when configuring endpoints manually, the ConfigureEndpoints methods may be excluded otherwise it should appear after any manually configured receive endpoints.

ConfigureEndpoints uses an IEndpointNameFormatter to generate endpoint names, which by default uses a PascalCase formatter. There are two additional endpoint name formatters included, snake and kebab case.

For the SubmitOrderConsumer, the endpoint names would be:

Formatter Name
Default SubmitOrder
Snake Case submit_order
Kebab Case submit-order

All of the included formatters trim the Consumer, Saga, or Activity suffix from the end of the class name. If the consumer name is generic, the generic parameter type is used instead of the generic type.

Video

Learn about the default conventions as well as how to tailor the naming style to meet your requirements in this short video (opens new window).

The endpoint name formatter can be set as shown below.

namespace MicrosoftContainerFormatter
{
    using System;
    using System.Threading.Tasks;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.SetKebabCaseEndpointNameFormatter();

                x.SetSnakeCaseEndpointNameFormatter();

                x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
            });
        }
    }
}

The endpoint formatter can also be passed to the ConfigureEndpoints method as shown.

namespace MicrosoftContainerFormatterInline
{
    using System.Threading.Tasks;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.ConfigureEndpoints(context, KebabCaseEndpointNameFormatter.Instance);
                });
            });
        }
    }
}

To explicitly configure endpoints, use the ConfigureConsumer and/or ConfigureConsumers methods.

namespace MicrosoftContainerConfigureConsumer
{
    using System;
    using System.Threading.Tasks;
    using ContainerConsumers;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));

                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.ReceiveEndpoint("order-service", e =>
                    {
                        e.ConfigureConsumer<SubmitOrderConsumer>(context);
                    }); 
                });
            });
        }
    }
}

When using ConfigureConsumer, the EndpointName, PrefetchCount, and Temporary properties of the consumer definition are not used.

MassTransit includes an endpoint name formatter (IEndpointNameFormatter) which can be used to automatically format endpoint names based upon the consumer, saga, or activity name. Using the ConfigureEndpoints method will automatically create a receive endpoint for every added consumer, saga, and activity. To automatically configure the receive endpoints, use the updated configuration shown below.

The example sets the kebab-case endpoint name formatter, which will create a receive endpoint named value-entered-event for the ValueEnteredEventConsumer. The Consumer suffix is removed by default. The endpoint is named based upon the consumer name, not the message type, since a consumer may consume multiple message types yet still be configured on a single receive endpoint.

namespace AspNetCoreEndpointListener
{
    using System;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Logging;

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddMassTransit(x =>
            {
                x.AddConsumer<ValueEnteredEventConsumer>();

                x.SetKebabCaseEndpointNameFormatter();

                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.ConfigureEndpoints(context);
                });
            });
        }
    }

    class ValueEnteredEventConsumer :
        IConsumer<ValueEntered>
    {
        ILogger<ValueEnteredEventConsumer> _logger;

        public ValueEnteredEventConsumer(ILogger<ValueEnteredEventConsumer> logger)
        {
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<ValueEntered> context)
        {
            _logger.LogInformation("Value: {Value}", context.Message.Value);
        }
    }
}

An ASP.NET Core application can also configure receive endpoints. The consumer, along with the receive endpoint, is configured within the AddMassTransit configuration. Separate registration of the consumer is not required (and discouraged), however, any consumer dependencies should be added to the container separately. Consumers are registered as scoped, and dependencies should be registered as scoped when possible, unless they are singletons.

namespace AspNetCoreListener
{
    using System;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Logging;

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddMassTransit(x =>
            {
                x.AddConsumer<EventConsumer>();

                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.ReceiveEndpoint("event-listener", e =>
                    {
                        e.ConfigureConsumer<EventConsumer>(context);
                    });
                });
            });
        }
    }

    class EventConsumer :
        IConsumer<ValueEntered>
    {
        ILogger<EventConsumer> _logger;

        public EventConsumer(ILogger<EventConsumer> logger)
        {
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<ValueEntered> context)
        {
            _logger.LogInformation("Value: {Value}", context.Message.Value);
        }
    }
}

To configure health checks, which MassTransit will produce when using the MassTransitHostedService, add the health checks to the container and map the readiness and liveness endpoints. The following example also separates the readiness from the liveness health check.

namespace AspNetCorePublisherHealthCheck
{
    using System;
    using MassTransit;
    using Microsoft.AspNetCore.Builder;
    using Microsoft.AspNetCore.Diagnostics.HealthChecks;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Diagnostics.HealthChecks;

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddHealthChecks();

            services.Configure<HealthCheckPublisherOptions>(options =>
            {
                options.Delay = TimeSpan.FromSeconds(2);
                options.Predicate = (check) => check.Tags.Contains("ready");
            });

            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq();
            });
        }

        public void Configure(IApplicationBuilder app)
        {
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapHealthChecks("/health/ready", new HealthCheckOptions()
                {
                    Predicate = (check) => check.Tags.Contains("ready"),
                });

                endpoints.MapHealthChecks("/health/live", new HealthCheckOptions());
            });
        }
    }
}

Optional

MassTransit does not require a container. If you aren't already using a container, you can get started without having adopt one. However, when you're ready to use a container, perhaps to deploy your service using the .NET Generic Host, MassTransit is ready with fully integrated support for Microsoft.Extensions.DependencyInjection.

Regardless of which container is used, supported containers have a consistent registration syntax used to add consumers, sagas, and activities, as well as configure the bus. Behind the scenes, MassTransit is configuring the container, including container-specific features such as scoped lifecycles, consistently and correctly. Use of the registration syntax has drastically reduced container configuration support questions.

# .NET Generic Host

Uses MassTransit.RabbitMQ (opens new window)

The .NET Generic Host is the preferred way to create standalone services and can be easily using dotnet new worker. In this example, MassTransit is configured to connect to RabbitMQ (which should be accessible on localhost) and publish messages. As each value is entered, the value is published as a ValueEntered message. No consumers are configured in this example.

namespace EventContracts
{
    public interface ValueEntered
    {
        string Value { get; }
    }
}

namespace ConsoleEventPublisher
{
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq();

            var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));

            await busControl.StartAsync(source.Token);
            try
            {
                while (true)
                {
                    string value = await Task.Run(() =>
                    {
                        Console.WriteLine("Enter message (or quit to exit)");
                        Console.Write("> ");
                        return Console.ReadLine();
                    });

                    if("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
                        break;

                    await busControl.Publish<ValueEntered>(new
                    {
                        Value = value
                    });
                }
            }
            finally
            {
                await busControl.StopAsync();
            }
        }
    }
}

Another console application can be created to consume the published events. In this application, the receive endpoint is configured with a consumer that consumes the ValueEntered event. The message contract from the example above, in the same namespace, should be copied to this program as well (it isn't shown below).

namespace ConsoleEventListener
{
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.ReceiveEndpoint("event-listener", e =>
                {
                    e.Consumer<EventConsumer>();
                });
            });

            var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));

            await busControl.StartAsync(source.Token);
            try
            {
                Console.WriteLine("Press enter to exit");

                await Task.Run(() => Console.ReadLine());
            }
            finally
            {
                await busControl.StopAsync();
            }
        }

        class EventConsumer :
            IConsumer<ValueEntered>
        {
            public async Task Consume(ConsumeContext<ValueEntered> context)
            {
                Console.WriteLine("Value: {0}", context.Message.Value);
            }
        }
    }
}