May 17, 2021 Storm getting Started
Sometimes you might want to develop a Storm project in a language that isn't JVM-based, or you might prefer to use a different language or a library written in one language.
Storm is implemented in Java, and all the spouts and bolts you see in this book are written in Java. S o is it possible to write spout and bolt in a language like Python, Ruby, or JavaScript? The answer is yes
OK! This can be achieved using multilingual protocols.
A multilingual protocol is a special protocol implemented by Storm that uses standard inputs and outputs as a communication channel between spout and bolt processes. Messages are delivered in JSON or plain text format in the channel.
Let's look at a simple example of developing spout and bolt in a non-JVM language. In this example there is a spout that produces numbers from 1 to 10,000, and a bolt filter prime number, both of which are implemented with PHP.
NOTE : In this example, we use a very stupid method to verify prime numbers. There are better and certainly more complex methods that are beyond the scope of this example.
There is a PHP DSL specifically implemented for Storm, and we'll show our implementation in an example. Start by defining the topology.
1
...
2
TopologyBuilder builder = new TopologyBuilder();
3
builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000));
4
builder.setBolt("prime-numbers-filter", new
5
PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator");
6
StormTopology topology = builder.createTopology();
7
...
NOTE: There is a way to define a topology in a non-JVM language. N ow that the Storm topology is a Thrift schema and Nimbus is a Thrift daemon, you can create and submit the topology in any language you want. But this is beyond the scope of the book.
There's nothing new here. Let's look at the implementation of NumbersGeneratorSpout.
01
public class NumberGeneratorSpout extends ShellSpout implements IRichSpout {
02
public NumberGeneratorSpout(Integer from, Integer to) {
03
super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to.toString());
04
}
05
public void declareOutputFields(OutputFieldsDeclarer declarer) {
06
declarer.declare(new Fields("number"));
07
}
08
public Map<String, Object> getComponentConfiguration() {
09
return null;
10
}
11
}
As you may have noticed, this spout inherits shellSpout. T his is a special class provided by Storm to help you run and control spout written in other languages. In this case it tells Storm how to execute your PHP script.
NumberGeneratorSpout's PHP script distributes the metapons to the standard output and reads the confirmation or failure signal from the standard input.
Before you start implementing the NumberGeneratorSpout .php script, take a closer look at how multilingual protocols work.
Spout generates numbers from from to in order of the parameters passed to the constructor.
Next, take a look at PrimeNumbers Filter Bolt. T his class implements the shell mentioned earlier. I t tells Storm how to execute your PHP script. Storm provides a special class called ShellBolt for this purpose, and the only thing you have to do is point out how to run the script and declare the properties to distribute.
1
public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt {
2
public PrimeNumbersFilterBolt() {
3
super("php", "-f", "PrimeNumbersFilterBolt.php");
4
}
5
public void declareOutputFields(OutputFieldsDeclarer declarer) {
6
declarer.declare(new Fields("number"));
7
}
8
}
In this constructor just tell Storm how to run the PHP script. It is equivalent to the following command.
1
php -f PrimeNumbersFilterBolt.php
PrimeNumbersFilterBolt.php Script reads the mets from the standard inputs, processes them, and then distributes, acknowledges, or fails to the standard output. Before we start this script, let's learn more about how multilingual protocols work.
NOTE: There is a special way to log in your script using Storm's built-in logging mechanism, so you don't need to implement the logging system yourself.
Let's take a look at the details of each of these steps and how to implement it with PHP.
In order to control the entire process (start and end it), Storm needs to know the script process number (PID) it executes. A ccording to the multilingual protocol, the first thing that happens at the beginning of your process is that Storm sends a piece of JSON data to the standard input (which, as the context understands, is understood from a non-JVM language perspective, and the standard input mentioned here is the standard input of PHP), which contains the Storm configuration, topology context, and a process number directory. It looks like this:
{
"conf": {
"topology.message.timeout.secs": 3,
// etc
},
"context": {
"task->component": {
"1": "example-spout",
"2": "__acker",
"3": "example-bolt"
},
"taskid": 3
},
"pidDir": "..."
}
The script process must create a file under its own process number in the directory specified by pidDir and write the process number to standard output in JSON format.
For example, if you receive /tmp/example,n and your script process number is 123, you should create an empty file named /tmp/example/123 and print the line of text to the standard output. T his allows Storm to keep track of the process number and kill the script process when it is turned off. Here's the PHP implementation:
1
$config = json_decode(read_msg(), true);
2
$heartbeatdir = $config['pidDir'];
3
$pid = getmypid();
4
fclose(fopen("$heartbeatdir/$pid", "w"));
5
storm_send(["pid"=>$pid]);
6
flush();
You've implemented a function read_msg to handle messages read from standard inputs. A ccording to the declaration of a multilingual protocol, a message can be a single-line or multi-line JSON text. A message ends with end\n.
01
function read_msg() {
02
$msg = "";
03
while(true) {
04
$l = fgets(STDIN);
05
$line = substr($l,0,-1);
06
if($line=="end") {
07
break;
08
}
09
$msg = "$msg$line\n";
10
}
11
return substr($msg, 0, -1);
12
}
13
function storm_send($json) {
14
write_line(json_encode($json));
15
write_line("end");
16
}
17
function write_line($line) {
18
echo("$line\n");
19
}
NOTE: The flush() method is very important; T his means that your script may hang forever to wait for an input from Storm, which is waiting for output from your script. So it's important to empty the buffer as soon as your script has content output.
This is the most important step in the whole work. The implementation of this step depends on the spout and bolt you develop.
If it's spout, you should start distributing the d'groups. If it is bolt, cycle through tuodies, process them, distribute them, and confirm success or failure.
Let's take a look at the spout used to distribute the numbers.
01
$from = intval($argv[1]);
02
$to = intval($argv[2]);
03
while(true) {
04
$msg = read_msg();
05
$cmd = json_decode($msg, true);
06
if ($cmd['command']=='next') {
07
if ($from<$to) {
08
storm_emit(array("$from"));
09
$task_ids = read_msg();
10
$from++;
11
} else {
12
sleep(1);
13
}
14
}
15
storm_sync();
16
}
Get the arguments from and to from the command line and start the iteration. Get one next message at a time from Storm, which means you're ready to distribute the next metagroup.
Once you've sent all the numbers and there's no more futons to send, sleep for a while.
To ensure that the script is ready to send the next metagroup, Storm waits for the sync-n line of text before sending the next one. Call read_msg(), read a command, and resolve JSON.
For bolts, there is a slight difference.
01
while(true) {
02
$msg = read_msg();
03
$tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
04
if (!empty($tuple["id"])) {
05
if (isPrime($tuple["tuple"][0])) {
06
storm_emit(array($tuple["tuple"][0]));
07
}
08
storm_ack($tuple["id"]);
09
}
10
}
The loop reads the mets from the standard input. Resolve read each JSON message to determine whether it is a metagroup, if so, check if it is not a prime number, if it is a prime number to distribute a cell again, otherwise it will be ignored, and finally, no matter how to confirm success.
NOTE: The json_decode used in the JSON_BIGINT_AS_STRING function is intended to solve a data conversion problem between JAVA and PHP. S ome of the large numbers sent by JAVA lose precision in PHP, which can cause problems. T o avoid this problem, tell PHP to treat large numbers as strings and not use double quotes when outputing numbers in JSON messages. PhP5.4.0 or later requires this parameter.
Emit, ack, fail, and log messages are all structured as follows:
emit
{
"command": "emit",
"tuple": ["foo", "bar"]
}
The array contains the metagroup data that you distribute.
Ack
{
"command": "ack",
"id": 123456789
}
Where the id is the ID of the group you are working on.
fail
{
"command": "fail",
"id": 123456789
}
It's the same as ack, where the id is the group ID you're working with, as is the case with the content of the upper and lower JSON and the functionality of each method.
log
{
"command": "log",
"msg": "some message to be logged by storm."
}
Here's the full PHP code.
001
//你的spout:
002
<?php
003
function read_msg() {
004
$msg = "";
005
while(true) {
006
$l = fgets(STDIN);
007
$line = substr($l,0,-1);
008
if ($line=="end") {
009
break;
010
}
011
$msg = "$msg$line\n";
012
}
013
return substr($msg, 0, -1);
014
}
015
function write_line($line) {
016
echo("$line\n");
017
}
018
function storm_emit($tuple) {
019
$msg = array("command" => "emit", "tuple" => $tuple);
020
storm_send($msg);
021
}
022
function storm_send($json) {
023
write_line(json_encode($json));
024
write_line("end");
025
}
026
function storm_sync() {
027
storm_send(array("command" => "sync"));
028
}
029
function storm_log($msg) {
030
$msg = array("command" => "log", "msg" => $msg);
031
storm_send($msg);
032
flush();
033
}
034
$config = json_decode(read_msg(), true);
035
$heartbeatdir = $config['pidDir'];
036
$pid = getmypid();
037
fclose(fopen("$heartbeatdir/$pid", "w"));
038
storm_send(["pid"=>$pid]);
039
flush();
040
$from = intval($argv[1]);
041
$to = intval($argv[2]);
042
while(true) {
043
$msg = read_msg();
044
$cmd = json_decode($msg, true);
045
if ($cmd['command']=='next') {
046
if ($from<$to) {
047
storm_emit(array("$from"));
048
$task_ids = read_msg();
049
$from++;
050
} else {
051
sleep(1);
052
}
053
}
054
storm_sync();
055
}
056
?>
057
//你的bolt:
058
<?php
059
function isPrime($number) {
060
if ($number < 2) {
061
return false;
062
}
063
if ($number==2) {
064
return true;
065
}
066
for ($i=2; $i<=$number-1; $i++) {
067
if ($number % $i == 0) {
068
return false;
069
}
070
}
071
return true;
072
}
073
function read_msg() {
074
$msg = "";
075
while(true) {
076
$l = fgets(STDIN);
077
$line = substr($l,0,-1);
078
if ($line=="end") {
079
break;
080
}
081
$msg = "$msg$line\n";
082
}
083
return substr($msg, 0, -1);
084
}
085
function write_line($line) {
086
echo("$line\n");
087
}
088
function storm_emit($tuple) {
089
$msg = array("command" => "emit", "tuple" => $tuple);
090
storm_send($msg);
091
}
092
function storm_send($json) {
093
write_line(json_encode($json));
094
write_line("end");
095
}
096
function storm_ack($id) {
097
storm_send(["command"=>"ack", "id"=>"$id"]);
098
}
099
function storm_log($msg) {
100
$msg = array("command" => "log", "msg" => "$msg");
101
storm_send($msg);
102
}
103
$config = json_decode(read_msg(), true);
104
$heartbeatdir = $config['pidDir'];
105
$pid = getmypid();
106
fclose(fopen("$heartbeatdir/$pid", "w"));
107
storm_send(["pid"=>$pid]);
108
flush();
109
while(true) {
110
$msg = read_msg();
111
$tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
112
if (!empty($tuple["id"])) {
113
if (isPrime($tuple["tuple"][0])) {
114
storm_emit(array($tuple["tuple"][0]));
115
}
116
storm_ack($tuple["id"]);
117
}
118
}
119
?>
NOTE: It is important to note that all script files should be saved in a subdirecte called multilang/resources in your engineering directory. T his subdirect directory is included in the jar file sent to the worker process. I f you don't include scripts in this directory, Storm can't run them and throw an error.