May 26, 2021 Apache Kafka
In this chapter, we'll learn how to integrate Kafka with Apache Storm.
Storm was originally created by the team of Nathan Marz and BackType. /b10> In a short period of time, Apache Storm becomes the standard for distributed real-time processing systems, allowing you to process large amounts of data. /b11> Storm is very fast, and a reference clock processes more than one million dTs per second per node. /b12> Apache Storm runs continuously, consuming data from the configured source (Spouts) and passing the data to the processing pipeline (Bolts). /b13> Union, Spouts, and Bolt form a topology.
Kafka and Storm naturally complement each other, and their powerful collaboration enables real-time streaming analytics of fast-moving big data. /b10> Kafka and Storm integration is designed to make it easier for developers to get and publish data streams from storm topology.
Spouts is the source of the stream. /b10> For example, a nozzle can read metagroups from Kafka Topic and send them as streams. /b11> Bolt consumes input streams, processes and may emit new streams. /b12> Bolt can do anything from running functions, filtering tuons, performing flow aggregations, streaming connections, talking to databases, and so on. /b13> Each node in the Storm topology executes in parallel. /b14> The topology runs indefinitely until it is terminated. /b15> Storm will automatically reassign any failed tasks. /b16> In addition, Storm guarantees no data loss, even if the machine is down and messages are discarded.
Let's learn more about the Kafka-Storm integration API. /b10> There are three main classes integrating Kafka with Storm. They are as follows --
BrokerHosts is an interface, and ZkHosts and StaticHosts are its two main implementations. /b10> ZkHosts is used to dynamically track Kafka agents by maintaining details in ZooKeeper, while StaticHosts is used to manually/staticly set kafka agents and their details. /b11> ZkHosts is a quick and easy way to access Kafka agents.
ZkHosts' signature is as follows -
public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr)
Among them, BrokerZkStr is the ZooKeeper host, and brokerZkPath is the ZooKeeper path to maintain Kafka proxy details.
This API is used to define the configuration settings for the Kafka cluster. Kafka Con-fig's signature is defined as follows
public KafkaConfig(BrokerHosts hosts, string topic)
Host - BrokerHosts can be ZkHosts / StaticHosts.
Theme - The name of the topic.
Spoutconfig is an extension of KafkaConfig that supports additional ZooKeeper information.
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
Host - BrokerHosts can be any implementation of the BrokerHosts interface
Theme - The name of the topic.
zkRoot - ZooKeeper root path.
The state of the offset consumed by id - spouts stored in Zookeeper. /b10> The ID should uniquely identify your nozzle.
SchemeAsMultiScheme is an interface that indicates how to convert ByteBuffer consumed from Kafka to a stormy group. /b10> It originates from MultiScheme and accepts the implementation of the Scheme class. /b11> There are many implementations of the Scheme class, one of which is StringScheme, which resolves bytes into a simple string. /b12> It also controls the naming of output fields. /b13> The signature is defined below.
public SchemeAsMultiScheme(Scheme scheme)
Scenario - Byte buffer consumed from kafka.
KafkaSpout is our spout implementation, which will integrate with Storm. /b10> It gets messages from the kafka topic and sends them as a group to the Storm ecosystem. /b11> KafkaSpout gets its configuration details from SpoutConfig.
Here's an example code for creating a simple Kafka nozzle.
// ZooKeeper connection string BrokerHosts hosts = new ZkHosts(zkConnString); //Creating SpoutConfig Object SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName UUID.randomUUID().toString()); //convert the ByteBuffer to String. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); //Assign SpoutConfig to KafkaSpout. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Bolt is a component that uses tuds as inputs, processes tuds, and produces new tuds as output. /b10> Bolt will implement the IRichBolt interface. /b11> In this program, use two Bolt-class WordSplitter-Bolt and WordCounterBolt to do this.
The IRichBolt interface has the following methods -
Preparation - Provide Bolt with the environment to perform. /b10> The executor will run this method to initialize the nozzle.
Execute - Process input for a single group.
Cleanup - called when Bolt wants to close.
DeclareOutputFields - Declares the output mode of the yuan group.
Let's create the SplitBolt .java, which implements logical splitting of a sentence into words and CountBolt .java, which implements logical separation of unique words and counting their appearance.
import java.util.Map; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext; public class SplitBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String sentence = input.getString(0); String[] words = sentence.split(" "); for(String word: words) { word = word.trim(); if(!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public void cleanup() {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
import java.util.Map; import java.util.HashMap; import backtype.storm.tuple.Tuple; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext; public class CountBolt implements IRichBolt{ Map<String, Integer> counters; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counters = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple input) { String str = input.getString(0); if(!counters.containsKey(str)){ counters.put(str, 1); }else { Integer c = counters.get(str) +1; counters.put(str, c); } collector.ack(input); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counters.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
The Storm topology is basically a Trift structure. /b10> The TopologyBuilder class provides an easy and easy way to create complex topology. /b11>The TopologyBuilder class has a way to set up spout and set up bolt. /b12> Finally, TopologyBuilder has CreativeTopology to create to-pology. /b13> The shufflegrouping and fieldsgrouping methods help set up stream groupings for nozzles and Bolts.
Local Clusters
- For development purposes, we can
use the LocalCluster object
to create a local cluster, and then use the
LocalCluster's
class.
submitTopology
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import java.util.ArrayList; import java.util.List; import java.util.UUID; import backtype.storm.spout.SchemeAsMultiScheme; import storm.kafka.trident.GlobalPartitionInformation; import storm.kafka.ZkHosts; import storm.kafka.Broker; import storm.kafka.StaticHosts; import storm.kafka.BrokerHosts; import storm.kafka.SpoutConfig; import storm.kafka.KafkaConfig; import storm.kafka.KafkaSpout; import storm.kafka.StringScheme; public class KafkaStormSample { public static void main(String[] args) throws Exception{ Config config = new Config(); config.setDebug(true); config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); String zkConnString = "localhost:2181"; String topic = "my-first-topic"; BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic, UUID.randomUUID().toString()); kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4; kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4; kafkaSpoutConfig.forceFromStart = true; kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig)); builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout"); builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("KafkaStormSample", config, builder.create-Topology()); Thread.sleep(10000); cluster.shutdown(); } }
Before mobile compilation, Kakfa-Storm integration required curator ZooKeeper client java libraries. /b10> Curator version 2.9.1 supports Apache Storm 0.9.5 (which we use in this tutorial). /b11> Download the jar file specified below and place it in the java class path.
After including dependent files, compile the program using the following commands.
javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java
Start the Kafka Producer CLI (explained in the previous section),
create a new topic called my-first-topic,
and provide some sample messages as follows -
hello kafka storm spark test message another test message
Now use the following command to execute the application -
java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*":. KafkaStormSample
The sample output of this application is as follows -
storm : 1 test : 2 spark : 1 another : 1 kafka : 1 hello : 1 message : 2