# Observers

MassTransit supports several message observers allowing received, consumed, sent, and published messages to be monitored. There is a bus observer as well, so that the bus life cycle can be monitored.

WARNING

Observers should not be used to modify or intercept messages. To intercept messages to add headers or modify message content, create a new or use an existing middleware component.

# Receive

To observe messages as they are received by the transport, create a class that implements the IReceiveObserver interface, and connect it to the bus as shown below.

namespace MassTransit
{
    using System;
    using System.Threading.Tasks;


    /// <summary>
    /// An observer that can monitor a receive endpoint to track message consumption at the
    /// endpoint level.
    /// </summary>
    public interface IReceiveObserver
    {
        /// <summary>
        /// Called when a message has been delivered by the transport is about to be received by the endpoint
        /// </summary>
        /// <param name="context">The receive context of the message</param>
        /// <returns></returns>
        Task PreReceive(ReceiveContext context);

        /// <summary>
        /// Called when the message has been received and acknowledged on the transport
        /// </summary>
        /// <param name="context">The receive context of the message</param>
        /// <returns></returns>
        Task PostReceive(ReceiveContext context);

        /// <summary>
        /// Called when a message has been consumed by a consumer
        /// </summary>
        /// <typeparam name="T">The message type</typeparam>
        /// <param name="context">The message consume context</param>
        /// <param name="duration">The consumer duration</param>
        /// <param name="consumerType">The consumer type</param>
        /// <returns></returns>
        Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType)
            where T : class;

        /// <summary>
        /// Called when a message being consumed produced a fault
        /// </summary>
        /// <typeparam name="T">The message type</typeparam>
        /// <param name="context">The message consume context</param>
        /// <param name="duration">The consumer duration</param>
        /// <param name="consumerType">The consumer type</param>
        /// <param name="exception">The exception from the consumer</param>
        /// <returns></returns>
        Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception)
            where T : class;

        /// <summary>
        /// Called when the transport receive faults
        /// </summary>
        /// <param name="context">The receive context of the message</param>
        /// <param name="exception">The exception that was thrown</param>
        /// <returns></returns>
        Task ReceiveFault(ReceiveContext context, Exception exception);
    }
}

To configure a receive observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectReceiveObserver bus method can be used instead.

services.AddReceiveObserver<ReceiveObserver>();
services.AddReceiveObserver(provider => new ReceiveObserver());

# Consume

If the ReceiveContext isn't fascinating enough for you, perhaps the actual consumption of messages might float your boat. A consume observer implements the IConsumeObserver interface, as shown below.

namespace MassTransit
{
    using System;
    using System.Threading.Tasks;


    /// <summary>
    /// Intercepts the ConsumeContext
    /// </summary>
    public interface IConsumeObserver
    {
        /// <summary>
        /// Called before a message is dispatched to any consumers
        /// </summary>
        /// <param name="context">The consume context</param>
        /// <returns></returns>
        Task PreConsume<T>(ConsumeContext<T> context)
            where T : class;

        /// <summary>
        /// Called after the message has been dispatched to all consumers - note that in the case of an exception
        /// this method is not called, and the DispatchFaulted method is called instead
        /// </summary>
        /// <param name="context"></param>
        /// <returns></returns>
        Task PostConsume<T>(ConsumeContext<T> context)
            where T : class;

        /// <summary>
        /// Called after the message has been dispatched to all consumers when one or more exceptions have occurred
        /// </summary>
        /// <param name="context"></param>
        /// <param name="exception"></param>
        /// <returns></returns>
        Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception)
            where T : class;
    }
}

To configure a consume observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectConsumeObserver bus method can be used instead.

services.AddConsumeObserver<ConsumeObserver>();
services.AddConsumeObserver(provider => new ConsumeObserver());

# Consume Message

Okay, so it's obvious that if you've read this far you want a more specific observer, one that only is called when a specific message type is consumed. We have you covered there too, as shown below.

namespace MassTransit
{
    using System;
    using System.Threading.Tasks;


    /// <summary>
    /// Intercepts the ConsumeContext
    /// </summary>
    /// <typeparam name="T">The message type</typeparam>
    public interface IConsumeMessageObserver<in T>
        where T : class
    {
        /// <summary>
        /// Called before a message is dispatched to any consumers
        /// </summary>
        /// <param name="context">The consume context</param>
        /// <returns></returns>
        Task PreConsume(ConsumeContext<T> context);

        /// <summary>
        /// Called after the message has been dispatched to all consumers - note that in the case of an exception
        /// this method is not called, and the DispatchFaulted method is called instead
        /// </summary>
        /// <param name="context"></param>
        /// <returns></returns>
        Task PostConsume(ConsumeContext<T> context);

        /// <summary>
        /// Called after the message has been dispatched to all consumers when one or more exceptions have occurred
        /// </summary>
        /// <param name="context"></param>
        /// <param name="exception"></param>
        /// <returns></returns>
        Task ConsumeFault(ConsumeContext<T> context, Exception exception);
    }
}

To connect the observer, use the ConnectConsumeMessageObserver method before starting the bus.

The ConsumeObserver<T> interface may be deprecated at some point, it's sort of a legacy observer that isn't recommended.

# Send

Okay, so, incoming messages are not your thing. We get it, you're all about what goes out. It's cool. It's better to send than to receive. Or is that give? Anyway, a send observer is also available.

namespace MassTransit
{
    using System;
    using System.Threading.Tasks;


    /// <summary>
    /// Observes messages as they are sent to transports. These should not be used to intercept or
    /// filter messages, in that case a filter should be created and registered on the transport.
    /// </summary>
    public interface ISendObserver
    {
        /// <summary>
        /// Called before the message is sent to the transport
        /// </summary>
        /// <typeparam name="T">The message type</typeparam>
        /// <param name="context">The message send context</param>
        /// <returns></returns>
        Task PreSend<T>(SendContext<T> context)
            where T : class;

        /// <summary>
        /// Called after the message is sent to the transport (and confirmed by the transport if supported)
        /// </summary>
        /// <typeparam name="T">The message type</typeparam>
        /// <param name="context">The message send context</param>
        /// <returns></returns>
        Task PostSend<T>(SendContext<T> context)
            where T : class;

        /// <summary>
        /// Called when the message fails to send to the transport, including the exception that was thrown
        /// </summary>
        /// <typeparam name="T">The message type</typeparam>
        /// <param name="context">The message send context</param>
        /// <param name="exception">The exception from the transport</param>
        /// <returns></returns>
        Task SendFault<T>(SendContext<T> context, Exception exception)
            where T : class;
    }
}

To configure a send observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectSendObserver bus method can be used instead.

services.AddSendObserver<SendObserver>();
services.AddSendObserver(provider => new SendObserver());

# Publish

In addition to send, publish is also observable. Because the semantics matter, absolutely. Using the MessageId to link them up as it's unique for each message. Remember that Publish and Send are two distinct operations so if you want to observe all messages that are leaving your service, you have to connect both Publish and Send observers.

namespace MassTransit
{
    using System;
    using System.Threading.Tasks;


    /// <summary>
    /// Observes messages as they are published via a publish endpoint. These should not be used to intercept or
    /// filter messages, in that case a filter should be created and registered on the transport.
    /// </summary>
    public interface IPublishObserver
    {
        /// <summary>
        /// Called before the message is sent to the transport
        /// </summary>
        /// <typeparam name="T">The message type</typeparam>
        /// <param name="context">The message send context</param>
        /// <returns></returns>
        Task PrePublish<T>(PublishContext<T> context)
            where T : class;

        /// <summary>
        /// Called after the message is sent to the transport (and confirmed by the transport if supported)
        /// </summary>
        /// <typeparam name="T">The message type</typeparam>
        /// <param name="context">The message send context</param>
        /// <returns></returns>
        Task PostPublish<T>(PublishContext<T> context)
            where T : class;

        /// <summary>
        /// Called when the message fails to send to the transport, including the exception that was thrown
        /// </summary>
        /// <typeparam name="T">The message type</typeparam>
        /// <param name="context">The message send context</param>
        /// <param name="exception">The exception from the transport</param>
        /// <returns></returns>
        Task PublishFault<T>(PublishContext<T> context, Exception exception)
            where T : class;
    }
}

To configure a public observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectPublishObserver bus method can be used instead.

services.AddPublishObserver<PublishObserver>();
services.AddPublishObserver(provider => new PublishObserver());

# Bus

To observe bus life cycle events, create a class which implements IBusObserver, as shown below.

namespace MassTransit
{
    using System;
    using System.Threading.Tasks;


    /// <summary>
    /// Used to observe events produced by the bus
    /// </summary>
    public interface IBusObserver
    {
        /// <summary>
        /// Called after the bus has been created.
        /// </summary>
        /// <param name="bus"></param>
        /// <returns></returns>
        void PostCreate(IBus bus);

        /// <summary>
        /// Called when the bus fails to be created
        /// </summary>
        /// <param name="exception"></param>
        /// <returns></returns>
        void CreateFaulted(Exception exception);

        /// <summary>
        /// Called when the bus is being started, before the actual Start commences.
        /// </summary>
        /// <param name="bus"></param>
        /// <returns></returns>
        Task PreStart(IBus bus);

        /// <summary>
        /// Called once the bus has started and is running
        /// </summary>
        /// <param name="bus"></param>
        /// <param name="busReady">
        /// A task which is completed once the bus is ready and all receive endpoints are ready.
        /// </param>
        /// <returns></returns>
        Task PostStart(IBus bus, Task<BusReady> busReady);

        /// <summary>
        /// Called when the bus fails to start
        /// </summary>
        /// <param name="bus"></param>
        /// <param name="exception"></param>
        /// <returns></returns>
        Task StartFaulted(IBus bus, Exception exception);

        /// <summary>
        /// Called when the bus is being stopped, before the actual Stop commences.
        /// </summary>
        /// <param name="bus"></param>
        /// <returns></returns>
        Task PreStop(IBus bus);

        /// <summary>
        /// Called when the bus has been stopped.
        /// </summary>
        /// <param name="bus"></param>
        /// <returns></returns>
        Task PostStop(IBus bus);

        /// <summary>
        /// Called when the bus failed to Stop.
        /// </summary>
        /// <param name="bus"></param>
        /// <param name="exception"></param>
        /// <returns></returns>
        Task StopFaulted(IBus bus, Exception exception);
    }
}

To configure a bus observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddBusObserver<BusObserver>();
services.AddBusObserver(provider => new BusObserver());

# Receive Endpoint

namespace MassTransit
{
    using System.Threading.Tasks;


    /// <summary>
    /// Used to observe the events signaled by a receive endpoint
    /// </summary>
    public interface IReceiveEndpointObserver
    {
        /// <summary>
        /// Called when the receive endpoint is ready to receive messages
        /// </summary>
        /// <param name="ready"></param>
        /// <returns></returns>
        Task Ready(ReceiveEndpointReady ready);

        /// <summary>
        /// Called when the receive endpoint is being stopped, prior to actually stopping
        /// </summary>
        /// <param name="stopping"></param>
        /// <returns></returns>
        Task Stopping(ReceiveEndpointStopping stopping);

        /// <summary>
        /// Called when the receive endpoint has completed
        /// </summary>
        /// <param name="completed"></param>
        /// <returns></returns>
        Task Completed(ReceiveEndpointCompleted completed);

        /// <summary>
        /// Called when the receive endpoint faults
        /// </summary>
        /// <param name="faulted"></param>
        /// <returns></returns>
        Task Faulted(ReceiveEndpointFaulted faulted);
    }
}

To configure a receive endpoint observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddReceiveEndpointObserver<ReceiveEndpointObserver>();
services.AddReceiveEndpointObserver(provider => new ReceiveEndpointObserver());

# State Machine Event

To observe events consumed by a saga state machine, use an IEventObserver<T> where T is the saga instance type.

namespace MassTransit
{
    using System;
    using System.Threading.Tasks;


    public interface IEventObserver<TSaga>
        where TSaga : class, ISaga
    {
        /// <summary>
        /// Called before the event context is delivered to the activities
        /// </summary>
        /// <param name="context">The event context</param>
        /// <returns></returns>
        Task PreExecute(BehaviorContext<TSaga> context);

        /// <summary>
        /// Called before the event context is delivered to the activities
        /// </summary>
        /// <typeparam name="T">The event data type</typeparam>
        /// <param name="context">The event context</param>
        /// <returns></returns>
        Task PreExecute<T>(BehaviorContext<TSaga, T> context)
            where T : class;

        /// <summary>
        /// Called when the event has been processed by the activities
        /// </summary>
        /// <param name="context">The event context</param>
        /// <returns></returns>
        Task PostExecute(BehaviorContext<TSaga> context);

        /// <summary>
        /// Called when the event has been processed by the activities
        /// </summary>
        /// <typeparam name="T">The event data type</typeparam>
        /// <param name="context">The event context</param>
        /// <returns></returns>
        Task PostExecute<T>(BehaviorContext<TSaga, T> context)
            where T : class;

        /// <summary>
        /// Called when the activity execution faults and is not handled by the activities
        /// </summary>
        /// <param name="context">The event context</param>
        /// <param name="exception">The exception that was thrown</param>
        /// <returns></returns>
        Task ExecuteFault(BehaviorContext<TSaga> context, Exception exception);

        /// <summary>
        /// Called when the activity execution faults and is not handled by the activities
        /// </summary>
        /// <typeparam name="T">The message type</typeparam>
        /// <param name="context">The event context</param>
        /// <param name="exception">The exception that was thrown</param>
        /// <returns></returns>
        Task ExecuteFault<T>(BehaviorContext<TSaga, T> context, Exception exception)
            where T : class;
    }
}

To configure an event observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddEventObserver<T, EventObserver<T>>();
services.AddEventObserver<T>(provider => new EventObserver<T>());

# State Machine State

To observe state changes that happen in a saga state machine, use an IStateObserver<T> where T is the saga instance type.

namespace MassTransit
{
    using System.Threading.Tasks;


    public interface IStateObserver<TSaga>
        where TSaga : class, ISaga
    {
        /// <summary>
        /// Invoked prior to changing the state of the state machine
        /// </summary>
        /// <param name="context">The instance context of the state machine</param>
        /// <param name="currentState">The current state (after the change)</param>
        /// <param name="previousState">The previous state (before the change)</param>
        /// <returns></returns>
        Task StateChanged(BehaviorContext<TSaga> context, State currentState, State previousState);
    }
}

To configure a state observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddStateObserver<T, StateObserver<T>>();
services.AddStateObserver<T>(provider => new StateObserver<T>());