Apache Kafka Quick Guide


May 26, 2021 17:00 Apache Kafka


Table of contents


Apache Kafka - Introduction

In big data, a lot of data is used. /b10> We have two main challenges with data. T he first challenge is how to collect large amounts of data, and the second challenge is to analyze the collected data. /b11> To overcome these challenges, you must need a messaging system.

Kafka is designed for distributed high throughput systems. /b10> Kafka tends to work well as a more traditional alternative to messaging agents. /b11>Kafka has better throughput, built-in partitioning, replication, and inherent fault tolerance than other messaging systems, making it ideal for large-scale messaging applications.

What is a messaging system?

The messaging system is responsible for transferring data from one application to another, so the application can focus on the data without worrying about how to share it. /b10> Distributed messaging is based on the concept of reliable message queues. /b11> Messages are queued asynchronously between the client application and the messaging system. /b12>There are two types of message patterns available - one is point-to-point and the other is publish-subscribe (pub-sub) messaging systems. /b13> Most message patterns follow pub-sub.

Point-to-point messaging system

In a point-to-point system, messages are kept in the queue. /b10> One or more consumers can consume messages in the queue, but a particular message can only be consumed by up to one consumer. /b11> Once the consumer reads the message in the queue, it disappears from the queue. /b12> A typical example of this system is an order processing system where each order is processed by one order processor, but multiple order processors can also work at the same time. /b13> The following image describes the structure.

Apache Kafka Quick Guide

Publish - Subscribe to the messaging system

In the publish-subscribe system, messages are retained in the topic. /b10> Unlike a point-to-point system, consumers can subscribe to one or more topics and use all messages in that topic. /b11> In a publish-subscribe system, a message producer is called a publisher and a message consumer is called a subscriber. /b12>A real-life example is Dish TV, which publishes different channels such as sports, movies, music, etc., and anyone can subscribe to their own channel set and get their subscription to the channel when available.

Apache Kafka Quick Guide

What is Kafka?

Apache Kafka is a distributed publishing -subscription messaging system and a powerful queue that can process large amounts of data and enable you to deliver messages from one endpoint to another. /b10> Kafka is suitable for offline and online messaging consumption. /b11> Kafka messages remain on disk and are replicated within the cluster to prevent data loss. /b12> Kafka is built on the ZooKeeper sync service. /b13> It integrates well with Apache Storm and Spark for real-time streaming data analysis.

Benefits

Here are a few of Kafka's benefits -

  • Reliability - Kafka is distributed, partitioned, replicated and fault-05med.

  • Scalability - Kafka messaging systems are easily scaled without downtime.

  • Durability - Kafka uses distributed commit logs, which means that messages remain on disk as quickly as possible, so it is persistent.

  • Performance - Kafka has high throughput for both publishing and subscribing messages. /b10> Even if many TB messages are stored, it maintains stable performance.

Kafka is very fast and guarantees zero downtime and zero data loss.

Case

Kafka can be used in many use cases. Some of them are listed below -

  • Metrics - Kafka is typically used to manipulate monitoring data. /b10> This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.

  • Log aggregation solution - Kafka can be used to collect logs from multiple services across organizations and make them available to multiple servers in a standard format.

  • Streaming - Popular frameworks, such as Storm and Spark Streaming, read data from topics, process them, and write processed data to new topics for use by users and applications. /b10> Kafka's durability is also useful in the context of stream processing.

Kafka is needed

Kafka is a unified platform for processing all real-time data feeds. /b10> Kafka supports low-latency messaging and provides a guarantee of fault tolerance in the event of a machine failure. /b11> It has the ability to handle a large number of different consumers. /b12> Kafka is very fast, executing 2 million writes per second. /b13> Kafka saves all the data to disk, which essentially means that all writes go to the page cache of the operating system (RAM). /b14> This makes it very efficient to transfer data from the page cache to the network socket.

Apache Kafka - Basic

Before you dive into Kafka, you must understand the main terms such as topics, brokers, producers and consumers. /b11>The following diagram illustrates the main terminology, and the table describes the chart components in detail.

Apache Kafka Quick Guide

In the image above, the theme is configured as three partitions. /b10> Partition 1 has two offset factors 0 and 1. Partition 2 has four offset factors 0, 1, 2 and 3. Partition 3 has an offset factor 0. The id of the copy of the offset factor is the same as the id of the server hosting it.

Suppose that if the replication factor for the topic is set to 3, Kafka creates three identical copies of each partition and places them in a cluster so that they can be used for all of its operations. /b10> To balance the load in the cluster, each agent stores one or more of these partitions. /b11> Multiple producers and consumers can publish and retrieve messages at the same time.

S.No Components and instructions
1

Topics (Themes)

A message flow that belongs to a specific category is called a topic. /b10> The data is stored in the topic.

The theme is split into partitions. /b10> For each theme, Kafka saves a partition of The Mini Mom. /b11> Each such partition contains messages for an immedible ordered sequence. /b12> A partition is implemented as a set of segmented files of equal size.

2

Partition (partition)

A topic can have many partitions, so it can handle any amount of data.

3

Partition offset (partition offset)

Each partition message has a unique sequence identity called offset.

4

Replicas of partition (partition backup)

A copy is just a backup of a partition. /b10> Replicas never read or write data. /b11> They are used to prevent data loss.

5

Brokers (Brokers)

  • An agent is a simple system that maintains published data. /b10> Each agent can have zero or more partitions per topic. /b11> Suppose that if there are N partitions in a topic and N agents, each agent will have one partition.

  • Assuming that there are N partitions in a topic and more than N agents (n plus m), the first N agent will have one partition, and the next M agent will not have any partitions for that particular topic.

  • Assuming that there are N partitions in a topic and less than N agents (n-m), each agent will have one or more partition shares between them. /b10> This scenario is not recommended because the load distributions between agents are not equal.

6

Kafka Cluster

Kafka has multiple agents called Kafka clusters. /b10> Kafka clusters can be extended without downtime. /b11> These clusters are used to manage the persistence and replication of message data.

7

Producers (producers)

The producer is the publisher of a message sent to one or more Kafka topics. /b10> Producers send data to Kafka brokers. /b11> Whenever a producer publishes a message to an agent, the agent simply attachs the message to the last segment file. /b12> In effect, the message is attached to the partition. /b13> Producers can also send messages to partitions of their choice.

8

Consumers (Consumers)

Consumers reads data from brokers. /b10> Consumers subscribe to one or more topics and use published messages by extracting data from the agent.

9

Leader (Leader)

Leader is the node responsible for all reads and writes for a given partition. Each partition has a server acting as leader

10

Follower (follower)

The node that follows the leader's instructions is called Follower. /b10> If the leader fails, a follower will automatically become the new leader. /b11> Followers, as normal consumers, pull messages and update their own data stores.


Apache Kafka - Cluster architecture

Take a look at the illustration below. /b10>It shows Kafka's cluster diagram.

Apache Kafka Quick Guide

The following table describes each component shown in the figure above.

S.No Components and instructions
1

Broker (Agent)

Kafka clusters typically consist of multiple agents to maintain load balancing. /b10> Kafka agents are stateless, so they use ZooKeeper to maintain their cluster state. /b11>A Kafka proxy instance can handle hundreds of thousands of reads and writes per second, and each Broker can process TB messages without performance impact. /b12> Kafka broker leadership elections can be completed by ZooKeeper.

2

ZooKeeper

ZooKeeper is used to manage and coordinate Kafka agents. /b10> The ZooKeeper service is primarily used to notify producers and consumers of any new agents in the Kafka system or agent failures in the Kafka system. /b11> According to Zookeeper, notifications of agents' presence or failures are received, and then products and consumers take decisions and begin coordinating their tasks with some other agents.

3

Producers (Producers)

Producers push data to brokers. /b10> When a new agent starts, all producers search for it and automatically send messages to the new agent. /b11> Kafka producers do not wait for confirmation from the agent and send messages as quickly as the agent can handle.

4

Consumers (Consumers)

Because the Kafka agent is stateless, this means that consumers must maintain how much messages have been consumed by using partition offsets. /b10> If the consumer confirms a specific message offset, it means that the consumer has consumed all previous messages. /b11> The consumer makes an asynchronous pull request to the agent to have a byte buffer ready to consume. /b12> Consumers can simply regress or jump to any point in the partition by providing an offset value. /b13> The consumer offset value is notified by ZooKeeper.

Apache Kafka - WorkFlow

So far, we've discussed kafka's core concepts. /b10> Let's take a look at Kafka's workflow now.

Kafka is just a collection of topics divided into one or more partitions. /b10> Kafka partitions are linear, ordered sequences of messages, each of which is identified by their index, called offset. /b11> All data in a Kafka cluster is a partition federation that is not connected. /b12> The incoming message is written at the end of the partition and is read in consumer order. /b13> Provides persistence by copying messages to different agents.

Kafka provides pub-sub- and queue-based messaging systems in a fast, reliable, durable, fault-0200s and zero-downtime manner. /b10> In both cases, producers simply send messages to the topic, and consumers can choose any type of messaging system based on their needs. /b11> Let's follow the steps in the next section to learn how consumers choose the messaging system of their choice.

Publish - The workflow for subscribing to messages

Here's a step-by-step workflow for Pub-Sub messages -

  • Producers periodically send messages to topics.

  • The Kafka agent stores all messages in the partition configured for that particular topic. /b10> It ensures that messages are shared equally between partitions. /b11> If the producer sends two messages and has two partitions, Kafka stores a message in the first partition and a second message in the second partition.

  • Consumers subscribe to specific topics.

  • Once the consumer subscribes to the topic, Kafka will provide the consumer with the current offset of the topic and save the offset in the Zookeyer series.

  • Consumers will periodically request new messages from Kafka, such as 100 Ms.

  • Once Kafka receives messages from producers, it forwards them to consumers.

  • The consumer will receive the message and process it.

  • Once the message is processed, the consumer sends an acknowledgement to the Kafka agent.

  • Once Kafka receives confirmation, it changes the offset to the new value and updates it in Zookeeper. /b10> Because the offset is maintained in Zookeeper, consumers can correctly read the next message, even during server violence.

  • The above process will be repeated until the consumer stops the request.

  • Consumers can always fall back/jump to the desired topic offset and read all subsequent messages.

The workflow for the queue message/user group

In a queue messaging system instead of a single consumer, a group of consumers with the same group ID subscribes to the topic. /b10> Simply put, consumers who subscribe to topics with the same Group ID are considered a single group, and messages are shared between them. /b11> Let's examine the actual workflow of this system.

  • Producers send messages to a topic at regular intervals.

  • Kafka stores all messages in the partition configured for that particular topic, similar to the previous scenario.

  • A single consumer subscribes to a specific topic, assuming Topic-01 is Group-1.

  • Kafka interacts with consumers in the same way as publish-subscribe messages until new consumers subscribe to the same topic Topic-01 1 with the same group ID.

  • Once new consumers arrive, Kafka switches its operations to shared mode and shares data between the two consumers. /b10> This share continues until the number of users reaches the number of partitions configured for that particular topic.

  • Once the number of consumers exceeds the number of partitions, the new consumer will not receive any further messages until the existing consumer unsubscribes from any one consumer. /b10> This occurs because each consumer in Kafka will be assigned at least one partition, and once all partitions are assigned to existing consumers, new consumers will have to wait.

  • This feature is also known as a consumer group. /b10> Similarly, Kafka offers the best of both systems in a very simple and efficient way.

ZooKeeper's role

A key dependency of Apache Kafka is Apache Zookeeper, a distributed configuration and synchronization service. /b10> Zookeeper is a coordination interface between Kafka agents and consumers. /b11> The Kafka server shares information through the Zookeyer cluster. /b12> Kafka stores basic metadata in Zookeeper, such as information about topics, agents, consumer offsets (queue readers), and so on.

Because all critical information is stored in Zookeeper, and it usually replicates this data as a whole, the Kafka agent/Zookeyer failure does not affect the state of the Kafka cluster. /b10> Kafka will return to state once Zookeeper restarts. /b11> This brings zero downtime to Kafka. /b12> The leadership election between Kafka's agents was also completed by using Zookeyer in the event of a leader's defeat.

To learn more about Zookeeper, see Zookeyer

Let's move on to the next chapter on how to install Java, ZooKeeper and Kafka on your machine.

Apache Kafka - Installation steps

Here are the steps to install Java on the machine.

Step 1 - Verify the Java installation

Hopefully you've installed java on your machine, so you just need to verify it using the command below.

$ java -version

If java is successfully installed on your machine, you can see the installed version of Java.

Step 1.1 - Download JDK

If you are not downloading Java, download the latest version of JDK by visiting the link below and downloading the latest version.

http://www.oracle.com/technetwork/java/javase/downloads/index.html

The latest version is now JDK 8u 60, and the file is "jdk-8u60-linux-x64.tar.gz". /b10> Please download the file on your machine.

Step 1.2 - Extract the file

Typically, the file being downloaded is stored in the download folder, verified, and the tar settings are extracted using the following commands.

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

Step 1.3 - Move to the selection directory

To make java available to all users, move the extracted java content to usr/local/java/folder.

$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/

Step 1.4 - Set the path

To set the path JAVA_HOME variables, add the following command to the .bashrc file.

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

All changes are now applied to the currently running system.

$ source ~/.bashrc

Step 1.5 - Java substitution

Use the following command to change Java Alternatives.

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

Step 1.6 - Now verify java with the verification command (java -version) described in step 1.

Step 2 - ZooKeeper frame installation

Step 2.1 - Download ZooKeeper

To install the ZooKeeper framework on your computer, visit the link below and download the latest version of zooKeeper.

http://zookeeper.apache.org/releases.html

The latest version of ZooKeeper is now 3.4.6 (ZooKeeper-3.4.6.tar.gz).

Step 2.2 - Extract the tar file

Use the following commands to extract the tar file

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

Step 2.3 - Create a profile

Use the command "conf/ zoo.cfg" to open a profile named conf/zoo .cfg and set all of the following parameters as a starting point.

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

Once the profile is successfully saved and returned to the terminal again, you can start the zookeyer server.

Step 2.4 - Start the ZooKeeper server

$ bin/zkServer.sh start

After executing this command, you will get a response like the following -

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

Step 2.5 - Start the CLI

$ bin/zkCli.sh

After entering the command above, you will be connected to the zookeyer server and will receive the following response.

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

Step 2.6 - Stop the Zookeeper server

After connecting the server and doing everything, you can use the following command to stop the zookeeper server -

$ bin/zkServer.sh stop

Now you've successfully installed Java and ZooKeeper on your machine. /b10> Let's take a look at the steps to install Apache Kafka.

Step 3 - Apache Kafka installation

Let's continue with the following steps to install Kafka on your machine.

Step 3.1 - Download Kafka

To install Kafka on your machine, please click on the link below -

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

Now the latest version, kafka_2.11-0.9.0.0.tgz, will be downloaded to your computer.

Step 3.2 - Unzip the tar file

Use the following command to extract the tar file -

$ cd opt/
$ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0

You have now downloaded the latest version of Kafka on your machine.

Step 3.3 - Start the server

You can start the server by giving the following command -

$ bin/kafka-server-start.sh config/server.properties

After the server starts, you'll see the following response on the screen:

$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….

Step 4 - Stop the server

After all the operations have been performed, the server can be stopped using the following command -

$ bin/kafka-server-stop.sh config/server.properties

Now that we've discussed Kafka installation, we can learn how to do the basics of Kafka in the next chapter.

Apache Kafka - Basic operation

First let's start with a single-node single-agent configuration, and then we migrate our settings to a single-node multi-agent configuration.

Hopefully you can now install Java, ZooKeeper and Kafka on your machine. /b10> Before migrating to Kafka Cluster Setup, you first need to start ZooKeeper, because Kafka Cluster uses ZooKeeper.

Start ZooKeeper

Open a new terminal and type the following command -

bin/zookeeper-server-start.sh config/zookeeper.properties

To start Kafka Broker, type the following command -

bin/kafka-server-start.sh config/server.properties

After you start Kafka Broker, type the command jps on the ZooKeeper terminal and you'll see the following response -

821 QuorumPeerMain
928 Kafka
931 Jps

Now you can see two daemons running on the terminal, QuorumPeerMain is the ZooKeeper daemon, and the other is the Kafka daemon.

Single-node - single-agent configuration

In this configuration, you have a ZooKeeper and proxy id instance. Here's how to configure it -

Create a Kafka theme - Kafka provides a command kafka-topics.sh utility called "Create" to create a topic on the server. /b10> Open the new terminal and type the following example.

Grammar

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

We've just created a theme called Hello-Kafka that contains a partition and a copy factor. The output created above will be similar to the following output -

Output - Create the theme Hello-Kafka

After you create a theme, you can get notifications in the Kafka proxy terminal window and log the creation of the theme specified in "/tmp/kafka-logs /" in the confeg/server.properties file.

The list of topics

To get a list of topics in the Kafka server, you can use the following command -

Grammar

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

Since we've created a theme, it lists only Hello-Kafka. /b10> Suppose you create more than one theme, and you'll get the theme name in the output.

Start the producer to send a message

Grammar

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

From the above syntax, the producer command line client needs two main parameters -

Agent List - The list of agents we want to send mail to. /b10> In this case, we only have one agent. /b11>The Config /server.properties file contains the proxy port ID because we know that our agent is listening to port 9092, so you can specify it directly.

Topic Name - The following is an example of a topic name.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

Producers will wait for input from stdin and publish to the Kafka cluster. /b10> By default, each new line is published as a new message, and then the default producer property is specified in the config/producer.properties file. /b11> You can now type a few lines of messages in the terminal, as shown below.

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

Start the consumer to receive the message

Similar to producers, default consumer properties are specified in the config/consumer.proper-ties file. /b10> Open a new terminal and type the following message message syntax.

Grammar

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

Output

Hello
My first message
My second message

Finally, you can enter messages from the producer's terminal and see them appear at the consumer's terminal. /b10> So far, you have a very good understanding of a single-node cluster with a single agent. /b11> Now let's move on to multiple proxy configurations.

Single-node multi-agent configuration

Start the ZooKeeper server before entering multiple proxy cluster settings.

Create multiple Kafka Brokers - We already have a Kafka proxy instance in configuration/server.properties. /b10> Now we need multiple proxy instances, so copy the existing server.prop-erties file into two new profiles and rename it server-one.properties and server-two.properties. Then edit the two new files and assign the following changes -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Start multiple agents - After all the changes are made on three servers, open three new terminals and start each agent one by one.

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

Now we have three different brokers running on the machine. /b10> Try it yourself, check all the daemons by typing jps on the ZooKeeper terminal, and you'll see a response.

Create a theme

Let's specify the replication factor value as three for this topic because we have three different agents running. /b10> If you have two agents, the assigned replica value will be two.

Grammar

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

Output

created topic “Multibrokerapplication"

The Describe command is used to check which agent is listening to the currently created topic, as shown below -

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

From the above output, we can conclude that the first line gives a summary of all partitions, showing the topic name, the number of partitions, and the replication factors we have selected. /b10> In the second row, each node will be the leader of the randomly selected portion of the partition.

In our case, we see that our first broker.id 0 is the leader. /b10> Then Repricas:0,2,1 means that all agents copy the theme and the last Isr is a collection of in-sync replicas. /b11> Well, this is a subset of the replicas that are currently alive and caught up by the leaders.

Start the producer to send a message

This procedure remains the same as in a single agent setting.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

Output

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

Start the consumer to receive the message

This procedure remains the same as shown in the single agent settings.

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

Output

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

Basic topic actions

In this chapter, we'll discuss a variety of basic topic actions.

Modify the theme

You've learned how to create themes in Kafka Cluster. Now let's modify the created topic with the following command

Grammar

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

Example

We have already created a topic “Hello-Kafka" with single partition count and one replica factor. 
Now using “alter" command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

Output

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

Delete the theme

To remove the theme, you can use the following syntax.

Grammar

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output

> Topic Hello-kafka marked for deletion

Note - If delete.topic.enable is not set to true, this operation will have no effect

Apache Kafka - Simple producer example

Let's use the Java client to create an application for publishing and using messages. /b10> The Kafka producer client includes the following APIs.

KafkaProducer API

Let's look at the most important set of Kafka producer APIs in this section. /b10> The central part of the Kafka Producer API is the Kafka Producer class. /b11> The Kafka Producer class provides an option to connect the Kafka agent in its constructor to the following methods.

  • The Kafka Producer class provides a send method to send messages to the topic asynchronously. Send() is signed as follows

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - Producers manage buffers for records waiting to be sent.

  • Callback - A user-provided callback (null indicates no callback) that is performed when the server confirms the record.

  • The Kafka Producer class provides a flush method to ensure that all previously sent messages are actually completed. The syntax of the flush method is as follows -

public void flush()
  • The Kafka Producer class provides a partitionFor method, which helps you get partition metadata for a given topic. /b10> This can be used to customize partitions. The signature of this method is as follows -

public Map metrics()

It returns a map of internal metrics maintained by the producer.

  • public void close() - The Kafka Producer class provides a closed method block until all previously sent requests are complete.

Producer API

The central part of the producer API is the producer class. /b10> Producer classes provide an option to connect Kafka agents in their constructors by the following methods.

Producer class

The producer class provides a send method to send a message to a single or multiple topics using the following signatures.


public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,"async")
ProducerConfig config = new ProducerConfig(prop);

There are two types of producers - Synchronized and Asynchronous.

The same API configuration also applies to sync producers. /b10> The difference between them is that the sync generator sends messages directly, but in the background. /b11> Asynchronous producers are the first choice when you want higher throughput. /b12> In previous versions, such as 0.8, an asynchronous producer did not call backend() to register an error handler. /b13> This is only available in the current version 0.9.

public void close()

The producer class provides the Close method to close the producer pool connection to all Kafka agents.

Configure the settings

The following table lists the main configuration settings for the Producer API to better understand -

S.No Configure settings and instructions
1

client.id

Identify the producer application

2

producer.type

Synchronized or asynchronous

3

acks

The standard under the acks configuration control producer request is complete.

4

Retry

If the producer request fails, a specific value is used to automatically retry.

5

Bootstrapping agent list.

6

linger.ms

If you want to reduce the number of requests, you linger.ms set the value to something larger than a value.

7

key.serializer

The key of the serializer interface.

8

value.serializer

Value.

9

batch.size

The size of the buffer.

10

buffer.memory

Controls the total amount of memory that producers can use to buffer.

ProducerRecord API

ProducerRecord is sent to Kafka Cluster. The key/value pair of the ProducerRecord class constructor, which is used to create records with partitions, keys, and value pairs using the following signatures.

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic - The user-defined topic name that will be attached to the record.

  • Partition - Partition count

  • Key - The key that will be included in the record.

  • Value − Record contents
public ProducerRecord (string topic, k key, v value)

The ProducerRecord class constructor is used to create records with keys, value pairs, and no partitions.

  • Theme - Create a theme to assign records.

  • Key - The key of the record.

  • Value - Record content.

public ProducerRecord (string topic, v value)

The ProducerRecord class creates a record without partitions and keys.

  • Theme - Create a theme.

  • Value - Record content.

The ProducerRecord class method is listed in the table below -

S.No Class methods and descriptions
1

public string topic()

The topic is attached to the record.

2

public K key()

The key that will be included in the record. /b10> If you don't have such a key, null will reopen here.

3

public V value()

Record the contents.

4

partition()

The recorded partition count

SimpleProducer application

Before you create an application, start the ZooKeeper and Kafka agents, and then use the create topic command to create your own themes in the Kafka agent. /b10> After that, create a java class called Sim-pleProducer .java, and then type the following code.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer"
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully");
               producer.close();
   }
}

Compilation - You can compile your application using the following commands.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java

Execute - You can execute the application using the following command.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

A simple example of a consumer

So far, we've created a producer that sends messages to the Kafka cluster. /b10> Now let's create a message for consumers to consume Kafka clusters. /b11> The KafkaConsumer API is used to consume messages from the Kafka cluster. /b12> The constructors of the KafkaConsumer class are defined below.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - Returns a map configured by the consumer.

The KafkaConsumer class has the following important methods listed in the following table.

S.No Methods and instructions
1

public java.util.Set< TopicPar- tition> assignment()

Gets the set of partitions currently assigned by the user.

2

public string subscription()

Subscribe to a given list of topics to get dynamically signed partitions.

3

public void sub-scribe(java.util.List< java.lang.String> topics,ConsumerRe-balanceListener listener)

Subscribe to a given list of topics to get dynamically signed partitions.

4

public void unsubscribe()

Unsubscribe from a given list of partitions.

5

public void sub-scribe(java.util.List< java.lang.String> topics)

Subscribe to a given list of topics to get dynamically signed partitions. /b10> If the given list of topics is empty, it is considered the same as unsubscribe().

6

public void sub-scribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)

The parameter pattern refers to the subscription pattern in the format of the regular expression, while the listener parameter gets notifications from the subscription mode.

7

public void as-sign(java.util.List< TopicPartion> partitions)

Manually assign a list of partitions to the customer.

8

poll()

Use one of the subscription/assignment APIs to get data for a specified topic or partition. /b10> If the topic was not booked before polling the data, this returns an error.

9

public void commitSync()

Submit the commit offset returned by the last poll() for all sub-compilation lists of topics and partitions. /b10> The same action applies to cometAsyn().

10

public void seek(TopicPartition partition,long offset)

Gets the current offset value that the consumer will use in the next poll() method.

11

public void resume()

Restore the paused partition.

12

public void wakeup()

Wake up consumers.

ConsumerRecord API

The ConsumerRecord API is used to receive records from the Kafka cluster. /b10> This API consists of the subject name, the partition number from which records are received, and the offset to the records in the Kafka partition. /b11>The ConsumerRecord class is used to create consumer records with specific topic names, partition counts, and .lt; key, value, and value. /b12> That's right. /b13> It has the following signatures.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic - The subject name of the subject recorded by the consumer received from the Kafka cluster.

  • Partition - The partition of the theme.

  • Key - The key recorded, if no key exists null will be returned.

  • Value - Record content.

ConsumerRecords API

The ConsumerRecords API acts as a container for ConsumerRecord. /b10> This API is used to hold a list of ConsumerRecords for each partition of a particular topic. /b11> Its constructors are defined below.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - Returns a partition map for a specific topic.

  • Records - ConsumerRecord's return list.

The ConsumerRecords class defines the following methods.

S.No Method and description
1

public int count()

The number of records for all topics.

2

public Set partitions()

A partitioned set with data in this record set (if no data is returned, the set is empty).

3

public Iterator iterator()

The iterator allows you to iterate through the collection, get or re-move elements.

4

public List records()

Gets a list of records for a given partition.

Configure the settings

The configuration settings for the Consumer client API master configuration settings are as follows -

S.No Settings and instructions
1

The list of boot agents.

2

group.id

Assign a single consumer to a group.

3

enable.auto.commit

If the value is true, auto-implementation is enabled for offsets, otherwise no commits are made.

4

auto.commit.interval.ms

Returns how often the updated consumption offset is written to ZooKeeper.

5

session.timeout.ms

Represents how many milliseconds Kafka waits for ZooKeeper to respond to a request (read or write) before discarding and continuing to consume messages.

SimpleConsumer application

The producer application steps remain unchanged here. /b10> First, start your ZooKeeper and Kafka agents. /b11> Then create a SimpleConsumer .java using a Java class called SimpleCon-sumer and type the following code.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " &plus; topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation - You can compile your application using the following commands.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*" *.java

Execute - You can execute the application using the following command

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleConsumer <topic-name>

Input - Open generator CLI and send some messages to the topic. /b10> You can type the smple into 'Hello Consumer'.

Output - Here's the output.

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

Apache Kafka - Example of a user group

Consumer groups are multithreaded or multi-machine Kafka themes.

Consumer groups

  • Consumers can join group.id using the same information

  • The maximum parallelity of a group is the number of consumers in the group← not partition.

  • Kafka assigns partitions of a topic to consumers in a group so that each partition is used by only one consumer in the group.

  • Kafka guarantees that messages can only be read by one consumer in the group.

  • Consumers can view messages in the order in which they are stored in the log.

Rebalance consumers

Adding more processes/threads will cause Kafka to rebalance. /b10> If any consumer or agent is unable to send a heartbeat to ZooKeeper, it can be reconfigured through the Kafka cluster. /b11> During this rebalancing, Kafka allocates available partitions to available threads, possibly moving partitions to another process.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " &plus; topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

Assembly

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Perform

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

Here, we create a sample group name of my-group for two consumers. /b10> Similarly, you can create your group and the number of consumers in the group.

Input

Open the producer CLI and send some messages -

Test consumer group 01
Test consumer group 02

The output of the first procedure

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

The output of the second procedure

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

Now I hope you can learn about SimpleConsumer and Consume Group by using the Java client demo. /b10> Now you know how to use the Java client to send and receive messages. /b11> Let's continue Kafka's integration with big data technology in the next chapter.

Apache Kafka - Integrated with Storm

In this chapter, we'll learn how to integrate Kafka with Apache Storm.

About Storm

Storm was originally created by the team of Nathan Marz and BackType. /b10> In a short period of time, Apache Storm becomes the standard for distributed real-time processing systems, allowing you to process large amounts of data. /b11> Storm is very fast, and a reference clock processes more than one million dTs per second per node. /b12> Apache Storm runs continuously, consuming data from the configured source (Spouts) and passing the data to the processing pipeline (Bolts). Union, spout, and Bolt
Make a topology.

Integration with Storm

Kafka and Storm naturally complement each other, and their powerful collaboration enables real-time streaming analytics of fast-moving big data. /b10> Kafka and Storm integration is designed to make it easier for developers to get and publish data streams from storm topology.

Concept flow

Spout is the source of the stream. /b10> For example, a spout can read d'ites from Kafka Topic and send them as streams. /b11> Bolt consumes input streams, processes and may emit new streams. /b12> Bolt can do anything from running functions, filtering tuons, performing flow aggregations, streaming connections, talking to databases, and so on. /b13> Each node in the Storm topology executes in parallel. /b14> The topology runs indefinitely until it is terminated. /b15> Storm will automatically reassign any failed tasks. /b16> In addition, Storm guarantees no data loss, even if the machine is down and messages are discarded.

Let's learn more about the Kafka-Storm integration API. /b10> There are three main classes integrating Kafka with Storm. They are as follows --

Brokers - ZkHosts Static host

BrokerHosts is an interface, and ZkHosts and StaticHosts are its two main implementations. /b10> ZkHosts is used to dynamically track Kafka agents by maintaining details in ZooKeeper, while StaticHosts is used to manually/staticly set kafka agents and their details. /b11> ZkHosts is a quick and easy way to access Kafka agents.

ZkHosts' signature is as follows -

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

Among them, BrokerZkStr is the ZooKeeper host, and brokerZkPath is the ZooKeeper path to maintain Kafka proxy details.

KafkaConfig API

This API is used to define the configuration settings for the Kafka cluster. Kafka Con-fig's signature is defined as follows

public KafkaConfig(BrokerHosts hosts, string topic)

    Host - BrokerHosts can be ZkHosts / StaticHosts.

    Theme - The name of the topic.

SpoutConfig API

Spoutconfig is an extension of KafkaConfig that supports additional ZooKeeper information.

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • Host - BrokerHosts can be any implementation of the BrokerHosts interface

  • Theme - The name of the topic.

  • zkRoot - ZooKeeper root path.

  • The state of the offset consumed by id - spouts stored in Zookeeper. /b10> The ID should uniquely identify your spout.

SchemeAsMultiScheme

SchemeAsMultiScheme is an interface that indicates how to convert byteBuffer consumed from Kafka to a Storm group. /b10> It originates from MultiScheme and accepts the implementation of the Scheme class. /b11> There are many implementations of the Scheme class, one of which is StringScheme, which resolves bytes into a simple string. /b12> It also controls the naming of output fields. /b13> The signature is defined below.

public SchemeAsMultiScheme(Scheme scheme)
  • Scenario - Byte buffer consumed from kafka.

KafkaSpout API

KafkaSpout is our spout implementation, which will integrate with Storm. /b10> It gets messages from the kafka topic and sends them as a group to the Storm ecosystem. /b11> KafkaSpout gets its configuration details from SpoutConfig.

Here's a sample code for creating a simple Kafka spout.

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Create Bolt

Bolt is a component that uses tuds as inputs, processes tuds, and produces new tuds as output. /b10> Bolt will implement the IRichBolt interface. /b11> In this program, use two Bolt-class WordSplitter-Bolt and WordCounterBolt to do this.

The IRichBolt interface has the following methods -

  • Preparation - Provide Bolt with the environment to perform. /b10> The executor will run this method to initialize the spout.

  • Execute - Process input for a single group.

  • Cleanup - called when Bolt wants to close.

  • DeclareOutputFields - Declares the output mode of the yuan group.

Let's create the SplitBolt .java, which implements logical splitting of a sentence into words and CountBolt .java, which implements logical separation of unique words and counting their appearance.

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.