Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Storm Bolts


May 17, 2021 Storm getting Started


Table of contents


Bolts

As you can see, bolts are a key component in a Storm cluster. In this chapter you'll learn about the bolt lifecycle, some bolt design strategies, and a few examples of that.

Bolt life cycle

Bolt is a component that treats tudes as inputs and then produces new tuds as output. W hen implementing a bolt, you typically need to implement the IRichBolt interface. B olts objects are created by client machines, serialized into topology, and submitted to hosts in the cluster. The cluster then starts the worker process to reverse-serialize the bolt, calls the prepare, and finally starts processing tuddies.

NOTE: To create a bolt object that initializes member properties through constructor parameters, when bolts are committed to the cluster, the values are serialized along with each other.

Bolt structure

Bolts has the following methods:

declareOutputFields(OutputFieldsDeclarer declarer)
    为bolt声明输出模式
prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)
    仅在bolt开始处理元组之前调用
execute(Tuple input)
    处理输入的单个元组
cleanup()
    在bolt即将关闭时调用  

Here's an example in which bolt splits a sentence into a list of words:

class SplitSentence implements IRichBolt {
    private OutputCollector collector;
    publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void cleanup(){}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}  

As you can see, this is a very simple bolt. I t is worth mentioning that in this example, there is no message guarantee. This means that if the bolt drops some message for some reason -- whether because the bolt hangs or because the program intentionally drops -- the spout that generates the message will not receive any notification, nor will any other spouts and bolts.

In many cases, however, you want to make sure that messages are processed throughout the topology.

Reliable bolts and unreliable bolts

As mentioned earlier, Storm guarantees that every message sent through spout will be fully processed by all bolts. Based on design considerations, this means that you have to decide for yourself whether your bolts guarantee this.

A topology is a tree structure in which messages (tudals) pass through one or more of these branches. E ach node on the tree calls ack (tuple) or fail (tuple), so Storm knows if a message has failed and notifies that/those spouts that made it. S ince a Storm topology runs in a highly parallelized environment, the best way to track originating spout instances is to include an originating spout reference within the message tumb. T his technique is called anchoring. Modify splitSentence, which you just talked about, to ensure that messages are processed.

class SplitSentence implenents IRichBolt {
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(tuple, new Values(word));
        }
        collector.ack(tuple);
    }

    public void cleanup(){}

    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declar.declare(new Fields("word"));
    }
}  

Anchoring occurs when collector.emit() is called. A s mentioned earlier, Storm can trace the origin spout along the metagroup. ple) and collector.fail tell spout what happens to each message. W hen every message on the tree has been processed, Storm thinks the metagroup from spout is fully processed. I f a group does not complete processing of the message tree within the set timeout time, the group processing is considered to have failed. The default timeout is 30 seconds.

NOTE: You can modify Config.TOPOLOGY_MESSAGE_TIMEOUT timeout of the topology by modifying the data.

Of course, spout needs to consider the failure of the message and retry or discard the message accordingly.

NOTE: Each message you process is either confirmed or failed. Storm uses memory to track each group, so if you don't call both methods, the task will eventually run out of memory.

Multiple data streams

A bolt can use emit (streamId, tuple) to distribute tuples to multiple streams, where the argument streamId is a string used to identify the stream. You can then decide at TopologyBuilder which stream subscribes to it.

Multi-anchoring

In order to connect or aggregate data streams with bolts, you need to buffer tudgies with memory. T o ensure that the message is complete in this scenario, you have to anchor the stream to multiple groups. You can pass in a list of d'ites to the emit method to achieve your goal.

...
List anchors = new ArrayList();
anchors.add(tuple1);
anchors.add(tuple2);
collector.emit(anchors, values);
...  

In this way, bolt calls the ack or fail method at any time, notifies the message tree, and all related spouts are notified because the stream anchors multiple tuus.

Use IBasicBolt to confirm automatically

As you may have noticed, message confirmation is required in many cases. S imply put, Storm provides another interface for implementing bolt, IBasicBolt. For objects of the implementation class of the interface, the ack method is automatically called after the execute method is executed.

class SplitSentence extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}  

NOTE: BasicOutputCollector, which distributes messages, is automatically anchored to the metagroup that is passed in as an argument.