storm WC 2 (根据日志)

Posted tangsonghuai

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm WC 2 (根据日志)相关的知识,希望对你有一定的参考价值。

1.产生虚拟日志

技术图片
package les7.readFileTopo;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;

public class GetData 

    /**
     * @param args
     */
    public static void main(String[] args) 
        File logFile = new File("track.log");
        Random random = new Random();

        String[] hosts =  "movie information" ;
        String[] session_id =  "ABYH6Y4V4SCVXTG6DPB4VH9U12", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" ;
        String[] time =  "2019-03-07 08:40:50", "2019-03-07 08:40:51", "2019-03-07 08:40:52", "2019-03-07 08:40:53",
                "2019-03-07 09:40:49", "2019-03-07 10:40:49", "2019-03-07 11:40:49", "2019-03-07 12:40:49" ;
        
        StringBuffer sbBuffer = new StringBuffer() ;
        for (int i = 0; i < 5000; i++) 
            sbBuffer.append(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]+"\n");
        
        if(! logFile.exists())
        
            try 
                logFile.createNewFile();
             catch (IOException e) 
                System.out.println("Create logFile fail !");
            
        
        byte[] b = (sbBuffer.toString()).getBytes();
        
        FileOutputStream fs;
        try 
            fs = new FileOutputStream(logFile);
            fs.write(b);
            fs.close();
         catch (Exception e) 
            e.printStackTrace();
        
    

View Code

2.spout自定义数据流入拓扑逻辑

技术图片
package les7.readFileTopo;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class ReadFileSpout implements IRichSpout

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    FileInputStream fis;
    InputStreamReader isr;
    BufferedReader br;            

    SpoutOutputCollector collector = null;
    
    
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
        // 初始化方法
        try 
            this.collector = collector;
            this.fis = new FileInputStream("track.log");
            this.isr = new InputStreamReader(fis, "UTF-8");
            this.br = new BufferedReader(isr);
         catch (Exception e) 
            e.printStackTrace();
        
    

    @Override
    public void close() 
        // TODO 关闭Topo
        
    

    @Override
    public void activate() 
        // TODO 激活Topo
        
    

    @Override
    public void deactivate() 
        // TODO 停用Topo
        
    
    String str = null;
    String[] str01=null;
    @Override
    public void nextTuple() 
        // TODO 核心方法,死循环,获取外部Touple,emit到下一级组件
        try 
            while ((str = this.br.readLine()) != null) 
                //                // 过滤动作
                //
                                str01=str.split("\t");
                                collector.emit(new Values(str));
                //
                                Thread.sleep(3);
                //                //to do
            
         catch (Exception e) 
            // TODO: handle exception
        
    

    @Override
    public void ack(Object msgId) 
        // TODO 如果开启Acker,成功执行Tuple后会回调该4方法,告知Storm框架该Tuple已经被成功执行。
        
    

    @Override
    public void fail(Object msgId) 
        // TODO 如果开启Acker,当失败执行Tuple后会回调该方法,告知Storm框架该Tuple已经被执行失败。
        // 以便我们手工编码实现失败重发,并控制重发次数。
        
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        // TODO 定义输出的列名
        declarer.declare(new Fields("log"));
    

    @Override
    public Map<String, Object> getComponentConfiguration() 
        // TODO 可以在代码里设置一下属性。该方法基本是废弃不用的。
        return null;
    
    

View Code

3.bolt处理逻辑

技术图片
package les7.readFileTopo;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class FileBolt implements IRichBolt 

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    OutputCollector collector = null;
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) 
        // TODO 初始化函数

        this.collector = collector;
    

    Integer num = 0;
    String[] words;
    @Override
    public void execute(Tuple tuple) 
        // TODO 死循环,核心方法,处理业务逻辑
        String value =tuple.getString(0);
        //分词
        String[] words = value.split("\t");

        //输出
        for(String w:words)
            collector.emit(new Values(w,1));
        

        


    @Override
    public void cleanup() 
        // TODO 销毁方法,基本不用4

    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        // TODO 定义输出列名

        declarer.declare(new Fields("word","count"));
    

    @Override
    public Map<String, Object> getComponentConfiguration() 
        // TODO Auto-generated method stub
        return null;
    

View Code

4.bolt输出逻辑

技术图片
package les7.readFileTopo;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import static org.apache.htrace.Tracer.LOG;

public class PrintBolt implements IRichBolt 

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private Map<String, Integer> result = new HashMap<>();
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) 
        // TODO Auto-generated method stub
        this.collector = collector;

    

    @Override
    public void execute(Tuple tuple) 

        String word = tuple.getStringByField("word");
        int count = tuple.getIntegerByField("count");

        //求和
        if(result.containsKey(word))
            //如果已经存在,累加
            int total = result.get(word);
            result.put(word, total+count);
        else
            //这是一个新单词
            result.put(word, count);
        

        //输出到屏幕
        System.out.println("统计的结果是:" + result);

        //输出给下一个组件                                               单词           总频率
        this.collector.emit(new Values(word,result.get(word)));
    

    @Override
    public void cleanup() 
        // TODO Auto-generated method stub

    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        // TODO Auto-generated method stub

    

    @Override
    public Map<String, Object> getComponentConfiguration() 
        // TODO Auto-generated method stub
        return null;
    

View Code

5.书写拓扑逻辑代码

技术图片
package les7.readFileTopo;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;

//展示信息数据

public class FileCountTopo 

    public static void main(String[] args) 
        // TODO Auto-generated method stub

        TopologyBuilder builder = new TopologyBuilder();
        
        builder.setSpout("spout", new ReadFileSpout(),1) ;
        builder.setBolt("b1", new FileBolt(),2).shuffleGrouping("spout");
        builder.setBolt("PrintBolt", new PrintBolt(),1).shuffleGrouping("b1");
        
        Config conf = new Config();
        conf.setDebug(true);

        if (args.length > 0) 
            try 
                //提交到集群
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
             catch (Exception e) 
                e.printStackTrace();
            
        else 
            //本地模式提交
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("mytopology", conf, builder.createTopology());
        
        
        
        
        
    

View Code

 

以上是关于storm WC 2 (根据日志)的主要内容,如果未能解决你的问题,请参考以下文章

storm1.0.2使用中遇到的问题

Linux根据时间批量删除文件

nginx日志统计教程

nginx日志统计教程

实时收集Storm日志到ELK集群

Storm实时日志分析实战