# Job Consumers

Message consumers are intended to be used similarly to web controllers. Consumer instances are meant to live for a relatively short time, usually the time it takes to consume a single message. There are, however, scenarios where the processing initiated by a message takes a long time (like, maybe more than a couple of minutes). Instead of waiting for the processing to complete, preventing subsequent message consumption, job consumers are created and run outside of the message transport (but still initiated by a message).

A job consumer is a specialized consumer type designed to execute jobs, and is defined by implementing the IJobConsumer<T> interface where T is the job message type. Job consumers are typically used for long-running tasks, such as converting a video file, but can be used for any task. To use job consumers, Conductor, which is a set of managed services included with MassTransit, must be configured.

New and Improved

Job Consumers replace Turnout, a previous feature of MassTransit, which was poorly supported and very limited. While Turnout is now deprecated, job consumers offer a much better developer experience and are better integrated.

# IJobConsumer

namespace MassTransit.JobService
{
    using System.Threading.Tasks;


    /// <summary>
    /// Defines a message consumer which runs a job asynchronously, without waiting, which is monitored by Conductor
    /// services, to monitor the job, limit concurrency, etc.
    /// </summary>
    /// <typeparam name="TJob">The job message type</typeparam>
    public interface IJobConsumer<in TJob> :
        IConsumer
        where TJob : class
    {
        Task Run(JobContext<TJob> context);
    }
}

# Configuration

The example below configures a job consumer on a receive endpoint named using an IEndpointNameFormatter passing the consumer type.

namespace JobSystem.Jobs
{
    using System;

    public interface ConvertVideo
    {
        Guid VideoId { get; }
        string Format { get; }
    }
}

namespace JobSystemConsoleService
{
    using System;
    using System.Threading.Tasks;
    using JobSystem.Jobs;
    using MassTransit;
    using MassTransit.Conductor;
    using MassTransit.JobService;

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var options = new ServiceInstanceOptions()
                    .EnableInstanceEndpoint();

                cfg.ServiceInstance(options, instance =>
                {
                    instance.ConfigureJobServiceEndpoints();

                    var queueName = instance.EndpointNameFormatter.Consumer<ConvertVideoJobConsumer>();

                    instance.ReceiveEndpoint(queueName, e =>
                    {
                        e.Consumer(() => new ConvertVideoJobConsumer(), c =>
                        {
                            c.Options<JobOptions<ConvertVideo>>(options => options
                                .SetJobTimeout(TimeSpan.FromMinutes(15))
                                .SetConcurrentJobLimit(10));
                        });
                    });
                });
            });
        }

        public class ConvertVideoJobConsumer :
            IJobConsumer<ConvertVideo>
        {
            public async Task Run(JobContext<ConvertVideo> context)
            {
                // simulate converting the video
                await Task.Delay(TimeSpan.FromMinutes(3));
            }
        }
    }
}

In this example, the job timeout as well as the number of concurrent jobs allowed is specified using JobOptions<T> when configuring the consumer. The job options can also be specified using a consumer definition in the same way.

# Client

To submit jobs to the job consumer, use the service client to create a request client as shown, and send the request. The JobId is assigned the RequestId value.

namespace JobSystemClient
{
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using JobSystem.Jobs;
    using MassTransit;
    using MassTransit.Contracts.JobService;

    public class Program
    {
        public static async Task Main()
        {
            var busControl = Bus.Factory.CreateUsingRabbitMq();

            var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));

            await busControl.StartAsync(source.Token);
            try
            {
                var serviceClient = busControl.CreateServiceClient();

                var requestClient = serviceClient.CreateRequestClient<ConvertVideo>();

                do
                {
                    string value = await Task.Run(() =>
                    {
                        Console.WriteLine("Enter video format (or quit to exit)");
                        Console.Write("> ");
                        return Console.ReadLine();
                    });

                    if("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
                        break;

                    var response = await requestClient.GetResponse<JobSubmissionAccepted>(new
                    {
                        VideoId = NewId.NextGuid(),
                        Format = value
                    });
                }
                while (true);
            }
            finally
            {
                await busControl.StopAsync();
            }
        }
    }
}

# Supporting Endpoints

When the job service endpoints are configured, a set of saga state machines are configured used to track job execution across multiple service instances. This ensure that each job execution is tracked, faults are observed, and retry attempts are scheduled.

Conductor is used to manage the service instance, including the service instance endpoint. Each service instance has its own endpoint which is used to communicate with the job consumers executing on that instance.