May 26, 2021 Apache Kafka
Let's use the Java client to create an application for publishing and using messages. /b10> The Kafka producer client includes the following APIs.
Let's look at the most important set of Kafka producer APIs in this section.
/b10>
The central part of the Kafka Producer API is
the Kafka Producer
class.
/b11>
The Kafka Producer class provides an option to connect the Kafka agent in its constructor to the following methods.
The Kafka Producer class provides a send method to send messages to the topic asynchronously. Send() is signed as follows
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
ProducerRecord - Producers manage buffers for records waiting to be sent.
Callback - A user-provided callback (null indicates no callback) that is performed when the server confirms the record.
The Kafka Producer class provides a flush method to ensure that all previously sent messages are actually completed. The syntax of the flush method is as follows -
public void flush()
The Kafka Producer class provides a partitionFor method, which helps you get partition metadata for a given topic. /b10> This can be used to customize partitions. The signature of this method is as follows -
public Map metrics()
It returns a map of internal metrics maintained by the producer.
public void close() - The Kafka Producer class provides a closed method block until all previously sent requests are complete.
The central part of the producer API is
the producer
class.
/b10>
Producer classes provide an option to connect Kafka agents in their constructors by the following methods.
The producer class provides a send method to send a message to a single or multiple topics using the following signatures.
public void send(KeyedMessaget<k,v> message) - sends the data to a single topic,par-titioned by key using either sync or async producer. public void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,"async") ProducerConfig config = new ProducerConfig(prop);
There are two types of producers - Synchronized and Asynchronous.
The same API configuration also applies to
sync producers.
/b10>
The difference between them is that the sync generator sends messages directly, but in the background.
/b11>
Asynchronous producers are the first choice when you want higher throughput.
/b12>
In previous versions, such as 0.8, an asynchronous producer did not call backend() to register an error handler.
/b13>
This is only available in the current version 0.9.
The producer class provides the Close method to close the producer pool connection to all Kafka agents.
The following table lists the main configuration settings for the Producer API to better understand -
S.No | Configure settings and instructions |
---|---|
1 |
client.id Identify the producer application |
2 |
producer.type Synchronized or asynchronous |
3 |
acks The standard under the acks configuration control producer request is complete. |
4 |
Retry If the producer request fails, a specific value is used to automatically retry. |
5 |
Bootstrapping agent list. |
6 |
linger.ms If you want to reduce the number of requests, you linger.ms set the value to something larger than a value. |
7 |
key.serializer The key of the serializer interface. |
8 |
value.serializer Value. |
9 |
batch.size The size of the buffer. |
10 |
buffer.memory Controls the total amount of memory that producers can use to buffer. |
ProducerRecord is sent to Kafka Cluster. The key/value pair of the ProducerRecord class constructor, which is used to create records with partitions, keys, and value pairs using the following signatures.
public ProducerRecord (string topic, int partition, k key, v value)
Topic - The user-defined topic name that will be attached to the record.
Partition - Partition count.
Key - The key that will be included in the record.
public ProducerRecord (string topic, k key, v value)
The ProducerRecord class constructor is used to create records with keys, value pairs, and no partitions.
Theme - Create a theme to assign records.
Key - The key of the record.
Value - Record content.
public ProducerRecord (string topic, v value)
The ProducerRecord class creates a record without partitions and keys.
Theme - Create a theme.
Value - Record content.
The ProducerRecord class method is listed in the table below -
S.No | Class methods and descriptions |
---|---|
1 |
public string topic() The topic is attached to the record. |
2 |
public K key() The key that will be included in the record. /b10> If you don't have such a key, null will reopen here. |
3 |
public V value() Record the contents. |
4 |
partition() The recorded partition count |
Before you create an application, start the ZooKeeper and Kafka agents, and then use the create topic command to create your own themes in the Kafka agent.
/b10>
After that, create a
java class called Sim-pleProducer .java,
and then type the following code.
//import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; //Create java class named “SimpleProducer" public class SimpleProducer { public static void main(String[] args) throws Exception{ // Check arguments length value if(args.length == 0){ System.out.println("Enter topic name"); return; } //Assign topicName to string variable String topicName = args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", “localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", “all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); Producer<String, String> producer = new KafkaProducer <String, String>(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); System.out.println(“Message sent successfully"); producer.close(); } }
Compilation - You can compile your application using the following commands.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java
Execute - You can execute the application using the following command.
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleProducer <topic-name>
Output
Message sent successfully To check the above output open new terminal and type Consumer CLI command to receive messages. >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning 1 2 3 4 5 6 7 8 9 10
So far, we've created a producer that sends messages to the Kafka cluster. /b10> Now let's create a message for consumers to consume Kafka clusters. /b11> The KafkaConsumer API is used to consume messages from the Kafka cluster. /b12> The constructors of the KafkaConsumer class are defined below.
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - Returns a map configured by the consumer.
The KafkaConsumer class has the following important methods listed in the following table.
S.No | Methods and instructions |
---|---|
1 |
public java.util.Set< TopicPar- tition> assignment() Gets the set of partitions currently assigned by the user. |
2 |
public string subscription() Subscribe to a given list of topics to get dynamically signed partitions. |
3 |
public void sub-scribe(java.util.List< java.lang.String> topics,ConsumerRe-balanceListener listener) Subscribe to a given list of topics to get dynamically signed partitions. |
4 |
public void unsubscribe() Unsubscribe from a given list of partitions. |
5 |
public void sub-scribe(java.util.List< java.lang.String> topics) Subscribe to a given list of topics to get dynamically signed partitions. /b10> If the given list of topics is empty, it is considered the same as unsubscribe(). |
6 |
public void sub-scribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener) The parameter pattern refers to the subscription pattern in the format of the regular expression, while the listener parameter gets notifications from the subscription mode. |
7 |
public void as-sign(java.util.List< TopicPartion> partitions) Manually assign a list of partitions to the customer. |
8 |
poll() Use one of the subscription/assignment APIs to get data for a specified topic or partition. /b10> If the topic was not booked before polling the data, this returns an error. |
9 |
public void commitSync() Submit the commit offset returned by the last poll() for all sub-compilation lists of topics and partitions. /b10> The same action applies to cometAsyn(). |
10 |
public void seek(TopicPartition partition,long offset) Gets the current offset value that the consumer will use in the next poll() method. |
11 |
public void resume() Restore the paused partition. |
12 |
public void wakeup() Wake up consumers. |
The ConsumerRecord API is used to receive records from the Kafka cluster. /b10> This API consists of the subject name, the partition number from which records are received, and the offset to the records in the Kafka partition. /b11>The ConsumerRecord class is used to create consumer records with specific topic names, partition counts, and .lt; key, value, and value. /b12> That's right. /b13> It has the following signatures.
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
Topic - The subject name of the subject recorded by the consumer received from the Kafka cluster.
Partition - The partition of the theme.
Key - The key recorded, if no key exists null will be returned.
Value - Record content.
The ConsumerRecords API acts as a container for ConsumerRecord. /b10> This API is used to hold a list of ConsumerRecords for each partition of a particular topic. /b11> Its constructors are defined below.
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List <Consumer-Record>K,V>>> records)
TopicPartition - Returns a partition map for a specific topic.
Records - ConsumerRecord's return list.
The ConsumerRecords class defines the following methods.
S.No | Method and description |
---|---|
1 |
public int count() The number of records for all topics. |
2 |
public Set partitions() A partitioned set with data in this record set (if no data is returned, the set is empty). |
3 |
public Iterator iterator() The iterator allows you to iterate through the collection, get or re-move elements. |
4 |
public List records() Gets a list of records for a given partition. |
The configuration settings for the Consumer client API master configuration settings are as follows -
S.No | Settings and instructions |
---|---|
1 |
The list of boot agents. |
2 |
group.id Assign a single consumer to a group. |
3 |
enable.auto.commit If the value is true, auto-implementation is enabled for offsets, otherwise no commits are made. |
4 |
auto.commit.interval.ms Returns how often the updated consumption offset is written to ZooKeeper. |
5 |
session.timeout.ms Represents how many milliseconds Kafka waits for ZooKeeper to respond to a request (read or write) before discarding and continuing to consume messages. |
The producer application steps remain unchanged here.
/b10>
First, start your ZooKeeper and Kafka agents.
/b11>
Then create a
SimpleConsumer .java
using a Java class called
SimpleCon-sumer
and type the following code.
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class SimpleConsumer { public static void main(String[] args) throws Exception { if(args.length == 0){ System.out.println("Enter topic name"); return; } //Kafka consumer configuration settings String topicName = args[0].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)) //print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
Compilation - You can compile your application using the following commands.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java
Execute - You can execute the application using the following command
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleConsumer <topic-name>
Input - Open generator CLI and send some messages to the topic. /b10> You can type the smple into 'Hello Consumer'.
Output - Here's the output.
Subscribed to topic Hello-Kafka offset = 3, key = null, value = Hello Consumer