storm实时计算实例(socket实时接入)

Posted 世界那么大,我想去看看

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm实时计算实例(socket实时接入)相关的知识,希望对你有一定的参考价值。

介绍

实现了一个简单的从实时日志文件监听,写入socket服务器,再接入Storm计算的一个流程。

源码

日志监听实时写入socket服务器

 
[java] view plain copy
 
  1. package socket;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.File;       
  5. import java.io.IOException;       
  6. import java.io.InputStreamReader;  
  7. import java.io.PrintWriter;  
  8. import java.io.RandomAccessFile;       
  9. import java.net.Socket;  
  10. import java.util.concurrent.Executors;       
  11. import java.util.concurrent.ScheduledExecutorService;       
  12. import java.util.concurrent.TimeUnit;       
  13. /* 
  14.  * 监测数据,通过socket远程发送到另外服务器 ,见MyServerMulti 
  15.  * ClientRead再通过服务器从socket里读 
  16.  *  
  17.  */  
  18.       
  19. public class LogViewToSocket {       
  20.     private long lastTimeFileSize = 0;  //上次文件大小       
  21.     /**    
  22.      * 实时输出日志信息    
  23.      * @param logFile 日志文件    
  24.      * @throws IOException    
  25.      */      
  26.       
  27.     public String getNewFile(File file)  
  28.     {  
  29.         File[] fs=file.listFiles();  
  30.         long maxtime=0;  
  31.         String newfilename="";  
  32.         for (int i=0;i<fs.length;i++)  
  33.         {  
  34.             if (fs[i].lastModified()>maxtime)  
  35.             {  
  36.                 maxtime=fs[i].lastModified();  
  37.                 newfilename=fs[i].getAbsolutePath();  
  38.                   
  39.             }  
  40.         }  
  41.         return newfilename;  
  42.     }  
  43.      RandomAccessFile randomFile=null;  
  44.      String newfile=null;  
  45.      String thisfile=null;  
  46.     public void realtimeShowLog(final File logFile,final PrintWriter out) throws IOException{       
  47.            newfile=getNewFile(logFile);  
  48.         //指定文件可读可写       
  49.             randomFile = new RandomAccessFile(new File(newfile),"r");       
  50.         //启动一个线程每1秒钟读取新增的日志信息       
  51.         ScheduledExecutorService exec =        
  52.             Executors.newScheduledThreadPool(1);       
  53.         exec.scheduleWithFixedDelay(new Runnable(){       
  54.             public void run() {       
  55.                 try {       
  56.                     //获得变化部分的       
  57.                     randomFile.seek(lastTimeFileSize);       
  58.                     String tmp = "";       
  59.                     while( (tmp = randomFile.readLine())!= null) {       
  60.                         System.out.println(new String(tmp.getBytes("ISO8859-1")));   
  61.                         out.println(new String(tmp.getBytes("ISO8859-1")));  
  62.                         out.flush();   
  63.                     }     
  64.                    thisfile=getNewFile(logFile);  
  65.                    if(!thisfile.equals(newfile))  
  66.                      
  67.                    {  
  68.                        randomFile = new RandomAccessFile(new File(newfile),"r");  
  69.                        lastTimeFileSize=0;  
  70.                    }  
  71.                    else  
  72.                          
  73.                     lastTimeFileSize = randomFile.length();       
  74.                      
  75.                 } catch (IOException e) {       
  76.                     throw new RuntimeException(e);       
  77.                 }       
  78.             }       
  79.         }, 0, 1, TimeUnit.SECONDS);       
  80.     }       
  81.            
  82.     public static void main(String[] args) throws Exception {       
  83.         LogViewToSocket view = new LogViewToSocket();       
  84.   
  85.             Socket socket=new Socket("192.168.27.100",5678);   
  86.      
  87.         PrintWriter out=new PrintWriter(socket.getOutputStream());      
  88.            
  89.             
  90.   
  91.         final File tmpLogFile = new File("/home/hadoop/test");       
  92.         view.realtimeShowLog(tmpLogFile,out);   
  93.        // socket.close();  
  94.           
  95.     }       
  96.       
  97. }      
 

socket服务器处理

[java] view plain copy
 
  1. import java.io.BufferedReader;    
  2. import java.io.IOException;    
  3. import java.io.InputStreamReader;    
  4. import java.io.PrintWriter;    
  5. import java.net.ServerSocket;    
  6. import java.net.Socket;    
  7. import java.net.SocketAddress;  
  8. import java.util.*;  
  9.     
  10. public class MyServerMulti {    
  11.     private static Socket socket1;  
  12.   
  13.     public static void main(String[] args) throws IOException {    
  14.         ServerSocket server = new ServerSocket(5678);    
  15.           int i=0;  
  16.           ArrayList<PrintWriter> outs=new ArrayList<PrintWriter>();  
  17.             
  18.           /* 
  19.            * 一个client socket发送数据过来, server端再发到其他client socket端 
  20.            *  
  21.            */  
  22.           Socket socket1=null;  
  23.         while (true) {  
  24.               
  25.             Socket socket = server.accept();    
  26.              i++;  
  27.              System.out.println(i);  
  28.              System.out.println(socket.getInetAddress());  
  29.                  PrintWriter out= new PrintWriter(socket.getOutputStream());  
  30.                  outs.add(out);  
  31.                  if(i==1)  
  32.                       socket1=socket;  
  33.                  if(i==2)  
  34.                        
  35.                  invoke(socket1,outs);  
  36.                    
  37.               
  38.         }    
  39.     }    
  40.         
  41.     private static void invoke(final Socket client, final ArrayList<PrintWriter> outs) throws IOException {    
  42.         new Thread(new Runnable() {    
  43.             public void run() {    
  44.                 BufferedReader in = null;    
  45.                 PrintWriter out = null;    
  46.                 PrintWriter out1 = null;  
  47.                 try {    
  48.                     in = new BufferedReader(new InputStreamReader(client.getInputStream()));    
  49.                     out = new PrintWriter(client.getOutputStream());    
  50.     
  51.                     while (true) {    
  52.                         String msg = in.readLine();    
  53.                         System.out.println(msg);    
  54.                         out.println("Server received " + msg);    
  55.                         out.flush();    
  56.                           
  57.                         /*数据转发送到多个client*/  
  58.                         for(int i=0;i<outs.size();i++)  
  59.                         {  
  60.                             out1=outs.get(i);  
  61.                             System.out.println(i);  
  62.                             System.out.println("send msg:"+msg);  
  63.                              out1.println(msg);  
  64.                             out1.flush();  
  65.                         }  
  66.                           
  67.                         System.out.println(client.getInetAddress());  
  68.                         if (msg.equals("bye")) {    
  69.                             break;    
  70.                         }    
  71.                     }    
  72.                 } catch(IOException ex) {    
  73.                     ex.printStackTrace();    
  74.                 } finally {    
  75.                     try {    
  76.                         in.close();    
  77.                     } catch (Exception e) {}    
  78.                     try {    
  79.                         out.close();    
  80.                     } catch (Exception e) {}    
  81.                     try {    
  82.                         client.close();    
  83.                     } catch (Exception e) {}    
  84.                 }    
  85.             }    
  86.         }).start();    
  87.     }    
  88. }    

storm topology

[java] view plain copy
 
  1. import java.io.BufferedReader;  
  2. import java.io.BufferedWriter;  
  3. import java.io.File;  
  4. import java.io.FileNotFoundException;  
  5. import java.io.FileOutputStream;  
  6. import java.io.FileReader;  
  7. import java.io.FileWriter;  
  8. import java.io.IOException;  
  9. import java.io.InputStreamReader;  
  10. import java.io.OutputStreamWriter;  
  11. import java.io.PrintWriter;  
  12. import java.io.RandomAccessFile;  
  13. import java.net.Socket;  
  14. import java.net.UnknownHostException;  
  15. import java.util.Map;  
  16.    
  17. //import mytest.ThroughputTest.GenSpout;  
  18.    
  19. import backtype.storm.Config;  
  20. import backtype.storm.LocalCluster;  
  21. import backtype.storm.StormSubmitter;  
  22. import backtype.storm.generated.AlreadyAliveException;  
  23. import backtype.storm.generated.InvalidTopologyException;  
  24. import backtype.storm.spout.SpoutOutputCollector;  
  25. import backtype.storm.task.OutputCollector;  
  26. import backtype.storm.task.TopologyContext;  
  27. import backtype.storm.topology.BasicOutputCollector;  
  28. import backtype.storm.topology.OutputFieldsDeclarer;  
  29. import backtype.storm.topology.TopologyBuilder;  
  30. import backtype.storm.topology.base.BaseBasicBolt;  
  31. import backtype.storm.topology.base.BaseRichBolt;  
  32. import backtype.storm.topology.base.BaseRichSpout;  
  33. import backtype.storm.tuple.Fields;  
  34. import backtype.storm.tuple.Tuple;  
  35. import backtype.storm.tuple.Values;  
  36. import backtype.storm.utils.Utils;  
  37. /* 
  38.  *  
  39.  * 
  40.  *  storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true 
  41.  *  
  42.  */  
  43.    
  44. public class SocketProcess {  
  45.          public static class  SocketSpout extends BaseRichSpout {  
  46.    
  47.                    /** 
  48.                     */  
  49.               static Socket sock=null;  
  50.               static BufferedReader in=null;  
  51.               String str=null;  
  52.                    private static final long serialVersionUID = 1L;  
  53.                    private SpoutOutputCollector _collector;  
  54.                    private BufferedReader br;  
  55.                    private String dataFile;  
  56.                    private BufferedWriter bw2;  
  57.                     RandomAccessFile randomFile;  
  58.                     private long lastTimeFileSize = 0;   
  59.                     int cnt=0;  
  60.                    //定义spout文件  
  61.                     SocketSpout(){  
  62.                        
  63.                    }  
  64.    
  65.                    //定义如何读取spout文件  
  66.                    @Override  
  67.                    public void open(Map conf, TopologyContext context,  
  68.                                      SpoutOutputCollector collector) {  
  69.                             // TODO Auto-generated method stub  
  70.                             _collector = collector;  
  71.                             try {  
  72.                                 sock=new Socket("192.168.27.100",5678);  
  73.                                  in=     
  74.                                     new BufferedReader(new InputStreamReader(sock.getInputStream()));     
  75.                             } catch (UnknownHostException e) {  
  76.                                 // TODO Auto-generated catch block  
  77.                                 e.printStackTrace();  
  78.                             } catch (IOException e) {  
  79.                                 // TODO Auto-generated catch block  
  80.                                 e.printStackTrace();  
  81.                             }  
  82.                          
  83.                    }  
  84.    
  85.                    //获取下一个tuple的方法  
  86.                    @Override  
  87.                    public void nextTuple() {  
  88.                             // TODO Auto-generated method stub  
  89.                        if(sock==null){  
  90.                              try {  
  91.                                 sock=new Socket("192.168.27.100",5678);  
  92.                                  in=     
  93.                                         new BufferedReader(new InputStreamReader(sock.getInputStream()));    
  94.                             } catch (UnknownHostException e) {  
  95.                                 // TODO Auto-generated catch block  
  96.                                 e.printStackTrace();  
  97.                             } catch (IOException e) {  
  98.                                 // TODO Auto-generated catch block  
  99.                                 e.printStackTrace();  
  100.                             }   
  101.                        }  
  102.                          
  103.                          
  104.                        while(true){      
  105.                             
  106.                         try {  
  107.                             str = in.readLine();  
  108.                         } catch (IOException e) {  
  109.                             // TODO Auto-generated catch block  
  110.                             e.printStackTrace();  
  111.                         }  
  112.                         System.out.println(str);    
  113.                         _collector.emit(new Values(str));  
  114.                         if(str.equals("end")){      
  115.                             break;      
  116.                             }   
  117.                         }  
  118.                          
  119.                          
  120.                          
  121.                          
  122.                          
  123.                          
  124.                          
  125.                               
  126.                               
  127.                    }  
  128.    
  129.    
  130.                    @Override  
  131.                    public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  132.                             // TODO Auto-generated method stub  
  133.                             declarer.declare(new Fields("line"));  
  134.                    }  
  135.                     
  136.          }  
  137.           
  138.    
  139.          public static class Process extends BaseRichBolt{  
  140.    
  141.                    private String _seperator;  
  142.                    private String _outFile;  
  143.                    PrintWriter pw;  
  144.                    private OutputCollector _collector;  
  145.                    private BufferedWriter bw;  
  146.                     
  147.                    public Process(String outFile) {  
  148.                              
  149.                             this._outFile   = outFile;  
  150.                              
  151.                    }  
  152.                     
  153.                    //把输出结果保存到外部文件里面。  
  154.                    @Override  
  155.                    public void prepare(Map stormConf, TopologyContext context,  
  156.                                      OutputCollector collector) {  
  157.                             // TODO Auto-generated method stub  
  158.                             this._collector = collector;  
  159.                             File out = new File(_outFile);  
  160.                             try {  
  161. //                                  br = new BufferedWriter(new FileWriter(out));  
  162.                                      bw = new BufferedWriter(new OutputStreamWriter(   
  163.                              new FileOutputStream(out, true)));   
  164.                             } catch (IOException e1) {  
  165.                                      // TODO Auto-generated catch block  
  166.                                      e1.printStackTrace();  
  167.                             }                  
  168.                    }  
  169.                     
  170.                    //blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。  
  171.                    @Override  
  172.                    public void execute(Tuple input) {  
  173.                             // TODO Auto-generated method stub  
  174.                             String line = input.getString(0);  
  175. //                         System.out.println(line);  
  176.                        //     String[] str = line.split(_seperator);  
  177.                          //   System.out.println(str[2]);  
  178.                             try {  
  179.                                      bw.write(line+",bkeep"+"\n");  
  180.                                      bw.flush();  
  181.                             } catch (IOException e) {  
  182.                                      // TODO Auto-generated catch block  
  183.                                      e.printStackTrace();  
  184.                             }  
  185.                              
  186.                             _collector.emit(new Values(line));  
  187.                    }  
  188.    
  189.                    @Override  
  190.                    public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  191.                             // TODO Auto-generated method stub  
  192.                             declarer.declare(new Fields("line"));  
  193.                    }  
  194.                     
  195.          }  
  196.           
  197.          public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{  
  198.                   
  199.                    String outFile   = argv[0]; //输出文件  
  200.                    boolean distribute = Boolean.valueOf(argv[1]);       //本地模式还是集群模式  
  201.                    TopologyBuilder builder = new TopologyBuilder();  //build一个topology  
  202.         builder.setSpout("spout", new  SocketSpout(), 1);   //指定spout  
  203.         builder.setBolt("bolt", new Process(outFile),1).shuffleGrouping("spout");  //指定bolt,包括bolt、process和grouping  
  204.         Config conf = new Config();  
  205.         if(distribute){  
  206.             StormSubmitter.submitTopology("SocketProcess", conf, builder.createTopology());  
  207.         }else{  
  208.                  LocalCluster cluster = new LocalCluster();  
  209.                  cluster.submitTopology("SocketProcess", conf, builder.createTopology());  
  210.         }  
  211.          }         
  212. }  


最后执行
[java] view plain copy
 
  1. storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true  
spout接受从socket服务器实时发送过来的数据,经过topology处理,最终将数据写入out_socket.txt文件
转:http://blog.csdn.net/u011750989/article/details/18547015

以上是关于storm实时计算实例(socket实时接入)的主要内容,如果未能解决你的问题,请参考以下文章

大数据学习之Storm实时统计网站访问量案例35

转:大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

Storm 第一章 核心组件及编程模型

网站访问量实时统计

Storm简介——实时流式计算介绍

大数据学习之Storm实时计算概述及安装部署33