Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Storm topology


May 17, 2021 Storm getting Started


Table of contents


Topology

In this chapter, you'll learn how to pass tudes between different components within the same Storm topology, and how to publish a topology to a running Storm cluster.

The data flow group

One of the most important things you need to do when designing a topology is to define how data is exchanged between components (how data streams are consumed by bolts). An array of streams specifies which streams each bolt consumes and how they are consumed.

NOTE: A node can publish more than one stream, and a data flow group allows us to choose which one to receive.

The data flow group is set when the topology is defined, as we saw in Chapter 2:

···
    builder.setBolt("word-normalizer", new WordNormalizer())
           .shuffleGrouping("word-reader");
···  

In the previous block of code, a bolt is set by the TopologyBuilder object, and then the data source is specified using a random data flow group. Data flow groups typically use the ID of the data source component as an argument, depending on the type of data flow group and other optional parameters.

NOTE: Each InputDeclarer can have more than one data source, and each data source can be grouped into different groups.

Random data flow groups

Random flow groups are the most commonly used data flow groups. It has only one parameter (data source component), and the data source sends tuodies to randomly selected bolts to ensure that each consumer receives an approximate number of tuodies.

Random data flow groups are used to mathematically calculate such atomic operations. However, if the operation cannot be randomly assigned, as in the case of the second chapter for word counting, you need to consider other grouping methods.

The domain data flow group

A domain data flow group allows you to control how tuodies are sent to bolts based on one or more domains of a tue. I t guarantees that a set of values with the same domain combination is sent to the same bolt. Back to the example of a word counter, if you group data streams with a word field, word-normalizer bolt will only send tuds of the same word to the same word-counter bolt instance.

···
    builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGrouping("word-normalizer", new Fields("word"));
···  

NOTE: All domain collections in a domain data flow group must exist in the domain declaration of the data source.

All data flow groups

All data flow groups, one copy of the metagroup for each instance that receives the data. T his grouping method is used to send signals to the bolts. F or example, if you want to refresh the cache, you can send a refresh cache signal to all bolts. In the case of word counters, you can use a full data flow group to add the ability to clear the counter cache (see topology example)

    public void execute(Tuple input) {
        String str = null;
        try{
            if(input.getSourceStreamId().equals("signals")){
                str = input.getStringByField("action");
                if("refreshCache".equals(str))
                    counters.clear();
            }
        }catch (IllegalArgumentException e){
            //什么也不做
        }
        ···
    }  

We added an if branch to check the source data flow. S torm allows us to declare named streams (if you don't send a metagroup to a named stream, the default is to a stream named "default"). T his is a great way to identify a binary, as in this example, we want to identify signals. In the topology definition, you add a second data stream to word-counter bolt to receive each tutual that is sent from signals-spout traffic to all bolt instances.

    builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGroupint("word-normalizer",new Fields("word"))
           .allGrouping("signals-spout","signals");   

For the implementation of signals-spout, refer to the git repository.

Custom data flow groups

You can create custom data flow groups by implementing the backtype.storm.grouping.CustormStreamGrouping interface, allowing you to decide for yourself which bolts receive which tudds.

Let's modify the word counter example so that words with the same initials are received by the same bolt.

    public class ModuleGrouping mplents CustormStreamGrouping, Serializable{
        int numTasks = 0;

        @Override
        public List<Integer> chooseTasks(List<Object> values) {
            List<Integer> boltIds = new ArrayList<Integer>();
            if(values.size()>0){
                String str = values.get(0).toString();
                if(str.isEmpty()){
                    boltIds.add(0);
                }else{
                    boltIds.add(str.charAt(0) % numTasks);
                }
            }
            return boltIds;
        }

        @Override
        public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
            numTasks = targetTasks.size();
        }
    }  

This is a simple implementation of CustomStreamGrouping, where we decide to receive the bolt of the tute using the integer value of the initial character and the remaining number of tasks.

This custom data flow group can be used by word-normalizer modification as follows.

    builder.setBolt("word-normalizer", new WordNormalizer())
           .customGrouping("word-reader", new ModuleGrouping());  

Direct data flow groups

This is a special data flow group that the data source can use to determine which component receives the group. S imilar to the previous example, the data source determines which bolt receives the tuogroup based on the initials of the word. To use direct data flow groups, in WordNormalizer bolt, use the emitDirect method instead of emit.

    public void execute(Tuple input) {
        ...
        for(String word : words){
            if(!word.isEmpty()){
                ...
                collector.emitDirect(getWordCountIndex(word),new Values(word));
            }
        }
        //对元组做出应答
        collector.ack(input);
    }

    public Integer getWordCountIndex(String word) {
        word = word.trim().toUpperCase();
        if(word.isEmpty()){
            return 0;
        }else{
            return word.charAt(0) % numCounterTasks;
        }
    }  

The number of tasks is calculated in the prepare method

    public void prepare(Map stormConf, TopologyContext context, 
                OutputCollector collector) {
        this.collector = collector;
        this.numCounterTasks = context.getComponentTasks("word-counter");
    }  

Specifying the data stream in the topology definition will be grouped directly:

    builder.setBolt("word-counter", new WordCounter(),2)
           .directGrouping("word-normalizer");  

The global data flow group

A global data flow group sends a group created by all data sources to a single target instance (that is, a task with the lowest ID).

Do not group

When writing this book (Stom0.7.1), this data flow group is equivalent to a random data flow group. That is, when you use this data flow group, you don't care how the traffic is grouped.

LocalCluster VS StormSubmitter

So far, you've run a topology on your local machine with a tool called LocalCluster. S torm's basic tools make it easy for you to run and debug different topology on your own computer. B ut how do you submit your topology to a running Storm cluster? S torm has an interesting feature, and it's easy to run your own topology on a real cluster. To do this, you need to replace LocalCluster with StormSubmitter and implement the submitTopology method, which is responsible for sending the topology to the cluster.

Here's the modified code:

    //LocalCluster cluster = new LocalCluster();
    //cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf, 
    //builder.createTopology());
    StormSubmitter.submitTopology("Count-Word-Topology-With_Refresh-Cache", conf,
            builder.createTopology());
    //Thread.sleep(1000);
    //cluster.shutdown();  

NOTE: When you use StormSubmitter, you can't control the cluster through code as you would with LocalCluster.

Next, compress the source code into a jar package, run the Storm client command, and submit the topology to the cluster. If you've already used Maven, you just need to go to the source directory on the command line to run: mvn package.

Now that you have generated a jar package, submit the topology using the storm jar command (see Appendix A for information on how to install the Storm client). Command format: storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3.

For this example, run below the topology directory:

storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt  

With these commands, you publish the topology cluster.

If you want to stop or kill it, run:

storm kill Count-Word-Topology-With-Refresh-Cache  

NOTE: Topology names must be unique.

NOTE: How to install the Storm client, refer to Appendix A

DRPC topology

There is a special topology type called Distributed Remote Procedure Call (DRPC), which leverages Storm's distributed nature to perform remote procedure calls (RPCs) (see figure below). S torm provides tools for implementing DRPC. T he first is the DRPC server, which acts like a connector between the client and the Storm topology, acting as the data source for the topology's spout. I t receives a function and function argument to be executed, and then for each block of data for a function operation, the server assigns a request ID through the topology to identify the RPC request. When the topology executes the final bolt, it must assign the RPC request ID and the result so that the DRPC server returns the result to the correct client.

Storm topology

NOTE: A single-instance DRPC server can perform many functions. Each function is identified by a unique name.

Storm's second tool, which has been used in the example, is LineDRPCTopologyBuilder, an abstract concept that helps build the DRPC topology. T he resulting topology creates DRPCSpouts -- which connect to the DRPC server and distributes data to other parts of the topology -- and wraps the bolts so that the results are returned from the last bolt. Perform all bolts added to the LinearDRPCTopologyBuilder object in turn.

As an example of this type of topology, we created a process that performs addition operations. Although this is a simple example, the concept can be extended to complex distributed computing.

Bolt declares the output as follows:

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id","result"));
    }  

Because this is the only bolt in the topology, it must publish the RPC ID and the results. The execute method is responsible for performing addition operations.

    public void execute(Tuple input) {
        String[] numbers = input.getString(1).split("\\+");
        Integer added = 0;
        if(numbers.length<2){
            throw new InvalidParameterException("Should be at least 2 numbers");
        }
        for(String num : numbers){
            added += Integer.parseInt(num);
        }
        collector.emit(new Values(input.getValue(0),added));
    }  

The topology that contains the addition bolt is defined as follows:

    public static void main(String[] args) {
        LocalDRPC drpc = new LocalDRPC();

        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");
        builder.addBolt(AdderBolt(),2);

        Config conf = new Config();
        conf.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("drpcder-topology", conf,
            builder.createLocalTopology(drpc));
        String result = drpc.execute("add", "1+-1");
        checkResult(result,0);
        result = drpc.execute("add", "1+1+5+10");
        checkResult(result,17);

        cluster.shutdown();
        drpc.shutdown();
    }  

Create a LocalDRPC object to run the DRPC server locally. N ext, create a topology builder to add bolts to the topology. Run the execute method of the DRPC object (LocalDRPC object) to test the topology.

NOTE: Use the DRPCClient class to connect to a remote DRPC server. D rPC servers expose Thrift APIs, so they can be programmed across languages, and their APIs are the same whether they run DRPC servers locally or remotely. For Storm clusters with STORM configuration parameters, createRemoteTopology, which calls the builder object, submits a topology to the Storm cluster instead of calling CreateLocalTopology.