Apache Kafka Live App (Twitter)


May 26, 2021 17:00 Apache Kafka


Table of contents


Let's analyze a real-time app to get the latest Twitter feeds and their tags. /b10> Earlier, we had seen the integration of Storm and Spark with Kafka. /b11> In both cases, we created a Kafka producer (using cli) to send messages to the Kafka ecosystem. /b12> Storm and spark are then integrated by using Kafka consumers to read messages and inject them into the storm and spark ecosystems, respectively. So we actually need to create a Kafka Producer,

  • Read the Twitter feed using the Twitter Streaming API,
  • Dealing with Feeds,
  • Extract HashTags
  • Send to Kafka.

Once Kafka receives HashTags, Storm/Spark Integration receives the information and sends it to the Storm/Spark ecosystem.

Twitter Streaming API

The Twitter Streaming API can be accessed in any programming language. /b10> "Twitter4j" is an open source unofficial Java library that provides a Java-based module that provides easy access to the "Twitter Streaming API." /b11> "Twitter4j" provides a listener-based framework to access tweets. /b12> To access the Twitter Streaming API, we need to log into the Twitter developer account and get the following OAuth authentication details.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

After you create your developer account, download the "twitter4j" jar file and place it in the java class path.

The full Twitter Kafka Producer Code (Kafka Twitter Producer .java) is listed below -

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
      
      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {
        
         @Override
         public void onStatus(Status status) {      
            queue.offer(status);

            // System.out.println("@" &plus; status.getUser().getScreenName() 
               &plus; " - " &plus; status.getText());
            // System.out.println("@" &plus; status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }
         
         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:" 
               &plus; statusDeletionNotice.getStatusId());
         }
         
         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" &plus; 
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" &plus; userId &plus; 
            "upToStatusId:" &plus; upToStatusId);
         }      
         
         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" &plus; warning);
         }
         
         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);
      
      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);
      
      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         Status ret = queue.poll();
         
         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " &plus; hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

Assembly

Compile the application using the following command -

javac -cp "/path/to/kafka/libs/*":"/path/to/twitter4j/lib/*":. KafkaTwitterProducer.java

Perform

Open both consoles. /b10> Run the application compiled above in a console, as shown below.

java -cp “/path/to/kafka/libs/*":"/path/to/twitter4j/lib/*":
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

Run any of the Spark/Storm applications explained in the previous chapter in another window. /b10> The main thing to note is that the topics used in both cases should be the same. /b11> Here, we use My First Theme as the theme name.

Output

The output of this application will depend on the keyword and Twitter's current Feed. /b10> The sample output (integrated storm) is specified below.

. . .
food : 1
foodie : 2
burger : 1
. . .