
Customizing Storm spouts
You have explored and understood WordCount
topology provided by the Storm-starter project in previous chapters. Now it's time we move on to the next step, the do it yourself journey with Storm; so let's take up the next leap and do some exciting stuff with our own spouts that read from various sources.
Creating FileSpout
Here we will create our own spout to read the events or tuples from a file source and emit them into the topology; we would substitute spout in place of RandomSentenceSpout
we used in the WordCount
topology in the previous chapter.
To start, copy the project we created in Chapter 2, Getting Started with Your First Topology, into a new project and make the following changes in RandomSentenceSpout
to make a new class called FileSpout
within the Storm-starter project.
Now we will make changes in FileSpout
so that it reads sentences from a file as shown in the following code:
public class FileSpout extends BaseRichSpout { //declaration section SpoutOutputCollector _collector; DataInputStream in ; BufferedReader br; Queue qe; //constructor public FileSpout() { qe = new LinkedList(); } // the messageId builder method private String getMsgId(int i) { return (new StringBuilder("#@#MsgId")).append(i).toString(); } //The function that is called at every line being read by readFile //method and adds messageId at the end of each line and then add // the line to the linked list private void queueIt() { int msgId = 0; String strLine; try { while ((strLine = br.readLine()) != null) { qe.add((new StringBuilder(String.valueOf(strLine))).append("#@#" + getMsgId(msgId)).toString()); msgId++; } } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } //function to read line from file at specified location private void readFile() { try { FileInputStream fstream = new FileInputStream("/home/mylog"); in = new DataInputStream(fstream); br = new BufferedReader(new InputStreamReader( in )); queueIt(); System.out.println("FileSpout file reading done"); } catch (FileNotFoundException e) { e.printStackTrace(); } } //open function that is called at the time of spout initialization // it calls the readFile method that reads the file , adds events // to the linked list to be fed to the spout as tuples @ Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; readFile(); } //this method is called every 100 ms and it polls the list //for message which is read off as next tuple and emit the spout to //the topology. When queue doesn't have any events, it reads the //file again calling the readFile method @ Override public void nextTuple() { Utils.sleep(100); String fullMsg = (String) qe.poll(); String msg[] = (String[]) null; if (fullMsg != null) { msg = (new String(fullMsg)).split("#@#"); _collector.emit(new Values(msg[0])); System.out.println((new StringBuilder("nextTuple done ")).append(msg[1]).toString()); } else { readFile(); } } @ Override public void ack(Object id) {} @ Override public void fail(Object id) {} @ Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
Tip
Downloading the example code
You can download the example code files for all the Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.
Now we need to fit FileSpout
into our WordCount
topology and execute it. To do this, you need to change one line of code in WordCount
topology and instantiate FileSpout
instead of RandomSentenceSpout
in TopologyBuilder
, as shown here:
public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setSpout("spout", new FileSpout(), 1);
This one line change will take care of instantiation of the new spout that will read from the specified file /home/mylog
(please create this file before you execute the program). Here is a screenshot of the output for your reference:

As a next step to understand the spouts better, let's create a SocketSpout
class. Assuming that you are proficient in writing Socket Server or Producer, I will walk you through the process of creating a custom SocketSpout
class to consume a socket output in the Storm topology:
public class SocketSpout extends BaseRichSpout{ static SpoutOutputCollector collector; //The socket static Socket myclientSocket; static ServerSocket myserverSocket; static int myport; public SocketSpout(int port){ myport=port; } public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){ _collector=collector; myserverSocket=new ServerSocket(myport); } public void nextTuple(){ myclientSocket=myserverSocket.accept(); InputStream incomingIS=myclientSocket.getInputStream(); byte[] b=new byte[8196]; int len=b.incomingIS.read(b); _collector.emit(new Values(b)); } }