When learning a new technology, sometimes I just want to see it work. It gives me a baseline to extend my ideas, to see what is possible, and to imagine what it can become.

While reading through documentation or following tutorials, I am often greeted with numerous options and configurations. After seeing all the dazzling capabilities, I always want to see the core, the fundamental. Didn’t you ever just want to experience a simple working copy? Hence, I just want to see it work.

This series aims at minimizing the possibility of having a missing link and encourages you to build your next innovative solution based on what you learned here.

What is Kafka Stream?

According to Kafka, Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters.

What do we want to achieve?

Resources

Run Kafka in Docker

Navigate to where the docker-compose.yml is. Run the below command in cmd to start a docker container of Kafka in the background.

docker-compose up -d

Docker-compose.yml

version: "3"
services:
  kafka:
    image: "bitnami/kafka:3.2.3"
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

Let’s publish a message to Kafka

Navigate to where KafkaStream.Producer.csproj is at. Start a Powershell and run the following command.

dotnet run

When you see the message: “Message delivered to input-topic”, that means you did it!

What is in the Producer?

var config = new ProducerConfig
{
	BootstrapServers = "localhost:9092",
	ClientId = "producer-1"
};

using (var producer = new ProducerBuilder<string, string>(config).Build())
{
	var message = new Message<string, string>
	{
		Key = null, // Set the key if you want to partition the messages
		Value = "Hello, Kafka!"
	};

	try
	{
		var deliveryResult = await producer
                                    .ProduceAsync("input-topic", message);

		Console.WriteLine($"Message delivered to 
                            {deliveryResult.TopicPartitionOffset}");
	}
	catch (ProduceException<string, string> ex)
	{
		Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
	}
}

What is bootstrap-server?

It serves as an entry point for a client to connect to a Kafka broker.

Let’s setup the consumer

Navigate to where KafkaStream.Consumer.csproj is at. Start a Powershell and run the following command.

dotnet run

The initialization will take a few seconds. Then you should see the following messages.

Consuming messages from topic: input-topic
Press any key to exit
Received message: Hello, Kafka! from partition [0] offset 0

There you go!

What is in the Consumer?

const string Topic = "input-topic";

// Configure the consumer
var config = new ConsumerConfig
{
	BootstrapServers = "localhost:9092",
	GroupId = "consumer-group-1",
	AutoOffsetReset = AutoOffsetReset.Earliest,
	EnableAutoCommit = false // Disable auto commit to have more control over offsets
};

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
	// Subscribe to the topic
	consumer.Subscribe(Topic);

	Console.WriteLine($"Consuming messages from topic: {Topic}");

	// Start consuming messages in a background thread
	var cancellationTokenSource = new CancellationTokenSource();
	var cancellationToken = cancellationTokenSource.Token;

	var consumerThread = new Thread(() =>
	{
		try
		{
			while (true)
			{
				try
				{
					var consumeResult = 
                          consumer.Consume(cancellationToken);

					Console.WriteLine($"Received message: 
                          {consumeResult.Message.Value} from partition 
                          {consumeResult.Partition} offset 
                          {consumeResult.Offset}";

					// Manually commit the offset to mark the message as consumed
					consumer.Commit(consumeResult);
				}
				catch (ConsumeException ex)
				{
					Console.WriteLine($"Error occurred: 
                        {ex.Error.Reason}");
				}
			}
		}
		catch (OperationCanceledException)
		{
			// This exception will be thrown when cancellation is requested.
			Console.WriteLine("Cancellation requested");
		}
		finally
		{
			consumer.Close();
		}
	});

	// Start the consumer thread
	consumerThread.Start();

	// Wait for a key press to exit
	Console.WriteLine("Press any key to exit");
	Console.ReadKey();

	// Request cancellation and wait for the consumer thread to stop
	cancellationTokenSource.Cancel();
	consumerThread.Join();

	Console.WriteLine("End Consumer");
}

What does offset do?

In this context, it tells the consumer where it left off.

Gotcha

While developing your own producer/consumer, if you get an error message indicating the hostname cannot be resolved, check KAFKA_CFG_ADVERTISED_LISTENERS. If you still run into the same issue, consider updating the host file.

Conclusion

In this article, we successfully run Kafka in a docker container, produce a message to a topic, and consume a message from the topic. There are limitless applications of using Kafka. Go ahead and play with the project, and build something interesting of your own!