Consumers

Consumer is a widely used noun for something that consumes something. In MassTransit, a consumer consumes one or more message types when configured on or connected to a receive endpoint. MassTransit includes many consumer types, including consumers, sagas, saga state machines, routing slip activities, handlers, and job consumers.

Message Consumers

A message consumer, the most common consumer type, is a class that consumes one or more messages types. For each message type, the class implements IConsumer<TMessage> and the Consume method.

public interface IConsumer<in TMessage> :
    IConsumer
    where TMessage : class
{
    Task Consume(ConsumeContext<TMessage> context);
}

Messages must be reference types, either a record, interface, or class. See the messages concept page for more details.

An example message consumer that consumes the SubmitOrder message type is shown below.

class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        await context.Publish<OrderSubmitted>(new
        {
            context.Message.OrderId
        });
    }
}

To add a consumer and automatically configure a receive endpoint for the consumer, call one of the AddConsumer methods and call ConfigureEndpoints as shown below.

services.AddMassTransit(x =>
{
    x.AddConsumer<SubmitOrderConsumer>();

    x.UsingInMemory((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

Automatic receive endpoint configuration by calling ConfigureEndpoints is highly recommended. Several optional configuration options can be used to change the default conventions and customize endpoints, covered in the configuration section.

MassTransit embraces The Hollywood Principle, "Don't call us, we'll call you." Control flows from MassTransit to the developer's code in response to an event, which in this case is the delivery of a message by the transport. This behavior is similar to ASP.NET which creates controllers and invokes action methods on receipt of an HTTP request. When a message is delivered from the transport on a receive endpoint and the message type is consumed by the consumer, MassTransit creates a container scope, resolves a consumer instance, and executes the Consume method passing a ConsumeContext containing the message.

The Consume method returns a Task that is awaited by MassTransit. While the consumer method is executing, the message is unavailable to other receive endpoints. If the Task completes successfully, the message is acknowledged and removed from the queue.

If the Task faults in the event of an exception, or is canceled (explicitly, or via an OperationCanceledException), the consumer instance is released and the exception is propagated back up the pipeline. If the exception does not trigger a retry, the default pipeline will move the message to an error queue.

When a consumer is configured on a receive endpoint, the consumer message types (one for each IConsumer<T>) are used to configure the receive endpoint's consume topology. The consume topology is then used to configure the broker so that published messages are delivered to the queue. The broker topology varies by transport. For example, the RabbitMQ example above would result in the creation of an exchange for the SubmitOrder message type and a binding from the exchange to an exchange with the same name as the queue (the latter exchange then being bound directly to the queue).

If the queue is persistent (AutoDelete is false, which is the default), the topology remains in place even after the bus has stopped. When the bus is recreated and started, the broker entities are reconfigured to ensure they are properly configured. Any messages waiting in the queue will continue to be delivered to the receive endpoint once the bus is started.

Batch Consumers

In some scenarios, high message volume can lead to consumer resource bottlenecks. If a system is publishing thousands of messages per second, and has a consumer that is writing the content of those messages to some type of storage, the storage system might not be optimized for thousands of individual writes per second. It may, however, perform better if writes are performed in batches. For example, receiving one hundred messages and then writing the content of those messages using a single storage operation may be significantly more efficient (and faster).

MassTransit supports receiving multiple messages and delivering those messages to the consumer in a batch.

To create a batch consumer, consume the Batch<T> interface, where T is the message type. That consumer can then be configured using the container integration, with the batch options specified in a consumer definition. The example below consumes a batch of OrderAudit events, up to 100 at a time, and up to 10 concurrent batches.

class BatchMessageConsumer :
    IConsumer<Batch<Message>>
{
    public async Task Consume(ConsumeContext<Batch<Message>> context)
    {
        for(int i = 0; i < context.Message.Length; i++)
        {
            ConsumeContext<Message> message = context.Message[i];
        }
    }
}

Setting PrefetchCount and ConcurrentMessageLimit

Every transport has its own limitations that may constrain the batch size. For instance, Amazon SQS fetches ten messages at a time, making it an optimal batch size. It is best to experiment and see what works best in your environment.

Definitions

Consumer definitions are used to specify the behavior of consumers so that they can be automatically configured. Definitions may be explicitly added by AddConsumer or discovered automatically using any of the AddConsumers methods.

An example consumer definition is shown below. For a complete configuration reference, see the configuration section.

public class SubmitOrderConsumerDefinition :
    ConsumerDefinition<SubmitOrderConsumer>
{
    public SubmitOrderConsumerDefinition()
    {
        // override the default endpoint name, for whatever reason
        EndpointName = "ha-submit-order";

        // limit the number of messages consumed concurrently
        // this applies to the consumer only, not the endpoint
        ConcurrentMessageLimit = 4;
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<DiscoveryPingConsumer> consumerConfigurator)
    {
        endpointConfigurator.UseMessageRetry(r => r.Interval(5, 1000));
        endpointConfigurator.UseInMemoryOutbox();
    }
}

Skipped Messages

When a consumer is removed (or disconnected) from a receive endpoint, a message type is removed from a consumer, or if a message is mistakenly sent to a receive endpoint, messages may be delivered to the receive endpoint that do not have a consumer.

If this occurs, the unconsumed message is moved to a _skipped queue (prefixed by the original queue name). The original message content is retained and additional headers are added to identify the host that skipped the message.

It may be necessary to use the broker management tools to remove an exchange binding or topic subscription for a message type that is no longer consumed by the receive endpoint to prevent further skipped messages.