A practitioner’s guide. For everyone who is in too deep and wants to peel back the curtains on one of the most elegant yet maddening aspects of Apache Kafka: the consumer group rebalancing protocol.
Whether you’re just starting to operate your own Kafka Streams applications and wondering what goes on beneath the surface or have been using it for years, this blog post is for you.
Kafka’s consumer group rebalancing protocol is sophisticated, and Kafka Streams pushes the protocol to its limits. So before you can productively debug gnarly rebalancing issues, you need to understand how the system works. This post will build the necessary context before arming you with the spells to banish rebalancing-related problems from your system.
[.c-incident-911]Read it all now to prepare yourself for future battles or skip to the “Incident 911” sub-sections for concrete tips and specific references that can help in an emergency.[.c-incident-911]
Here’s how it breaks down:
As any great wizard will tell you, the first step to conquering your enemy is to understand them. So let’s jump right in to the rebalancing protocol and get to know it:
First up, what even is a rebalance? Fundamentally, rebalancing is an elegant solution to the most ancient of distributed systems problems: dynamic scheduling. We have a set of resources representing discrete units of parallelizable work, and want to make sure they are distributed across the available workers so that each worker has something to do and each unit of work gets a share of execution time. In this case the workers are the consumers in a consumer group, and the units of work are the partitions of the topics that group is subscribed to.
Simply put, rebalancing is the protocol by which partitions are scheduled (or as we say, assigned) onto the available consumers. Although both the consumers and the partitions are “persistent” in the sense that a consumer is never finished working on a partition, it’s possible for the number of either to change over time. For example, when topics matching the consumers’ subscription are created, there will be new partitions to process. On the flipside, consumers may be added and removed from the group through scaling operations or node failures. To make sure all partitions are being processed and all consumers are being utilized following a change to either the work or the workers of the group, we need to automatically redistribute the current partitions across the remaining consumers: to “rebalance” them.
So rebalancing is what we call the reassignment of partitions to consumers after a change in the group. But what exactly causes it — when does a rebalance occur?
At a high level, you can break up the causes into two categories: changes in the topic partitions and changes in the group membership. The former is typically less interesting, or at least less common in problematic situations, as topics tend to be fairly static for the lifetime of an application. Group membership, on the other hand, is far more dynamic — both intentionally and unintentionally — and more nuanced than you might think.
Before we drill down into the causes of a rebalance, let’s introduce all the players.
In summary, here are the various actors at play in a consumer group:
When there’s no rebalancing going on, the interactions between these players during normal processing will look something like this:
Now that we’ve introduced the different participants, we can start putting together a picture of what is happening where — and most importantly, which set of logs to check for specific questions!
First, while any member of the group can trigger a rebalance, they can’t and don’t communicate directly with each other. So all of their requests have to go to, and through, the group coordinator. The actual trigger to rebalance can be either a consumer sending one of those special requests to the group coordinator (specifically a “JoinGroup” request), or it can be simply a decision made by the group coordinator’s own internal logic. But it’s important to understand that whoever and whatever originally caused a rebalance, it’s ultimately up to the group coordinator to broadcast the rebalance event to the other consumers.
Broker logs
Typically the group coordinator will do so by sending a “rejoin” signal to each consumer via the response to their next heartbeat thread request. Before this occurs, the broker will print one of the most useful logs for understanding, tracking, and debugging rebalances on the broker side:
[.c-code-block]"Preparing to rebalance group <group_id> in state <current_state> with old generation <previous_generation_id> (__consumer_offsets-<group_offsets_partition>) (reason: <reason>)”[.c-code-block]
[.c-callout]Note: The specific log messages mentioned in this blog post are from version 3.6.0 of Apache Kafka, and many of the lines have changed across recent releases. You should always make sure to look at the exact wording of the logs in the source code for the specific version you are running.[.c-callout]
Client logs
Of course, if you also (or only!) have the client-side logs, you can ask the consumer itself why it decided to trigger a rebalance or rejoin the group. A great line to anchor any debugging session is this message that gets logged when the consumer internally flags the need to rejoin the group for an ongoing or new rebalance:
[.c-code-block]"Request joining group due to: <client_reason>"[.c-code-block]
Note that this may be logged by the polling thread, for example when first starting up or due to externally enforced rebalances (as used by Kafka Streams). It can also be logged by the heartbeat thread, like when the group coordinator sends a signal to rejoin in the heartbeat response. We’ll dig into all the gory details around what causes rebalances and cover the most common and useful to recognize <client_reason> possibilities in the following section. But it’s important to be able to differentiate between the actual cause of a rebalance and the symptom(s), so that you know when and where to look. For example, if the heartbeat thread receives the signal to rejoin in its response from the group coordinator, the <client_reason> logged will be that the [.c-code]"group is already rebalancing"[.c-code]. In this case, as the log line suggests, the consumer you are looking at is only rebalancing because someone else triggered one first. You’ll need to check the other consumers (and/or the group coordinator) to figure out the root cause of that particular rebalance.
[.c-incident-911]So, what are the actual reasons a group may have for rebalancing? Let’s drill down into the specific scenarios to explain why a rebalance is occurring, and more importantly, how you can identify the root cause. When debugging rebalances, you would ideally collect the complete set of logs from all consumers in the group as well as from the group coordinator. Unfortunately the reality seems to be that folks often have access to only one or the other, so we’ll cover what to look out for in both the consumer and broker logs: both to help those who might only have one of those available, and because they compliment each other and tell a much more complete story when read together.[.c-incident-911]
Heartbeat expiration occurs when the heartbeat thread fails to contact the group coordinator within the configured [.c-code]session.timeout.ms[.c-code], causing the group coordinator to assume failure and trigger a rebalance to kick it out of the group. Generally this happens when the process itself has stopped, in which case there would be no further client-side logging. However, it’s also possible that the group coordinator connection simply faltered, for example due to connection issues like network partitions or dropped packets, and in extreme cases, thread starvation or long GC pauses.
If the heartbeat thread just can’t get a heartbeat request and response across a faulty network within the [.c-code]session.timeout.ms[.c-code], you should see a line in the client-side logs that includes this phrase:
[.c-code-block]"session timed out without receiving a heartbeat response"[.c-code-block]
If the heartbeat thread was stalled for some reason, when it finally reconnects/recovers and manages to get a heartbeat request through to the group coordinator, the response will inform it of the expiration and include a signal to the consumer to rejoin the group. When this happens, you should see it logging this:
[.c-code-block]"Attempt to heartbeat with <sent_generation> and group instance id <group.instance.id> failed due to <error>, resetting generation"[.c-code-block]
As you can tell, detecting heartbeat expiration on the client side can be tricky and involves some nuance. If you’re looking for a reliable way to tell when this has happened and have access to the broker logs, just look for this message as the <reason> logged by the group coordinator when it’s preparing a rebalance:
[.c-code-block]"removing member <member_id> on heartbeat expiration"[.c-code-block]
When a new consumer starts up and calls poll for the first time, it will attempt to join the group by sending a special kind of message called a JoinGroup request. Note that this first JoinGroup request from a new member is not actually used to join the group, but rather to get a member id assigned. In other words, the initial JoinGroup request is always doomed to fail and must be sent again for the actual rebalance — so if you see this in the client logs, don’t worry, it just means a new member joined:
[.c-code-block]"Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)"[.c-code-block]
You can confirm this by checking the <reason> logged by the broker:
[.c-code-block]"Member <member_id> joining group during <current_state>; client reason: <client_reason>"[.c-code-block]
Next we have the reverse of the above case — when a consumer is no longer available to actively process partitions, it will send another kind of special message called a LeaveGroup request. This will inform the group coordinator to immediately trigger a rebalance so that any partitions it was assigned can be redistributed to the remaining members of the group. The corresponding <reason> given in the broker logs will be
[.c-code-block]"Removing member <member_id> on LeaveGroup; client reason: <leave_reason>"[.c-code-block]
And on the client side, the consumer will log
[.c-code-block]"Member <member_id> sending LeaveGroup request to coordinator due to <leave_reason>[.c-code-block]
There are three distinct scenarios that can lead to the consumer proactively leaving the group, with the following possible values for <leave_reason>:
One interesting thing to note is that, unlike the first two scenarios, the heartbeat thread is actually the one sending the LeaveGroup request in the final case. Hopefully this makes sense if you think about it: of course the heartbeat thread would need to be responsible for informing the group coordinator that the polling thread is stuck, since by definition the polling thread can/won’t!
It’s also possible for applications to externally trigger a rebalance by forcing the consumer to send out a JoinGroup request. When this happens, you should be able to find the cause logged as the <client_reason> in this line:
[.c-code-block]"Request joining group due to <client_reason>"[.c-code-block]
On the broker side, you’ll see the <reason> logged as
[.c-code-block]"Updating metadata for member <member_id> during <current_state>; client reason: <client_reason>"[.c-code-block]
While this should rarely be needed for plain consumer client apps, enforced rebalances are heavily relied on by Kafka Streams. The specific examples of <client_reason> are covered in the “Rebalances Triggered by Streams” section below.
While not technically a trigger for rebalancing itself, at least not in the absence of ongoing and already-triggered rebalances, sometimes you can get into a loop where they are continuously failing and being retried, making it difficult to narrow in on what is ultimately causing the rebalancing storm. There are many different kinds of failures, so you’ll need to debug this on a case-by-case basis, but searching for the phrases [.c-code]JoinGroup failed[.c-code] and [.c-code]SyncGroup failed[.c-code] in the consumer logs is usually a good place to start.
With all that under our belt, it’s time to summon your courage young Gryffindor, and dive into the rebalancing protocol itself. First, let’s take a high-level look from the group coordinator’s point of view, and dissect some of these terms we’ve been throwing around. We can divide each rebalance into two distinct phases: the JoinGroup and the SyncGroup.
On that note, let’s talk a bit about partition assignment. As the name suggests, this interface is where the ultimate decision of how to allocate partition resources is made. But if you peek behind the curtain, you’ll find that it’s more than just the assignment algorithm. Here are the three notable APIs of the assignor:
It’s important to note that each member of the group will have its own instance of the [.c-code]ConsumerPartitionAssignor[.c-code], and all of them will invoke the [.c-code]#subscriptionUserData[.c-code] and [.c-code]#onAssignment[.c-code] callbacks (as we’ll see shortly). Only one assignor instance will be responsible for performing the assignment and invoking the [.c-code]assign[.c-code] API, however. Remember how the group coordinator will pick one of the consumers to elect as group leader for each rebalance? Well, this is why: to perform the assignment.
Now that you’ve been armed with an understanding of the rebalancing protocol, we can rewind back to the beginning of a rebalance and step through the process once again, this time from the client’s point of view, to understand the vital role of the ConsumerPartitionAssignor.
Step 1: Rejoin Group & Send JoinGroup Request
As we already know, once a rebalance is initiated, all members of the consumer group are prompted to rejoin by sending a JoinGroup request to the group coordinator. This is more than just a simple Alohomora spell for being let back into the group, however. Each consumer has to encode a “Subscription” in the JoinGroup request, which contains required metadata like the topic subscription and currently assigned partitions, optional configurations such as [.c-code]rack.id[.c-code] or [.c-code]group.instance.id[.c-code], and any optional “userdata” like the set of standby tasks or stateful tasks in Kafka Streams. So each consumer begins a rebalance with a call to the assignor’s [.c-code]#subscriptionUserData[.c-code] and packages this into the Subscription that it then sends off in the JoinGroup request.
Step 2: Receive JoinGroup Response & Send SyncGroup Request
Eventually each consumer gets the JoinGroup response and immediately sends out its SyncGroup request. But the SyncGroup is the final piece of communication a consumer sends out during the rebalance, so if the partition assignment is to occur on the client side, this has to be when and where it happens.
And this is indeed where the assignor steps in, but not for everyone! All regular group members simply receive an empty JoinGroup response, and send out an empty SyncGroup request in return. For them, the JoinGroup response and SyncGroup request are simply sparks in the air: just a signal that they’re still here, still active, still waiting for their assignment.
The group leader on the other hand, has important things to do during this step. The group coordinator has collected the Subscriptions from all the JoinGroup requests, and consolidated them into a single object. This is what gets passed to the group leader in its JoinGroup response, and ultimately becomes the GroupSubscription that is handed to the ConsumerPartitionAssignor’s assign method, along with metadata about the cluster itself. We’ll save all the gory details of the assignment for another time (perhaps another blog post) but for now, all you need to understand is that a single instance of the ConsumerPartitionAssignor (specifically, the group leader’s assignor) gets the Subscriptions for the entire group and uses them to decide on a mapping of topic partitions to consumers. The group leader encodes the set of partitions assigned to each consumer, along with any other optional user metadata, in an “Assignment” for each member of the group. Finally, it packs these into a GroupAssignment embedded in its own SyncGroup request, and sends it off to continue the rebalance.
Step 3: Receive SyncGroup Response & Stabilize Generation
Once the SyncGroup requests have all been sent, the group coordinator locates the one from the group leader and dissects it into the individual per-consumer Assignments, which it then forwards to the corresponding consumer in its SyncGroup response. From now on, as far as the group coordinator is concerned, the rebalance is over (and a new one can begin). Then it’s just a matter of waiting for each consumer to receive and process its SyncGroup response to officially join the group on the latest stable generation. At this point the group leader is once again indistinguishable from any other member of the group. If all goes well, the consumers will just update their partition assignment, invoke the [.c-code]#onAssignment[.c-code] callback to notify their personal assignor (even if that “assignor” was not actually involved in the assignment), and resume normal processing.
[.c-callout]🚨 Note: the Kafka consumer will attempt to enforce the timeout passed in to #poll regardless of an active rebalance, and therefore it may not always complete a rebalance within a single invocation of #poll! Using a short poll timeout is useful when you want to continue processing while a rebalance is in progress, for example while waiting on the JoinGroup/SyncGroup response. It’s good to keep in mind that these steps can occur in a single invocation of #poll or in separate calls for each.[.c-callout]
Unlike the Consumer client which allows users to choose from a set of built-in partition assignors or even plug in a custom implementation of ConsumerPartitionAssignor of their own, the assignor in Kafka Streams is hard-coded. A Streams application requires a specific assignor, called the StreamsPartitionAssignor — but not necessarily (or not just) to enforce a specific assignment strategy. The StreamsPartitionAssignor is used to dip into the dark arts and perform a bit of magic unrelated to assigning partitions. By taking advantage of the group leader selection, Streams can use the assignor to perform certain actions that only need to happen once, communicate between different instances within the same app that lack a direct network connection, and gate the processing until important initialization & validation steps have completed. It uses the #assign method for things like verifying partition counts and creating internal topics.
It’s important to understand that rebalances are an essential tool of Kafka Streams as much as they are a core functionality, and are used extensively for a variety of features beyond what most would imagine. But rebalancing can seem to be as much an art as a science, and a dark art at that. Because as useful as rebalances are, they can be equally destructive and harmful to your application, or worse: your sanity. Like with any dark magic, they have lead many down the path of madness. So let’s take a look at what can happen when things go wrong, and how to defend yourself against the worst of it.
Wands out — we’re going in.
The first thing to know is constant vigilance! The enemy is all around you — and it’s often Kafka Streams itself that’s intentionally causing the rebalances. We’ll get in to the specific causes and kinds of triggered rebalances, but for know, just know that you can always tell whether the assignor intentionally scheduled a followup rebalance by searching the logs for the phrase "unstable assignment" — if you don’t see that, you can infer that the assignment was “stable” and doesn’t require a followup. Although it’s always a good idea to confirm this by searching for the phrase "stable assignment" instead.
When a StreamThread’s consumer is processing the Assignment it received in its SyncGroup response, among the various other metadata encoded by Streams, it may read out a scheduled time (in epoch milliseconds) at which it’s been requested to enforce a new rebalance. If this happens, it will signal to the StreamThread when the followup rebalance should be triggered. After the rebalance has finished and the StreamThread leaves the poll , it will check this schedule on each iteration of the main processing loop, and inform the consumer to rejoin the group on the next poll if the current time is beyond the scheduled rebalance. You can tell when this has happened, and which StreamThread was responsible, by looking for this line in the Streams client logs:
[.c-code-block]"Triggering the followup rebalance scheduled for <SCHEDULED_TIME>"[.c-code-block]
[.c-incident-911]While there’s nothing wrong with a little rebalancing, sometimes applications seem to fall into a black hole of rebalancing from which they don’t recover. To diagnose a bad case of endless rebalancing, we obviously need to know why the rebalances are happening. Sometimes it may be external factors, such as consumers joining/leaving the group, but other times each rebalance is actually directly inducing the next. Often, debugging rebalancing issues means tracing along a chain of rebalances to find the original event that kicked it off. This means being able to differentiate between the rebalances that are just a normal symptom of the current and/or ongoing events, and the ones that reveal an unexpected or abnormal factor at play. To unwind a tangled mess of rebalancing, you’ll want to be familiar with all the reasons a rebalance might occur, so you can determine if the rebalance makes sense in the given context.[.c-incident-911]
In the earlier section “The Why” above, we covered the kinds of rebalances that the group coordinator or consumer group members can trigger. Now let’s turn the page to the subject of externally-triggered rebalances, and take a close look at each of the cases in which the StreamsPartitionAssignor will request a followup rebalance.
Note: some of these are more niche than others, but we’re including them all for completeness. If you’re experiencing a rebalancing loop emergency you should [.c-strike]hang up and call 911[.c-strike] focus on the first 2 reasons below.
Cooperative Followup Rebalance
An immediate followup rebalance that is scheduled when the assignor would like to change which StreamThread an active task is assigned to. To make sure that no two consumers ever own the same task at the same time, no matter how briefly, we must make sure that a partition is revoked from its original owner before it can be assigned to its new one. This means the task will not be assigned to anyone during the first rebalance, and instead, the assignor will inform the intended owner to trigger a followup rebalance as soon as the current one has finished. Since the previous owner will have had to revoke that partition before it can rejoin the group for the 2nd rebalance, the assignor is now free to give the active task to its new consumer.
In a stable group, there should only ever be exactly one of these followup rebalances. However, if the group membership is constantly changing — for example due to recurring scaling events or consumers missing rebalances — then you will get a new cooperative followup rebalance after each “regular” rebalance. This can compound issues where the group is unstable, for example due to long processing times.
Look for this log from the assignor:
[.c-code-block]"Requesting followup rebalance be scheduled immediately by {} due to tasks changing ownership."[.c-code-block]
Or this log on a regular group member:
[.c-code-block]“Requested to schedule immediate rebalance for new tasks to be safely revoked from current owner”[.c-code-block]
Warmup Task Probing Rebalance
A delayed followup rebalance scheduled for [.c-code]probing.rebalance.interval.ms[.c-code] from the time of the current rebalance, or 10 minutes by default. This is a potentially recurring rebalance that is used to “probe” the readiness of warmup tasks. These are special standby tasks that are placed on nodes that do not yet have a complete local copy of the changelog contents for a stateful task and would therefore need to spend a lot of time in restoration before it can begin processing. During a rebalance, the assignor is only allowed to place active tasks on nodes for which the local state is within the configured [.c-code]acceptable.recovery.lag[.c-code] from the end of the changelog. If the assignor wants to move an active task to a different node, it will place a warmup task there instead, in order to “warm up” the state in the background without interrupting the active processing. It then schedules probing rebalances to determine when it is finally safe to transition that warmup task into the active task.
Users who monitor the partition assignment balance may find that the group is extremely unbalanced while probing rebalances are ongoing. This can happen when only a subset of nodes have most of the stateful tasks, causing the active tasks to be “locked” onto those nodes. Many systems are impacted by this extreme skew, even if temporary, and it is often outright deadly for applications that are already running at or near the limits of their allotted resources. If/when probing rebalances are detected, it’s always recommended to monitor your application closely until it has stabilized and stopped issuing probing rebalances.
In general, users may run into issues with the increased resource consumption, slow warmup times, or extreme task skew during the probing rebalance period. If you have trouble getting this feature to work, try increasing [.c-code]max.warmup.replicas[.c-code] to enable faster warmup times at the cost of higher resource usage spikes, or set the [.c-code]acceptable.recovery.lag[.c-code] to [.c-code]MAX_VALUE[.c-code] to effectively disable it and instantly move tasks if you can afford the changelog topic restoration downtimes.
Quickly identify probing rebalances before they happen by looking for this log by the assignor:
[.c-code-block]"Requesting followup rebalance be scheduled by {} for {} to probe for caught-up replica tasks"[.c-code-block]
Or this log by one of the regular group members:
[.c-code-block]"Requested to schedule next probing rebalance at {} to try for a more balanced assignment"[.c-code-block]
Version Probing Followup Rebalance
An immediate followup rebalance that is scheduled only during rolling upgrades in which the Subscription schema version has changed. The version probing feature enables live upgrades and downgrades of any Streams application. In it, the assignor will inform each member to downgrade their subscription onto the lowest commonly supported version to make sure that it’s readable regardless of whether the selected group leader is on the new or old bytecode version.
Once the last member has been upgraded and the entire group is on the same version, one final rebalance will be triggered to have everyone upgrade onto the latest Subscription version. This kind of rebalancing should only ever be seen during a rolling upgrade/downgrade.
Look for this log to know when a version probing rebalance has been triggered:
[.c-code-block]"Requested to schedule immediate rebalance due to version probing"[.c-code-block]
Host Endpoint Change With Static Membership
A relatively rare kind of immediate followup rebalance in which the host endpoint has been changed but the new host endpoint has not yet percolated through the group due to static membership preventing an automatic rebalance. This should only be triggered once, and only if the [.c-code]application.server[.c-code] config has been changed.
This log will tell you when this has occurred:
[.c-code-block]"Requested to schedule immediate rebalance to update group with new host endpoint"[.c-code-block]
Application Shutdown Rebalance
An immediate, but not followup, rebalance which is enforced in order to transmit an error code to all members of the group. This is the only rebalance on this list that doesn’t occur as a direct result of a previous rebalance, nor is it requested by the assignor. Instead, this kind of rebalance can be triggered by any member of the group at any time. Remember, individual members of the group cannot communicate directly with each other. They can however effectively pass messages to the rest of the group by embedding them in the Subscription and Assignment metadata passed around during a rebalance. Kafka Streams takes advantage of this to let its application instances forward custom messages.
Currently this is only used to send fatal errors that induce an application-wide shutdown when the user opts to [.c-code]SHUTDOWN_APPLICATION[.c-code] with a custom [.c-code]StreamsUncaughtExceptionHandler[.c-code], but it’s a powerful mechanism made possible by the consumer group rebalancing protocol and our custom [.c-code]StreamsPartitionAssignor[.c-code].
This log will appear on the Streams client node where the shutdown request originated:
"Detected that shutdown was requested. All clients in this app will now begin to shutdown"
One of the most valuable tools in being able to understand and debug rebalances is just being able to figure out who did what, when. At the end of this class you should be able to piece together the timeline of a rebalance and triage an issue on your own.This won’t make you suddenly responsible for debugging and solving all rebalancing related issues at your company (we hope!), but you should be able to triage an incident thoroughly enough to determine a course of action: whether it’s fixing a configuration error, filing a thorough bug report with enough information for the devs to diagnose later, escalating it to a support team in clear language that will get you help the fastest, or just writing up a post-mortem report on an incident (10 inches of parchment minimum!)
[.c-incident-911]Like with all distributed systems, the multiple actors of a consumer group can make it difficult to understand how the system as a whole is behaving. Fortunately, rebalances have unique, monotonically increasing generations to identify them, and a single group leader for each rebalance that can be isolated to understand the driving forces.[.c-incident-911]
[.c-callout]📜 To dig into an app experiencing difficulties with rebalances, you’ll want to make sure to collect logs across all members of the group. The most important logs are in the StreamsPartitionAssignor and StreamThread on the Streams side, and the ConsumerCoordinator and AbstractCoordinator for the consumer client. So make sure to configure them with INFO level logging — at least.
However, consider setting the StreamsPartitionAssignor to DEBUG[.c-callout]
With that said, the first step is always to identify the group leader of a given rebalance. Of course consumer groups can get quite large, so that’s often easier said than done — not only is there typically more than one copy of a Streams application running on a cluster of nodes, each application instance can have multiple StreamThreads. With one consumer per StreamThread and multiple StreamThreads per node, how do we find the group leader and locate the relevant assignor?
All it takes is a quick [.c-code]grep[.c-code] (or other tool for searching text). Simply search for a piece of one of the log lines from #assign, and narrow your view to the span of time over which the group was rebalancing. For example, the phrase "participating in this rebalance" is useful for locating the group leader of a Streams app, since it’s unlikely to appear outside of this context and produce false positives, occurs at the start of the rebalance, and has been in the StreamsPartitionAssignor since version 3.0.
Let’s say you have an application running on multiple nodes with two StreamThreads each, and have saved the logs from each application instance to a file called node{N}.log. We want to identify the group leader, so we run the following command:
[.c-code-block]$ grep "participating in this rebalance" node*.log
node0.log: [11:08:54,694] INFO stream-thread [otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-2-consumer] 2 client nodes and 4 consumers participating in this rebalance:
node1.log: [11:09:45,375] INFO stream-thread [otter-app-c4497316-8891-4f1a-b60e-6952b7c56fcf-StreamThread-1-consumer] 2 client nodes and 4 consumers participating in this rebalance:
node0.log: [11:10:57,340] INFO stream-thread [otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer] 2 client nodes and 4 consumers participating in this rebalance:
node0.log: [11:10:57,362] INFO stream-thread [otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer] 2 client nodes and 4 consumers participating in this rebalance:
node0.log: [11:24:30,654] INFO stream-thread [otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer] 2 client nodes and 4 consumers participating in this rebalance:[.c-code-block]
[.c-callout]✅ Note: the log prefix for the StreamsPartitionAssignor contains these elements: [.c-code]applicationId-processId-threadId[.c-code]. The first part of the string should be recognizable as your app id. The last part will always be “-StreamThread-n-consumer”, where n is the thread id. Everything in between is a UUID that uniquely identifies the specific process, aka “node”, that the StreamThreads and their consumers are running in.[.c-callout]
We actually got quite a few hits on that log line, as you’d expect with recurring rebalances. Take a close look at the example results. Do you see anything suspicious?
Unstable Group Leaders
The first thing that should jump out at you is that the log prefix changes a few times, telling us that the group leader was not the same for all rebalances in this period. The first three lines all come from a different StreamThread. During normal operations, the group leader for the previous rebalance will be chosen for the next one as well. This leadership disruption therefore indicates that there was an issue with the first two members that prevented them from completing successful rebalances, until eventually [.c-code]otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer[.c-code] took over.
That’s not all we can learn from the example above. It’s important to always check the timestamps in a rebalance! What else can we learn here?
Closely Grouped Rebalances
The next thing you may notice is that the first two rebalances after the final group leader takes over are very close together in time, both occurring within the same second at 11:10:57. This almost certainly means that the first rebalance failed: but how do we know that, and can we make a guess at why?
Consider everything that has to happen between each instance of this line being logged: finishing the assignment for that first rebalance and sending it back in the SyncGroup request, receiving the SyncGroup response, sending out a new JoinGroup request, and receiving the JoinGroup response — which, remember, includes the Subscription for every other consumer in the group, which means we also had to wait for everyone else to rejoin the group.
It seems very unlikely, if not outright impossible, for all of that to occur within a few milliseconds: especially waiting for all of the other consumers to (re)join. But what if a new rebalance had already been kicked off, perhaps triggered by another member? In that case all the other consumers may have already sent their JoinGroup requests for the new rebalance and were just waiting on the group leader, who failed to notice as quickly due to its assignment responsibilities. If the group leader only had to finish its assignment and go through its own SyncGroup and JoinGroup request/response, it’s much easier to believe this could happen within a single second.
This theory is ultimately borne out by the logs in node0.log, which show that the SyncGroup of the first rebalance did indeed fail due to a second rebalance having been triggered.
[.c-code-block][11:10:57,345] INFO [Consumer clientId=otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer, groupId=otter-app] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=742, memberId='otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer-7f45268d-547c-47ed-a835-48e7d1503b0f', protocol='stream'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:826)[.c-code-block]
From here, you can trace back the original cause of the rebalancing by diving into the logs with the help of the two previous incident 911 sections, “The Why” and “Rebalances Triggered by Streams”.
Regularly Spaced Probing Rebalances
Another common case of timestamps telling the whole story is with probing rebalances. If you happen to notice a gap of almost exactly 10 minutes — or whatever you set the [.c-code]probing.rebalance.interval.ms[.c-code] to — then it’s almost certain the recurring rebalances are simply due to a migration of tasks that are taking a long time to warm up. Consider increasing the [.c-code]max.warmup.replicas[.c-code] or even allocating additional standby tasks.
It’s important to note that the task assignment can be pretty much scrambled each time you make a permanent change to the group, such as scaling up or down the number of nodes. You’ll want to be aware of the expected increase in network usage as temporary warmup tasks restore from their changelog topics, as well as possible task skew until the final warmup task has transitioned to active. Monitor the app closely and try not to scale too often to minimize the disruption. For example, unlike the standard wisdom in similar systems, it usually makes sense to do everything all at once: whether that’s adding multiple nodes in a single scale-up event or bouncing multiple nodes at the same time. Ultimately, the longer the app spends in the intermediate state, the more time and money you’ll be spending just moving tasks around.
Unfortunately, with large enough state stores your app can only react so quickly. If you want to operate in a dynamic environment or scale your app to react quickly to current events, probing rebalances can be as much curse as blessing. The most important thing is being able to identify when they are occurring — especially if there’s no end in sight and the rebalancing never terminates, in which case you’ll need to consider your options and can weigh the tradeoffs you might otherwise not have considered (such as disabling warmup tasks altogether).
Everyone knows that rebalancing occurs when consumers enter or leave the group, but it’s often assumed that the group membership will be stable provided the operator is not actively adding or removing nodes. Unfortunately the truth is that Kafka Streams and the stability of its assignment is highly sensitive to the set of consumers who are actually participating in each rebalance. In fact, if the group membership is constantly changing, the application will never converge on a task assignment and will continually rebalance with a combination of cooperative and probing rebalances as tasks are moved around between nodes.
To narrow down the reason for excessive rebalancing, it’s therefore important to track the group membership across a series of rebalances during the affected period of time. Fortunately, we can build on what we already know, and in fact use the same search terms to figure out who was in each rebalance. You’ll want to make sure to follow the progression across any changes in group leadership by searching the logs of multiple nodes if necessary, as discussed above. Let’s look at another example:
[.c-code-block]$ grep "participating in this rebalance" node*.log
node0.log: [18:12:54,694] INFO stream-thread [...] 2 client nodes and 3 consumers participating in this rebalance:
node0.log: [18:12:45,375] INFO stream-thread [...] 2 client nodes and 4 consumers participating in this rebalance:
node0.log: [18:13:57,340] INFO stream-thread [...] 2 client nodes and 3 consumers participating in this rebalance:
node0.log: [18:13:57,362] INFO stream-thread [...] 3 client nodes and 4 consumers participating in this rebalance:
node0.log: [18:14:30,654] INFO stream-thread [...] 2 client nodes and 4 consumers participating in this rebalance:[.c-code-block]
[.c-callout]🚨 This log message was recently changed to include the summary of node and consumer count explicitly, so in earlier versions (3.5.0 or below) you will need to count this yourself. The assignor will print each node on a separate line, so you’ll want to add a [.c-code]-A <max_nodes>[.c-code] switch to the sample grep command to make sure you get the entire message. It’s kind of a pain, but that’s exactly why we improved it in 3.6.0![.c-callout]
If you ever see a change in the number of consumers (or nodes) between successive rebalances, you immediately know the cause of the rebalance: members coming and going. Run the grep command again with the [.c-code]-A <max_nodes>[.c-code] switch to get the list of consumers and nodes participating each time, then pick out a specific set of rebalances and figure out which node has a consumer missing from one of the rebalances.
Armed with this information, it should be straightforward to rule out the possible causes until you know the true root cause of the changing membership and ultimately the rebalancing itself. We’ll start with the straightforward ones:
It’s nearly time to take the Hogwarts Express back to the real world, but before you go, let’s round up what we’ve learned:
Ultimately, as you can see a lot of the complexity of rebalancing in Streams — and therefore of operating and debugging it — stems from two main things that Streams does: 1) coupling partitions with state (particularly local state tied to a specific node), and 2) wielding the partition assignor as a dark art with power over many features, yet confined to run as a client-side assignor within the confines of the group leader — which at the end of the day is simply a consumer like any other, and far from the true “leader” status of a group coordinator or control plane.
The group coordinator actually is going to start taking on more responsibility to simplify the operational and debugging experience for plain consumer group apps, thanks to the next gen rebalancing protocol introduced in KIP-848. But the picture is not so simple for Kafka Streams, which has too much custom logic to hand over control to the brokers, and of course also has to contend with state.
At Responsive, we’re working on making endless rebalances a thing of the past by decoupling state from compute and handing the assignment reins to our controller to make informed decisions from our control plane. We’re currently working on making the partition assignment customizable in Kafka Streams with KIP-924: check out the KIP to follow along or join in on the KIP discussion.
And that’s it! Congratulations on making it to the end. Take these tools out into the world so that the next time you find yourself facing a rebalancing storm, you can approach it head on and without fear — for a fear of rebalancing suggests that what you fear most of all, is fear itself.
But we get it: this stuff is really hard. If you have any specific questions or are worried about your upcoming N.E.W.T exams, send us an owl — or for quicker results, hop on the Responsive discord and say hello!