Apache Kafka simple producer example


May 26, 2021 16:00 Apache Kafka


Table of contents


Let's use the Java client to create an application for publishing and using messages. /b10> The Kafka producer client includes the following APIs.

KafkaProducer API

Let's look at the most important set of Kafka producer APIs in this section. /b10> The central part of the Kafka Producer API is the Kafka Producer class. /b11> The Kafka Producer class provides an option to connect the Kafka agent in its constructor to the following methods.

  • The Kafka Producer class provides a send method to send messages to the topic asynchronously. Send() is signed as follows

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - Producers manage buffers for records waiting to be sent.

  • Callback - A user-provided callback (null indicates no callback) that is performed when the server confirms the record.

  • The Kafka Producer class provides a flush method to ensure that all previously sent messages are actually completed. The syntax of the flush method is as follows -

public void flush()
  • The Kafka Producer class provides a partitionFor method, which helps you get partition metadata for a given topic. /b10> This can be used to customize partitions. The signature of this method is as follows -

public Map metrics()

It returns a map of internal metrics maintained by the producer.

  • public void close() - The Kafka Producer class provides a closed method block until all previously sent requests are complete.

Producer API

The central part of the producer API is the producer class. /b10> Producer classes provide an option to connect Kafka agents in their constructors by the following methods.

Producer class

The producer class provides a send method to send a message to a single or multiple topics using the following signatures.


public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,"async")
ProducerConfig config = new ProducerConfig(prop);

There are two types of producers - Synchronized and Asynchronous.

The same API configuration also applies to sync producers. /b10> The difference between them is that the sync generator sends messages directly, but in the background. /b11> Asynchronous producers are the first choice when you want higher throughput. /b12> In previous versions, such as 0.8, an asynchronous producer did not call backend() to register an error handler. /b13> This is only available in the current version 0.9.

public void close()

The producer class provides the Close method to close the producer pool connection to all Kafka agents.

Configure the settings

The following table lists the main configuration settings for the Producer API to better understand -

S.No Configure settings and instructions
1

client.id

Identify the producer application

2

producer.type

Synchronized or asynchronous

3

acks

The standard under the acks configuration control producer request is complete.

4

Retry

If the producer request fails, a specific value is used to automatically retry.

5

Bootstrapping agent list.

6

linger.ms

If you want to reduce the number of requests, you linger.ms set the value to something larger than a value.

7

key.serializer

The key of the serializer interface.

8

value.serializer

Value.

9

batch.size

The size of the buffer.

10

buffer.memory

Controls the total amount of memory that producers can use to buffer.

ProducerRecord API

ProducerRecord is sent to Kafka Cluster. The key/value pair of the ProducerRecord class constructor, which is used to create records with partitions, keys, and value pairs using the following signatures.

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic - The user-defined topic name that will be attached to the record.

  • Partition - Partition count.

  • Key - The key that will be included in the record.

  • Value - content.
public ProducerRecord (string topic, k key, v value)

The ProducerRecord class constructor is used to create records with keys, value pairs, and no partitions.

  • Theme - Create a theme to assign records.

  • Key - The key of the record.

  • Value - Record content.

public ProducerRecord (string topic, v value)

The ProducerRecord class creates a record without partitions and keys.

  • Theme - Create a theme.

  • Value - Record content.

The ProducerRecord class method is listed in the table below -

S.No Class methods and descriptions
1

public string topic()

The topic is attached to the record.

2

public K key()

The key that will be included in the record. /b10> If you don't have such a key, null will reopen here.

3

public V value()

Record the contents.

4

partition()

The recorded partition count

SimpleProducer application

Before you create an application, start the ZooKeeper and Kafka agents, and then use the create topic command to create your own themes in the Kafka agent. /b10> After that, create a java class called Sim-pleProducer .java, and then type the following code.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer"
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully");
               producer.close();
   }
}

Compilation - You can compile your application using the following commands.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java

Execute - You can execute the application using the following command.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

A simple example of a consumer

So far, we've created a producer that sends messages to the Kafka cluster. /b10> Now let's create a message for consumers to consume Kafka clusters. /b11> The KafkaConsumer API is used to consume messages from the Kafka cluster. /b12> The constructors of the KafkaConsumer class are defined below.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - Returns a map configured by the consumer.

The KafkaConsumer class has the following important methods listed in the following table.

S.No Methods and instructions
1

public java.util.Set< TopicPar- tition> assignment()

Gets the set of partitions currently assigned by the user.

2

public string subscription()

Subscribe to a given list of topics to get dynamically signed partitions.

3

public void sub-scribe(java.util.List< java.lang.String> topics,ConsumerRe-balanceListener listener)

Subscribe to a given list of topics to get dynamically signed partitions.

4

public void unsubscribe()

Unsubscribe from a given list of partitions.

5

public void sub-scribe(java.util.List< java.lang.String> topics)

Subscribe to a given list of topics to get dynamically signed partitions. /b10> If the given list of topics is empty, it is considered the same as unsubscribe().

6

public void sub-scribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)

The parameter pattern refers to the subscription pattern in the format of the regular expression, while the listener parameter gets notifications from the subscription mode.

7

public void as-sign(java.util.List< TopicPartion> partitions)

Manually assign a list of partitions to the customer.

8

poll()

Use one of the subscription/assignment APIs to get data for a specified topic or partition. /b10> If the topic was not booked before polling the data, this returns an error.

9

public void commitSync()

Submit the commit offset returned by the last poll() for all sub-compilation lists of topics and partitions. /b10> The same action applies to cometAsyn().

10

public void seek(TopicPartition partition,long offset)

Gets the current offset value that the consumer will use in the next poll() method.

11

public void resume()

Restore the paused partition.

12

public void wakeup()

Wake up consumers.

ConsumerRecord API

The ConsumerRecord API is used to receive records from the Kafka cluster. /b10> This API consists of the subject name, the partition number from which records are received, and the offset to the records in the Kafka partition. /b11>The ConsumerRecord class is used to create consumer records with specific topic names, partition counts, and .lt; key, value, and value. /b12> That's right. /b13> It has the following signatures.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic - The subject name of the subject recorded by the consumer received from the Kafka cluster.

  • Partition - The partition of the theme.

  • Key - The key recorded, if no key exists null will be returned.

  • Value - Record content.

ConsumerRecords API

The ConsumerRecords API acts as a container for ConsumerRecord. /b10> This API is used to hold a list of ConsumerRecords for each partition of a particular topic. /b11> Its constructors are defined below.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - Returns a partition map for a specific topic.

  • Records - ConsumerRecord's return list.

The ConsumerRecords class defines the following methods.

S.No Method and description
1

public int count()

The number of records for all topics.

2

public Set partitions()

A partitioned set with data in this record set (if no data is returned, the set is empty).

3

public Iterator iterator()

The iterator allows you to iterate through the collection, get or re-move elements.

4

public List records()

Gets a list of records for a given partition.

Configure the settings

The configuration settings for the Consumer client API master configuration settings are as follows -

S.No Settings and instructions
1

The list of boot agents.

2

group.id

Assign a single consumer to a group.

3

enable.auto.commit

If the value is true, auto-implementation is enabled for offsets, otherwise no commits are made.

4

auto.commit.interval.ms

Returns how often the updated consumption offset is written to ZooKeeper.

5

session.timeout.ms

Represents how many milliseconds Kafka waits for ZooKeeper to respond to a request (read or write) before discarding and continuing to consume messages.

SimpleConsumer application

The producer application steps remain unchanged here. /b10> First, start your ZooKeeper and Kafka agents. /b11> Then create a SimpleConsumer .java using a Java class called SimpleCon-sumer and type the following code.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " &plus; topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation - You can compile your application using the following commands.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java

Execute - You can execute the application using the following command

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleConsumer <topic-name>

Input - Open generator CLI and send some messages to the topic. /b10> You can type the smple into 'Hello Consumer'.

Output - Here's the output.

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer