Kafka offset management

In this blog I try to explain the Kafka offset.

So before start reading this blog you should have some basic idea about Kafka, it’s producer, consume and its consumer group, you can read about it from here https://kafka.apache.org/documentation/

What is Offset : – A Kafka topic receives messages across a distributed set of partitions where they are stored. Each partition maintains the messages it has received in a sequential order where they are identified by an offset, also known as a position

As we all know Kafka is a highly scalable and distributed messaging system where multiple consumers can consume the data from different partitions in a distributed environment but what if any consumer in cluster get down or rebalancing triggered and another one consumer start reading a data from the same partition, will Kafka ensure that new consumer will start reading the data from the same point where the previous one left off, The Answer is no it’s not Kafka responsibility. Kafka will only allow the new consumer to read the data from where the offset is committed by previous consumer. it provides different types of offset mechanism so it’s an application developer’s responsibility to choose the right one according to the application role.

Kafka offers two types of offset-

  • Current offset
  • Committed offset

Current offset :-

Current offset is the information that stores by the consumer it’s self when it reads the data from Kafka partitions. so, in this case, the consumer knows from which position it will start reading the data in the next request when using the poll method.

But the problem is when rebalancing is triggered then all consumers will start reading the data from last committed offset and current offset information(Event we are processing right now) will be lost, so there might be changes the same record will be processed twice

Committed offset :-

Committed offset is the offset that is committed by the consumer. That ensures to the Kafka that fetched records are processed successfully by the consumer.
It provides two way for committing the offset 

  • Auto commit
  • Manual commit

Auto commit :-

The easiest way to commit offsets is to allow the consumer to do it for you. If you configure enable.auto.commit=true , then in every five seconds the consumer will commit the largest offset. The five-second interval is the default and is controlled by setting auto.commit.interval .ms. Just like everything else in the consumer, the automatic commits are driven by the poll loop. When‐ever you poll, the consumer checks if it is time to commit, and if it is, it will commit the offsets it returned in the last poll.

Before using this convenient option, it is important to understand the consequences.

Problem:-

Consider an example – We created a consumer with default auto-commit strategy, which will commit in every five seconds And the Job of the Consumer is poll the records form Kafka and insert into the DB.
Consider it if the consumer reads the data and inserted it into the DB within three seconds and just after three seconds rebalancing triggered. So as we know after rebalancing all consumers will start reading from the committed offset but our committed offset is three seconds old so In this case, all the events that arrived after those three seconds will be processed twice.
It is also possible to configure the commit interval to commit more frequently and reduce the window in which records will be duplicated, but it is impossible to completely eliminate them

Automatic commits are convenient, but they don’t give developers enough control to avoid duplicate messages. For solving this type of issue manual commit in the picture.

Note:- by default enable.auto.commit is true.

props.put(“enable.auto.commit”, “true”)
props.put(“auto.commit.interval.ms”, “5”)

These settings need to be set in consumer property

Manual commit : –

Manual commit is straight forward you just need to commit the offset after processing the records successfully.
Note – For this method, you need to set enable.auto.commit property false

Kafka provides two types of manual commit –

1) Commit Sync :-

By using this method a consumer commits the offset synchronously and retries if any commit gets fail by any recoverable reason. This is a very useful and reliable method but the problem is it is a blocking method, it blocks your call for completing commit operation and it also retries if there is any recoverable error.

Example :-

props.put("enable.auto.commit", "false")
  def consume(topic: List[String]) = {
    def read(consumer: KafkaConsumer[K, V]) = {
      while (true) {
        val a = consumer.poll(Duration.ofMillis(100))
        a.asScala.map(record => record.value()).toList.foreach{record =>
          // Process data here
        }
        consumer.commitSync()
      }
    }
    val consumer = createConsumer
    consumer.subscribe(topic.asJava)
    read(consumer)
 }

2) CommitAsync:-  

This method is used for committing the offset without blocking but the drawback is that CommitAsync will not retry but there is a valid reason for such behavior.

Lets take an example suppose you are trying to commit  offset as 100 and it fails by any recoverable reason and you want to retry after a few seconds, as we know this was an asynchronous call, so without knowing that your previous commit is waiting you initiate another commit lets say this time you are committing offset 125 and your commit 125 is successful while commit 100 is waiting for retry, Now what you will do now, obviously you would not commit 100 after commit 125 that may cause problem. so commitAsync not design for retry.
So this approach is not a problem because you know if a commit fails then higher-order commit will be a success.

Example :-

props.put("enable.auto.commit", "false")
  def consume(topic: List[String]) = {
    def read(consumer: KafkaConsumer[K, V]) = {
      while (true) {
        val a = consumer.poll(Duration.ofMillis(100))
        a.asScala.map(record => record.value()).toList.foreach{record =>
          // Process data here
        }
        consumer.commitAsync()
      }
    }
    val consumer = createConsumer
    consumer.subscribe(topic.asJava)
    read(consumer)
 }

Note :- But if the commit is last and it gets failed then this method may cause a problem because no higher commit will be there for success commit. For solving this issue, we can mix CommitSync and CommitAsync

Combining Synchronous and Asynchronous Commits:-

Normally, occasional failures to commit without retrying are not a huge problem because if the problem is temporary, the following commit will be successful. But if we know that this is the last commit before we close the consumer, or before a rebalance, we want to make extra sure that the commit succeeds.
Therefore, a common pattern is to combine commitAsync() with commitSync() just before shutdown.
Here is the example of how its work - 

try {
   while (true) {
    val a = consumer.poll(Duration.ofMillis(100))
     a.asScala.map(record => record.value()).toList.foreach { record =>
           // Process data here
     }
     consumer.commitAsync()
   }
  } catch {
     case e: Exception => log.error("Unexpected error", e)
       consumer.commitSync()
       consumer.close()
    }
  } 

While everything is fine, we use commitAsync . It is faster, and if one commit fails,the next commit will serve as a retry.

But if we are closing, there is no “next commit.” We call commitSync() , because it will retry until it succeeds or suffers unrecoverable failure

Note :- But are we pretty sure by using this method our problem will completely resolve, Suppose In this example I am getting 100 records using a poll method from Kafka and after processing successfully consumer is committing the offset, what if the rebalancing triggered after processing 50 records or what if an exception occurs after processing 50 records?

CommitSync and CommitAsync only commit the latest offset not a current offset mean if we get an error while processing 50th record then consumer does not know what is the current offset which needs to commit in that case it will commit the latest offset which is 100. So for solving this type of situation rebalance listener comes in the picture


Consumer Rebalance listener:-

If your consumer is about to lose ownership of a partition, you will want to commit offsets of the last event you’ve processed in this case this method is very useful.

For this method, there are two things that you need to know -

How to commit a particular offset?
How to know that a rebalance is triggered?

For achieving this Kafka API allows us to specify ConsumerRebalanceListener class, this class must have two methods -


1) onPartionRevoked:-

The API will call the onPartationRevoked method just before it takes away your partition so here you can commit your offset


2) onPartionAssigned:-

The API will call this method after rebalancing is complete and before you start consuming record from new partition, this method is useful when you want yo know which partition is assigned to which consumer.

Note :- ConsumerRebalanceListener maintains a list of offsets that are processed and ready to commit and commit the offsets when the partition is going to away.

Here is an example for that:- 

 def consume(topic: List[String]) = {
    val consumer: KafkaConsumer[K, V] = createConsumer
    val rebalanceListener = new RebalanceListener[K, V](consumer)
    consumer.subscribe(topic.asJava, rebalanceListener)
    read(consumer)

    def read(consumer: KafkaConsumer[K, V]) = {
      try {
        while (true) {
          val a = consumer.poll(Duration.ofMillis(100))
          a.asScala.map(record => record.value()).toList.foreach { record =>
            // Process data here

           //ready to commit
            rebalanceListener.addOffset(record.topic(),record.partition(),record.offset() + 1)
          }
          consumer.commitAsync() // Commit before next call
        }
      } catch {
        case e: Exception => log.error("Unexpected error", e)
      }
      finally {
        consumer.close()
      }
    }
  }

You can see in this example, After processing each record we are telling to the listener that this particular offset is ready to be committed, The listener will not commit it immediately it will just maintain a list of offset with partition and topic information and when rebalancing triggered it will commit the offset.
In this code, we created the instance of RebalanceListener class which has the code for committing the offset, you can find this class in my repo whose link is provided below.

You can find the whole code in my Repo - here.

So this is all about the Kafka offset management, You can use any of them method according to your application need, Hope you will like this blog 🙂 

References:

  • Kafka official documentation
  • Kafka definitive guide book

Leave a comment

Design a site like this with WordPress.com
Get started