Routing Slip

Developing applications using a distributed, message-based architecture significantly increases the complexity of performing transactions, where an end-to-end set of steps must be completed entirely, or not at all. In an application using an ACID database, this is typically done using SQL transactions, where partial operations are rolled back if the transaction cannot be completed. However, this doesn't scale when the steps being to include dependencies outside of a single database. And in the distributed, microservices based architectures, the use of a single ACID database is shrinking to completely non-existent.

MassTransit Courier is a mechanism for creating and executing distributed transactions with fault compensation that can be used to meet the requirements previously within the domain of database transactions, but built to scale across a large system of distributed services. Courier also works well with MassTransit sagas, which add transaction monitoring and recoverability.

MassTransit implements the Routing Slip pattern. Leveraging a durable messaging transport and the advanced saga features of MassTransit, routing slips provide a powerful set of components to simplify the use of routing slips in distributed applications. Combining the routing slip pattern with a saga state machine results in a reliable, recoverable, and supportable approach for coordinating and monitoring message processing across multiple services.

In addition to the basic routing slip pattern, MassTransit also supports compensations which allow activities to store execution data so that reversible operations can be undone, using either a traditional rollback mechanism or by applying an offsetting operation. For example, an activity that holds a seat for a patron could release the held seat when compensated.

Activities

In MassTransit Courier, an Activity refers to a processing step that can be added to a routing slip.

To create an activity, create a class that implements the IActivity interface.

public class DownloadImageActivity :
    IActivity<DownloadImageArguments, DownloadImageLog>
{
    Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> context);
    Task<CompensationResult> Compensate(CompensateContext<DownloadImageLog> context);
}

The IActivity interface is generic with two arguments. The first parameter specifies the activity’s argument type and the second parameter specifies the activity’s log type. In the example shown above, DownloadImageArguments is the argument type and DownloadImageLog is the log type. Both parameters may be interface, class or record types. Where the type is a class or a record, the proper accessors should be specified (i.e. { get; set; } or { get; init; }).

Execute Activities

An Execute Activity is an activity that only executes and does not support compensation. As such, the declaration of a log type is not required.

public class ValidateImageActivity :
    IExecuteActivity<ValidateImageArguments>
{
    Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> context);
}

Implementing

An activity must implement two interface methods, Execute and Compensate. The Execute method is called while the routing slip is executing activities and the Compensate method is called when a routing slip faults and needs to be compensated.

When the Execute method is called, an execution argument is passed containing the activity arguments, the routing slip TrackingNumber, and methods to mark the activity as completed or faulted. The actual routing slip message, as well as any details of the underlying infrastructure, are excluded from the execution argument to prevent coupling between the activity and the implementation. An example Execute method is shown below.

async Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> execution)
{
    DownloadImageArguments args = execution.Arguments;
    string imageSavePath = Path.Combine(args.WorkPath, 
        execution.TrackingNumber.ToString());

    await _httpClient.GetAndSave(args.ImageUri, imageSavePath);

    return execution.Completed<DownloadImageLog>(new {ImageSavePath = imageSavePath});
}

Completing

Once activity processing is complete, the activity returns an ExecutionResult to the host. If the activity executes successfully, the activity can elect to store compensation data in an activity log which is passed to the Completed method on the execution argument. If the activity chooses not to store any compensation data, the activity log argument is not required. In addition to compensation data, the activity can add or modify variables stored in the routing slip for use by subsequent activities.

In the example above, the activity specifies the DownloadImageLog interface and initializes the log using an anonymous object. The object is then passed to the Completed method for storage in the routing slip before sending the routing slip to the next activity.

Terminating

In some situations, it may make sense to terminate the routing slip without executing any of the subsequent activities in the itinerary. This might be due to a business rule, in which the routing slip shouldn't be faulted, but needs to end immediately.

To terminate a routing slip, call Terminate as shown.

// regular termination
return execution.Terminate();

// terminate and include additional variables in the event
return execution.Terminate(new { Reason = "Not a good time, dude."});

Faulting

By default, if an activity throws an exception, it will be faulted and a RoutingSlipFaulted event will be published (unless a subscription changes the rules). An activity can also return Faulted rather than throwing an exception.

Compensating

When an activity fails, the Compensate method is called for previously executed activities in the routing slip that stored compensation data. If an activity does not store any compensation data, the Compensate method is never called. The compensation method for the example above is shown below.

Task<CompensationResult> Compensate(CompensateContext<DownloadImageLog> compensation)
{
    DownloadImageLog log = compensation.Log;
    File.Delete(log.ImageSavePath);

    return compensation.Compensated();
}

Using the activity log data, the activity compensates by removing the downloaded image from the work directory. Once the activity has compensated the previous execution, it returns a CompensationResult by calling the Compensated method. If the compensating actions could not be performed (either via logic or an exception) and the inability to compensate results in a failure state, the Failed method can be used instead, optionally specifying an Exception.

Using a Routing Slip

A routing slip specifies a sequence of processing steps called activities that are combined into a single transaction. As each activity completes, the routing slip is forwarded to the next activity in the itinerary. When all activities have completed, the routing slip is completed and the transaction is complete.

A key advantage to using a routing slip is it allows the activities to vary for each transaction. Depending upon the requirements for each transaction, which may differ based on things like payment methods, billing or shipping address, or customer preference ratings, the routing slip builder can selectively add activities to the routing slip. This dynamic behavior is in contrast to a more explicit behavior defined by a state machine or sequential workflow that is statically defined (either through the use of code, a DSL, or something like Windows Workflow).

Building

A routing slip contains an itinerary, variables, and activity/compensation logs. It is defined by a message contract, which is used by the underlying Courier components to execute and compensate the transaction. The routing slip contract includes:

  • A tracking number, which should be unique for each routing slip
  • An itinerary, which is an ordered list of activities
  • An activity log, containing an ordered list of previously executed activities
  • A compensation log, containing an order list of previous executed activities which may be compensated if the routing slip faults
  • A collection of variables, which can be mapped to activity arguments
  • A collection of subscriptions, which can be added to notify consumers of routing slip events
  • A collection of exceptions which may have occurred during routing slip execution

Developers are discouraged from directly implementing the RoutingSlip message type and should instead use a RoutingSlipBuilder to create a routing slip. The RoutingSlipBuilder encapsulates the creation of the routing slip and includes methods to add activities (and their arguments), activity logs, and variables to the routing slip. For example, to create a routing slip with two activities and an additional variable, a developer would write:

var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DownloadImage", new Uri("rabbitmq://localhost/execute_downloadimage"), 
    new
    {
        ImageUri = new Uri("http://images.google.com/someImage.jpg")
    });
builder.AddActivity("FilterImage", new Uri("rabbitmq://localhost/execute_filterimage"));
builder.AddVariable("WorkPath", @"\dfs\work");

var routingSlip = builder.Build();

Each activity requires a name for display purposes and a URI specifying the execution address. The execution address is where the routing slip should be sent to execute the activity. For each activity, arguments can be specified that are stored and presented to the activity via the activity arguments interface type specify by the first argument of the IActivity interface. The activities added to the routing slip are combined into an Itinerary, which is the list of activities to be executed, and stored in the routing slip.

Managing the inventory of available activities, as well as their names and execution addresses, is the responsibility of the application and is not part of the MassTransit Courier. Since activities are application specific, and the business logic to determine which activities to execute and in what order is part of the application domain, the details are left to the application developer.

Activity Arguments

Each activity declares an activity argument type, which must be an interface. When the routing slip is received by an activity host, the argument type is used to read data from the routing slip and deliver it to the activity.

The argument properties are mapped, by name, to the argument type from the routing slip using:

  • Explicitly declared arguments, added to the itinerary with the activity
  • Implicitly mapped arguments, added as variables to the routing slip

To specify an explicit activity argument, specify the argument value while adding the activity using the routing slip builder.

var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DownloadImage", new Uri("rabbitmq://localhost/execute_downloadimage"), new
    {
        ImageUri = new Uri("http://images.google.com/someImage.jpg")
    });

To specify an implicit activity argument, add a variable to the routing slip with the same name/type as the activity argument.

var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DownloadImage", new Uri("rabbitmq://localhost/execute_downloadimage"));
builder.AddVariable("ImageUri", "http://images.google.com/someImage.jpg");

If an activity argument is not specified when the routing slip is created, it may be added by an activity that executes prior to the activity that requires the argument. For instance, if the DownloadImage activity stored the image in a local cache, that address could be added and used by another activity to access the cached image.

First, the routing slip would be built without the argument value.

var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DownloadImage", new Uri("rabbitmq://localhost/execute_downloadimage"));
builder.AddActivity("ProcessImage", new Uri("rabbitmq://localhost/execute_processimage"));
builder.AddVariable("ImageUri", "http://images.google.com/someImage.jpg");

Then, the first activity would add the variable to the routing slip on completion.

async Task<ExecutionResult> Execute(ExecuteContext<DownloadImageArguments> context)
{
    ...
    return context.CompletedWithVariables(new { ImagePath = ...});
}

The process image activity would then use that variable as an argument value.

async Task<ExecutionResult> Execute(ExecuteContext<ProcessImageArguments> context)
{
    var path = context.Arguments.ImagePath;
}

Executing

Once built, the routing slip is executed, which sends it to the first activity’s execute URI. To make it easy and to ensure that source information is included, an extension method on IBus is available, the usage of which is shown below.

await bus.Execute(routingSlip);

It should be pointed out that if the address for the first activity is invalid or cannot be reached, an exception will be thrown by the Execute method.

Routing Slip Events

During routing slip execution, events are published when the routing slip completes or faults. Every event message includes the TrackingNumber as well as a Timestamp (in UTC, of course) indicating when the event occurred:

  • RoutingSlipCompleted
  • RoutingSlipFaulted
  • RoutingSlipCompensationFailed

Additional events are published for each activity, including:

  • RoutingSlipActivityCompleted
  • RoutingSlipActivityFaulted
  • RoutingSlipActivityCompensated
  • RoutingSlipActivityCompensationFailed

By observing these events, an application can monitor and track the state of a routing slip. To maintain the current state, an Automatonymous state machine could be created. To maintain history, events could be stored in a database and then queried using the TrackingNumber of the routing slip.

Subscriptions

By default, routing slip events are published -- which means that any subscribed consumers will receive the events. While this is useful getting started, it can quickly get out of control as applications grow and multiple unrelated routing slips are used. To handle this, subscriptions were added (yes, added, because they weren't though of until we experienced this ourselves).

Subscriptions are added to the routing slip at the time it is built using the RoutingSlipBuilder.

builder.AddSubscription(new Uri("rabbitmq://localhost/log-events"), 
    RoutingSlipEvents.All);

This subscription would send all routing slip events to the specified endpoint. If the application only wanted specified events, the events can be selected by specifying the enumeration values for those events. For example, to only get the RoutingSlipCompleted and RoutingSlipFaulted events, the following code would be used.

builder.AddSubscription(new Uri("rabbitmq://localhost/log-events"), 
    RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted);

It is also possible to tweak the content of the events to cut down on message size. For instance, by default, the RoutingSlipCompleted event includes the variables from the routing slip. If the variables contained a large document, that document would be copied to the event. Eliminating the variables from the event would reduce the message size, thereby reducing the traffic on the message broker. To specify the contents of a routing slip event subscription, an additional argument is specified.

builder.AddSubscription(new Uri("rabbitmq://localhost/log-events"), 
    RoutingSlipEvents.Completed, RoutingSlipEventContents.None);

This would send the RoutingSlipCompleted event to the endpoint, without any of the variables be included (only the main properties of the event would be present).

Once a subscription is added to a routing slip, events are no longer published -- they are only sent to the addresses specified in the subscriptions. However, multiple subscriptions can be specified -- the endpoints just need to be known at the time the routing slip is built.

Custom

It is also possible to specify a subscription with a custom event, a message that is created by the application developer. This makes it possible to create your own event types and publish them in response to routing slip events occurring. And this includes having the full context of a regular endpoint Send so that any headers or context settings can be applied.

To create a custom event subscription, use the overload shown below.

// first, define the event type in your assembly
public record OrderProcessingCompleted
{
    public Guid TrackingNumber { get; init; }
    public DateTime Timestamp { get; init; }

    public string OrderId { get; init; }
    public string OrderApproval { get; init; }
}

// then, add the subscription with the custom properties
builder.AddSubscription(new Uri("rabbitmq://localhost/order-events"), 
    RoutingSlipEvents.Completed, 
    x => x.Send<OrderProcessingCompleted>(new
    {
        OrderId = "BFG-9000",
        OrderApproval = "ComeGetSome"
    }));

In the message contract above, there are four properties, but only two of them are specified. By default, the base RoutingSlipCompleted event is created, and then the content of that event is merged into the message created in the subscription. This ensures that the dynamic values, such as the TrackingNumber and the Timestamp, which are present in the default event, are available in the custom event.

Custom events can also select with contents are merged with the custom event, using an additional method overload.