Defense Against the Dark Arts of Rebalancing in Kafka Streams

Defense Against the Dark Arts of Rebalancing in Kafka Streams

A practitioner’s guide. For everyone who is in too deep and wants to peel back the curtains on one of the most elegant yet maddening aspects of Apache Kafka: the consumer group rebalancing protocol.

Whether you’re just starting to operate your own Kafka Streams applications and wondering what goes on beneath the surface or have been using it for years, this blog post is for you.

Kafka’s consumer group rebalancing protocol is sophisticated, and Kafka Streams pushes the protocol to its limits. So before you can productively debug gnarly rebalancing issues, you need to understand how the system works. This post will build the necessary context before arming you with the spells to banish rebalancing-related problems from your system.

Read it all now to prepare yourself for future battles or skip to the “Incident 911” sub-sections for concrete tips and specific references that can help in an emergency.

Here’s how it breaks down:

Rebalancing 101: An introduction to the consumer group rebalancing protocol, this section covers the architectural components that play important roles in the act of rebalancing, the reasons they are triggered, and what happens during a rebalance.

The Why covers the specific causes of rebalancing that apply to regular consumer groups as well as Kafka Streams, and how to identify them based on the logs you have.

Rebalancing in Kafka Streams: The middle section turns the focus from consumer groups in general to the specific example of Kafka Streams, and how rebalancing plays a role in your Streams applications.

Look to Rebalances Triggered by Streams for a logging-based guide to a special subset of rebalances that are unique to Kafka Streams.

Practical Defense Lessons: In the final section, we drill down on the lessons in practical defense against real-world rebalancing issues.

See How to Read a Rebalance for a general guide to approaching tough situations and tips for breaking down rebalances with concrete logging examples.

Rebalancing 101

As any great wizard will tell you, the first step to conquering your enemy is to understand them. So let’s jump right in to the rebalancing protocol and get to know it:

The What

First up, what even is a rebalance? Fundamentally, rebalancing is an elegant solution to the most ancient of distributed systems problems: dynamic scheduling. We have a set of resources representing discrete units of parallelizable work, and want to make sure they are distributed across the available workers so that each worker has something to do and each unit of work gets a share of execution time. In this case the workers are the consumers in a consumer group, and the units of work are the partitions of the topics that group is subscribed to.

Simply put, rebalancing is the protocol by which partitions are scheduled (or as we say, assigned) onto the available consumers. Although both the consumers and the partitions are “persistent” in the sense that a consumer is never finished working on a partition, it’s possible for the number of either to change over time. For example, when topics matching the consumers’ subscription are created, there will be new partitions to process. On the flipside, consumers may be added and removed from the group through scaling operations or node failures. To make sure all partitions are being processed and all consumers are being utilized following a change to either the work or the workers of the group, we need to automatically redistribute the current partitions across the remaining consumers: to “rebalance” them.

The When

So rebalancing is what we call the reassignment of partitions to consumers after a change in the group. But what exactly causes it — when does a rebalance occur?

At a high level, you can break up the causes into two categories: changes in the topic partitions and changes in the group membership. The former is typically less interesting, or at least less common in problematic situations, as topics tend to be fairly static for the lifetime of an application. Group membership, on the other hand, is far more dynamic — both intentionally and unintentionally — and more nuanced than you might think.

The Who

Before we drill down into the causes of a rebalance, let’s introduce all the players:

  1. Group Member: A consumer group is, obviously, comprised of a group of consumers, which may be running in the same physical process or spread out across multiple nodes. Typically all consumers in a group will subscribe to the same set of topics (though this is not strictly required). The set of consumers belonging to the same group are referred to as “group members”, and are identified by a shared value of the group.id config, which is often referred to as the consumer group’s name.

  2. Group Coordinator: The group itself is managed by a special broker called the “group coordinator”, who is responsible for handling offset commits and orchestrating rebalances. When debugging issues with rebalancing, the group coordinator logs can provide essential insights, as we’ll see shortly. You’ll want to find the relevant broker logs for a particular consumer group by searching for that consumer group’s name. This helps identify the group coordinator for that group out of the set of logs from all brokers in the cluster. Note that a given broker node may be the coordinator for multiple consumer groups at a time, so it can also help to filter the logs by the consumer group name.

    Generally speaking all members of a consumer group are interchangeable, and it is the group coordinator, not any one consumer, that manages and tracks the membership of the group.

  3. Group Leader: There is, however, a special consumer in each group that is chosen — arbitrarily and essentially at random by the group coordinator — to be the “group leader”. Despite the name, it doesn’t do much in the way of “leading” the group, and is only differentiated from the others when it is called on to perform the partition assignment during a rebalance. By pushing the assignment into the group leader we can keep the partition allocation logic on the client side, allowing each consumer group application to customize the assignment algorithm if desired, while still maintaining server-side control of the consumer group protocol through the group coordinator.

  4. Polling Thread: A consumer can only participate in a rebalance when the application thread using it invokes the poll method. We dub the application thread polling the consumer as the “polling thread” due to the vital role that poll plays in rebalancing and the consumer group membership in general: this is the mechanism by which a consumer can “join the group”.

    The consumer’s poll API actually has two distinct responsibilities: fetching records and maintaining group membership. Inside each poll, the consumer may (1) send new fetch requests to the brokers and return already-received records from the consumer’s internal fetch buffer, and (2) send special rebalancing-related requests to the group coordinator to initiate, continue, or complete a rebalance. Examples of the latter are the JoinGroup and SyncGroup request, though we’ll cover their specific roles later in this post. Also of note is the role of the polling itself; repeated calls to poll will indicate to the group coordinator that the polling thread is still alive and actively processing the partitions assigned to its consumer.

  5. Heartbeat Thread: Separately, each consumer has a background “heartbeat thread” which shoots off repeated “heartbeat” requests to the group coordinator to confirm that the application process itself is alive and healthy. As with all requests, the group coordinator must send a response, and it can actually take advantage of the frequent heartbeat requests to communicate messages and calls to action. It might use the heartbeat response to send a special signal to rejoin the group when a rebalance is triggered, since the group coordinator cannot itself initiate contact with the consumers in a group.

In summary, here are the various actors at play in a consumer group:

Legend

When there’s no rebalancing going on, the interactions between these players during normal processing will look something like this:

Normal Polling

The Where

Now that we’ve introduced the different participants, we can start putting together a picture of what is happening where — and most importantly, which set of logs to check for specific questions!

First, while any member of the group can trigger a rebalance, they can’t and don’t communicate directly with each other. So all of their requests have to go to, and through, the group coordinator. The actual trigger to rebalance can be either a consumer sending one of those special requests to the group coordinator (specifically a JoinGroup request), or it can be simply a decision made by the group coordinator’s own internal logic. But it’s important to understand that whoever and whatever originally caused a rebalance, it’s ultimately up to the group coordinator to broadcast the rebalance event to the other consumers.

Broker Logs

Typically the group coordinator will do so by sending a “rejoin” signal to each consumer via the response to their next heartbeat thread request. Before this occurs, the broker will print one of the most useful logs for understanding, tracking, and debugging rebalances on the broker side:

"Preparing to rebalance group <group_id> in state <current_state> with old generation <previous_generation_id> (__consumer_offsets-<group_offsets_partition>) (reason: <reason>)”
  1. The <current_state> will always be PreparingRebalance as it transitions to this state just prior to the logging, and the <group_offsets_partition> just refers to which partition of the internal __consumer_offsets topic holds committed offsets for this group (a detail you can typically ignore). More interesting here are the <previous_generation_id> and <reason>.
  2. The generation_id, or often just “generation”, is a simple monotonically increasing counter that identifies each new stable state of the consumer group. It is incremented after every successful rebalance and marks the set of confirmed consumers in the group. Any consumer that successfully participated in that rebalance is considered part of that generation of the group. In the context of the above log line, the <previous_generation_id> reports the generation_id of the last successful rebalance — that means if you see the above line logged repeatedly with the same generation_id each time, you know the rebalances are failing and will need to investigate why!
  3. The <reason> reports the cause of the rebalance from the broker’s point of view. You may need to look at the client-side logs to understand the true root cause of a rebalance that was originally prompted by a consumer. Luckily, thanks to recent protocol and logging improvements such as KIP-800, these days the consumer can communicate its reason for requesting a rebalance to the group coordinator. If your cluster (ie your brokers) are on version 3.2.0 or above, the reason will contain details about why a consumer requested the group to rebalance.

The specific log messages mentioned in this blog post are from version 3.6.0 of Apache Kafka, and many of the lines have changed across recent releases. You should always make sure to look at the exact wording of the logs in the source code for the specific version you are running.

Client Logs

Of course, if you also (or only!) have the client-side logs, you can ask the consumer itself why it decided to trigger a rebalance or rejoin the group. A great line to anchor any debugging session is this message that gets logged when the consumer internally flags the need to rejoin the group for an ongoing or new rebalance:

"Request joining group due to: <client_reason>"

Note that this may be logged by the polling thread, for example when first starting up or due to externally enforced rebalances (as used by Kafka Streams). It can also be logged by the heartbeat thread, like when the group coordinator sends a signal to rejoin in the heartbeat response. We’ll dig into all the gory details around what causes rebalances and cover the most common and useful to recognize <client_reason> possibilities in the following section. But it’s important to be able to differentiate between the actual cause of a rebalance and the symptom(s), so that you know when and where to look. For example, if the heartbeat thread receives the signal to rejoin in its response from the group coordinator, the <client_reason> logged will be that the "group is already rebalancing". In this case, as the log line suggests, the consumer you are looking at is only rebalancing because someone else triggered one first. You’ll need to check the other consumers (and/or the group coordinator) to figure out the root cause of that particular rebalance.

Still struggling with rebalancing problems?

Join our Discord!

S

Sophie Blee-Goldman

Founding Engineer

The Why (Incident 911)

So, what are the actual reasons a group may have for rebalancing? Let’s drill down into the specific scenarios to explain why a rebalance is occurring, and more importantly, how you can identify the root cause. When debugging rebalances, you would ideally collect the complete set of logs from all consumers in the group as well as from the group coordinator. Unfortunately the reality seems to be that folks often have access to only one or the other, so we’ll cover what to look out for in both the consumer and broker logs: both to help those who might only have one of those available, and because they compliment each other and tell a much more complete story when read together.

Heartbeat Expiration

Rebalance

Heartbeat expiration occurs when the heartbeat thread fails to contact the group coordinator within the configured session.timeout.ms, causing the group coordinator to assume failure and trigger a rebalance to kick it out of the group. Generally this happens when the process itself has stopped, in which case there would be no further client-side logging. However, it’s also possible that the group coordinator connection simply faltered, for example due to connection issues like network partitions or dropped packets, and in extreme cases, thread starvation or long GC pauses.

If the heartbeat thread just can’t get a heartbeat request and response across a faulty network within the session.timeout.ms, you should see a line in the client-side logs that includes this phrase:

"session timed out without receiving a heartbeat response"

If the heartbeat thread was stalled for some reason, when it finally reconnects/recovers and manages to get a heartbeat request through to the group coordinator, the response will inform it of the expiration and include a signal to the consumer to rejoin the group. When this happens, you should see it logging this:

"Attempt to heartbeat with <sent_generation> and group instance id <group.instance.id> failed due to <error>, resetting generation"

As you can tell, detecting heartbeat expiration on the client side can be tricky and involves some nuance. If you’re looking for a reliable way to tell when this has happened and have access to the broker logs, just look for this message as the <reason> logged by the group coordinator when it’s preparing a rebalance:

"removing member <member_id> on heartbeat expiration"

New Member Joins

New Member Joins

When a new consumer starts up and calls poll for the first time, it will attempt to join the group by sending a special kind of message called a JoinGroup request. Note that this first JoinGroup request from a new member is not actually used to join the group, but rather to get a member id assigned. In other words, the initial JoinGroup request is always doomed to fail and must be sent again for the actual rebalance — so if you see this in the client logs, don’t worry, it just means a new member joined:

"Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)"

You can confirm this by checking the <reason> logged by the broker:

"Member <member_id> joining group during <current_state>; client reason: <client_reason>"

Existing Member Leaves

Next we have the reverse of the above case — when a consumer is no longer available to actively process partitions, it will send another kind of special message called a LeaveGroup request. This will inform the group coordinator to immediately trigger a rebalance so that any partitions it was assigned can be redistributed to the remaining members of the group. The corresponding <reason> given in the broker logs will be:

"Removing member <member_id> on LeaveGroup; client reason: <leave_reason>"

And on the client side, the consumer will log:

"Member <member_id> sending LeaveGroup request to coordinator due to <leave_reason>

There are three distinct scenarios that can lead to the consumer proactively leaving the group, with the following possible values for <leave_reason>:

  1. "the consumer is being closed": The first and most simple is when the consumer is closed, which typically happens because an external event like a scale-down has triggered the application to shut down. If this log message looks unfamiliar to Kafka Streams users, it’s because in Streams, we override the default consumer behavior so that it does not send a LeaveGroup message when the Kafka Streams application, and correspondingly its consumer, is closed. This means an instance of a Streams app that has been shutdown will actually wait to be kicked from the group on heartbeat expiration rather than proactively leave, and is the reason it can take up to the session.timeout.ms for the group to rebalance after scaling down. This is an intentional tradeoff to improve overall group and partition assignment stability in the face of temporary shutdowns typical of cloud environments and rolling redeployments.
  2. "the consumer unsubscribed from all topics": The second reason you may see a consumer proactively leave the group is when it has unsubscribed from all topics. Technically any change to a consumer’s subscription will trigger a rebalance, but if it calls unsubscribe or subscribes to an empty set of topics, the consumer decides to just leave the group completely since it obviously can’t take on any of the partition workload if it’s not subscribed to anything. For this reason, unsubscribe is sometimes used to reset a consumer’s partition assignment and position in the group, for example in Kafka Streams when the local state is corrupted or a transaction times out.
  3. "consumer poll timeout has expired." Lastly, we have one of the more insidious causes of consumers dropping out of the group: poll interval timeout. If your application spends longer than the configured max.poll.interval.ms between calls to poll, it’s assumed to be blocked or otherwise incapacitated and in need of another consumer to take over the partitions it was supposed to be processing. So whereas the session.interval.ms is used to indicate that the heartbeat thread, and therefore the entire process, is alive and unblocked, the max.poll.interval.ms indicates whether the polling thread is still actively working on the partitions it owns. If you ever see this message in your client logs, it’s time to check on your application’s health or else consider increasing the max.poll.interval.ms.

One interesting thing to note is that, unlike the first two scenarios, the heartbeat thread is actually the one sending the LeaveGroup request in the final case. Hopefully this makes sense if you think about it: of course the heartbeat thread would need to be responsible for informing the group coordinator that the polling thread is stuck, since by definition the polling thread can/won’t!

Enforced Rebalances

It’s also possible for applications to externally trigger a rebalance by forcing the consumer to send out a JoinGroup request. When this happens, you should be able to find the cause logged as the <client_reason> in this line:

"Request joining group due to <client_reason>"

On the broker side, you’ll see the <reason> logged as:

"Updating metadata for member <member_id> during <current_state>; client reason: <client_reason>"

While this should rarely be needed for plain consumer client apps, enforced rebalances are heavily relied on by Kafka Streams. The specific examples of <client_reason> are covered in the Rebalances Triggered by Streams section below.

Failures & Retries

While not technically a trigger for rebalancing itself, at least not in the absence of ongoing and already-triggered rebalances, sometimes you can get into a loop where they are continuously failing and being retried, making it difficult to narrow in on what is ultimately causing the rebalancing storm. There are many different kinds of failures, so you’ll need to debug this on a case-by-case basis, but searching for the phrases JoinGroup failed and SyncGroup failed in the consumer logs is usually a good place to start.

The How

With all that under our belt, it’s time to summon your courage young Gryffindor, and dive into the rebalancing protocol itself. First, let’s take a high-level look from the group coordinator’s point of view, and dissect some of these terms we’ve been throwing around. We can divide each rebalance into two distinct phases: the JoinGroup and the SyncGroup.

Rebalance (How)
  • JoinGroup Phase: As we briefly introduced earlier, any consumer can initiate a rebalance by sending a JoinGroup request. In fact, during a rebalance, every single member has to send a JoinGroup request to the group coordinator, in a process that’s referred to as “joining the group” (or “re-joining” if they were already part of an active consumer group). The coordinator will wait for all the currently known members of the group to rejoin, waiting for up to the max.poll.interval.ms. If an application instance fails to call poll within this timeout after a rebalance has begun, that consumer will miss the rebalance and drop out of the group, and all of its partitions will have been reassigned to other consumers. You can think of each rebalance as effectively a reset of the consumer group. Everyone has to participate by re-joining the group in order to remain a part of it!
  • SyncGroup Phase: Once the group coordinator has received a JoinGroup request from all known members of the group OR waited for up to the max.poll.interval.ms, it will send a JoinGroup response to each of the consumers. In turn, those consumers will confirm their membership by sending out another special message called a SyncGroup request. The group coordinator waits for all consumers who had sent a JoinGroup to send this SyncGroup request, then finally sends out a SyncGroup response to complete the rebalancing process. Once all the SyncGroups have been sent, the group coordinator transitions the group to “Stable” and bumps the generation_id.

The ConsumerPartitionAssignor

On that note, let’s talk a bit about partition assignment. As the name suggests, this interface is where the ultimate decision of how to allocate partition resources is made. But if you peek behind the curtain, you’ll find that it’s more than just the assignment algorithm. Here are the three notable APIs of the assignor:

public interface ConsumerPartitionAssignor {


    /**
     * Return serialized data that will be included in the {@link Subscription}
     * sent to the leader and can be leveraged in {@link #assign(Cluster, GroupSubscription)} 
     * ((e.g. local host/rack information)
     */
    default ByteBuffer subscriptionUserData(Set<String> topics) {
        return null;
    }


    /**
     * Perform the group assignment given the member subscriptions and current
     * cluster metadata.
     */
    GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);


    /**
     * Callback which is invoked when a group member receives its assignment
     * from the leader.
     */
    default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
    }

    ...
}

It’s important to note that each member of the group will have its own instance of the ConsumerPartitionAssignor, and all of them will invoke the #subscriptionUserData and #onAssignment callbacks (as we’ll see shortly). Only one assignor instance will be responsible for performing the assignment and invoking the assign API, however. Remember how the group coordinator will pick one of the consumers to elect as group leader for each rebalance? Well, this is why: to perform the assignment.

Partition Assignment

Now that you’ve been armed with an understanding of the rebalancing protocol, we can rewind back to the beginning of a rebalance and step through the process once again, this time from the client’s point of view, to understand the vital role of the ConsumerPartitionAssignor.

Step 1: Rejoin Group & Send JoinGroup Request

As we already know, once a rebalance is initiated, all members of the consumer group are prompted to rejoin by sending a JoinGroup request to the group coordinator. This is more than just a simple Alohomora spell for being let back into the group, however. Each consumer has to encode a Subscription in the JoinGroup request, which contains required metadata like the topic subscription and currently assigned partitions, optional configurations such as rack.id or group.instance.id, and any optional “userdata” like the set of standby tasks or stateful tasks in Kafka Streams. So each consumer begins a rebalance with a call to the assignor’s #subscriptionUserData and packages this into the Subscription that it then sends off in the JoinGroup request.

Step 2: Receive JoinGroup Response & Send SyncGroup Request

Eventually each consumer gets the JoinGroup response and immediately sends out its SyncGroup request. But the SyncGroup is the final piece of communication a consumer sends out during the rebalance, so if the partition assignment is to occur on the client side, this has to be when and where it happens.

And this is indeed where the assignor steps in, but not for everyone! All regular group members simply receive an empty JoinGroup response, and send out an empty SyncGroup request in return. For them, the JoinGroup response and SyncGroup request are simply sparks in the air: just a signal that they’re still here, still active, still waiting for their assignment.

The group leader on the other hand, has important things to do during this step. The group coordinator has collected the Subscriptions from all the JoinGroup requests, and consolidated them into a single object. This is what gets passed to the group leader in its JoinGroup response, and ultimately becomes the GroupSubscription that is handed to the ConsumerPartitionAssignor’s assign method, along with metadata about the cluster itself. We’ll save all the gory details of the assignment for another time (perhaps another blog post) but for now, all you need to understand is that a single instance of the ConsumerPartitionAssignor (specifically, the group leader’s assignor) gets the Subscriptions for the entire group and uses them to decide on a mapping of topic partitions to consumers. The group leader encodes the set of partitions assigned to each consumer, along with any other optional user metadata, in an “Assignment” for each member of the group. Finally, it packs these into a GroupAssignment embedded in its own SyncGroup request, and sends it off to continue the rebalance.

Step 3: Receive SyncGroup Response & Stabilize Generation

Once the SyncGroup requests have all been sent, the group coordinator locates the one from the group leader and dissects it into the individual per-consumer Assignments, which it then forwards to the corresponding consumer in its SyncGroup response. From now on, as far as the group coordinator is concerned, the rebalance is over (and a new one can begin). Then it’s just a matter of waiting for each consumer to receive and process its SyncGroup response to officially join the group on the latest stable generation. At this point the group leader is once again indistinguishable from any other member of the group. If all goes well, the consumers will just update their partition assignment, invoke the #onAssignment callback to notify their personal assignor (even if that “assignor” was not actually involved in the assignment), and resume normal processing.

WARN: the Kafka consumer will attempt to enforce the timeout passed in to #poll regardless of an active rebalance, and therefore it may not always complete a rebalance within a single invocation of #poll!

Using a short poll timeout is useful when you want to continue processing while a rebalance is in progress, for example while waiting on the JoinGroup/SyncGroup response. It’s good to keep in mind that these steps can occur in a single invocation of #poll or in separate calls for each.

Rebalancing in Kafka Streams

Unlike the Consumer client which allows users to choose from a set of built-in partition assignors or even plug in a custom implementation of ConsumerPartitionAssignor of their own, the assignor in Kafka Streams is hard-coded. A Streams application requires a specific assignor, called the StreamsPartitionAssignor — but not necessarily (or not just) to enforce a specific assignment strategy. The StreamsPartitionAssignor is used to dip into the dark arts and perform a bit of magic unrelated to assigning partitions. By taking advantage of the group leader selection, Streams can use the assignor to perform certain actions that only need to happen once, communicate between different instances within the same app that lack a direct network connection, and gate the processing until important initialization & validation steps have completed. It uses the #assign method for things like verifying partition counts and creating internal topics.

It’s important to understand that rebalances are an essential tool of Kafka Streams as much as they are a core functionality, and are used extensively for a variety of features beyond what most would imagine. But rebalancing can seem to be as much an art as a science, and a dark art at that. Because as useful as rebalances are, they can be equally destructive and harmful to your application, or worse: your sanity. Like with any dark magic, they have lead many down the path of madness. So let’s take a look at what can happen when things go wrong, and how to defend yourself against the worst of it.

Wands out — we’re going in.

Defense Against the Dark Arts

The first thing to know is constant vigilance! The enemy is all around you — and it’s often Kafka Streams itself that’s intentionally causing the rebalances. We’ll get in to the specific causes and kinds of triggered rebalances, but for know, just know that you can always tell whether the assignor intentionally scheduled a followup rebalance by searching the logs for the phrase "unstable assignment" — if you don’t see that, you can infer that the assignment was “stable” and doesn’t require a followup. Although it’s always a good idea to confirm this by searching for the phrase "stable assignment" instead.

When a StreamThread’s consumer is processing the Assignment it received in its SyncGroup response, among the various other metadata encoded by Streams, it may read out a scheduled time (in epoch milliseconds) at which it’s been requested to enforce a new rebalance. If this happens, it will signal to the StreamThread when the followup rebalance should be triggered. After the rebalance has finished and the StreamThread leaves the poll , it will check this schedule on each iteration of the main processing loop, and inform the consumer to rejoin the group on the next poll if the current time is beyond the scheduled rebalance. You can tell when this has happened, and which StreamThread was responsible, by looking for this line in the Streams client logs:

"Triggering the followup rebalance scheduled for <SCHEDULED_TIME>"

Rebalances Triggered by Streams (Incident 911)

While there’s nothing wrong with a little rebalancing, sometimes applications seem to fall into a black hole of rebalancing from which they don’t recover. To diagnose a bad case of endless rebalancing, we obviously need to know why the rebalances are happening. Sometimes it may be external factors, such as consumers joining/leaving the group, but other times each rebalance is actually directly inducing the next. Often, debugging rebalancing issues means tracing along a chain of rebalances to find the original event that kicked it off. This means being able to differentiate between the rebalances that are just a normal symptom of the current and/or ongoing events, and the ones that reveal an unexpected or abnormal factor at play. To unwind a tangled mess of rebalancing, you’ll want to be familiar with all the reasons a rebalance might occur, so you can determine if the rebalance makes sense in the given context.

In the earlier section “The Why” above, we covered the kinds of rebalances that the group coordinator or consumer group members can trigger. Now let’s turn the page to the subject of externally-triggered rebalances, and take a close look at each of the cases in which the StreamsPartitionAssignor will request a followup rebalance.

Note: some of these are more niche than others, but we’re including them all for completeness. If you’re experiencing a rebalancing loop emergency you should hang up and call 911 focus on the first 2 reasons below.

Cooperative Followup Rebalance

An immediate followup rebalance that is scheduled when the assignor would like to change which StreamThread an active task is assigned to. To make sure that no two consumers ever own the same task at the same time, no matter how briefly, we must make sure that a partition is revoked from its original owner before it can be assigned to its new one. This means the task will not be assigned to anyone during the first rebalance, and instead, the assignor will inform the intended owner to trigger a followup rebalance as soon as the current one has finished. Since the previous owner will have had to revoke that partition before it can rejoin the group for the 2nd rebalance, the assignor is now free to give the active task to its new consumer.

In a stable group, there should only ever be exactly one of these followup rebalances. However, if the group membership is constantly changing — for example due to recurring scaling events or consumers missing rebalances — then you will get a new cooperative followup rebalance after each “regular” rebalance. This can compound issues where the group is unstable, for example due to long processing times.

Look for this log from the assignor:

"Requesting followup rebalance be scheduled immediately by {} due to tasks changing ownership."

Or this log on a regular group member:

“Requested to schedule immediate rebalance for new tasks to be safely revoked from current owner”

Warmup Task Probing Rebalance

A delayed followup rebalance scheduled for probing.rebalance.interval.ms from the time of the current rebalance, or 10 minutes by default. This is a potentially recurring rebalance that is used to “probe” the readiness of warmup tasks. These are special standby tasks that are placed on nodes that do not yet have a complete local copy of the changelog contents for a stateful task and would therefore need to spend a lot of time in restoration before it can begin processing. During a rebalance, the assignor is only allowed to place active tasks on nodes for which the local state is within the configured acceptable.recovery.lag from the end of the changelog. If the assignor wants to move an active task to a different node, it will place a warmup task there instead, in order to “warm up” the state in the background without interrupting the active processing. It then schedules probing rebalances to determine when it is finally safe to transition that warmup task into the active task.

Users who monitor the partition assignment balance may find that the group is extremely unbalanced while probing rebalances are ongoing. This can happen when only a subset of nodes have most of the stateful tasks, causing the active tasks to be “locked” onto those nodes. Many systems are impacted by this extreme skew, even if temporary, and it is often outright deadly for applications that are already running at or near the limits of their allotted resources. If/when probing rebalances are detected, it’s always recommended to monitor your application closely until it has stabilized and stopped issuing probing rebalances.

In general, users may run into issues with the increased resource consumption, slow warmup times, or extreme task skew during the probing rebalance period. If you have trouble getting this feature to work, try increasing max.warmup.replicas to enable faster warmup times at the cost of higher resource usage spikes, or set the acceptable.recovery.lag to MAX_VALUE to effectively disable it and instantly move tasks if you can afford the changelog topic restoration downtimes.

Quickly identify probing rebalances before they happen by looking for this log by the assignor:

"Requesting followup rebalance be scheduled by {} for {} to probe for caught-up replica tasks"

Or this log by one of the regular group members:

"Requested to schedule next probing rebalance at {} to try for a more balanced assignment"

Version Probing Followup Rebalance

An immediate followup rebalance that is scheduled only during rolling upgrades in which the Subscription schema version has changed. The version probing feature enables live upgrades and downgrades of any Streams application. In it, the assignor will inform each member to downgrade their subscription onto the lowest commonly supported version to make sure that it’s readable regardless of whether the selected group leader is on the new or old bytecode version.

Once the last member has been upgraded and the entire group is on the same version, one final rebalance will be triggered to have everyone upgrade onto the latest Subscription version. This kind of rebalancing should only ever be seen during a rolling upgrade/downgrade.

Look for this log to know when a version probing rebalance has been triggered:

"Requested to schedule immediate rebalance due to version probing"

Host Endpoint Change With Static Membership

A relatively rare kind of immediate followup rebalance in which the host endpoint has been changed but the new host endpoint has not yet percolated through the group due to static membership preventing an automatic rebalance. This should only be triggered once, and only if the application.server config has been changed.

This log will tell you when this has occurred:

"Requested to schedule immediate rebalance to update group with new host endpoint"

Application Shutdown Rebalance

An immediate, but not followup, rebalance which is enforced in order to transmit an error code to all members of the group. This is the only rebalance on this list that doesn’t occur as a direct result of a previous rebalance, nor is it requested by the assignor. Instead, this kind of rebalance can be triggered by any member of the group at any time. Remember, individual members of the group cannot communicate directly with each other. They can however effectively pass messages to the rest of the group by embedding them in the Subscription and Assignment metadata passed around during a rebalance. Kafka Streams takes advantage of this to let its application instances forward custom messages.

Currently this is only used to send fatal errors that induce an application-wide shutdown when the user opts to SHUTDOWN_APPLICATION with a custom StreamsUncaughtExceptionHandler, but it’s a powerful mechanism made possible by the consumer group rebalancing protocol and our custom StreamsPartitionAssignor.

This log will appear on the Streams client node where the shutdown request originated:

"Detected that shutdown was requested. All clients in this app will now begin to shutdown"

Practical Defense Lessons

One of the most valuable tools in being able to understand and debug rebalances is just being able to figure out who did what, when. At the end of this class you should be able to piece together the timeline of a rebalance and triage an issue on your own.This won’t make you suddenly responsible for debugging and solving all rebalancing related issues at your company (we hope!), but you should be able to triage an incident thoroughly enough to determine a course of action: whether it’s fixing a configuration error, filing a thorough bug report with enough information for the devs to diagnose later, escalating it to a support team in clear language that will get you help the fastest, or just writing up a post-mortem report on an incident (10 inches of parchment minimum!)

How to Read a Rebalance (Incident 911)

Like with all distributed systems, the multiple actors of a consumer group can make it difficult to understand how the system as a whole is behaving. Fortunately, rebalances have unique, monotonically increasing generations to identify them, and a single group leader for each rebalance that can be isolated to understand the driving forces.

NOTE: To dig into an app experiencing difficulties with rebalances, you’ll want to make sure to collect logs across all members of the group. The most important logs are in the StreamsPartitionAssignor and StreamThread on the Streams side, and the ConsumerCoordinator and AbstractCoordinator for the consumer client. So make sure to configure them with INFO level logging — at least. However, consider setting the StreamsPartitionAssignor to DEBUG.

With that said, the first step is always to identify the group leader of a given rebalance. Of course consumer groups can get quite large, so that’s often easier said than done — not only is there typically more than one copy of a Streams application running on a cluster of nodes, each application instance can have multiple StreamThreads. With one consumer per StreamThread and multiple StreamThreads per node, how do we find the group leader and locate the relevant assignor?

Tip 1: Find the GroupLeader

All it takes is a quick grep (or other tool for searching text). Simply search for a piece of one of the log lines from #assign, and narrow your view to the span of time over which the group was rebalancing. For example, the phrase "participating in this rebalance" is useful for locating the group leader of a Streams app, since it’s unlikely to appear outside of this context and produce false positives, occurs at the start of the rebalance, and has been in the StreamsPartitionAssignor since version 3.0.

Let’s say you have an application running on multiple nodes with two StreamThreads each, and have saved the logs from each application instance to a file called node{N}.log. We want to identify the group leader, so we run the following command:

$ grep "participating in this rebalance" node*.log
node0.log: [11:08:54,694] INFO stream-thread [otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-2-consumer] 2 client nodes and 4 consumers participating in this rebalance:
node1.log: [11:09:45,375] INFO stream-thread [otter-app-c4497316-8891-4f1a-b60e-6952b7c56fcf-StreamThread-1-consumer] 2 client nodes and 4 consumers participating in this rebalance:
node0.log: [11:10:57,340] INFO stream-thread [otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer] 2 client nodes and 4 consumers participating in this rebalance:
node0.log: [11:10:57,362] INFO stream-thread [otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer] 2 client nodes and 4 consumers participating in this rebalance:
node0.log: [11:24:30,654] INFO stream-thread [otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer] 2 client nodes and 4 consumers participating in this rebalance:

NOTE: the log prefix for the StreamsPartitionAssignor contains these elements: applicationId-processId-threadId. The first part of the string should be recognizable as your app id. The last part will always be “-StreamThread-n-consumer”, where n is the thread id. Everything in between is a UUID that uniquely identifies the specific process, aka “node”, that the StreamThreads and their consumers are running in.

We actually got quite a few hits on that log line, as you’d expect with recurring rebalances. Take a close look at the example results. Do you see anything suspicious?

Unstable Group Leaders

The first thing that should jump out at you is that the log prefix changes a few times, telling us that the group leader was not the same for all rebalances in this period. The first three lines all come from a different StreamThread. During normal operations, the group leader for the previous rebalance will be chosen for the next one as well. This leadership disruption therefore indicates that there was an issue with the first two members that prevented them from completing successful rebalances, until eventually otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer took over.

Tip 2: Check Rebalance Timing

That’s not all we can learn from the example above. It’s important to always check the timestamps in a rebalance! What else can we learn here?

Closely Grouped Rebalances

The next thing you may notice is that the first two rebalances after the final group leader takes over are very close together in time, both occurring within the same second at 11:10:57. This almost certainly means that the first rebalance failed: but how do we know that, and can we make a guess at why?

Consider everything that has to happen between each instance of this line being logged: finishing the assignment for that first rebalance and sending it back in the SyncGroup request, receiving the SyncGroup response, sending out a new JoinGroup request, and receiving the JoinGroup response — which, remember, includes the Subscription for every other consumer in the group, which means we also had to wait for everyone else to rejoin the group.

It seems very unlikely, if not outright impossible, for all of that to occur within a few milliseconds: especially waiting for all of the other consumers to (re)join. But what if a new rebalance had already been kicked off, perhaps triggered by another member? In that case all the other consumers may have already sent their JoinGroup requests for the new rebalance and were just waiting on the group leader, who failed to notice as quickly due to its assignment responsibilities. If the group leader only had to finish its assignment and go through its own SyncGroup and JoinGroup request/response, it’s much easier to believe this could happen within a single second.

This theory is ultimately borne out by the logs in node0.log, which show that the SyncGroup of the first rebalance did indeed fail due to a second rebalance having been triggered.

[11:10:57,345] INFO [Consumer clientId=otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer, groupId=otter-app] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=742, memberId='otter-app-225712b8-b54d-4043-a7e1-c489ee4f8fd5-StreamThread-1-consumer-7f45268d-547c-47ed-a835-48e7d1503b0f', protocol='stream'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:826)

From here, you can trace back the original cause of the rebalancing by diving into the logs with the help of the two previous incident 911 sections, “The Why” and “Rebalances Triggered by Streams”.

Regularly Spaced Probing Rebalances

Another common case of timestamps telling the whole story is with probing rebalances. If you happen to notice a gap of almost exactly 10 minutes — or whatever you set the probing.rebalance.interval.ms to — then it’s almost certain the recurring rebalances are simply due to a migration of tasks that are taking a long time to warm up. Consider increasing the max.warmup.replicas or even allocating additional standby tasks.

It’s important to note that the task assignment can be pretty much scrambled each time you make a permanent change to the group, such as scaling up or down the number of nodes. You’ll want to be aware of the expected increase in network usage as temporary warmup tasks restore from their changelog topics, as well as possible task skew until the final warmup task has transitioned to active. Monitor the app closely and try not to scale too often to minimize the disruption. For example, unlike the standard wisdom in similar systems, it usually makes sense to do everything all at once: whether that’s adding multiple nodes in a single scale-up event or bouncing multiple nodes at the same time. Ultimately, the longer the app spends in the intermediate state, the more time and money you’ll be spending just moving tasks around.

Unfortunately, with large enough state stores your app can only react so quickly. If you want to operate in a dynamic environment or scale your app to react quickly to current events, probing rebalances can be as much curse as blessing. The most important thing is being able to identify when they are occurring — especially if there’s no end in sight and the rebalancing never terminates, in which case you’ll need to consider your options and can weigh the tradeoffs you might otherwise not have considered (such as disabling warmup tasks altogether).

Tip 3: Track Group Membership

Everyone knows that rebalancing occurs when consumers enter or leave the group, but it’s often assumed that the group membership will be stable provided the operator is not actively adding or removing nodes. Unfortunately the truth is that Kafka Streams and the stability of its assignment is highly sensitive to the set of consumers who are actually participating in each rebalance. In fact, if the group membership is constantly changing, the application will never converge on a task assignment and will continually rebalance with a combination of cooperative and probing rebalances as tasks are moved around between nodes.

To narrow down the reason for excessive rebalancing, it’s therefore important to track the group membership across a series of rebalances during the affected period of time. Fortunately, we can build on what we already know, and in fact use the same search terms to figure out who was in each rebalance. You’ll want to make sure to follow the progression across any changes in group leadership by searching the logs of multiple nodes if necessary, as discussed above. Let’s look at another example:

$ grep "participating in this rebalance" node*.log
node0.log: [18:12:54,694] INFO stream-thread [...] 2 client nodes and 3 consumers participating in this rebalance:
node0.log: [18:12:45,375] INFO stream-thread [...] 2 client nodes and 4 consumers participating in this rebalance:
node0.log: [18:13:57,340] INFO stream-thread [...] 2 client nodes and 3 consumers participating in this rebalance:
node0.log: [18:13:57,362] INFO stream-thread [...] 3 client nodes and 4 consumers participating in this rebalance:
node0.log: [18:14:30,654] INFO stream-thread [...] 2 client nodes and 4 consumers participating in this rebalance:

NOTE: This log message was recently changed to include the summary of node and consumer count explicitly, so in earlier versions (3.5.0 or below) you will need to count this yourself. The assignor will print each node on a separate line, so you’ll want to add a -A <max_nodes> switch to the sample grep command to make sure you get the entire message. It’s kind of a pain, but that’s exactly why we improved it in 3.6.0!

If you ever see a change in the number of consumers (or nodes) between successive rebalances, you immediately know the cause of the rebalance: members coming and going. Run the grep command again with the -A <max_nodes> switch to get the list of consumers and nodes participating each time, then pick out a specific set of rebalances and figure out which node has a consumer missing from one of the rebalances.

Armed with this information, it should be straightforward to rule out the possible causes until you know the true root cause of the changing membership and ultimately the rebalancing itself. We’ll start with the straightforward ones:

  • Timing Out On Heartbeat Expiration: If you have access to the broker logs, this one will be easy to figure out. Check out the “Heartbeat Expiration” section above for the full details, or look for this message in the logs (or even just the term "heartbeat expiration"):

    "Member <memberId> in group <application.id> has failed, removing it from the group"
    
  • Adding/Removing StreamThreads or Application Instances: This is generally a manual operation, so you should know whether or not either of these is the case. When in doubt ask someone with prod access if they’ve been messing with things.

  • Failing/Replacing StreamThreads: A more subtle possibility is that you have a StreamThread dying and being replaced. If the number of consumers bounces back and forth between n and n+1, this might mean you have a thread stuck in a crash loop. Fortunately, you can first ask yourself if you’ve configured a custom StreamsUncaughtExceptionHandler that selects the REPLACE_THREAD option, and rule this out if not. If you aren’t sure, a quick search in the logs for this phrase will confirm whether this is behind your rebalancing:

    "Replacing thread in the streams uncaught exception handler"
    
  • Members Timing Out on Max Poll Interval/Missing Rebalances: Once you’ve ruled out all of the above, most likely you have a consumer that is missing the rebalances for some reason. It’s always possible there is a bug in the rebalancing protocol, but much more likely is that it’s simply failing to return to call poll within the configured max.poll.interval.ms. This in itself can happen for a number of reasons, so you’ll have to put on your sleuthing hat from here.

    To nudge you in the right direction, remember to line up the timestamps and try to figure out what that consumer was doing when the rest of the group was rejoining the group for that particular rebalance! If you don’t see anything in the logs at all around that time, it’s likely the polling thread is stuck (or greatly lagging) somewhere during its processing. You could have an infinite loop in custom code, or it could be a long write stall in rocksdb on an extremely overloaded node.

    Check out our blog post on sizing and monitoring for some tips on debugging a lagging StreamThread.

Closing Thoughts

It’s nearly time to take the Hogwarts Express back to the real world, but before you go, let’s round up what we’ve learned:

Ultimately, as you can see a lot of the complexity of rebalancing in Streams — and therefore of operating and debugging it — stems from two main things that Streams does: 1) coupling partitions with state (particularly local state tied to a specific node), and 2) wielding the partition assignor as a dark art with power over many features, yet confined to run as a client-side assignor within the confines of the group leader — which at the end of the day is simply a consumer like any other, and far from the true “leader” status of a group coordinator or control plane.

The group coordinator actually is going to start taking on more responsibility to simplify the operational and debugging experience for plain consumer group apps, thanks to the next gen rebalancing protocol introduced in KIP-848. But the picture is not so simple for Kafka Streams, which has too much custom logic to hand over control to the brokers, and of course also has to contend with state.

At Responsive, we’re working on making endless rebalances a thing of the past by decoupling state from compute and handing the assignment reins to our controller to make informed decisions from our control plane. We’re currently working on making the partition assignment customizable in Kafka Streams with KIP-924: check out the KIP to follow along or join in on the KIP discussion.

And that’s it! Congratulations on making it to the end. Take these tools out into the world so that the next time you find yourself facing a rebalancing storm, you can approach it head on and without fear — for a fear of rebalancing suggests that what you fear most of all, is fear itself.

But we get it: this stuff is really hard. If you have any specific questions or are worried about your upcoming N.E.W.T exams, send us an owl — or for quicker results, hop on the Responsive discord and say hello!


Have some additional questions?

Join our Discord!

S

Sophie Blee-Goldman

Founding Engineer

See all posts