May 26, 2021 Apache Kafka
Consumer groups are multithreaded or multi-machine Apache Kafka themes.
Consumers can join
group.id using
the same information
The maximum parallelity of a group is the number of consumers in the group← not partition.
Kafka assigns partitions of a topic to consumers in a group so that each partition is used by only one consumer in the group.
Kafka guarantees that messages can only be read by one consumer in the group.
Consumers can view messages in the order in which they are stored in the log.
Adding more processes/threads will cause Kafka to rebalance. /b10> If any consumer or agent is unable to send a heartbeat to ZooKeeper, it can be reconfigured through the Kafka cluster. /b11> During this rebalancing, Kafka allocates available partitions to available threads, possibly moving partitions to another process.
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class ConsumerGroup { public static void main(String[] args) throws Exception { if(args.length < 2){ System.out.println("Usage: consumer <topic> <groupname>"); return; } String topic = args[0].toString(); String group = args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
The configuration settings for the Consumer client API master configuration settings are shown below
S.No | Settings and instructions |
---|---|
1 | The list of boot agents. |
2 |
group.id
Assign a single consumer to a group. |
3 |
enable.auto.commit
If the value is true, auto-implementation is enabled for offsets, otherwise no commits are made. |
4 |
auto.commit.interval.ms
Returns how often the updated consumption offset is written to ZooKeeper. |
5 |
session.timeout.ms
Represents how many milliseconds Kafka waits for ZooKeeper to respond to a request (read or write) before discarding and continuing to consume messages. |
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group >>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group
Here, we create a sample group name of
my-group for two consumers.
/b10>
Similarly, you can create your group and the number of consumers in the group.
Open the producer CLI and send some messages -
Test consumer group 01 Test consumer group 02
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 01
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 02
I hope you can learn about SimpleConsumer and Consumer Group by using the Java client demo.
Now you know how to use the Java client to send and receive messages. /b10> Let's continue Kafka's integration with big data technology in the next chapter.