# Kafka

Kafka is supported as a Rider, and supports consuming and producing messages from/to Kafka topics. The Confluent .NET client is used, and has been tested with the community edition (running in Docker).

# Topic Endpoints

Uses MassTransit.RabbitMQ (opens new window), MassTransit.Kafka (opens new window), MassTransit.Extensions.DependencyInjection (opens new window)

Note: the following examples are using the RabbitMQ Transport. You can also use InMemory Transport to achieve the same effect when developing. With that, there is no need to install MassTransit.RabbitMQ. x.UsingInMemory((context,config) => config.ConfigureEndpoints(context));

To consume a Kafka topic, configure a Rider within the bus configuration as shown.

namespace KafkaConsumer;

using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

            x.AddRider(rider =>
            {
                rider.AddConsumer<KafkaMessageConsumer>();

                rider.UsingKafka((context, k) =>
                {
                    k.Host("localhost:9092");

                    k.TopicEndpoint<KafkaMessage>("topic-name", "consumer-group-name", e =>
                    {
                        e.ConfigureConsumer<KafkaMessageConsumer>(context);
                    });
                });
            });
        });
    }

    class KafkaMessageConsumer :
        IConsumer<KafkaMessage>
    {
        public Task Consume(ConsumeContext<KafkaMessage> context)
        {
            return Task.CompletedTask;
        }
    }

    public record KafkaMessage
    {
        public string Text { get; init; }
    }
}

A TopicEndpoint connects a Kafka Consumer to a topic, using the specified topic name. The consumer group specified should be unique to the application, and shared by a cluster of service instances for load balancing (but it is possible to consume messages from multiple groups using separate endpoints). Consumers and sagas can be configured on the topic endpoint, which should be registered in the rider configuration. While the configuration for topic endpoints is the same as a receive endpoint, there is no implicit binding of consumer message types to Kafka topics. The message type is specified on the TopicEndpoint as a generic argument.

# Wildcard support

Kafka allows to subscribe to multiple topics by using Regex (also called wildcard) which matches multiple topics:

namespace KafkaWildcardConsumer;

using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

            x.AddRider(rider =>
            {
                rider.AddConsumer<KafkaMessageConsumer>();

                rider.UsingKafka((context, k) =>
                {
                    k.Host("localhost:9092");

                    k.TopicEndpoint<KafkaMessage>("^topic-[0-9]*", "consumer-group-name", e =>
                    {
                        e.ConfigureConsumer<KafkaMessageConsumer>(context);
                    });
                });
            });
        });
    }

    class KafkaMessageConsumer :
        IConsumer<KafkaMessage>
    {
        public Task Consume(ConsumeContext<KafkaMessage> context)
        {
            return Task.CompletedTask;
        }
    }

    public record KafkaMessage
    {
        public string Text { get; init; }
    }
}

# Configuration

The configuration includes through Confluent (opens new window) client configs or using configurators to overrides it with style.

# Checkpoint

Rider implementation is taking full responsibility of Checkpointing, there is no ability to change it. Checkpointer can be configured on topic bases through next properties:

Name Description Default
CheckpointInterval Checkpoint frequency based on time 1 min
CheckpointMessageCount Checkpoint every X messages 5000
MessageLimit Checkpointer buffer size without blocking consumption 10000

Please note, each topic partition has it's own checkpointer and configuration is applied to partition and not to entire topic.

During graceful shutdown Checkpointer will try to "checkpoint" all already consumed messages. Force shutdown should be avoided to prevent multiple consumption for the same message.

# Scalability

Riders are designed with performance in mind, handling each topic partition withing separate threadpool. As well, allowing to scale-up consumption within same partition by using Key, as long as keys are different they will be processed concurrently and all this without sacrificing ordering.

Name Description Default
ConcurrentConsumerLimit Number of Confluent Consumer instances withing same endpoint 1
ConcurrentDeliveryLimit Number of Messages delivered concurrently within same partition + key. Increasing this value will break ordering, helpful for topics where ordering is not required 1
ConcurrentMessageLimit Number of Messages processed concurrently witin different keys (preserving ordering). When keys are the same for entire partition ConcurrentDeliveryLimit will be used instead 1
PrefetchCount Number of Messages to prefetch from kafka topic into memory 1000

WARNING

ConcurrentConsumerLimit is very powerful setting as Confluent consumer is reading one partition at a time, this will allow creating multiple consumers to read from separate partitions. But having higher number of Consumers than Number of Total Partitions would result of having idle consumers

# Configure Topology

WARNING

Kafka is not intended to create topology during startup. Topics should be created with correct number of partitions and replicas beforehand

When client has required permissions and CreateIfMissing is configured, topic can be created on startup

namespace KafkaTopicTopology;

using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

            x.AddRider(rider =>
            {
                rider.UsingKafka((context, k) =>
                {
                    k.Host("localhost:9092");

                    k.TopicEndpoint<KafkaMessage>("topic-name", "consumer-group-name", e =>
                    {
                        e.CreateIfMissing(t =>
                        {
                            t.NumPartitions = 2; //number of partitions
                            t.ReplicationFactor = 1; //number of replicas
                        });
                    });
                });
            });
        });
    }


    public record KafkaMessage
    {
        public string Text { get; init; }
    }
}

# Producers

Producing messages to Kafka topics requires the producer to be registered. The producer can then be used to produce messages to the specified Kafka topic. In the example below, messages are produced to the Kafka topic as they are entered by the user.

namespace KafkaProducer;

using System;
using System.Threading;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

            x.AddRider(rider =>
            {
                rider.AddProducer<KafkaMessage>("topic-name");

                rider.UsingKafka((context, k) => { k.Host("localhost:9092"); });
            });
        });

        var provider = services.BuildServiceProvider();

        var busControl = provider.GetRequiredService<IBusControl>();

        await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
        try
        {
            var producer = provider.GetRequiredService<ITopicProducer<KafkaMessage>>();
            do
            {
                string value = await Task.Run(() =>
                {
                    Console.WriteLine("Enter text (or quit to exit)");
                    Console.Write("> ");
                    return Console.ReadLine();
                });

                if ("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
                    break;

                await producer.Produce(new
                {
                    Text = value
                });
            } while (true);
        }
        finally
        {
            await busControl.StopAsync();
        }
    }

    public record KafkaMessage
    {
        public string Text { get; init; }
    }
}

# Tombstone message

A record with the same key from the record we want to delete is produced to the same topic and partition with a null payload. These records are called tombstones. This could be done by setting custom value serializer during produce:

namespace KafkaTombstoneProducer;

using System;
using System.Threading;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();

        services.AddMassTransit(x =>
        {
            x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

            x.AddRider(rider =>
            {
                rider.AddProducer<string, KafkaMessage>("topic-name");

                rider.UsingKafka((context, k) => { k.Host("localhost:9092"); });
            });
        });

        var provider = services.BuildServiceProvider();

        var busControl = provider.GetRequiredService<IBusControl>();

        await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
        try
        {
            var producer = provider.GetRequiredService<ITopicProducer<string, KafkaMessage>>();
            do
            {
                string value = await Task.Run(() =>
                {
                    Console.WriteLine("Enter text (or quit to exit)");
                    Console.Write("> ");
                    return Console.ReadLine();
                });

                if ("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
                    break;

                await producer.Produce("key", new { }, Pipe.Execute<KafkaSendContext<string, KafkaMessage>>(context =>
                {
                    context.ValueSerializer = new TombstoneSerializer<KafkaMessage>();
                }));
            } while (true);
        }
        finally
        {
            await busControl.StopAsync();
        }
    }

    class TombstoneSerializer<T> :
        IAsyncSerializer<T>
    {
        public Task<byte[]> SerializeAsync(T data, SerializationContext context)
        {
            return Task.FromResult(Array.Empty<byte>());
        }
    }

    public record KafkaMessage
    {
    }
}

Note, null message is not possible to consume and will be always skipped

# Producing and Consuming Multiple Message Types on a Single Topic

There are situations where you might want to produce / consume events of different types on the same Kafka topic. A common use case is to use a single topic to log ordered meaningful state change events like SomethingRequested, SomethingStarted, SomethingFinished.

Confluent have some documentation about how this can be implemented on the Schema Registry side:

Unfortunately, it is not yet widely supported in client tools and products (opens new window) and there is limited documentation about how to support this in your own applications.

However, it is possible... The following demo uses the MassTransit Kafka Rider with custom Avro (opens new window) serializer / deserializer implementations and the Schema Registry to support multiple event types on a single topic:

MassTransit-Kafka-Demo (opens new window)

The custom serializers / deserializer implementations leverage the wire format used by the standard Confluent schema-based serializers, which includes the schema id in the data stored for each message. This is also good news for interoperability with non-MassTransit applications.

Warning: It's a little hacky and only supports the Avro format, but there's enough there to get you started.