Real-time Analytics with Storm and Cassandra
上QQ阅读APP看书,第一时间看更新

Customizing Storm spouts

You have explored and understood WordCount topology provided by the Storm-starter project in previous chapters. Now it's time we move on to the next step, the do it yourself journey with Storm; so let's take up the next leap and do some exciting stuff with our own spouts that read from various sources.

Creating FileSpout

Here we will create our own spout to read the events or tuples from a file source and emit them into the topology; we would substitute spout in place of RandomSentenceSpout we used in the WordCount topology in the previous chapter.

To start, copy the project we created in Chapter 2, Getting Started with Your First Topology, into a new project and make the following changes in RandomSentenceSpout to make a new class called FileSpout within the Storm-starter project.

Now we will make changes in FileSpout so that it reads sentences from a file as shown in the following code:

public class FileSpout extends BaseRichSpout {
  //declaration section
  SpoutOutputCollector _collector;
  DataInputStream in ;
  BufferedReader br;
  Queue qe;

  //constructor
    public FileSpout() {
        qe = new LinkedList();
    }

  // the messageId builder method
  private String getMsgId(int i) {
    return (new StringBuilder("#@#MsgId")).append(i).toString();
    }

  //The function that is called at every line being read by  readFile
  //method and adds messageId at the end of each line and then add
  // the line to the linked list
    private void queueIt() {
      int msgId = 0;
      String strLine;
      try {
          while ((strLine = br.readLine()) != null) {
              qe.add((new  StringBuilder(String.valueOf(strLine))).append("#@#"  + getMsgId(msgId)).toString());
              msgId++;
          }
      } catch (IOException e) {
          e.printStackTrace();
      } catch (Exception e) {
          e.printStackTrace();
      }
    }

  //function to read line from file at specified location 
  private void readFile() {
        try {
          FileInputStream fstream = new  FileInputStream("/home/mylog"); in =  new DataInputStream(fstream);
          br = new BufferedReader(new InputStreamReader( in ));
          queueIt();
          System.out.println("FileSpout file reading done");
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
    }

  //open function that is called at the time of spout  initialization
  // it calls the readFile method that reads the file , adds  events 
  // to the linked list to be fed to the spout as tuples
  @
    Override
    public void open(Map conf, TopologyContext context,  SpoutOutputCollector  collector) {
      _collector = collector;
      readFile();
    }

  //this method is called every 100 ms and it polls the list
  //for message which is read off as next tuple and emit the spout  to
  //the topology. When queue doesn't have any events, it reads the
  //file again calling the readFile method
    @
    Override
    public void nextTuple() {
      Utils.sleep(100);
      String fullMsg = (String) qe.poll();
      String msg[] = (String[]) null;
      if (fullMsg != null) {
          msg = (new String(fullMsg)).split("#@#");
          _collector.emit(new Values(msg[0]));
          System.out.println((new StringBuilder("nextTuple done  ")).append(msg[1]).toString());
      } else {
          readFile();
      }
    }

  @
  Override
  public void ack(Object id) {}

  @
  Override
  public void fail(Object id) {}

  @
  Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
  }
}

Tip

Downloading the example code

You can download the example code files for all the Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

Tweaking WordCount topology to use FileSpout

Now we need to fit FileSpout into our WordCount topology and execute it. To do this, you need to change one line of code in WordCount topology and instantiate FileSpout instead of RandomSentenceSpout in TopologyBuilder, as shown here:

public static void main(String[] args) throws Exception {
  TopologyBuilder builder = new TopologyBuilder();
//builder.setSpout("spout", new RandomSentenceSpout(), 5);
  builder.setSpout("spout", new FileSpout(), 1);

This one line change will take care of instantiation of the new spout that will read from the specified file /home/mylog (please create this file before you execute the program). Here is a screenshot of the output for your reference:

Tweaking WordCount topology to use FileSpout

The SocketSpout class

As a next step to understand the spouts better, let's create a SocketSpout class. Assuming that you are proficient in writing Socket Server or Producer, I will walk you through the process of creating a custom SocketSpout class to consume a socket output in the Storm topology:

public class SocketSpout extends BaseRichSpout{
  static SpoutOutputCollector collector;
  //The socket
    static Socket myclientSocket;
    static ServerSocket myserverSocket;
    static int myport;

  public SocketSpout(int port){
    myport=port;
  }

  public void open(Map conf,TopologyContext context,  SpoutOutputCollector collector){
    _collector=collector;
    myserverSocket=new ServerSocket(myport);
  }

  public void nextTuple(){
    myclientSocket=myserverSocket.accept();
    InputStream incomingIS=myclientSocket.getInputStream();
    byte[] b=new byte[8196];
    int len=b.incomingIS.read(b);
    _collector.emit(new Values(b));
  }
}