Apache Kafka consumer groups … don’t use them in the “wrong” way !

8In this blog post I’d like to focus the attention on how the “automatic” and “manual” partitions assignment can interfere with each other even breaking things. I’d like to give an advice on using them in the right way avoiding to mix them in the same scenario or being aware of what you are doing.

The consumer group experience

In Apache Kafka, the consumer group concepts is a way for achieving two things :

  • having consumers as part of the same consumer group means providing the “competing consumers” pattern with whom the messages from topic partitions are spread across the members of the group; each consumer receive messages from one or more partitions (“automatically” assigned to it) and the same messages won’t be received by the other consumers (assigned to different partitions). In this way we can scale the number of the consumers up to the number of the partitions (having one consumer reading only one partition); in this case a new consumer joining the group will be in a idle state without being assigned to any partition.
  • having consumers as part of different consumer groups means providing the “publish/subscribe” pattern where the messages from topic partitions are sent to all the consumers across the different groups. It means that inside the same consumer group we’ll have the rules explained above but across different groups, the consumers will receive the same messages. It’s useful when the messages inside a topic are of interest for different applications which will process them in different ways; we want that the all the interested applications will receive all the same messages from the topic.

Another great advantage of consumers grouping is the re-balancing feature. When a consumer joins a group, if there are still enough partitions available (so we haven’t reached the limit of one consumer per partition), a re-balancing starts and the partitions will be reassigned to the current consumers plus the new one. In the same way, if a consumer leaves a group, the partitions will be reassigned to the remaining consumers.

What I have told so far it’s really true using the subscribe() method provided by the KafkaConsumer API. This method enforces you to assign the consumer to a consumer group, setting the “group.id” property, because it’s needed for re-balancing. In any case, it’s not the consumer to decide the partitions it wants to read for; in general the first consumer which joins the group doing the assignment while other consumers join the group.

How things can be “broken”

Other than using the subscribe() method, there is another way for a consumer for reading from topic partitions : it’s using the assign() method and in this case, the consumer is able to specify the topic partitions it wants to read for.

This type of approach can be useful when you know exactly where some specific messages will be written (the partition) and you want to read directly from there. Of course, you lose the re-balancing feature in this case and it is the first big difference with using the subscribe way.

Another difference is that with “manual” assignment you can avoid to specify a consumer group (so the “group.id” property) for the consumer : it will be just empty. In any case it’s better specifying it.

Most of the people use the subscribing way, leveraging “automatic” assignment and re-balancing feature; using both the way together can brake something … let’s see.

Imagine to have a single “test” topic with only two partitions (P0 and P1) and a consumer C1 which subscribe to the topic as part of the consumer group G1. This consumer will be assigned to both the partitions receiving messages from them. Now, let’s start a new consumer C2 which is configured to be part of the same consumer group G1 but … it uses the assign way for asking for the partitions P0 and P1 explicitly.

subscribe_assignNow we have broken something ! What ? Maybe you are asking …

Both the consumer C1 and C2 will receive messages from the topic, so from both partitions P0 and P1, but … they are part of the same consumer group G1 ! So we have “broken” what we said in the previous paragraph about “competing consumers” when they are part of the same consumer group. So you are experiencing a “publish/subscribe” pattern but with consumers within the same consumer group.

What about offsets commits ?

In general you should avoid a scenario like the one described before even because there is a real side effect on that. Starting from the version 0.8.2.0, the offsets committed by the consumers aren’t saved in Zookeeper but on a partitioned and replicated topic named “__consumer_offsets” which is hosted on the Kafka brokers in the cluster.

When a consumer commits some offsets (for different partitions), it really sends a message to the broker to the “__consumer_offsets” topic and such message has the following structure :

  • key = [group, topic, partition]
  • value = offset

Coming back to the previous scenario what does it mean ?

Having C1 and C2 as part of the same consumer group but able to receive from the same partitions (both P0 and P1) it could happen something like the following :

  • C1 commits offset X for partition P0 writing a message like this :
    • key = [G1, “test”, P0], value = X
  • C2 commits offset Y for partition P0 writing a message like this :
    • key = [G1, “test”, P0], value = Y

So the consumer C2 has overwritten the committed offset for the same partition P0 of the consumer C1 and maybe X was less than Y; if C1 crashes and restarts it will lose messages starting to read from Y (remember Y > X).

Something like that can’t happen with consumers which use only the subscribe way for being assigned to partitions, because as part of the same consumer group they’ll receive different partitions so the key for the offset commit message will be always different.

[Update 28/07/2017]

As a confirmation that mixing subscribe and assign isn’t a good thing to do, after a discussion with one of my colleagues, Henryk Konsek, it turned out that if you try to call both methods on the same consumer, the client library throws the following exception :

java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive

Conclusion

The consumer groups mechanism in Apache Kafka works really well and leveraging on that for scaling consumers and having “automatic” partitions assignment with re-balancing is a great plus. There are cases where you would need to assign partitions “manually” but in that case pay attention on what could happen if you mix both solutions.

So … let’s consume from Apache Kafka but with judgment and the awareness of what we are doing.

Leave a comment