# Configuration

MassTransit can be configured in most .NET application types. Several commonly used application types are documented below, including the package references used, and show the minimal configuration required. More thorough configuration details can be found throughout the documentation.

Containers

Unless the application type requires a dependency injection container, the examples below do not include examples of using one. To learn about container configuration, including examples, refer to the containers section.

The configuration examples all use the EventContracts.ValueEntered message type. The message type is only included in the first example's source code.

# Console App

Uses MassTransit.RabbitMQ (opens new window)

A console application, such as an application created using dotnet new console, has a Main entry point in the Program.cs class by default. In this example, MassTransit is configured to connect to RabbitMQ (which should be accessible on localhost) and publish messages. As each value is entered, the value is published as a ValueEntered message. No consumers are configured in this example.

namespace EventContracts
{
    public interface ValueEntered
    {
        string Value { get; }
    }
}

namespace ConsoleEventPublisher
{
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq();

            var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));

            await busControl.StartAsync(source.Token);
            try
            {
                do
                {
                    string value = await Task.Run(() =>
                    {
                        Console.WriteLine("Enter message (or quit to exit)");
                        Console.Write("> ");
                        return Console.ReadLine();
                    });

                    if("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
                        break;

                    await busControl.Publish<ValueEntered>(new
                    {
                        Value = value
                    });
                }
                while (true);
            }
            finally
            {
                await busControl.StopAsync();
            }
        }
    }
}

Another console application can be created to consume the published events. In this application, the receive endpoint is configured with a consumer that consumes the ValueEntered event. The message contract from the example above, in the same namespace, should be copied to this program as well (it isn't shown below).

namespace ConsoleEventListener
{
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.ReceiveEndpoint("event-listener", e =>
                {
                    e.Consumer<EventConsumer>();
                });
            });

            var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));

            await busControl.StartAsync(source.Token);
            try
            {
                Console.WriteLine("Press enter to exit");

                await Task.Run(() => Console.ReadLine());
            }
            finally
            {
                await busControl.StopAsync();
            }
        }

        class EventConsumer :
            IConsumer<ValueEntered>
        {
            public async Task Consume(ConsumeContext<ValueEntered> context)
            {
                Console.WriteLine("Value: {0}", context.Message.Value);
            }
        }
    }
}

# ASP.NET Core

Uses MassTransit.AspNetCore (opens new window), MassTransit.RabbitMQ (opens new window)

MassTransit fully integrates with ASP.NET Core, including:

  • Microsoft Extensions Dependency Injection container configuration, including consumer, saga, and activity registration. The MassTransit interfaces are also registered:
    • IBusControl (singleton)
    • IBus (singleton)
    • ISendEndpointProvider (scoped)
    • IPublishEndpoint (scoped)
  • The MassTransitHostedService to automatically start and stop the bus
  • Health checks for the bus and receive endpoints
  • Configures the bus to use ILoggerFactory from the container

To produce messages from an ASP.NET Core application, the configuration below is used.

namespace AspNetCorePublisher
{
    using System;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq();
            });

            services.AddMassTransitHostedService();
        }
    }
}

In this example, MassTransit is configured to connect to RabbitMQ (which should be accessible on localhost) and publish messages. The messages can be published from a controller as shown below.

namespace AspNetCorePublisher
{
    using System;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;
    using Microsoft.AspNetCore.Mvc;

    public class ValueController :
        ControllerBase
    {
        readonly IPublishEndpoint _publishEndpoint;

        public ValueController(IPublishEndpoint publishEndpoint)
        {
            _publishEndpoint = publishEndpoint;
        }

        [HttpPost]
        public async Task<ActionResult> Post(string value)
        {
            await _publishEndpoint.Publish<ValueEntered>(new
            {
                Value = value
            });

            return Ok();
        }
    }
}

An ASP.NET Core application can also configure receive endpoints. The consumer, along with the receive endpoint, is configured within the AddMassTransit configuration. Separate registration of the consumer is not required (and discouraged), however, any consumer dependencies should be added to the container separately. Consumers are registered as scoped, and dependencies should be registered as scoped when possible, unless they are singletons.

namespace AspNetCoreListener
{
    using System;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Logging;

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddMassTransit(x =>
            {
                x.AddConsumer<EventConsumer>();

                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.ReceiveEndpoint("event-listener", e =>
                    {
                        e.ConfigureConsumer<EventConsumer>(context);
                    });
                });
            });

            services.AddMassTransitHostedService();
        }
    }

    class EventConsumer :
        IConsumer<ValueEntered>
    {
        ILogger<EventConsumer> _logger;

        public EventConsumer(ILogger<EventConsumer> logger)
        {
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<ValueEntered> context)
        {
            _logger.LogInformation("Value: {Value}", context.Message.Value);
        }
    }
}

MassTransit includes an endpoint name formatter (IEndpointNameFormatter) which can be used to automatically format endpoint names based upon the consumer, saga, or activity name. Using the ConfigureEndpoints method will automatically create a receive endpoint for every added consumer, saga, and activity. To automatically configure the receive endpoints, use the updated configuration shown below.

The example sets the kebab-case endpoint name formatter, which will create a receive endpoint named value-entered-event for the ValueEnteredEventConsumer. The Consumer suffix is removed by default. The endpoint is named based upon the consumer name, not the message type, since a consumer may consume multiple message types yet still be configured on a single receive endpoint.

namespace AspNetCoreEndpointListener
{
    using System;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Logging;

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddMassTransit(x =>
            {
                x.AddConsumer<ValueEnteredEventConsumer>();

                x.SetKebabCaseEndpointNameFormatter();

                x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.ConfigureEndpoints(context);
                });
            });

            services.AddMassTransitHostedService();
        }
    }

    class ValueEnteredEventConsumer :
        IConsumer<ValueEntered>
    {
        ILogger<ValueEnteredEventConsumer> _logger;

        public ValueEnteredEventConsumer(ILogger<ValueEnteredEventConsumer> logger)
        {
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<ValueEntered> context)
        {
            _logger.LogInformation("Value: {Value}", context.Message.Value);
        }
    }
}

To configure health checks, which MassTransit will produce when using the MassTransitHostedService, add the health checks to the container and map the readiness and liveness endpoints. The following example also separates the readiness from the liveness health check.

namespace AspNetCorePublisherHealthCheck
{
    using System;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;
    using Microsoft.AspNetCore.Builder;
    using Microsoft.AspNetCore.Diagnostics.HealthChecks;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Diagnostics.HealthChecks;

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddHealthChecks();

            services.Configure<HealthCheckPublisherOptions>(options =>
            {
                options.Delay = TimeSpan.FromSeconds(2);
                options.Predicate = (check) => check.Tags.Contains("ready");
            });

            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq();
            });

            services.AddMassTransitHostedService();
        }

        public void Configure(IApplicationBuilder app)
        {
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapHealthChecks("/health/ready", new HealthCheckOptions()
                {
                    Predicate = (check) => check.Tags.Contains("ready"),
                });

                endpoints.MapHealthChecks("/health/live", new HealthCheckOptions());
            });
        }
    }
}

# Topshelf Service

Uses MassTransit.RabbitMQ (opens new window), Topshelf (opens new window)

MassTransit recommends running consumers, sagas, and activities in an autonomous (standalone) service. Services can be managed by the operating system, and monitored using application performance monitoring tools. With .NET Core, this can be any application built using the .NET Core Generic Host. However, when using the .NET Framework (the full framework, such as .NET 4.6.1), a Windows service may be built using Topshelf.

Topshelf or .NET Core Generic Host

Before the .NET Core Generic Host became available, Topshelf was recommended to create a Windows service. Now that the Generic Host has built-in support for Windows services and Linux daemons, Topshelf is no longer required.

The example below configures a receive endpoint, which is created and started with the service.

namespace TopshelfListener
{
    using System;
    using System.Threading.Tasks;
    using EventContracts;
    using MassTransit;
    using Topshelf;

    public class Program
    {
        public static int Main()
        {
            return (int)HostFactory.Run(cfg => cfg.Service(x => new EventConsumerService()));
        }

        class EventConsumerService :
            ServiceControl
        {
            IBusControl _bus;

            public bool Start(HostControl hostControl)
            {
                _bus = ConfigureBus();
                _bus.Start(TimeSpan.FromSeconds(10));

                return true;
            }

            public bool Stop(HostControl hostControl)
            {
                _bus?.Stop(TimeSpan.FromSeconds(30));

                return true;
            }

            IBusControl ConfigureBus()
            {
                return Bus.Factory.CreateUsingRabbitMq(cfg =>
                {
                    cfg.ReceiveEndpoint("event-listener", e =>
                    {
                        e.Consumer<EventConsumer>();
                    });
                });
            }
        }

        class EventConsumer :
            IConsumer<ValueEntered>
        {
            public async Task Consume(ConsumeContext<ValueEntered> context)
            {
                Console.WriteLine("Value: {0}", context.Message.Value);
            }
        }
    }
}