Kafka producers :

Rida kejji
4 min readApr 23, 2022

--

This article is the first of a serie of writings about Kafka. My principal source for this article is the book : Real-Time Data and Stream Processing at Scale. Available for free online using this link.

Mandatory configurations to create a kafka producer :

There are many important configurations to start a producer, the following list maps out the mandatory ones. Later in this article I will go through the main producers configurations :

  • Bootstrap.servers : it is a list of broker’s ip addresses. It doesn’t need to contain all the brokers’ ip addresses. A single broker address is enough since every broker has a cached metadata list of all the brokers addresses along with the leader partitions that they host. It is very recommended to put more than one broker address in case a broker goes down.
  • Key. serializer : serialization is the operation during which a message in any format (Json, PlainText, Avro…) is transformed into a byte array. We can create our custom serializers or just use the Avro format, which makes this operation very easy.
  • Value.serializer : same thing as the key serializer but for the value

Kafka only treats byte arrays !!

Send() function :

The send function that sends messages to Kafka returns an object of type Future<RecordProducer>.

It is asynchronous by default. We have 3 ways of producing messages :

Fire and forget :

We send a message and don’t really care if it arrives. Most of the time the messages arrive since Kafka is highly available and are retired. But if a message is lost we won’t know.

Synchronous send :

To make the send() function synchronous we can simply add a .get() at the end just like all promise functions :

Sending messages synchronously lowers the throughputs. If the network round-trip time between the producer and Kafka is 10ms, sending 100 messages will take 1 second.

Asynchrounous send :

When calling send() asynchronously, sending 100 messages wouldn’t take any time at all. We just need to add a CallBack function. So that if an exception is returned we handle it.

Producer’s main configurations :

Here are the main configuration parameters for a Kafka producer :

  • client.id : this is a logical parameter, it is used to identify the producer application. It is pretty usefull for debugging and for monitoring visibility
  • Acks :

This is one of the most important parameters for producers. As we saw earlier when calling the send() function we get a response from the Kafka broker, meaning that the message had been correctly received by Kafka.

When this parameter is set to 1, this success message is returned as soon as the leader replica gets it. When it is set to “all” or “-1”the success message is returned once all the replicats have received it (more precisely the in-sync replicats). When it is set to 0 it is equivalent to a fire and forget way of sending records.

When Acks are set to 1 or 0 we have lower reliability but higher thoughput. When Acks are set to all or (-1) we have high reliability but lower throughput, since we need all the in-sync replicats to get the message which will cause latency.

  • retry.backoff.ms : When producing batches of message synchronously, we get a success message when they arrive to the broker. We might as well receive an error. Some errors are called “retriable”, which means that by retrying we might be able to perform successfully our produce operation. For example (no leader for the partition). We will just wait until a leader is elected and send the produce request again. This parameter defines in milliseconds the time to wait before retrying to send a batch of messages to the broker (by default it is set to 100ms). It is recommended to set it to the time required by your broker to restart when recovering from a failure.

Examples of non retryable errors are : Message too large, Acls not found ..

  • Batch size : the batch is the “buffer” that gathers messages before sending them. when increasing the batch.size.bytes we enhance the throughput to use less CPU.
  • ligner.ms : This parameter indicates the maximum waiting time before sending the batch eventhough it is not completely full.
  • max.request.size.bytes : The maximum size that can be produced for each request. This is not directly related to the number of records/ messages. For example for a max request size bytes of 1Mb, we can have 1000 messages of 1Ko
  • Max.Flight.Request.Per.Second : This parameter controls the number of records that can be sent asynchronously. More precisly it indicates the number of records that can be sent without waiting for a successful message from the Kafka broker. if it is set to 1, it is equivalent to a synchronous producing of messages.

--

--

Rida kejji
Rida kejji

No responses yet