Middleware

MassTransit is built using a network of pipes and filters to dispatch messages. A pipe is composed of a series of filters, each of which is a key atom and are described below.

Middleware components are configured using extension methods on any pipe configurator IPipeConfigurator<T>, and the extension methods all begin with Use to separate them from other methods.

To understand how middleware components are built, an understanding of filters and pipes is needed.

Filters

A filter is a middleware component that performs a specific function, and should adhere to the single responsibility principal – do one thing, one thing only (and hopefully do it well). By sticking to this approach, developers are able to opt-in to each behavior without including unnecessary or unwatched functionality.

There are many filters included with GreenPipes, and a whole lot more of them are included with MassTransit. In fact, the entire MassTransit message flow is built around pipes and filters.

Developers can create their own filters. To create a filter, create a class that implements IFilter<T>.

public interface IFilter<T>
    where T : class, PipeContext
{
    void Probe(ProbeContext context);

    Task Send(T context, IPipe<T> next);
}

The Probe method is used to interrogate the filter about its behavior. This should describe the filter in a way that a developer would understand its role when looking at a network graph. For example, a transaction filter may add the following to the context.

public void Probe(ProbeContext context)
{
    context.CreateFilterScope("transaction");
}

The Send method is used to send contexts through the pipe to each filter. Context is the actual context, and next is used to pass the context to the next filter in the pipe. Send returns a Task, and should always follow the .NET guidelines for asynchronous methods.

Creating Filters

Middleware components are configured using extension methods, to make them easy to discover.

To be consistent with MassTransit conventions, middleware configuration methods should start with Use.

An example middleware component that would log exceptions to the console is shown below.

x.UsingInMemory((context,cfg) =>
{
    cfg.UseExceptionLogger();
    
    cfg.ConfigureEndpoints(context);
});

The extension method creates the pipe specification for the middleware component, which can be added to any pipe. For a component on the message consumption pipeline, use ConsumeContext instead of any PipeContext.

public static class ExampleMiddlewareConfiguratorExtensions
{
    public static void UseExceptionLogger<T>(this IPipeConfigurator<T> configurator)
        where T : class, PipeContext
    {
        configurator.AddPipeSpecification(new ExceptionLoggerSpecification<T>());
    }
}

The pipe specification is a class that adds the filter to the pipeline. Additional logic can be included, such as configuring optional settings, etc. using a closure syntax similar to the other configuration classes in MassTransit.

public class ExceptionLoggerSpecification<T> :
    IPipeSpecification<T>
    where T : class, PipeContext
{
    public IEnumerable<ValidationResult> Validate()
    {
        return Enumerable.Empty<ValidationResult>();
    }

    public void Apply(IPipeBuilder<T> builder)
    {
        builder.AddFilter(new ExceptionLoggerFilter<T>());
    }
}

Finally, the middleware component itself is a filter added to the pipeline. All filters have absolute and complete control of the execution context and flow of the message. Pipelines are entirely asynchronous, and expect that asynchronous operations will be performed.

Do not use legacy constructs such as .Wait, .Result, or .WaitAll() as these can cause blocking in the message pipeline. While they might work in same cases, you've been warned!
public class ExceptionLoggerFilter<T> :
    IFilter<T>
    where T : class, PipeContext
{
    long _exceptionCount;
    long _successCount;
    long _attemptCount;

    public void Probe(ProbeContext context)
    {
        var scope = context.CreateFilterScope("exceptionLogger");
        scope.Add("attempted", _attemptCount);
        scope.Add("succeeded", _successCount);
        scope.Add("faulted", _exceptionCount);
    }

    /// <summary>
    /// Send is called for each context that is sent through the pipeline
    /// </summary>
    /// <param name="context">The context sent through the pipeline</param>
    /// <param name="next">The next filter in the pipe, must be called or the pipe ends here</param>
    public async Task Send(T context, IPipe<T> next)
    {
        try
        {
            Interlocked.Increment(ref _attemptCount);

            // here the next filter in the pipe is called
            await next.Send(context);

            Interlocked.Increment(ref _successCount);
        }
        catch (Exception ex)
        {
            Interlocked.Increment(ref _exceptionCount);

            await Console.Out.WriteLineAsync($"An exception occurred: {ex.Message}");

            // propagate the exception up the call stack
            throw;
        }
    }
}

The example filter above is stateful. If the filter was stateless, the same filter instance could be used by multiple pipes — worth considering if the filter has high memory requirements.

Message Type Filters

In many cases, the message type is used by a filter. To create an instance of a generic filter that includes the message type, use the configuration observer.

public class MessageFilterConfigurationObserver :
    ConfigurationObserver,
    IMessageConfigurationObserver
{
    public MessageFilterConfigurationObserver(IConsumePipeConfigurator receiveEndpointConfigurator)
        : base(receiveEndpointConfigurator)
    {
        Connect(this);
    }

    public void MessageConfigured<TMessage>(IConsumePipeConfigurator configurator)
        where TMessage : class
    {
        var specification = new MessageFilterPipeSpecification<TMessage>();

        configurator.AddPipeSpecification(specification);
    }
}

Then, in the specification, the appropriate filter can be created and added to the pipeline.

public class MessageFilterPipeSpecification<T> :
    IPipeSpecification<ConsumeContext<T>>
    where T : class
{
    public void Apply(IPipeBuilder<ConsumeContext<T>> builder)
    {
        var filter = new MessageFilter<T>();

        builder.AddFilter(filter);
    }

    public IEnumerable<ValidationResult> Validate()
    {
        yield break;
    }
}

The filter could then include the message type as a generic type parameter.

public class MessageFilter<T> :
    IFilter<ConsumeContext<T>>
    where T : class
{
    public void Probe(ProbeContext context)
    {        
        var scope = context.CreateFilterScope("messageFilter");
    }

    public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
    {
        // do something

        await next.Send(context);
    }
}

The extension method for the above is shown below (for completeness).

public static class MessageFilterConfigurationExtensions
{
    public static void UseMessageFilter(this IConsumePipeConfigurator configurator)
    {
        if (configurator == null)
            throw new ArgumentNullException(nameof(configurator));

        var observer = new MessageFilterConfigurationObserver(configurator);
    }
}

Pipes

Filters are combined in sequence to form a pipe. A pipe configurator, along with a pipe builder, is used to configure and build a pipe.

public interface CustomContext :
    PipeContext
{
    string SomeThing { get; }
}

IPipe<CustomContext> pipe = Pipe.New<CustomContext>(x =>
{   
    x.UseFilter(new CustomFilter(...));
})

The IPipe interface is similar to IFilter, but a pipe hides the next parameter as it is part of the pipe's structure. It is the pipe's responsibility to pass the appropriate next parameter to the individual filters in the pipe.

public interface IPipe<T>
    where T : class, PipeContext
{
    Task Send(T context);
}

Send can be called, passing a context instance as shown.

public class BaseCustomContext :
    BasePipeContext,
    CustomContext
{
    public string SomeThing { get; set; }
}

await pipe.Send(new BaseCustomContext { SomeThing = "Hello" });

PipeContext

The context type has a PipeContext constraint, which is another core atom in GreenPipes. A pipe context can include payloads, which are kept in a last-in, first-out (LIFO) collection. Payloads are identified by type, and can be retrieved, added, and updated using the PipeContext methods:

public interface PipeContext
{
    /// <summary>
    /// Used to cancel the execution of the context
    /// </summary>
    CancellationToken CancellationToken { get; }

    /// <summary>
    /// Checks if a payload is present in the context
    /// </summary>
    bool HasPayloadType(Type payloadType);

    /// <summary>
    /// Retrieves a payload from the pipe context
    /// </summary>
    /// <typeparam name="T">The payload type</typeparam>
    /// <param name="payload">The payload</param>
    /// <returns></returns>
    bool TryGetPayload<T>(out T payload)
        where T : class;

    /// <summary>
    /// Returns an existing payload or creates the payload using the factory method provided
    /// </summary>
    /// <typeparam name="T">The payload type</typeparam>
    /// <param name="payloadFactory">The payload factory is the payload is not present</param>
    /// <returns>The payload</returns>
    T GetOrAddPayload<T>(PayloadFactory<T> payloadFactory)
        where T : class;

    /// <summary>
    /// Either adds a new payload, or updates an existing payload
    /// </summary>
    /// <param name="addFactory">The payload factory called if the payload is not present</param>
    /// <param name="updateFactory">The payload factory called if the payload already exists</param>
    /// <typeparam name="T">The payload type</typeparam>
    /// <returns></returns>
    T AddOrUpdatePayload<T>(PayloadFactory<T> addFactory, UpdatePayloadFactory<T> updateFactory)
        where T : class;

The payload methods are also used to check if a pipe context is another type of context. For example, to see if the SendContext is a RabbitMqSendContext, the TryGetPayload method should be used instead of trying to pattern match or cast the context parameter.

public async Task Send(SendContext context, IPipe<SendContext> next)
{
    if(context.TryGetPayload<RabbitMqSendContext>(out var rabbitMqSendContext))
        rabbitMqSendContext.Priority = 3;

    return next.Send(context);
}
It is entirely the filter's responsibility to call Send on the next parameter. This gives the filter ultimately control over the context and behavior. It is how the retry filter is able to retry – by controlling the context flow.

User-defined payloads are easily added, so that subsequent filters can use them. The following example adds a payload.

public class SomePayload
{
    public int Value { get; set; }
}

public async Task Send(SendContext context, IPipe<SendContext> next)
{
    var payload = context.GetOrAddPayload(() => new SomePayload{Value = 27});

    return next.Send(context);
}