Consumer Sagas

A consumer saga is a class, identified by a CorrelationId, that defines the state persisted by a saga repository. Along with the state, interfaces may be added to the saga class to define the events handled by the saga. This combination of state and behavior in a single class is a consumer saga. In the example below, an order saga initiated by a SubmitOrder message is defined.

Interfaces

InitiatedBy

public record SubmitOrder :
    CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; init; }
    public DateTime OrderDate { get; init; }
}

public class OrderSaga :
    ISaga,
    InitiatedBy<SubmitOrder>
{
    public Guid CorrelationId { get; set; }

    public DateTime? SubmitDate { get; set; }
    public DateTime? AcceptDate { get; set; }

    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        SubmitDate = context.Message.OrderDate;
    }
}

When a SubmitOrder message is received by the saga's receive endpoint, the CorrelationId property is used to determine if an existing saga instance with that CorrelationId exists. If an existing instance is not found, the repository creates a new saga instance and calls the Consume method on the new instance. After the Consume method completes, the repository saves the newly created instance.

Orchestrates

To define an event orchestrated by an existing saga instance, such as OrderAccepted, an additional interface and method is specified.

public record OrderAccepted :
    CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; init; }
    public DateTime Timestamp { get; init; }
}

public class OrderSaga :
    ISaga,
    InitiatedBy<SubmitOrder>,
    Orchestrates<OrderAccepted>,
{
    public Guid CorrelationId { get; set; }

    public DateTime? SubmitDate { get; set; }
    public DateTime? AcceptDate { get; set; }

    public async Task Consume(ConsumeContext<SubmitOrder> context) {...}

    public async Task Consume(ConsumeContext<OrderAccepted> context)
    {
        AcceptDate = context.Message.Timestamp;
    }
}

InitiatedByOrOrchestrates

To define an event that can initiate a new or orchestrate an existing saga instance, such as OrderInvoiced, an additional interface and method is specified.

public record OrderInvoiced :
    CorrelatedBy<Guid>
{
    public Guid CorrelationId { get; init; }
    public DateTime Timestamp { get; init; }
    public decimal Amount { get; init; }
}

public class OrderPaymentSaga :
    ISaga,
    InitiatedByOrOrchestrates<OrderInvoiced>
{
    public Guid CorrelationId { get; set; }

    public DateTime? InvoiceDate { get; set; }
    public decimal? Amount { get; set; }

    public async Task Consume(ConsumeContext<OrderInvoiced> context)
    {
        InvoiceDate = context.Message.Timestamp;
        Amount = context.Message.Amount;
    }
}

Observes

To define an event observed by an existing saga instance that does not implement the CorrelatedBy interface, such as OrderShipped, an additional interface and method is specified.

public record OrderShipped
{
    public Guid OrderId { get; init; }
    public DateTime ShipDate { get; init; }
}

public class OrderSaga :
    ISaga,
    InitiatedBy<SubmitOrder>,
    Orchestrates<OrderAccepted>,
    Observes<OrderShipped, OrderSaga>
{
    public Guid CorrelationId { get; set; }

    public DateTime? SubmitDate { get; set; }
    public DateTime? AcceptDate { get; set; }
    public DateTime? ShipDate { get; set; }

    public async Task Consume(ConsumeContext<SubmitOrder> context) {...}
    public async Task Consume(ConsumeContext<OrderAccepted> context) {...}

    public async Task Consume(ConsumeContext<OrderShipped> context)
    {
        ShipDate = context.Message.ShipDate;
    }

    public Expression<Func<OrderSaga, OrderShipped, bool>> CorrelationExpression =>
        (saga,message) => saga.CorrelationId == message.OrderId;
}

Configuration

To add a saga when configuring MassTransit, use the AddSaga method shown below.

services.AddMassTransit(x =>
{
    x.AddSaga<OrderSaga>()
        .InMemoryRepository();
});

Supported saga repositories are documented in the configuration section.