Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Apache Kafka consumer group example


May 26, 2021 Apache Kafka


Table of contents


Consumer groups are multithreaded or multi-machine Apache Kafka themes.

Consumer groups

  • Consumers can join group.id using the same information

  • The maximum parallelity of a group is the number of consumers in the group← not partition.

  • Kafka assigns partitions of a topic to consumers in a group so that each partition is used by only one consumer in the group.

  • Kafka guarantees that messages can only be read by one consumer in the group.

  • Consumers can view messages in the order in which they are stored in the log.

Rebalance consumers

Adding more processes/threads will cause Kafka to rebalance. /b10> If any consumer or agent is unable to send a heartbeat to ZooKeeper, it can be reconfigured through the Kafka cluster. /b11> During this rebalancing, Kafka allocates available partitions to available threads, possibly moving partitions to another process.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " &plus; topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

The configuration settings for the Consumer client API master configuration settings are shown below

S.No Settings and instructions
1 The list of boot agents.
2 group.id
Assign a single consumer to a group.
3 enable.auto.commit
If the value is true, auto-implementation is enabled for offsets, otherwise no commits are made.
4 auto.commit.interval.ms
Returns how often the updated consumption offset is written to ZooKeeper.
5 session.timeout.ms
Represents how many milliseconds Kafka waits for ZooKeeper to respond to a request (read or write) before discarding and continuing to consume messages.

Compile

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Perform

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

Here, we create a sample group name of my-group for two consumers. /b10> Similarly, you can create your group and the number of consumers in the group.

Input

Open the producer CLI and send some messages -

Test consumer group 01
Test consumer group 02

The output of the first procedure

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

The output of the second procedure

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

I hope you can learn about SimpleConsumer and Consumer Group by using the Java client demo.

Now you know how to use the Java client to send and receive messages. /b10> Let's continue Kafka's integration with big data technology in the next chapter.