State is hard. Really hard. You just won't believe how vastly, hugely, mind-bogglingly hard it is. I mean, you may think it's a long way down the road to functional programming, but that's just peanuts to state. Listen...*
Having spent years at Confluent in charge of operating deployments of Kafka Streams, we’re no strangers to the range of production incidents that can arise. One of the clearest patterns we saw in these incidents was that their frequency and severity was directly correlated with the amount of state, both in overall storage size and in number of state stores:
Stateless applications rarely faced any downtime, while applications that managed terabytes of state would frequently encounter rebalancing storms sometimes paired with extremely long recovery times.
Understanding the root cause for this correlation requires a little detour explaining the history of Kafka Streams. When it first launched back in 2016, Kafka Streams was developed to depend only on the presence of Kafka. This architectural choice made it extremely easy to deploy Kafka Streams as it was “just a library” — unlike other infrastructure, it did not depend on a Hadoop, YARN or Mesos cluster (Kubernetes was just a baby back then).
One downstream effect of this choice is that all state in Kafka Streams is considered transient — it is more akin to a cache backed a persistent changelog topic in the Kafka broker. This effect has three main implications, which are explored in the rest of this post.
As the backbone for business critical applications, Kafka Streams must provide strong processing semantic guarantees. This means that in order to process an event, the state store must reflect the updates from all previously processed events. The default store implementations are materialized locally and are not replicated to other nodes — instead every write to the local storage is first written to a changelog topic persisted to Kafka and this topic serves as the source of truth and persistence.
When something triggers data recovery (either a node failure, a restart with a corrupted local store, an EOS transaction timeout, a scale-out request or one of the many operations that shuffles partitions) the local state must be discarded and bootstrapped from scratch. This is achieved by reading the changelog from the beginning of time and inserting data into the local storage. Given the physical limitations of sending data across a network and writing to disk, this operation is understandably expensive both in time and resources.
The expensive restoration process is a huge thorn in the operation of Kafka Streams, and one that even seasoned application developers find it difficult to avoid being pricked by.
While the solution of bootstrapping state from Kafka changelog topics is (in theory) an elegant and simple solution to durability, long-running operations in distributed systems nearly always have unintended consequences. KAFKA-13295 illustrates this phenomenon well. We determined that long restoration times led to applications failing on transaction timeouts, which in turn caused processing to stall on repeated rebalances.
Understanding this issue requires some background knowledge on how threading in Kafka Streams worked prior to some recent improvements (which we'll describe below). The basic model involves threads and tasks - each thread is assigned a number of tasks, each of which reads from a single partition of one or more topics.
The key thing to know about Kafka Streams threading is that all tasks assigned to a [.c-code]StreamThread[.c-code] must complete restoration restoring before any new records are polled from the input topics and actively processed. You can think of a [.c-code]StreamThread[.c-code] as having different “phases” — if the thread is in the restoration phase, only tasks that need to be restored will be processed. Once all tasks are done restoring, the thread moves into the “running” phase and will continue actively processing records.
The threading model simplifies the code and works great when restoration is quick, but if restoration takes a long time you can probably guess the consequence: all active tasks assigned to a thread that's restoring state will be blocked, even if there's only one task in need of restoration!
This is especially bad if one of the tasks has started an EOS transaction, which is the issue reported in KAFKA-13295:
An active task begins processing and initializes a transaction, but before the transaction is committed there is a rebalance which assigns a task in need of restoration. Only active tasks commit transactions, and since restoration can take a long time it is likely that the transaction will timeout (not be committed) triggering another rebalance and potentially causing tasks on this node to shuffle around, starting the vicious cycle all over again.
This phenomena of long restoration foreshadowing a rebalance storm is not specific to applications with transactional producers, the Kafka Streams/Kafka Consumer architecture isn't well optimized for situations where processing of certain partitions is delayed.
A config is about the most massively useful thing a stateful hitchhiker can have.
Prevent Rebalances. The first strategy to consider is avoiding restores altogether by preventing unnecessary rebalances. There are three helpful configurations for that:
Reduce The Overhead. The next set of improvements attempt to hide the overhead of restoration, so if a rebalance does happen it doesn't disrupt normal operations:
Speed Up Rebalances. Lastly, we have a set of miscellaneous improvements that speed up restores:
The Encyclopedia Galactica, in its chapter on rebalances, states that they are far too complicated to operate. The Definitive Guide to the Kafka Streams State has this to say on the subject of rebalances: Avoid, if at all possible.
Kafka's consumer rebalance protocol was groundbreaking. When it was first released over a decade ago, dynamic task allocation was something only few advanced systems could benefit from but the Kafka client made it simple to dynamically scale applications reading from Kafka, so long as your application was stateless (which many initial ones were).
When Kafka Streams entered the picture, state was suddenly easily accessible in your application. It soon became clear that the default assignment mechanism would not keep up: the presence of an extensive bootstrapping period (see the previous section) makes naively shuffling around partitions impractical.
To bring the rebalance protocol up to speed, Streams developers introduced two sets of improvements: the first aims to make partitions as “sticky” as possible, preventing shuffling of partitions and associated state, and the second attempts to intelligently place standby/warmup tasks (replicas) so that the load is well balanced across the nodes in your application.
In the first category, the introduction of the [.c-code]StickyAssignor[.c-code] as a configurable assignor for the Kafka Consumer ensured that partitions were minimally shuffled during a rebalance so that state did not need to move with them. That helped mitigate the after effects of rebalances since consumers would end up with the same partition assignment when it was over, but what about during the rebalances themselves? Until the introduction of the [.c-code]CooperativeStickyAssignor[.c-code], rebalances required every consumer in the group to stop working entirely so they could revoke all of their partitions — even though they would often get the same ones back (it is sticky after all). By providing two-phases of partition migration, this latest improvement allows nodes to keep ownership of partitions that didn't move (see this excellent blog post by our founding engineer Sophie for a deep dive on this assignor, or KIP-429 if you want to nerd out on the details). Both of these improvements also apply to Kafka Streams; they come out-of-the-box with the required [.c-code]StreamsPartitionAssignor[.c-code] and the default [.c-code]HighAvailabilityTaskAssignor[.c-code].
For some users, the risk of any rebalance whatsoever is so high that they opt to effectively disable dynamic rebalancing using static membership. By setting the [.c-code]group.instance.id[.c-code] config, Kafka can recognize individual members of the consumer group and hand them the same partition assignment back instead of triggering a rebalance under certain conditions (see KIP-345 for more details on how this works). There are various risks associated with static membership — we don't recommend it in most situations since the rebalance protocol, while sometimes problematic, is essential to Kafka and is relied on for many features such as communicating and synchronizing across members of the group
Squarely in the second category of intelligent task assignment is KIP-441, which introduced the concepts of probing rebalances and warmup replicas. This KIP allowed the prior owner of that task to keep it even if the assignment is unbalanced until the new owner gets caught up, and then to change ownership after the catch-up phase. Since new owners aren't expected to immediately serve as actives in the cluster, scaling the number of nodes in a cluster can be done without disruption. Note that this does not help the amount of time it takes to scale, the newly provisioned resources will still need to bootstrap before being utilized for active processing.
These changes only scratch the tip of the complexity iceberg that is stateful task rebalancing. If you're interested in more war stories as well as how to tune rebalances, stay tuned — we have a full blog post planned that deeply explores the rebalance protocol.
The Answer to the Ultimate Question of Life, the Universe, and The Maximum Number of Collocated RocksDB Instances is 42 (well, we actually recommend 30).
Not only is tuning RocksDB notoriously difficult (though the wonderful Kafka Streams committers teamed up with one of the founders of RocksDB to help guide you through it), but this challenge is compounded when there are multiple instances of RocksDB deployed on a single node - which is typically the case in Kafka Streams topologies as each stateful task will have its own independent RocksDB instance.
When multiple RocksDB instances are running on a single node, they don't (by default) share a cache or write buffer. Since physical memory on your node is constrained (RocksDB and the JVM can't share the same memory), in practice these buffers are either tuned to be relatively small or flushed before they're full to avoid memory pressure. You can tune this using [.c-code]Options#setWriteBufferSize[.c-code] when configuring a [.c-code]RocksDBConfigSetter[.c-code] in Kafka Streams.
We learned the hard way (in production) that this invalidates one of RocksDB's fundamental performance assumptions: that random writes can be effectively batched and written to disk in predictably sized chunks, called SSTables. If SSTables are flushed to disk too early, you'll get increased write amplification, more compaction, and poor cache coherency.
These set of problems, unfortunately, are intrinsically tied to the architecture of Kafka Streams and RocksDB — we never discovered any good workarounds during our time at Confluent operating such clusters in production. We can, however, leave you with some rules of thumb to avoid running into issues:
The takeaway here is that there's just as much art as there is science in operating a stateful service, and pairing that state with compute makes the balancing act much more delicate. Most application developers don't want to reason about these considerations; that's why we built Responsive to separate these concerns and manages the more difficult half of the equation — the stateful half — for you.
In the beginning, State and Compute were coupled. This has made a lot of people very angry and been widely regarded as a bad move.
At Responsive, we've been working on solving all of these problems in one go: by removing state from the equation altogether.
Separating storage from compute is a time tested technique for making the operations of stateful distributed systems tractable — Kafka Streams is no different. In the Kafka Stream context it removes the need for state restoration, which in turn makes rebalances cheap, simpler and more robust.
This architectural improvement provides better application availability, lower costs thanks to truly elastic compute and reduced operational overhead for teams operating the applications. Our next blog post will talk about how we leveraged these characteristics to build an autoscaler for Kafka Streams.
Hopefully this blog helped shine some light on what to do if you hit various problems. If you'd rather not think about state at all, you can get started with Responsive today and let us take care of your Kafka Streams operations!
If you have questions about Kafka Streams state you can chat with me (yup, I'll respond to your ping) or the rest of the team on our our Discord.
[.c-footer]*So you made it to the end and were wondering what the italicized quotes were all about? Go read the Hitchhiker's Guide to the Galaxy![.c-footer]