Entity Framework

alt NuGet

An example saga instance is shown below, which is orchestrated using an Automatonymous state machine. The CorrelationId will be the primary key, and CurrentState will be used to store the current state of the saga instance.

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public DateTime? OrderDate { get; set; }

    // If using Optimistic concurrency, this property is required
    public byte[] RowVersion { get; set; }
}

The instance properties are configured using a SagaClassMap.

The SagaClassMap has a default mapping for the CorrelationId as the primary key. If you create your own mapping, you must follow the same convention, or at least make it a Clustered Index + Unique, otherwise you will likely experience deadlock exceptions and/or performance issues in high throughput scenarios.
public class OrderStateMap : 
    SagaClassMap<OrderState>
{
    protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model)
    {
        entity.Property(x => x.CurrentState).HasMaxLength(64);
        entity.Property(x => x.OrderDate);

        // If using Optimistic concurrency, otherwise remove this property
        entity.Property(x => x.RowVersion).IsRowVersion();
    }
}

Include the instance map in a DbContext class that will be used by the saga repository.

public class OrderStateDbContext : 
    SagaDbContext
{
    public OrderStateDbContext(DbContextOptions options)
        : base(options)
    {
    }

    protected override IEnumerable<ISagaClassMap> Configurations
    {
        get { yield return new OrderStateMap(); }
    }
}

Configuration

Once the class map and associated DbContext class have been created, the saga repository can be configured with the saga registration, which is done using the configuration method passed to AddMassTransit. The following example shows how the repository is configured for using Microsoft Dependency Injection Extensions, which are used by default with Entity Framework Core.

services.AddMassTransit(cfg =>
{
    cfg.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion

            r.AddDbContext<DbContext, OrderStateDbContext>((provider,builder) =>
            {
                builder.UseSqlServer(connectionString, m =>
                {
                    m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                    m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
                });
            });
        });
});

Database Engines (PostgreSQL)

By default, MassTransit uses Microsoft SQL Server locking statements to handle concurrency. It is important, however, if using PostgreSQL, MySQL or Sqlite that you specify this as part of the setup of the DbContextOptionsBuilder options.

The following shows an example for PostgreSQL

services.AddMassTransit(cfg =>
{
    cfg.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Optimistic; // or use Pessimistic, which does not require RowVersion

            r.AddDbContext<DbContext, OrderStateDbContext>((provider,builder) =>
            {
                builder.UseNpgsql(connectionString, m =>
                {
                    m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                    m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
                });
            });

            //This line is added to enable PostgreSQL features
            r.UsePostgres();
        });
});

Further, in PostgreSQL, the RowVersion column is a hidden system column which already exists on every table, called xmin with a type xid. For this reason, we do not need to create a new RowVersion column when using "Optimistic" mode. Instead, we simply bind our RowVersion property to a uint type, and apply the correct mappings in our OrderStateMap class.

The example below shows the original OrderState model, using a PostgreSQL RowVersion

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public DateTime? OrderDate { get; set; }

    // If using Optimistic concurrency, this property is required
    public uint RowVersion { get; set; }
}

The state mapping must also be modified to use the xmin column of PostgreSQL

public class OrderStateMap : 
    SagaClassMap<OrderState>
{
    protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model)
    {
        entity.Property(x => x.CurrentState).HasMaxLength(64);
        entity.Property(x => x.OrderDate);

        // If using Optimistic concurrency, otherwise remove this property
        entity.Property(x => x.RowVersion)
            .HasColumnName("xmin")
            .HasColumnType("xid")
            .IsRowVersion()
    }
}

Shared DbContext

A single DbContext can be registered in the container which can then be used to configure sagas that are mapped by the DbContext. For example, Job Consumers needs three saga repositories, and the Entity Framework Core package includes the JobServiceSagaDbContext which can be configured using the AddSagaRepository method as shown below.

services.AddDbContext<JobServiceSagaDbContext>(builder =>
    builder.UseNpgsql(Configuration.GetConnectionString("JobService"), m =>
    {
        m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
        m.MigrationsHistoryTable($"__{nameof(JobServiceSagaDbContext)}");
    }));

services.AddMassTransit(x =>
{
    x.AddSagaRepository<JobSaga>()
        .EntityFrameworkRepository(r =>
        {
            r.ExistingDbContext<JobServiceSagaDbContext>();
            r.LockStatementProvider = new PostgresLockStatementProvider();
        });
    x.AddSagaRepository<JobTypeSaga>()
        .EntityFrameworkRepository(r =>
        {
            r.ExistingDbContext<JobServiceSagaDbContext>();
            r.LockStatementProvider = new PostgresLockStatementProvider();
        });
    x.AddSagaRepository<JobAttemptSaga>()
        .EntityFrameworkRepository(r =>
        {
            r.ExistingDbContext<JobServiceSagaDbContext>();
            r.LockStatementProvider = new PostgresLockStatementProvider();
        });

    // other configuration, such as consumers, etc.
});

The above code using the standard Entity Framework configuration extensions to add the DbContext to the container, using PostgreSQL. Because the job service state machine receive endpoints are configured by ConfigureJobServiceEndpoints, the saga repositories must be configured separately. The AddSagaRepository method is used to register a repository for a saga that has already been added, and uses the same extension methods as the AddSaga and AddSagaStateMachine methods.

Once configured, the job service sagas can be configured as shown below.

cfg.ServiceInstance(options, instance =>
{
    instance.ConfigureJobServiceEndpoints(js =>
    {
        js.ConfigureSagaRepositories(context);
    });
});
MassTransit/Sample-JobConsumers

MassTransit includes a job service that keeps track of each job, assigns jobs to service instances, and schedules job retries when necessary. The job service uses three saga state machines and the default configuration uses an in-memory saga repository, which is not durable. When using job consumers for production use cases, configuring durable saga repositories is highly recommended to avoid possible message loss. Check out the sample project on GitHub, which includes the Entity Framework configuration for the job service state machines.

The sample above is a working example of this configuration style.

Multiple DbContext

Multiple DbContext can be registered in the container which can then be used to configure sagas that are mapped by the DbContext and injected into other components. Calling the AddDbContext extension method will register a scoped DbContext by default. For simple scenarios where there is a single DbContext this will work. However, in scenarios where there is at least one other DbContext the dotnet command that generates Entity Framework migrations will not work. To resolve this issue, you'll need to perform the following steps:

  1. Make sure that all DbContext has a constructor that takes DbContextOptions<TOptions> instead of DbContextOptions.
  2. Run the Entity Framework Core command to create your migrations as shown below.
    dotnet ef migrations add InitialCreate -c JobServiceSagaDbContext
    
  3. Run the Entity Framework Core command to sync with the database as shown below.
    dotnet ef database update -c JobServiceSagaDbContext