May 26, 2021 Apache Storm
Now that we've gone through the core technical details of Apache Storm, it's time to write some simple scenarios.
Mobile calls and their duration will be used as input to Apache Storm, which processes and groups calls between the same caller and receiver and their total number of calls.
Spout is a component for data generation. B
asically, a
spout
will implement an IRichSpout interface.
The "IRichSpout" interface has the following important methods -
open - Provides an execution environment for Spout. T he executor will run this method to initialize the nozzle.
NextTuple - emits the generated data through the collector.
Close - This method is called when spout is about to close.
DeclareOutputFields - Declares the output mode of the group.
ack - Confirm that a specific group is processed.
fail - specifies that specific metagroups are not processed and are not reprocessed.
The signature of the open method is as follows -
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf - Provides a storm configuration for this spout.
context - Provides complete information about the spout location in the topology, its task ID, input and output information.
collector - allows us to issue tuds that will be processed by bolts.
The signature of the nextTuple method is as follows -
nextTuple()
NextTuple() is called periodically from the same loop as theack() and fail() methods. I t must release thread control when there is no work to be done so that other methods have a chance to be called. ng. f so, it should hibernate for at least one millisecond to reduce the processor's load before returning.
The signature of the close method is as follows -
close()
The signature of the declareOutputFields method is as follows-
declareOutputFields(OutputFieldsDeclarer declarer)
declarer - It is used to declare output stream id, output fields, and so on
This method is used to specify the output mode of the yuan group.
The signature of the
ack method is as follows -
ack(Object msgId)
The method confirms that a specific group has been processed.
The nextTuple method is signed as follows-
ack(Object msgId)
This method notifies a specific group that it has not been fully processed. Storm will reprocess specific metagroups.
In our scenario, we need to collect call log details. The information in the call log is included.
Since we do not have real-time information for the call log, we will generate a false call log. F alse information will be created using the Random class. The complete program code is as follows.
import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Bolt is a component that uses tuds as inputs, processes tuds, and produces new tuds as output.
Bolts
will implement
the IRichBolt
interface.
In this program, two are used
Bolts
Classes
CallLogCreatorBolt
and CallLogCounterBolt
do this.
The IRichBolt interface has the following methods -
prepare - Provides the bolt with the environment to execute. T he executor will run this method to initialize spout.
Execute - Handles input for a single dollar group
cleanup - called when spout is to be closed.
DeclareOutputFields - Declares the output mode of the group.
prepare(Map conf, TopologyContext context, OutputCollector collector)
conf - The Storm configuration is available for this bolt.
context - Provides complete information about the bolt location in the topology, its task ID, input and output information, and more.
collector - allows us to issue processed d'ites.
The signature of the execute method is as follows -
execute(Tuple tuple)
Here's the u-group is the input metagroup to process.
The execute method processes a single group at a time. T he tuple data can be accessed through the getValue method of the Tuple class. Y ou do not have to process the input yuan group immediately. ss. he processed metagroups can be issued by using the OutputCollector class.
The cleanup
method is signed as follows -
cleanup()
The signature of the declareOutputFields method is as follows-
declareOutputFields(OutputFieldsDeclarer declarer)
The parameter declarer here is used to declare the output stream id, the output field, and so on.
This method is used to specify the output mode of the yuan group.
Call log creator bolt receives the call log tue. T he call log group has a caller number, receiver number, and call duration. T his bolt simply creates a new value by combining the caller number with the receiver number. ws. he complete code is as follows.
//import util packages import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; //import Storm IRichBolt package import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //Create a class CallLogCreatorBolt which implement IRichBolt interface public class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Call log creator bolt receives the call log tue. T he call log group has a caller number, receiver number, and call duration. T his bolt simply creates a new value by combining the caller number with the receiver number. ws. he complete code is as follows.
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
The Storm topology is basically a Trift structure.
The TopologyBuilder class provides an easy and easy way to create complex topology.
The TopologyBuilder class has a way to set
up spout (setSpout)
and set bolt.
F
inally, TopologyBuilder has CreativeTopology to create a topology.
Create a topology using the following snippets of code -
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
The shufflegrouping and fieldsgrouping methods help set up stream groupings for spout and bolts.
For development purposes, we can use the LocalCluster object to create a local cluster and then submit the topology using the submitTopology method of the LocalCluster class.
One of the parameters of "submitTopology" is an instance of the "Config" class.
The Config class is used to set configuration options before submitting a topology.
This configuration option is merged with the cluster configuration at runtime and sent to all tasks (spout and bolt) using the prepare method.
Once the topology is committed to the cluster, we wait 10 seconds for the cluster to calculate the committed topology, and then use the "shutdown" method of "LocalCluster" to shut down the cluster.
The complete program code is as follows -
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }
The complete application has four Java codes.
They are -
Applications can be built using the following commands -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
The application can run with the following command -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
Once the application starts, it outputs full details about the cluster startup process, spout, and bolt processing, and finally the cluster shutdown process.
In CallLogCounterBolt, we printed the call and its count details.
This information will appear on the console as follows -
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93
The Storm topology is implemented through the Trift interface, which makes it easy to submit the topology of any language. S torm supports Ruby, Python and many other languages. Let's take a look at python binding.
Python is a common interpretation, interaction, object-oriented, and advanced programming language. S torm supports Python to implement its topology. Python supports launch, anchoring, acking, and log operations.
As you know, bolts can be defined in any language. B olts written in another language are executed as sub-processes, and Storm communicates with JSON messages through stdin/stdout. ng. python binding.
public static class WordCount implements IRichBolt { public WordSplit() { super("python", "splitword.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
The class WordCount here implements the IRichBolt interface and runs with the python implementation to specify the super method parameter "splitword.py". Now create a python implementation splitword.py "Creating a File".
import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) WordCountBolt().run()
This is Python's example implementation, which calculates the words in a given sentence. Similarly, you can bind to other support languages.