Persisting Saga Instances

Sagas are stateful event-based message consumers -- they retain state. Therefore, saving state between events is important. Without persistent state, a saga would consider each event a new event, and orchestration of subsequent events would be meaningless.

Specifying saga persistence

In order to store the saga state, you need to use one form of saga persistence. There are several types of storage that MassTransit supports, all of those, which are included to the main distribution, are listed below. There is also a in-memory unreliable storage, which allows to temporarily store your saga state. It is useful to try things out since it does not require any infrastructure.

Simple initialization of a state machine saga with persistence looks like this:

var sagaStateMachine = new ShoppingCartStateMachine();
var repository = new InMemorySagaRepository<ShoppingCart>();
var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
    var host = x.Host(new Uri("rabbitmq://localhost"), h =>
    {
        h.Username("guest");
        h.Password("guest");
    });

    x.ReceiveEndpoint(host, "shopping_cart_state", e =>
    {
        e.StateMachineSaga(sagaStateMachine, repository);
    });
});

It is important to notice that the saga repository object is a singleton. It does not hold any state inside the class instance and only performs operations on the saga state objects that are send to it to persist and retrieve.

There are two types of saga repository:

  • Query repository
  • Identity-only repository

Depending on the persistence mechanism, repository implementation can be either identity-only or identity plus query.

When using identity-only repository, such as Azure Service Bus message session or Redis, you can only use correlation by identity. This means that all events that the saga receives, must hold the saga correlation id, and the correlation for each event can only use CorrelateById method to define the correlation.

Query repository by definition support identity correlation too, but in addition support other properties of events being received and saga state properties. Such correlations are defined using CorrelateBy method and you can use any logical expression that involve the event data and saga state data to establish such correlation. Repository implementation such as Entity Framework, NHibernate and Marten support correlation by query. Of course, in-memory repository supports it as well.

Identity

Saga instances are identified by a unique identifier (Guid), represented by the CorrelationId on the saga instance. Events are correlated to the saga instance using either the unique identifier, or alternatively using an expression that correlates properties on the saga instance to each event. If the CorrelationId is used, it's always a one-to-one match, either the saga already exists, or it's a new saga instance. With a correlation expression, the expression might match to more than one saga instance, so care should be used -- because the event would be delivered to all matching instances.

Seriously, don't sent an event to all instances -- unless you want to watch your messages consumers lock your entire saga storage engine.

It is strongly advised to have CorrelationId as your table/document key. This will enable better concurrency handling and will make the saga state consistent.

Publishing and Sending From Sagas

Sagas are completely message-driven and therefore not only consume but also publish events and send commands. However, if your saga received a lot of messages coming roughly at the same time and the endpoint is set to process multiple messages in parallel - this can lead to a conflict between message processing and saga persistence.

This means that there could be more than one saga state updates that are being persisted at the same time. Depending on the saga repository type, this might fail for different reasons - versioning issue, row or table lock or eTag mismatch. All those problems are basically saying that you are having a concurrency issue.

It is normal for the saga repository to throw an exception in such case but if your saga is publishing messages, they were already published but the saga state has not been updated. MassTransit will eventually use retry policy on the endpoint and more messages will be send, potentially leading to mess. Or, if there are no retry policies configured, messages might be sent indicating that the process needs to continue but saga instance will be in the old state and will not accept any further messages because they will come in a wrong state.

This issue is common and can be solved by postponing the message publish and send operations until all persistence work is done. All messages that should be published, are collected in a buffer, which is called Outbox. MassTransit implements this feature and it can be configured by adding these lines to your endpoint configuration:

c.ReceiveEndpoint("queue", e =>
{
    e.UseInMemoryOutbox();
    // other endpoint configuration here
}

Storage Engines

MassTransit supports several storage engines, including NHibernate, Entity Framework, MongoDB and Redis. Each of these are setup in a similar way, but examples are shown below for each engine.

Optimistic vs pessimistic concurrency

Most persistence mechanisms for sagas supported by MassTransit need some way to guarantee ACID when processing sagas. Because there can be multiple threads consuming multiple bus events meant for the same saga instance, they could end up overwriting each other (race condition).

Relational databases can easily handle this by setting the transaction type to serializable or (page/row) locking. This would be considered as pessimistic concurrency.

Another way to handle concurrency is to have some attribute like version or timestamp, which updates every time a saga is persisted. By doing that we can instruct the database only to update the record if this attribute matches between what we are trying to persist and what is stored in the database record we are trying to update.

This is type of concurrency is called an optimistic concurrency. It doesn't guarantee your unit of work with the database will succeed (must retry after these exceptions), but it also doesn't block anybody else from working within the same database page (not locking the table/page).

So, which one should I use?

For almost every scenario, it is recommended using the optimistic concurrency, because most state machine logic should be fairly quick.

If the chosen persistence method supports optimistic concurrency, race conditions can be handled rather easily by specifying a retry policy for concurrency exceptions or using generic retry policy.

Entity Framework

Entity Framework seems to be the most common ORM for class-SQL mappings, and SQL is still widely used for storing data. So it's a win to have it supported out of the box by MassTransit.

Concurrency handling

Fortunately, Entity Framework is a repository pattern itself, and has a column type [Timestamp] (a.k.a. IsRowVersion with fluent mapping), which will check the value of the column when it starts it's "unit of work" and if that value is the same when updating that row in the database - everybody is happy! But, if that column value is different, then it has been updated elsewhere. So, the Entity Framework will throw a DbUpdateConcurrencyException, which can be handled by a retry policy to fix the concurrency violation.

Configuration and usage

The code-first mapping example below shows the basics of getting started. The lines have been commented where the additional optimistic concurrency column is needed.

public class SagaInstance : SagaStateMachineInstance
{
    public SagaInstance(Guid correlationId)
    {
        CorrelationId = correlationId;
    }

    protected SagaInstance()
    {
    }

    public string CurrentState { get; set; }
    public string ServiceName { get; set; }
    public Guid CorrelationId { get; set; }
    public byte[] RowVersion { get; set; } // For Optimistic Concurrency
}

public class SagaInstanceMap : SagaClassMapping<SagaInstance>
{
    public SagaInstanceMap()
    {
        Property(x => x.CurrentState);
        Property(x => x.ServiceName, x => x.Length(40));
        Property(x => x.RowVersion).IsRowVersion(); // For Optimistic Concurrency
    }
}

Important: The SagaClassMapping has default mapping for the CorrelationId as a database generated primary key. If you use your own mapping, you must follow the same convention, otherwise there is a big chance to get deadlock exceptions in case of high throughput.

The repository is then created on the context factory:

SagaDbContextFactory contextFactory = () => 
    new SagaDbContext<SagaInstance, SagaInstanceMap>(_connectionString);

var repository = new EntityFrameworkSagaRepository<SagaInstance>(
    contextFactory, optimistic: true); // true For Optimistic Concurrency, false is default for pessimistic

Lastly, the snippet below is only needed for optimistic concurrency, because the saga should retry processing if failure occurred when writing to the database. This snippet is adding retry policies middleware to the saga receive endpoint from the first section.

x.ReceiveEndpoint(host, "shopping_cart_state", e =>
{
    e.UseRetry(x => 
        {
            x.Handle<DbUpdateConcurrencyException>();
            x.Interval(5, TimeSpan.FromMilliseconds(100)));
        }); // Add the retry middleware for optimistic concurrency
    e.StateMachineSaga(sagaStateMachine, repository);
});

Hence, that if you have retry policy without an exception filter, it will also handle the concurrency exception, so explicit configuration is not required in this case.

MongoDB

MongoDB is an easy to use saga repository, because setup is easy. There is no need for class mapping, the saga instances can be persisted easily using a MongoDB collection.

Concurrency

MongoDb saga persistence requires that saga instance classes implement IVersionedSaga interface. This interface has a Version property, which allows the saga persistence to handle optimistic concurrency.

Configuration and usage

public class SagaInstance : SagaStateMachineInstance, IVersionedSaga
{
    public SagaInstance(Guid correlationId)
    {
        CorrelationId = correlationId;
    }

    protected SagaInstance() { }

    public string CurrentState { get; set; }
    public string ServiceName { get; set; }
    public Guid CorrelationId { get; set; }
    public int Version { get; set; }
}

The saga repository is created using the simple syntax:

var database = new MongoClient("mongodb://127.0.0.1").GetDatabase("sagas");
var repository = new MongoDbSagaRepository<SagaInstance>(database);

Each saga instance will be placed in a collection specific to the instance type.

NHibernate

Although NHibernate is not being actively developed recently, it is still widely used and is supported by MassTransit for saga storage. The example below shows the code-first approach to using NHibernate for saga persistence.

Concurrency

NHibernate natively supports multiple concurrency handling mechanisms. The easiest is probably adding a Version property of type int to the saga instance class and map it to the column with the same name. NHibernate will use it by default.

Configuration and usage

public class SagaInstance : SagaStateMachineInstance
{
    public SagaInstance(Guid correlationId)
    {
        CorrelationId = correlationId;
    }

    protected SagaInstance() { }

    public string CurrentState { get; set; }
    public string ServiceName { get; set; }
    public Guid CorrelationId { get; set; }
    public int Version { get; set; } // for optimistic concurrency
}

public class SagaInstanceMap : SagaClassMapping<SagaInstance>
{
    public SagaInstanceMap()
    {
        Property(x => x.CurrentState);
        Property(x => x.ServiceName, x => x.Length(40));
        Property(x => x.Version); // for optimistic concurrency
    }
}

The SagaClassMapping base class maps the CorrelationId of the saga, and handles some of the basic bootstrapping of the class map. All other properties, including the property for the CurrentState (if you're using state machine sagas), must be mapped by the developer. Once mapped, the ISessionFactory can be created using NHibernate directly. From the session factory, the saga repository can be created.

Important: The SagaClassMapping has default mapping for the CorrelationId as a database generated primary key. If you use your own mapping, you must follow the same convention, otherwise there is a big chance to get deadlock exceptions in case of high throughput.

ISessionFactory sessionFactory = CreateSessionFactory();
var repository = new NHibernateSagaRepository<SagaInstance>(sessionFactory);

Redis

Redis is a very popular key-value store, which is known for being very fast.

Redis does not support queries, therefore Redis saga persistence only supports correlation by id. If you try to use correlation by expressions, you will get a "not implemented" exception.

Saga persistence for Redis uses ServiceStack.Redis library and it support both BSD-licensed v3.9.71 and the latest commercial versions as well.

Saga instance class must implement IHasGuid interface and the Id property, that must return the value of the CorrelationId property:

public class SagaInstance : 
    SagaStateMachineInstance, IHasGuidId
{
    public Guid CorrelationId { get; set; }
    public Guid Id => CorrelationId;
    public string CurrentState { get; set; }

    public string CustomData { get; set; }
}

Concurrency

Redis saga persistence does not acquire locking on the database record when writing it so potentially you can have write conflict in case the saga is updating its state frequently (hundreds of times per second). To resolve this, the saga instance can implement the IVersionedSaga interface and include the Version property:

public int Version { get; set; }

When version of the instance that is being updated will be lower than the expected version, the saga repository will trow an exception and force the message to be retried, potentially resolving the issue.

Redis client initialization

The Redis saga repository requires ServiceStack.Redis.IRedisClientsManager as constructor parameter. For containerless initialization the code would look like:

var redisConnectionString = "redis://localhost:6379";
var repository = new RedisSagaRepository<SagaInstance>(
    new RedisManagerPool(redisConnectionString));

If you use a container, you can use the code like this (example for Autofac):

var redisConnectionString = "redis://localhost:6379";
builder.Register<IRedisClientsManager>(c => 
    new RedisManagerPool(redisConnectionString)).SingleInstance();
builder.RegisterGeneric(typeof(RedisSagaRepository<>))
    .As(typeof(ISagaRepository<>)).SingleInstance();

Marten

Marten is an open-source library that provides an API to the PostgreSQL JSONB storage, influenced by RavenDb client API. It allows to use PotgreSQL as schema-less NoSQL document storage. Unlike typical document databases, PostgreSQL JSONB storage provides you the ACID-compliant transactional store with full consistency.

To use Marten and PostgreSQL as saga persistence, you need to install MassTransit.Marten NuGet package and add some code.

First, your saga state class needs to mark the correlationId property with the [Identity] attribute. By this you inform Marten that correlationId will be used as the primary key.

public class SampleSaga : ISaga
{
    [Identity]
    public Guid CorrelationId { get; set; }
    public string State { get; set; }
    public string SomeProperty { get; set; }
}

Then you need to initialize the document store and the repository. Repository needs the store as its constructor parameter.

var connectionString =
    "server=localhost;port=5432;database=test;user id=test;password=test;";
var store = DocumentStore.For(connectionString);
var repository = new MartenSagaRepository<SampleSaga>(store);

If you use a container, you can use the code like this (example for Autofac):

var connectionString =
    "server=localhost;port=5432;database=test;user id=test;password=test;";
builder.Register<IDocumentStore>(c => DocumentStore.For(connectionString);
builder.RegisterGeneric(typeof(MartenSagaRepository<>))
    .As(typeof(ISagaRepository<>)).SingleInstance();

Marten will create necessary tables for you. This type of saga repository supports correlation by id and custom expressions.

Concurrency

Marten supports optimistic concurrency by using an eTag-like version field in the metadata. This means that the saga instance class does not need any additional fields for version.

There are two ways to use this feature:

1) Apply [UseOptimisticConcurrency] attribute to the saga instance class

[UseOptimisticConcurrency]
public class SampleSaga : ISaga
{
    [Identity]
    public Guid CorrelationId { get; set; }
    ...
}
  1. Configure the store to use optimistic concurrency for your saga instance class

    var store = DocumentStore.For(_ =>
    {
      // Adds optimistic concurrency checking to Issue
      _.Schema.For<SampleSaga>().UseOptimisticConcurrency(true);
    });
    

Azure Service Bus

Azure Service Bus provides a feature called message sessions, to process multiple messages at once and to store some state on a temporary basis, which can be retrieved by some key.

The latter give us an ability to use this feature as saga state storage. Using message sessions as saga persistence, you can only use Azure Service Bus for both messaging and saga persistence purposes, without needing any additional infrastructure.

There is a limitation for using message sessions - this feature is not supported for AMQP transport.

You have to explicitly enable message sessions when configuring the endpoint, and use parameterless constructor to instantiate the saga repository.

Here is the basic sample of how to use the Azure Service Bus message session as saga repository:

var sagaStateMachine = new MySagaStateMachine();
var repository = new MessageSessionSagaRepository<MySaga>(); 
sbc.ReceiveEndpoint(host, "test_queue", ep =>
{
    ep.RequiresSession = true;
    ep.StateMachineSaga(sagaStateMachine, repository);
});

As mentioned before, the message session allows storing and retrieving any state by some unique key. This means that this type of saga persistence only support correlation by id. So, similar to Redis saga persistence, you cannot use CorrelateBy to specify how to find the saga instance, but only CorrelateById.

results matching ""

    No results matching ""