Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Apache Storm working instance


May 26, 2021 Apache Storm


Table of contents


Now that we've gone through the core technical details of Apache Storm, it's time to write some simple scenarios.

Scenario - Mobile call log analyzer

Mobile calls and their duration will be used as input to Apache Storm, which processes and groups calls between the same caller and receiver and their total number of calls.

Spout was created

Spout is a component for data generation. B asically, a spout will implement an IRichSpout interface. The "IRichSpout" interface has the following important methods -

  • open - Provides an execution environment for Spout. T he executor will run this method to initialize the nozzle.

  • NextTuple - emits the generated data through the collector.

  • Close - This method is called when spout is about to close.

  • DeclareOutputFields - Declares the output mode of the group.

  • ack - Confirm that a specific group is processed.

  • fail - specifies that specific metagroups are not processed and are not reprocessed.

open

The signature of the open method is as follows -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Provides a storm configuration for this spout.

  • context - Provides complete information about the spout location in the topology, its task ID, input and output information.

  • collector - allows us to issue tuds that will be processed by bolts.

nextTuple

The signature of the nextTuple method is as follows -

nextTuple()

NextTuple() is called periodically from the same loop as theack() and fail() methods. I t must release thread control when there is no work to be done so that other methods have a chance to be called. ng. f so, it should hibernate for at least one millisecond to reduce the processor's load before returning.

close

The signature of the close method is as follows -

close()

declareOutputFields

The signature of the declareOutputFields method is as follows-

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - It is used to declare output stream id, output fields, and so on

This method is used to specify the output mode of the yuan group.

Ack

The signature of the ack method is as follows -

ack(Object msgId)

The method confirms that a specific group has been processed.

fail

The nextTuple method is signed as follows-

ack(Object msgId)

This method notifies a specific group that it has not been fully processed. Storm will reprocess specific metagroups.

FakeCallLogReaderSpout

In our scenario, we need to collect call log details. The information in the call log is included.

  • The calling number
  • The receiving number
  • Duration

Since we do not have real-time information for the call log, we will generate a false call log. F alse information will be created using the Random class. The complete program code is as follows.

Coding - FakeCallLogReaderSpout .java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Bolt was created

Bolt is a component that uses tuds as inputs, processes tuds, and produces new tuds as output. Bolts will implement the IRichBolt interface. In this program, two are used Bolts
Classes CallLogCreatorBolt and CallLogCounterBolt do this.

The IRichBolt interface has the following methods -

  • prepare - Provides the bolt with the environment to execute. T he executor will run this method to initialize spout.

  • Execute - Handles input for a single dollar group

  • cleanup - called when spout is to be closed.

  • DeclareOutputFields - Declares the output mode of the group.

Prepare

The signature of the prepare method is as follows -

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - The Storm configuration is available for this bolt.

  • context - Provides complete information about the bolt location in the topology, its task ID, input and output information, and more.

  • collector - allows us to issue processed d'ites.

execute

The signature of the execute method is as follows -

execute(Tuple tuple)

Here's the u-group is the input metagroup to process.

The execute method processes a single group at a time. T he tuple data can be accessed through the getValue method of the Tuple class. Y ou do not have to process the input yuan group immediately. ss. he processed metagroups can be issued by using the OutputCollector class.

cleanup

The cleanup method is signed as follows -

cleanup()

declareOutputFields

The signature of the declareOutputFields method is as follows-

declareOutputFields(OutputFieldsDeclarer declarer)

The parameter declarer here is used to declare the output stream id, the output field, and so on.

This method is used to specify the output mode of the yuan group.

Call log creator bolt

Call log creator bolt receives the call log tue. T he call log group has a caller number, receiver number, and call duration. T his bolt simply creates a new value by combining the caller number with the receiver number. ws. he complete code is as follows.

Code - CallLogCreatorBolt .java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Call Log Counter Bolt

Call log creator bolt receives the call log tue. T he call log group has a caller number, receiver number, and call duration. T his bolt simply creates a new value by combining the caller number with the receiver number. ws. he complete code is as follows.

Coding - CallLogCounterBolt .java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Create a topology

The Storm topology is basically a Trift structure. The TopologyBuilder class provides an easy and easy way to create complex topology. The TopologyBuilder class has a way to set up spout (setSpout) and set bolt. F inally, TopologyBuilder has CreativeTopology to create a topology. Create a topology using the following snippets of code -

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

The shufflegrouping and fieldsgrouping methods help set up stream groupings for spout and bolts.

The local cluster

For development purposes, we can use the LocalCluster object to create a local cluster and then submit the topology using the submitTopology method of the LocalCluster class. One of the parameters of "submitTopology" is an instance of the "Config" class. The Config class is used to set configuration options before submitting a topology. This configuration option is merged with the cluster configuration at runtime and sent to all tasks (spout and bolt) using the prepare method. Once the topology is committed to the cluster, we wait 10 seconds for the cluster to calculate the committed topology, and then use the "shutdown" method of "LocalCluster" to shut down the cluster. The complete program code is as follows -

Code - LogAnalyser Storm .java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Build and run applications

The complete application has four Java codes. They are -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

Applications can be built using the following commands -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

The application can run with the following command -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Output

Once the application starts, it outputs full details about the cluster startup process, spout, and bolt processing, and finally the cluster shutdown process. In CallLogCounterBolt, we printed the call and its count details. This information will appear on the console as follows -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Non-JVM language

The Storm topology is implemented through the Trift interface, which makes it easy to submit the topology of any language. S torm supports Ruby, Python and many other languages. Let's take a look at python binding.

Python binding

Python is a common interpretation, interaction, object-oriented, and advanced programming language. S torm supports Python to implement its topology. Python supports launch, anchoring, acking, and log operations.

As you know, bolts can be defined in any language. B olts written in another language are executed as sub-processes, and Storm communicates with JSON messages through stdin/stdout. ng. python binding.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

The class WordCount here implements the IRichBolt interface and runs with the python implementation to specify the super method parameter "splitword.py". Now create a python implementation splitword.py "Creating a File".

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

This is Python's example implementation, which calculates the words in a given sentence. Similarly, you can bind to other support languages.