Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Storm is developed in a non-JVM language


May 17, 2021 Storm getting Started


Table of contents


Developed in a non-JVM language

Sometimes you might want to develop a Storm project in a language that isn't JVM-based, or you might prefer to use a different language or a library written in one language.

Storm is implemented in Java, and all the spouts and bolts you see in this book are written in Java. S o is it possible to write spout and bolt in a language like Python, Ruby, or JavaScript? The answer is yes

OK! This can be achieved using multilingual protocols.

A multilingual protocol is a special protocol implemented by Storm that uses standard inputs and outputs as a communication channel between spout and bolt processes. Messages are delivered in JSON or plain text format in the channel.

Let's look at a simple example of developing spout and bolt in a non-JVM language. In this example there is a spout that produces numbers from 1 to 10,000, and a bolt filter prime number, both of which are implemented with PHP.

NOTE : In this example, we use a very stupid method to verify prime numbers. There are better and certainly more complex methods that are beyond the scope of this example.

There is a PHP DSL specifically implemented for Storm, and we'll show our implementation in an example. Start by defining the topology.

1
...
2
TopologyBuilder builder = new TopologyBuilder();
3
builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000));
4
builder.setBolt("prime-numbers-filter", new
5
PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator");
6
StormTopology topology = builder.createTopology();
7
...  

NOTE: There is a way to define a topology in a non-JVM language. N ow that the Storm topology is a Thrift schema and Nimbus is a Thrift daemon, you can create and submit the topology in any language you want. But this is beyond the scope of the book.

There's nothing new here. Let's look at the implementation of NumbersGeneratorSpout.

01
public class NumberGeneratorSpout extends ShellSpout implements IRichSpout {
02
    public NumberGeneratorSpout(Integer from, Integer to) {
03
       super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to.toString());
04
    }
05
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
06
        declarer.declare(new Fields("number"));
07
    }
08
    public Map<String, Object> getComponentConfiguration() {
09
        return null;
10
    }
11
}  

As you may have noticed, this spout inherits shellSpout. T his is a special class provided by Storm to help you run and control spout written in other languages. In this case it tells Storm how to execute your PHP script.

NumberGeneratorSpout's PHP script distributes the metapons to the standard output and reads the confirmation or failure signal from the standard input.

Before you start implementing the NumberGeneratorSpout .php script, take a closer look at how multilingual protocols work.

Spout generates numbers from from to in order of the parameters passed to the constructor.

Next, take a look at PrimeNumbers Filter Bolt. T his class implements the shell mentioned earlier. I t tells Storm how to execute your PHP script. Storm provides a special class called ShellBolt for this purpose, and the only thing you have to do is point out how to run the script and declare the properties to distribute.

1
public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt {
2
    public PrimeNumbersFilterBolt() {
3
        super("php", "-f", "PrimeNumbersFilterBolt.php");
4
    }
5
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
6
        declarer.declare(new Fields("number"));
7
    }
8
}  

In this constructor just tell Storm how to run the PHP script. It is equivalent to the following command.

1
php -f PrimeNumbersFilterBolt.php  

PrimeNumbersFilterBolt.php Script reads the mets from the standard inputs, processes them, and then distributes, acknowledges, or fails to the standard output. Before we start this script, let's learn more about how multilingual protocols work.

  1. Start a handshake
  2. Start the loop
  3. Read/write to the metagroup

NOTE: There is a special way to log in your script using Storm's built-in logging mechanism, so you don't need to implement the logging system yourself.

Let's take a look at the details of each of these steps and how to implement it with PHP.

Start a handshake

In order to control the entire process (start and end it), Storm needs to know the script process number (PID) it executes. A ccording to the multilingual protocol, the first thing that happens at the beginning of your process is that Storm sends a piece of JSON data to the standard input (which, as the context understands, is understood from a non-JVM language perspective, and the standard input mentioned here is the standard input of PHP), which contains the Storm configuration, topology context, and a process number directory. It looks like this:

{
    "conf": {
        "topology.message.timeout.secs": 3,
        // etc
    },
    "context": {
        "task->component": {
            "1": "example-spout",
            "2": "__acker",
            "3": "example-bolt"
        },
        "taskid": 3
    },
    "pidDir": "..."
}  

The script process must create a file under its own process number in the directory specified by pidDir and write the process number to standard output in JSON format.

For example, if you receive /tmp/example,n and your script process number is 123, you should create an empty file named /tmp/example/123 and print the line of text to the standard output. T his allows Storm to keep track of the process number and kill the script process when it is turned off. Here's the PHP implementation:

1
$config = json_decode(read_msg(), true);
2
$heartbeatdir = $config['pidDir'];
3
$pid = getmypid();
4
fclose(fopen("$heartbeatdir/$pid", "w"));
5
storm_send(["pid"=>$pid]);
6
flush();  

You've implemented a function read_msg to handle messages read from standard inputs. A ccording to the declaration of a multilingual protocol, a message can be a single-line or multi-line JSON text. A message ends with end\n.

01
function read_msg() {
02
    $msg = "";
03
    while(true) {
04
        $l = fgets(STDIN);
05
        $line = substr($l,0,-1);
06
        if($line=="end") {
07
            break;
08
        }
09
        $msg = "$msg$line\n";
10
    }
11
    return substr($msg, 0, -1);
12
}
13
function storm_send($json) {
14
    write_line(json_encode($json));
15
    write_line("end");
16
}
17
function write_line($line) {
18
    echo("$line\n");
19
}  

NOTE: The flush() method is very important; T his means that your script may hang forever to wait for an input from Storm, which is waiting for output from your script. So it's important to empty the buffer as soon as your script has content output.

Start the loop and read/write the group

This is the most important step in the whole work. The implementation of this step depends on the spout and bolt you develop.

If it's spout, you should start distributing the d'groups. If it is bolt, cycle through tuodies, process them, distribute them, and confirm success or failure.

Let's take a look at the spout used to distribute the numbers.

01
$from = intval($argv[1]);
02
$to = intval($argv[2]);
03
while(true) {
04
    $msg = read_msg();
05
    $cmd = json_decode($msg, true);
06
    if ($cmd['command']=='next') {
07
        if ($from<$to) {
08
            storm_emit(array("$from"));
09
            $task_ids = read_msg();
10
            $from++;
11
        } else {
12
            sleep(1);
13
        }
14
    }
15
    storm_sync();
16
}  

Get the arguments from and to from the command line and start the iteration. Get one next message at a time from Storm, which means you're ready to distribute the next metagroup.

Once you've sent all the numbers and there's no more futons to send, sleep for a while.

To ensure that the script is ready to send the next metagroup, Storm waits for the sync-n line of text before sending the next one. Call read_msg(), read a command, and resolve JSON.

For bolts, there is a slight difference.

01
while(true) {
02
    $msg = read_msg();
03
    $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
04
    if (!empty($tuple["id"])) {
05
        if (isPrime($tuple["tuple"][0])) {
06
            storm_emit(array($tuple["tuple"][0]));
07
        }
08
        storm_ack($tuple["id"]);
09
    }
10
}  

The loop reads the mets from the standard input. Resolve read each JSON message to determine whether it is a metagroup, if so, check if it is not a prime number, if it is a prime number to distribute a cell again, otherwise it will be ignored, and finally, no matter how to confirm success.

NOTE: The json_decode used in the JSON_BIGINT_AS_STRING function is intended to solve a data conversion problem between JAVA and PHP. S ome of the large numbers sent by JAVA lose precision in PHP, which can cause problems. T o avoid this problem, tell PHP to treat large numbers as strings and not use double quotes when outputing numbers in JSON messages. PhP5.4.0 or later requires this parameter.

Emit, ack, fail, and log messages are all structured as follows:

emit

{
    "command": "emit",
    "tuple": ["foo", "bar"]
}  

The array contains the metagroup data that you distribute.

Ack

{
    "command": "ack",
    "id": 123456789
}  

Where the id is the ID of the group you are working on.

fail

{
    "command": "fail",
    "id": 123456789
}   

It's the same as ack, where the id is the group ID you're working with, as is the case with the content of the upper and lower JSON and the functionality of each method.

log

{
    "command": "log",
    "msg": "some message to be logged by storm."
}   

Here's the full PHP code.

001
//你的spout:
002
<?php
003
function read_msg() {
004
    $msg = "";
005
    while(true) {
006
        $l = fgets(STDIN);
007
        $line = substr($l,0,-1);
008
        if ($line=="end") {
009
            break;
010
        }
011
        $msg = "$msg$line\n";
012
    }
013
    return substr($msg, 0, -1);
014
}
015
function write_line($line) {
016
    echo("$line\n");
017
}
018
function storm_emit($tuple) {
019
    $msg = array("command" => "emit", "tuple" => $tuple);
020
    storm_send($msg);
021
}
022
function storm_send($json) {
023
    write_line(json_encode($json));
024
    write_line("end");
025
}
026
function storm_sync() {
027
    storm_send(array("command" => "sync"));
028
}
029
function storm_log($msg) {
030
    $msg = array("command" => "log", "msg" => $msg);
031
    storm_send($msg);
032
    flush();
033
}
034
$config = json_decode(read_msg(), true);
035
$heartbeatdir = $config['pidDir'];
036
$pid = getmypid();
037
fclose(fopen("$heartbeatdir/$pid", "w"));
038
storm_send(["pid"=>$pid]);
039
flush();
040
$from = intval($argv[1]);
041
$to = intval($argv[2]);
042
while(true) {
043
    $msg = read_msg();
044
    $cmd = json_decode($msg, true);
045
    if ($cmd['command']=='next') {
046
        if ($from<$to) {
047
            storm_emit(array("$from"));
048
            $task_ids = read_msg();
049
            $from++;
050
        } else {
051
            sleep(1);
052
        }
053
    }
054
    storm_sync();
055
}
056
?>
057
//你的bolt:
058
<?php
059
function isPrime($number) {
060
    if ($number < 2) {
061
        return false;
062
    }
063
    if ($number==2) {
064
        return true;
065
    }
066
    for ($i=2; $i<=$number-1; $i++) {
067
        if ($number % $i == 0) {
068
            return false;
069
        }
070
    }
071
    return true;
072
}
073
function read_msg() {
074
    $msg = "";
075
    while(true) {
076
        $l = fgets(STDIN);
077
        $line = substr($l,0,-1);
078
        if ($line=="end") {
079
            break;
080
        }
081
        $msg = "$msg$line\n";
082
    }
083
    return substr($msg, 0, -1);
084
}
085
function write_line($line) {
086
    echo("$line\n");
087
}
088
function storm_emit($tuple) {
089
    $msg = array("command" => "emit", "tuple" => $tuple);
090
    storm_send($msg);
091
}
092
function storm_send($json) {
093
    write_line(json_encode($json));
094
    write_line("end");
095
}
096
function storm_ack($id) {
097
    storm_send(["command"=>"ack", "id"=>"$id"]);
098
}
099
function storm_log($msg) {
100
    $msg = array("command" => "log", "msg" => "$msg");
101
    storm_send($msg);
102
}
103
$config = json_decode(read_msg(), true);
104
$heartbeatdir = $config['pidDir'];
105
$pid = getmypid();
106
fclose(fopen("$heartbeatdir/$pid", "w"));
107
storm_send(["pid"=>$pid]);
108
flush();
109
while(true) {
110
    $msg = read_msg();
111
    $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
112
    if (!empty($tuple["id"])) {
113
        if (isPrime($tuple["tuple"][0])) {
114
            storm_emit(array($tuple["tuple"][0]));
115
        }
116
        storm_ack($tuple["id"]);
117
    }
118
}
119
?>  

NOTE: It is important to note that all script files should be saved in a subdirecte called multilang/resources in your engineering directory. T his subdirect directory is included in the jar file sent to the worker process. I f you don't include scripts in this directory, Storm can't run them and throw an error.