Kafka Consumers :
The main source for this article is the book : Real-Time Data and Stream Processing at Scale. Available for free online using this link.
Consumers and consumer groups :
A consumer sends fetch requests to partitions in a topic to retrieve its records.
Initialization of a consumer :
To initialize a consumer we only need 3 configuration parameters, we will see further configurations later in this article :
- Bootstrap server : it is a list of broker’s ip addresses. It doesn’t need to contain all the brokers’ ip addresses. A single broker address is enough since every broker has a cached metadata list of all the brokers addresses along with the leader partitions that they host. It is very recommended to put more than one broker address in case a broker goes down.
- Key and Value serializers : serialization is the process during which a message in any format (Json, PlainText, Avro…) is transformed into a byte array. We can create our custom serializers or just use the Avro specific serializers, which make this process very easy.
- Groupe id : It indicates the consumer group of the consumer.
What is a consumer group :
Suppose we have an application that consumes messages from a topic, transforms them and inserts them in a database. Doing an insert in a database takes a significant amount of time (100ms) meanwhile the topic receives messages very swiftly (less than 1ms). A single consumer might not be able to handle the load of records received. A solution for that is to use the consumer group concept. Each application shall have a group of consumer applications.
1 partition for each consumer in each consumer group, if we have 5 partitions and 10 consumers within the same consumer group, 5 of them will be in idle mode.
If a consumer crashes or stops working, the partition it was subscribed to will start sending data to one of the remaining consumers. This event is called a rebalance. This is done thanks to a special kafka broker called the group coordinator. Each consumer sends to the group coordinator a “heartbeat”, it pings it to let it know that the consumer is still alive.
If the group coordinator doesn’t receive any heartbeat for a lapse time, it will consider it dead and will start a rebalance operation. During the rebalance operation, consumers don’t work [Update now it is possible to do a rebalance without interrupting all the consumers, it is called cooperative rebalance].
There are two types of rebalances :
· Eager rebalance : During which all the consumers stop working until the reassignment of partitions to consumers is done. It generates latency.
· Cooperative rebalance : Only a few consumers will stop working and lose their partitions, the rest will keep them. It might take some iterations before arriving to a static rebalance state.
The main configurations for a Kafka consumer :
- Fetch.min.bytes : this specifies the minimum size of the batches we want to consume for every thread. For example if the consumer polls the Kafka broker and it only has 2Mb of records it will wait until it reaches the fetch.min.bytes size to send it. Just like in the producers with the batches. If we have a lot of consumers we can lower fetch.min.bytes.
- Fetch.max.wait.ms : when using fetch.min.byte the Kafka broker waits to have enough size to send the record to be consumer. Fetch.max.wait.ms sets a limit to that waiting time.
- Fetch.max.bytes : the maximum size of each poll request.
- Max.patition.fetch.bytes : sets a limit of size that can be consumed per partition. It should be larger than the maximum message size specified in max.message.size. In practice it is better to set it to a lower value, since other partitions might send messages to the same consumer in case a consumer fails; also, if the value is too big the consumer might take a lot of time to consume it, and therefore not send a heartbeat to the broker and trigger a rebalance.
- Session.timeout.ms: if the consumer doesn’t send a heartbeat after this amount of time it will be considered dead.
- Heartbreak.interval.ms : the interval between each heartbeat sent to the kafka broker. It has to be three times lower than the session.timeout
- Auto.offset.reset : In the scenario, in which the consumer loses the offsets or has invalid offsets for a topic, according to the values of this configuration, it will either restart from the first valid offset (data duplicat) or from the latest one (data loss).
- Max.poll.interval.ms : It sets the maximum interval, after which if the poll() function is not called the consumer is considered dead. It has nothing to do with the session.timeout.ms. The heartbeats are sent through a background thread while the poll() function is called in the main thread. By default it is set to 5 minutes but if the processing of your records might take longer you should consider increasing it(while debugging on local as well)
- Partition.assignement.strategy : PartitionAssignor is the class responsible for assigning partitions to consumers, we have two possible strategies for that:
Range : If two consumers C1 and C2 consume 2 topics T1 and T2. Each topic has 3 partitions. The broker is going to take the first topic and distribute the partition evenly (if possible here it can’t because we have an odd number of consumers), So eventually C1 will get partitions 0 and 1 from both topics and C2 will get partition 2 from both topics. In the end C1 has more partitions that C2.
RoundRobin : The broker takes all the partitions from all the topics and assign them one by one, in a sequential manner. We will then have the same number of partitions for all the consumers.
- Client id : Used to identify the consumers in the logs
- Client.rack : Normally a consumer fetches messages from a leader partition but for some reasons (financial/ latency..) we can consume from a replica partition in the same zone as the consumer
- Group.instance.id : in case we want the consumer to have a static group id
Offsets and commits :
Unlike most of JMS, Kafka allows consumers to know which partition was already processed(when calling the poll function). The action of updating the current position of the partition is called a commit.
How committing works : After consuming each message, consumers produce its position to a specific internal topic called __consumers_offsets. When a consumer crashes or a new consumer joins the consumer group, a rebalance is triggered and the consumers have different partitions. The consumers pick up the work using the latest commit in the __consumer_offsets. If we commit a message before processing it, we might lose data => the consumer crashes and the new consumer considers the message treated and moves to the next one while it was not processed. If we process a message before committing it we might have duplicate data.
There are many options to commit a message here are some of them:
1- Auto commit :
The default solution that is widely used is to set enable.auto.commit to true. The application will send commits every 5 seconds configurable (auto.commit.interval.ms). If the rebalance happens in the 3th second, all the messages written during the 3 seconds will be duplicated.
2- Synchronous commit :
We can specify that we want to commit messages when the application decides to not the timer. We then add the following code at the end of the processing code
The drawback here is the decline in the throughput since we need to wait for an answer of the broker for each batch.
Asynchronous committing :
With commitAsynch() we commit asynchronously, the drawback is that errors will be logged but not retried in order to preserve the order of the messages guarentee
Commit specific offset :
Imagine you would like to consume data from a partition with larger batches, (which means that you increase fetch.min.bytes property). You would like to commit a specific offset of the batch, so that you won’t have to reconsume a complete batch just because you have lost the consumer while processing the batch. To avoid that, we can commit a specific record, for example :
A consumer can only read messages once they are replicated on all in-sync replicas
Unclean leader election :
The unclean leader election happens for example when a partition has 2 replicas and both of them are out-of-synch. Then the only remaining leader goes down. (Reminder that the consumer can only consume messages when they are present in all the in-synch replicas)
In this case we can either allow the out of synch replicas to become leaders once one of them is back online. This can cause data loss and data => inconsistency. Or we wait for the leader partition to come back online => less availability. The parameter responsible for that is unclean.leader.election.enable. Setting it to true means that we allow out-of-synch replicas to be leaders.