Job Consumers

When a message is delivered from the message broker to a consumer instance, the message is locked by the broker. Once the consumer completes, MassTransit will acknowledge the message on the broker, removing it from the queue. While the message is locked, it will not be delivered to another consumer – on any bus instance reading from the same queue (competing consumer pattern). However, if the broker connection is lost the message will be unlocked and redelivered to a new consumer instance. The lock timeout is usually long enough for most message consumers, and this rarely is an issue in practice for consumers that complete quickly.

However, there are plenty of use cases where consumers may run for a longer duration, from minutes to even hours. In these situations, a job consumer may be used to decouple the consumer from the broker. A job consumer is a specialized consumer designed to execute jobs, defined by implementing the IJobConsumer<T> interface where T is the job message type. Job consumers may be used for long-running tasks, such as converting a video file, but can really be used for any task. Job consumers have additional requirements, such as a database to store the job messages, manage concurrency and retry, and report job completion or failure.

MassTransit/Sample-JobConsumers

MassTransit includes a job service that keeps track of each job, assigns jobs to service instances, and schedules job retries when necessary. The job service uses three saga state machines and the default configuration uses an in-memory saga repository, which is not durable. When using job consumers for production use cases, configuring durable saga repositories is highly recommended to avoid possible message loss. Check out the sample project on GitHub, which includes the Entity Framework configuration for the job service state machines.

When should you use a Job Consumer

If you can get the work done in less than 5 minutes, don't use a job consumer. RabbitMQ, Azure, and SQS can easily take a lock on the message for 5 minutes before needing a relock. This should be plenty of time for most consumers. If you find your consumers taking longer than 5 minutes, its time to start asking the question if you should move to job consumers. There is a fair amount of bookkeeping involved with Job Consumers so be aware of the extra computational cost.

Implementation

To use job consumers, a service instance must be configured (see below).

IJobConsumer

A job consumer implements the IJobConsumer<T> interface, shown below.

public interface IJobConsumer<in TJob> :
    IConsumer
    where TJob : class
{
    Task Run(JobContext<TJob> context);
}

Configuration

The example below configures a job consumer on a receive endpoint named using an IEndpointNameFormatter passing the consumer type.

services.AddMassTransit(x =>
{
    x.AddConsumer<ConvertVideoJobConsumer>(cfg =>
    {
        cfg.Options<JobOptions<ConvertVideo>>(options => options
            .SetJobTimeout(TimeSpan.FromMinutes(15))
            .SetConcurrentJobLimit(10));
    });

    x.SetKebabCaseEndpointNameFormatter();

    x.AddJobSagaStateMachines();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

In this example, the job timeout as well as the number of concurrent jobs allowed is specified using JobOptions<T> when configuring the consumer. The job options can also be specified using a consumer definition in the same way.

The number of concurrent jobs can only be configured using the JobOptions<TJob>.SetConcurrentJobLimit(int limit) method. The ConcurrentMessageLimit on the consumer definition must not be used with job consumers.

Submitting a job

To submit jobs to the job consumer, you can use the request client as shown to request. In this example, the RequestId will be used as the JobId.

[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<ConvertVideo> client)
{
    _logger.LogInformation("Sending job: {Path}", path);

    Response<JobSubmissionAccepted> response = await client.GetResponse<JobSubmissionAccepted>(new
    {
        path
    });

    return Ok(new
    {
        response.Message.JobId,
        Path = path
    });
}
[HttpPut("{path}")]
public async Task<IActionResult> FireAndForgetSubmitJob(string path, [FromServices] IPublishEndpoint publishEndpoint)
{
    _logger.LogInformation("Sending job: {Path}", path);

    var jobId = NewId.NextGuid();

    await publishEndpoint.Publish<SubmitJob<ConvertVideo>>(new
    {
        JobId = jobId,
        Job = new
        {
            path
        }
    });

    return Ok(new
    {
        jobId,
        Path = path
    });
}

Job Service Endpoints

The job service saga state machines are configured on their own endpoints, using the configured endpoint name formatter. These endpoints are required on at least one bus instance. Additionally, it is not necessary to configure them on every bus instance. In the example above, the job service endpoint are configured. A standard ConfigureEndpoints call will suffice to host the job consumers without the job service saga state machines.

x.UsingRabbitMq((context, cfg) =>
{
    cfg.ConfigureEndpoints(context);
});

For a more detailed example of configuring the job service endpoints, including persistent storage, see the sample mentioned in the box above.