The Beginner's Guide to Website Development
20.2.2024
If you’ve worked with Apache Kafka and .NET, you’re likely aware that the out of the box experience of consuming messages, using Confluent’s client library is geared towards processing records sequentially. If you want to process multiple records in parallel, you’ve got a few of options that come to mind:
For some time now, I’ve been thinking about implementing a proof of concept that implemented option 3, not only because there aren’t that many options for .NET (but there are some, more on that later), but also because it felt like an interesting problem to tackle. Well, this post is about this proof of concept, a little library I called YakShaveFx.KafkaClientTurbocharger 😁.
This post will be mostly a high-level look at how this proof of concept was implemented, not super detailed on the code, which you’re free to explore. I think it ended up not being super complex in terms of implementation, it just took a bit to figure out a good design approach, but the code itself isn’t super complex (though I built on top of some solid foundations 😉).
Note that this proof of concept is based on the premise that we want to handle Kafka records as discrete events, one at a time, much like we would do with other types of messaging systems like RabbitMQ, Azure Service Bus, and others of the sort. This PoC is not targeted at processing a continuous flow of events, in a streaming fashion, which benefits from doing things differently. To know more about discrete events vs continuous flow of events, check out this page.
I imagine most, when thinking about this kind of problem - consuming multiple records, process in parallel, manage offsets, manage ordering, etc - start to consider the typical concurrency primitives and auxiliar data structures (locks, semaphores, auto reset events, concurrent dictionaries, …) that would need to be used to implement this sort of thing. While not wrong, there are alternatives, higher level ones, that I feel are often forgotten.
Enter the actor model!
As soon as I started to think about how to implement this, the actor model was the first thing that came to mind - probably because I’ve been trying for years to find a scenario where I could use it 😅.
Without going into much detail, a few bullets to summarize the main aspects of the actor model that make it interesting for this kind of problem:
If it makes it easier to picture, even though it’s a silly analogy, imagine that an actor is like a nanoservice inside your microservice - something running autonomously, with its own private data and API to interact with others.
These simple characteristics I mentioned, make the actor model great to build highly concurrent applications, or, in this case, a library to consume Kafka records in parallel. We can start to think about what autonomous components would make sense, how they’ll interact and what data they need to do their job:
And with all this, there are no locks, no concurrent dictionaries, none of that sort of stuff - I mean, somewhere in our actor framework of choice (I used Akka.NET for this development) these primitives are used, but we’re working at a higher level of abstraction, so that’s handled for us. Plus, with this shared nothing/message passing approach, many of the typical concurrency problems simply go away.
There’s much, much more to talk about the actor model (I didn’t even mention the super important location transparency concept), but hopefully this quick summary is enough to get you through the rest of the post, and maybe also to pique your interest to learn more - maybe knowing that Halo 4 and 5 online services are powered by Orleans (another .NET actor framework), or being aware of Ericsson’s impressive reliability, using Erlang and the actor model, can be further motivation.
So, we know we’re going to use the actor model to design our system, because it simplifies the concurrent aspect of it, now we need to understand what are our exact needs and how to distribute them.
In summary, we need to:
To implement these requirements, I created an actor for each of them:
Besides these 4 actors, I created a fifth one, which I named the consumer orchestrator. Actors can communicate directly without an orchestrator, but I felt like it would be simpler to manage if I centralized how the whole thing operates.
Besides introducing the orchestrator, I went with a publish/subscribe like approach, where the 4 actors mentioned earlier would receive commands to execute, and would publish events when something happened. Note that this is just a semantic thing, as these commands and events are just normal messages between actors, I simply decided to follow such a convention to make it easier to understand. Also note that this has nothing to do with the actor model, it’s just the way I felt like implementing things, and the actor model can accommodate it.
To better understand the design, and how would the basic flow of messages look like, let’s take a look at a diagram.
Let’s follow the flow described in the diagram:
Remember that, even though there’s a sequence of commands/events, a full sequence doesn’t need to be completed before another starts. Instead, there are multiple such sequences happening at the same time, with actors processing their mailbox messages sequentially, but these messages may pertain to a different sequence, which effectively means the system, as whole, is executing the various sequences concurrently, much like CPU cores interleave work from multiple threads/processes/applications.
Now that we looked at how, at a high level, the whole parallel consumer works, let’s drill down on a couple of specific subjects, starting with committing processed record offsets.
At first glance, one would think there is nothing special about committing offsets, in particular if we’re used to handling records sequentially: handle record, commit, handle another, commit, and so on. Besides the fact that this is not very efficient (batching commits would be better, to minimize I/O), this doesn’t work when we’re processing records in parallel. Or better yet, it works, but we lose the at least once delivery guarantee that we typically want.
Let’s imagine that we configure our consumer with a maximum degree of parallelism of 2. We grab 2 records from Kafka, and push them onto the runners. Let’s consider that both records came from the same Kafka partition, one with offset 33, and the other with offset 34. If 33 completes its processing before 34, it can be immediately committed, no worries. However, if 34 is the first to conclude processing, we can’t just commit it, because we’re not sure 33 will be processed successfully. If for some reason record with offset 33 fails processing, but the offset 34 was already committed, we “lost” a record. The record is still in the Kafka topic, so we didn’t actually lose the data, hence the quotes, but because the last committed offset is greater than 33, it will never be processed again, so it’s basically the same as lost.
This is where the offset controller actor comes it. Every new record we start handling, we ask this actor to keep track of it. When we complete handling a record, we again inform the offset controller. When there are offsets ready to commit, i.e. all records with offsets until a certain point were handled successfully, the offset controller notifies the orchestrator, which in turn will forward that information to the client proxy.
Continuing on the previous example, but applying the offset controller logic, the flow would be:
Related Blogs: