# Kafka

Kafka is supported as a Rider, and supports consuming and producing messages from/to Kafka topics. The Confluent .NET client is used, and has been tested with the community edition (running in Docker).

# Topic Endpoints

Uses MassTransit.RabbitMQ (opens new window), MassTransit.Kafka (opens new window), MassTransit.Extensions.DependencyInjection (opens new window)

Note: the following examples are using the RabbitMQ Transport. You can also use InMemory Transport to achieve the same effect when developing. With that, there is no need to install MassTransit.RabbitMQ. x.UsingInMemory((context,config) => config.ConfigureEndpoints(context));

To consume a Kafka topic, configure a Rider within the bus configuration as shown.

namespace KafkaConsumer
{
    using System;
    using System.Threading.Tasks;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

                x.AddRider(rider =>
                {
                    rider.AddConsumer<KafkaMessageConsumer>();

                    rider.UsingKafka((context, k) =>
                    {
                        k.Host("localhost:9092");

                        k.TopicEndpoint<KafkaMessage>("topic-name", "consumer-group-name", e =>
                        {
                            e.ConfigureConsumer<KafkaMessageConsumer>(context);
                        });
                    });
                });
            });
        }

        class KafkaMessageConsumer :
            IConsumer<KafkaMessage>
        {
            public Task Consume(ConsumeContext<KafkaMessage> context)
            {
                return Task.CompletedTask;
            }
        }

        public interface KafkaMessage
        {
            string Text { get; }
        }
    }
}

A TopicEndpoint connects a Kafka Consumer to a topic, using the specified topic name. The consumer group specified should be unique to the application, and shared by a cluster of service instances for load balancing. Consumers and sagas can be configured on the topic endpoint, which should be registered in the rider configuration. While the configuration for topic endpoints is the same as a receive endpoint, there is no implicit binding of consumer message types to Kafka topics. The message type is specified on the TopicEndpoint as a generic argument.

# Configure Topology

When client has required permissions and CreateIfMissing is configured, topic can be created on startup and deleted on shutdown

namespace KafkaTopicTopology
{
    using System.Threading.Tasks;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

                x.AddRider(rider =>
                {
                    rider.UsingKafka((context, k) =>
                    {
                        k.Host("localhost:9092");

                        k.TopicEndpoint<KafkaMessage>("topic-name", "consumer-group-name", e =>
                        {
                            e.CreateIfMissing(t =>
                            {
                                t.NumPartitions = 2; //number of partitions
                                t.ReplicationFactor = 1; //number of replicas
                            });
                        });
                    });
                });
            });
        }


        public interface KafkaMessage
        {
        }
    }
}

# Producers

Producing messages to Kafka topics requires the producer to be registered. The producer can then be used to produce messages to the specified Kafka topic. In the example below, messages are produced to the Kafka topic as they are entered by the user.

namespace KafkaProducer
{
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using MassTransit;
    using Microsoft.Extensions.DependencyInjection;

    public class Program
    {
        public static async Task Main()
        {
            var services = new ServiceCollection();

            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));

                x.AddRider(rider =>
                {
                    rider.AddProducer<KafkaMessage>("topic-name");

                    rider.UsingKafka((context, k) =>
                    {
                        k.Host("localhost:9092");
                    });
                });
            });

            var provider = services.BuildServiceProvider();

            var busControl = provider.GetRequiredService<IBusControl>();

            await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
            try
            {
                var producer = provider.GetRequiredService<ITopicProducer<KafkaMessage>>();
                do
                {
                    string value = await Task.Run(() =>
                    {
                        Console.WriteLine("Enter text (or quit to exit)");
                        Console.Write("> ");
                        return Console.ReadLine();
                    });

                    if("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
                        break;

                    await producer.Produce(new
                    {
                        Text = value
                    });
                }
                while (true);
            }
            finally
            {
                await busControl.StopAsync();
            }
        }

        public interface KafkaMessage
        {
            string Text { get; }
        }
    }
}

# Producing and Consuming Multiple Message Types on a Single Topic

There are situations where you might want to produce / consume events of different types on the same Kafka topic. A common use case is to use a single topic to log ordered meaningful state change events like SomethingRequested, SomethingStarted, SomethingFinished.

Confluent have some documentation about how this can be implemented on the Schema Registry side:

Unfortunately, it is not yet widely supported in client tools and products (opens new window) and there is limited documentation about how to support this in your own applications.

However, it is possible... The following demo uses the MassTransit Kafka Rider with custom Avro (opens new window) serializer / deserializer implementations and the Schema Registry to support multiple event types on a single topic:

MassTransit-Kafka-Demo (opens new window)

The custom serializers / deserializer implementations leverage the wire format used by the standard Confluent schema-based serializers, which includes the schema id in the data stored for each message. This is also good news for interoperability with non-MassTransit applications.

Warning: It's a little hacky and only supports the Avro format, but there's enough there to get you started.