Apache Kafka integrates Storm


May 26, 2021 17:00 Apache Kafka


Table of contents


In this chapter, we'll learn how to integrate Kafka with Apache Storm.

About Storm

Storm was originally created by the team of Nathan Marz and BackType. /b10> In a short period of time, Apache Storm becomes the standard for distributed real-time processing systems, allowing you to process large amounts of data. /b11> Storm is very fast, and a reference clock processes more than one million dTs per second per node. /b12> Apache Storm runs continuously, consuming data from the configured source (Spouts) and passing the data to the processing pipeline (Bolts). /b13> Union, Spouts, and Bolt form a topology.

Integration with Storm

Kafka and Storm naturally complement each other, and their powerful collaboration enables real-time streaming analytics of fast-moving big data. /b10> Kafka and Storm integration is designed to make it easier for developers to get and publish data streams from storm topology.

Concept flow

Spouts is the source of the stream. /b10> For example, a nozzle can read metagroups from Kafka Topic and send them as streams. /b11> Bolt consumes input streams, processes and may emit new streams. /b12> Bolt can do anything from running functions, filtering tuons, performing flow aggregations, streaming connections, talking to databases, and so on. /b13> Each node in the Storm topology executes in parallel. /b14> The topology runs indefinitely until it is terminated. /b15> Storm will automatically reassign any failed tasks. /b16> In addition, Storm guarantees no data loss, even if the machine is down and messages are discarded.

Let's learn more about the Kafka-Storm integration API. /b10> There are three main classes integrating Kafka with Storm. They are as follows --

BrokerHosts - ZkHosts & StaticHosts

BrokerHosts is an interface, and ZkHosts and StaticHosts are its two main implementations. /b10> ZkHosts is used to dynamically track Kafka agents by maintaining details in ZooKeeper, while StaticHosts is used to manually/staticly set kafka agents and their details. /b11> ZkHosts is a quick and easy way to access Kafka agents.

ZkHosts' signature is as follows -

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

Among them, BrokerZkStr is the ZooKeeper host, and brokerZkPath is the ZooKeeper path to maintain Kafka proxy details.

KafkaConfig API

This API is used to define the configuration settings for the Kafka cluster. Kafka Con-fig's signature is defined as follows

public KafkaConfig(BrokerHosts hosts, string topic)

    Host - BrokerHosts can be ZkHosts / StaticHosts.

    Theme - The name of the topic.

SpoutConfig API

Spoutconfig is an extension of KafkaConfig that supports additional ZooKeeper information.

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • Host - BrokerHosts can be any implementation of the BrokerHosts interface

  • Theme - The name of the topic.

  • zkRoot - ZooKeeper root path.

  • The state of the offset consumed by id - spouts stored in Zookeeper. /b10> The ID should uniquely identify your nozzle.

SchemeAsMultiScheme

SchemeAsMultiScheme is an interface that indicates how to convert ByteBuffer consumed from Kafka to a stormy group. /b10> It originates from MultiScheme and accepts the implementation of the Scheme class. /b11> There are many implementations of the Scheme class, one of which is StringScheme, which resolves bytes into a simple string. /b12> It also controls the naming of output fields. /b13> The signature is defined below.

public SchemeAsMultiScheme(Scheme scheme)
  • Scenario - Byte buffer consumed from kafka.

KafkaSpout API

KafkaSpout is our spout implementation, which will integrate with Storm. /b10> It gets messages from the kafka topic and sends them as a group to the Storm ecosystem. /b11> KafkaSpout gets its configuration details from SpoutConfig.

Here's an example code for creating a simple Kafka nozzle.

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Create Bolt

Bolt is a component that uses tuds as inputs, processes tuds, and produces new tuds as output. /b10> Bolt will implement the IRichBolt interface. /b11> In this program, use two Bolt-class WordSplitter-Bolt and WordCounterBolt to do this.

The IRichBolt interface has the following methods -

  • Preparation - Provide Bolt with the environment to perform. /b10> The executor will run this method to initialize the nozzle.

  • Execute - Process input for a single group.

  • Cleanup - called when Bolt wants to close.

  • DeclareOutputFields - Declares the output mode of the yuan group.

Let's create the SplitBolt .java, which implements logical splitting of a sentence into words and CountBolt .java, which implements logical separation of unique words and counting their appearance.

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()&plus;" : " &plus; entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Submit the topology

The Storm topology is basically a Trift structure. /b10> The TopologyBuilder class provides an easy and easy way to create complex topology. /b11>The TopologyBuilder class has a way to set up spout and set up bolt. /b12> Finally, TopologyBuilder has CreativeTopology to create to-pology. /b13> The shufflegrouping and fieldsgrouping methods help set up stream groupings for nozzles and Bolts.

Local Clusters - For development purposes, we can use the LocalCluster object to create a local cluster, and then use the LocalCluster's submitTopology class.

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
         
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);
      
      cluster.shutdown();
   }
}

Before mobile compilation, Kakfa-Storm integration required curator ZooKeeper client java libraries. /b10> Curator version 2.9.1 supports Apache Storm 0.9.5 (which we use in this tutorial). /b11> Download the jar file specified below and place it in the java class path.

  • curator-client-2.9.1.jar
  • curator-framework-2.9.1.jar

After including dependent files, compile the program using the following commands.

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

Perform

Start the Kafka Producer CLI (explained in the previous section), create a new topic called my-first-topic, and provide some sample messages as follows -

hello
kafka
storm
spark
test message
another test message

Now use the following command to execute the application -

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*":. KafkaStormSample

The sample output of this application is as follows -

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2