Coding With Fun
Home Docker Django Node.js Articles Python pip guide FAQ Policy

Apache Storm Trident


May 26, 2021 Apache Storm


Table of contents


Trident is an extension of Storm. L ike Storm, Trident was also developed by Twitter. es.

Trident uses spout and bolt, but these lower-level components are automatically generated by Trident before execution. Trident has functions, filters, joins, groupings, and aggregations.

Trident processes the stream as a series of batches, called transactions. ts. n this way, Trident is different from Storm in that it performs a group of units.

The batch concept is very similar to database transactions. E ach transaction is assigned a transaction ID. T he transaction is considered successful once all of its processing is complete. nd. or each batch, Trident calls beginCommit at the beginning of the transaction and commits at the end.

Trident topology

The Trident API exposes a simple option to create a Trident topology using the Trident Topology class. Basically, the Trident topology receives the input stream from the outflow and performs an orderly sequence of operations (filtering, aggregation, grouping, etc.) on the flow. The Storm tuogroup is replaced by the Trident tuogroup and the bolt is replaced by the operation. A simple Trident topology can be created as follows -

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident Tuples is a list of named values. T he Trident Tuple interface is a data model for trident topology. The TridentTuple interface is the basic unit of data that can be processed by trident topology.

Trident Spout

Trident spout is similar to Storm spout, with additional options to use Trident's functionality. I n fact, we can still use IRichSpout, which we use in storm topology, but it is inherently non-transactional and we will not be able to use the benefits provided by Trident.

The basic spout with all the features that use Trident's features is "ITridentSpout". ut. he other spouts are IBatchSpout, IPartitioned TridentSpout and IOpaquePartitioned TridentSpout.

In addition to these generic spouts, Trident has many samples implemented by Trident spout. tc. we can use to send a named list of trident tuples without worrying about batching, parallelity, etc.

FeederBatchSpout creation and data feeds can be done as follows -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident operation

Trident relies on "Trident operations" to handle the input flow of Trident tuples. b20> ns. trident tuples grouping and aggregation. L et's go through the most important and frequently used operations.

Filter

A filter is an object that performs an input validation task. The Trident filter gets a subset of the trident tuples field as input and returns true or false depending on whether certain conditions are met. If true is returned, the unit is saved in the output stream; Otherwise, the group is removed from the stream. The filter will basically inherit from the BaseFilter class and implement the isKey method. Here is an example implementation of a filter operation -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

You can use the "each" method to call the filter functionality in the topology. T he Fields class can be used to specify inputs (a subset of trident tuple). The sample code is as follows -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Function

A function is an object that performs simple operations on a single trident tuple. It requires a subset of the trident tuple field and emits zero or more new trident tuple fields.

The function basically inherits and implements the execute method from the BaseFunction class. Here's an example implementation:

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Similar to filtering operations, you can use each method to invoke function operations in your topology. The sample code is as follows -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Polymerization

Aggregation is an object that performs aggregation operations on an input batch or partition or stream. T rident has three types of aggregations. They are as follows -

  • aggregate - aggregation of each batch of trident tuple individually. D uring the aggregation process, the groups are first re-partitioned using global grouping to combine all partitions of the same batch into a single partition.

  • partitionAggregate - aggregates each partition instead of the entire trident tuple. T he output of the partition collection completely replaces the input group. T he output of the partition collection contains a single field group.

  • Persistentaggregate - Aggregates all trident tuples in all batches and stores the results in memory or database.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Aggregation operations can be created using the Combiner Aggregator, Reducer Aggregator, or generic Aggregator interfaces. The Count aggregator used in the example above is one of the built-in aggregators, implemented using the "CombinerAgregator" implementation, which is as follows -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Group

A grouping operation is a built-in operation that can be called by the groupBy method. The groupBy method repartitions the flow by executing the partyBy on the specified field, and then in each partition, it groups the groups of group fields equal. Typically, we use "groupBy" and "persistentAggregate" to get group aggregations. The sample code is as follows -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Merge and connect

Merges and connections can be done by using the Merge and Connect methods, respectively. Merge one or more streams. Joining is similar to merging, except for the fact that the trident tuple field from both sides is used to check and connect the two streams. In addition, joining will only work at the bulk level. The sample code is as follows -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

State maintenance

Trident provides a mechanism for state maintenance. State information can be stored in the topology itself, otherwise it can be stored in a separate database. The reason is to maintain a state and retry the failed group if any of the metagroups fail during processing. This can cause problems when updating the status because you are not sure whether the status of this metagroup has been updated before. If the metagroup has failed before the state is updated, retrying the yuan group stabilizes the state. However, if the metagroup fails after updating the state, retrying the same group increases the count in the database again and makes the state unstable. The following steps are required to ensure that messages are processed only once -

  • Small batch processing of odd groups.

  • Assign a unique ID to each batch. If the batch is retried, the same unique ID is given.

  • Status updates are sorted between batches. For example, a second batch of status updates would not be possible until the status updates for the first batch were complete.

Distributed RPC

Distributed RPC is used to query and retrieve Trident topology results. S torm has a built-in distributed RPC server. T he distributed RPC server receives RPC requests from the client and passes them to the topology. b20> el.

When do I use Trident?

In many use cases, if the requirement is to process a query only once, we can do so by writing a topology in Trident. b20> herefore, Trident will be useful for use cases that require one-time processing. te.

Trident's working instance

We'll convert the Call Log Analyzer application developed in the last section to the Trident framework. T hanks to its advanced API, Trident applications will be easier than normal storms. b20> inally, we'll start the DRPC server using the LocalDRPC class and search for some keywords using the Excelcute method of the LocalDRPC class.

Format the call information

The purpose of the FormatCall class is to format call information that includes caller numbers and recipient numbers. The complete program code is as follows -

Code: FormatCall .java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

The purpose of the CSVSplit class is to split the input string based on "comma(,)" and emit each word in the string. T his function is used to resolve input parameters for distributed queries. The complete code is as follows -

Code: CSVSplit .java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log analyzer

This is the main application. I nitially, the application will use FeederBatchSpout to initialize Trident Topology and provide caller information. T rident topology flows can be created using the newStream method of the Trident Topology class. b22> newDRCPStream method of the Trident Topology class. Y ou can use the LocalDRPC class to create a simple DRCP server. /b21> LocalDRPC has an execute method for searching for some keywords. T he complete code is as follows.

Code: LogAnalyser Trident .java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Build and run applications

The complete application has three Java codes. They are as follows -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

You can build your application using the following commands -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

You can run the application using the following command -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Output

Once the application is started, the application outputs full details about the cluster startup process, operational processing, DRPC server and client information, and the final cluster shutdown process. ow.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends