# Azure Service Bus

MassTransit.Azure.ServiceBus.Core

To configure Azure Service Bus, use the connection string (from the Azure portal) to configure the host as shown below.

namespace ServiceBusConsoleListener
{
    using System;
    using System.Threading.Tasks;
    using MassTransit;
    using MassTransit.Azure.ServiceBus.Core;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();
            services.AddMassTransit(x =>
            {
                x.UsingAzureServiceBus((context, cfg) =>
                {
                    cfg.Host("connection-string");
                });
            });
        }
    }
}

Additional host properties include:

Property Description
TokenProvider Use a specific token provider, such as a managed identity token provider, to access the namespace
TransportType Change the transport type from the default (AMQP) to use WebSockets

Azure Service Bus queues includes an extensive set a properties that can be configured. All of these are optional, MassTransit uses sensible defaults, but the control is there when needed.

namespace ServiceBusReceiveEndpoint
{
    using System;
    using System.Threading.Tasks;
    using MassTransit;
    using MassTransit.Azure.ServiceBus.Core;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();
            services.AddMassTransit(x =>
            {
                x.UsingAzureServiceBus((context, cfg) =>
                {
                    cfg.Host("connection-string");

                    cfg.ReceiveEndpoint("input-queue", e =>
                    {
                        // all of these are optional!!

                        e.PrefetchCount = 100;

                        // number of "threads" to run concurrently
                        e.MaxConcurrentCalls = 100;

                        // default, but shown for example
                        e.LockDuration = TimeSpan.FromMinutes(5); 

                        // lock will be renewed up to 30 minutes
                        e.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
                    });
                });
            });
        }
    }
}
Property Type Description
PrefetchCount int The number of unacknowledged messages that can be processed concurrently (default based on CPU count)
MaxConcurrentCalls int How many concurrent messages to dispatch (transport-throttled)
LockDuration TimeSpan How long to hold message locks (max is 5 minutes)
MaxAutoRenewDuration TimeSpan How long to renew message locks (maximum consumer duration)
RequiresSession bool If true, a message SessionId must be specified when sending messages to the queue
MaxDeliveryCount int How many times the transport will redeliver the message on negative acknowledgment. This is different from retry, this is the transport redelivering the message to a receive endpoint before moving it to the dead letter queue.

# Azure Functions

Azure Functions is a consumption-based compute solution that only runs code when there is work to be done. MassTransit supports Azure Service Bus and Azure Event Hubs when running as an Azure Function.

The Sample Code is available for reference as well.

The functions host.json file needs to have messageHandlerOptions > autoComplete set to true. If this isn't set to true, MassTransit will try to set it to true for you. This is so that the message is acknowledged by the Azure Functions runtime, which removes it from the queue once processing has completed successfully.

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true
      }
    },
    "logLevel": {
      "MassTransit": "Debug",
      "Sample.AzureFunctions.ServiceBus": "Information"
    }
  },
  "extensions": {
    "serviceBus": {
      "prefetchCount": 32,
      "messageHandlerOptions": {
        "autoComplete": true,
        "maxConcurrentCalls": 32,
        "maxAutoRenewDuration": "00:30:00"
      }
    },
    "eventHub": {
      "maxBatchSize": 64,
      "prefetchCount": 256,
      "batchCheckpointFrequency": 1
    }
  }
}

This settings for prefetchCount, maxConcurrentCalls, and maxAutoRenewDuration are the most important – these wild directly affect the performance of an Azure Function.

The function should include a Startup class, which is called on startup by the Azure Functions framework. The example below configures MassTransit, registers the consumers for use, and adds a scoped type for an Azure function class.

using MassTransit;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;

[assembly: FunctionsStartup(typeof(Sample.AzureFunctions.ServiceBus.Startup))]

namespace Sample.AzureFunctions.ServiceBus
{
    public class Startup :
        FunctionsStartup
    {
        public override void Configure(IFunctionsHostBuilder builder)
        {
            builder.Services
                .AddScoped<SubmitOrderFunctions>() // add your functions as scoped
                .AddMassTransitForAzureFunctions(cfg =>
                {
                    cfg.AddConsumersFromNamespaceContaining<ConsumerNamespace>();
                });
        }
    }
}

::: NOTE Azure Functions using Azure Service Bus or Azure Event Hubs require the queue, subscription, topic, or event hub to exist prior to starting the function service. If the messaging entity does not exist, the function will not be bound, and messages or events will not be delivered. Service Bus messages sent or published by MassTransit running inside an Azure Function will, however, create the appropriate topics and/or queues as needed. :::

# Azure Service Bus

The bindings for using MassTransit with Azure Service Bus are shown below.

using MassTransit.WebJobs.ServiceBusIntegration;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.WebJobs;


public class SubmitOrderFunctions
{
    const string SubmitOrderQueueName = "input-queue";
    readonly IMessageReceiver _receiver;

    public SubmitOrderFunctions(IMessageReceiver receiver)
    {
        _receiver = receiver;
    }

    [FunctionName("SubmitOrder")]
    public Task SubmitOrderAsync([ServiceBusTrigger(SubmitOrderQueueName)]
        Message message, CancellationToken cancellationToken)
    {
        return _receiver.HandleConsumer<SubmitOrderConsumer>(SubmitOrderQueueName, message, cancellationToken);
    }
}

In the example above, the HandleConsumer method is used to configure a specific consumer on the message receiver.

Message receivers are cached by entityName. Once a message receiver has been used, the configuration cannot be changed.

To configure the consumer pipeline, such as to add UseMessageRetry middleware, use a ConsumerDefinition for the consumer type.

# Event Hub

The bindings for using MassTransit with Azure Event Hub are shown below. In addition to the event hub name, the Connection must also be specified.

At least I think so, the documentation isn't great and I only found this approach when digging through the extension source code.

using MassTransit.WebJobs.EventHubsIntegration;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;

public class AuditOrderFunctions
{
    const string AuditOrderEventHubName = "input-hub";
    readonly IEventReceiver _receiver;

    public AuditOrderFunctions(IEventReceiver receiver)
    {
        _receiver = receiver;
    }

    [FunctionName("AuditOrder")]
    public Task AuditOrderAsync([EventHubTrigger(AuditOrderEventHubName, Connection = "AzureWebJobsEventHub")]
        EventData message, CancellationToken cancellationToken)
    {
        return _receiver.HandleConsumer<AuditOrderConsumer>(AuditOrderEventHubName, message, cancellationToken);
    }
}

WARNING

With this refresh of the Azure Function code, it is no longer possible to send messages to other event hubs. Messages published or sent are done so using Azure Service Bus.

# Testing Locally

To test locally, a settings files must be created. Connections strings for the various services, along with the Application Insights connection string, can be configured.

{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "AzureWebJobsStorage": "",
    "AzureWebJobsServiceBus": "",
    "AzureWebJobsEventHub": "",
    "FUNCTIONS_EXTENSION_VERSION": "~3",
    "APPINSIGHTS_INSTRUMENTATIONKEY": "",
    "APPLICATIONINSIGHTS_CONNECTION_STRING": "InstrumentationKey="
  }
}