大数据学习之Storm实时统计网站访问量案例35

Posted hidamowang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据学习之Storm实时统计网站访问量案例35相关的知识,希望对你有一定的参考价值。

案例一:统计网站访问量(实时统计)

 

实时流式计算框架:storm

 

1)spout

数据源,接入数据源

本地文件如下

技术图片

编写spout程序:

package pvcount;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

/**
 * @author Dawn
 * @date 2019年6月7日10:19:39
 * @version 1.0
 * 编写spout。接入本地数据源
 */
public class PvCountSpout implements IRichSpout
	
	private SpoutOutputCollector collector;
	private BufferedReader br;
	private String line;
	
	@Override
	public void nextTuple() 
		//发送读取的数据的每一行
		try 
			while((line=br.readLine()) != null) 
				//发送数据到splitbolt
				collector.emit(new Values(line));
				//设置延迟
				Thread.sleep(500);
			
		 catch (IOException e) 
			// TODO Auto-generated catch block
			e.printStackTrace();
		 catch (InterruptedException e) 
			// TODO Auto-generated catch block
			e.printStackTrace();
		
		
	

	@Override
	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) 
		this.collector=collector;
		
		//读取文件
		try 
			br=new BufferedReader(new InputStreamReader(new FileInputStream("f:/temp/storm实时统计访问量/weblog.log")));
		 catch (FileNotFoundException e) 
			// TODO Auto-generated catch block
			e.printStackTrace();
		
		//别关流!!!!
//		finally 
//			if(br!=null) 
//				try 
//					br.close();
//				 catch (IOException e) 
//					// TODO Auto-generated catch block
//					e.printStackTrace();
//				
//			
//		
		
	

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) 
		//声明
		declarer.declare(new Fields("logs"));
	

	//处理tuple成功 回调的方法。就像kafka的那个callback回调函数,还有zookeeper中的回调函数 process
	@Override
	public void ack(Object arg0) 
		// TODO Auto-generated method stub
		
	
	
	//如果spout在失效的模式中 调用此方法来激活,和在Linux中那个命令 storm activate [拓扑名称] 一样的效果
	@Override
	public void activate() 
		// TODO Auto-generated method stub
		
	

	//在spout程序关闭前执行 不能保证一定被执行 kill -9 是不执行 storm kill 是不执行
	@Override
	public void close() 
		// TODO Auto-generated method stub
		
	

	//在spout失效期间,nextTuple不会被调用 和在Linux中那个命令 storm deactivate [拓扑名称] 一样的效果
	@Override
	public void deactivate() 
		// TODO Auto-generated method stub
		
	

	//处理tuple失败回调的方法
	@Override
	public void fail(Object arg0) 
		// TODO Auto-generated method stub
		
	

	//配置
	@Override
	public Map<String, Object> getComponentConfiguration() 
		// TODO Auto-generated method stub
		return null;
	


  

2)splitbolt

业务逻辑处理

切分数据

拿到网址

package pvcount;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * @author Dawn
 * @date 2019年6月7日10:30:38
 * @version 1.0
 * 切分数据,拿到网址
 */
public class PvCountSplitBolt implements IRichBolt
	
	private  OutputCollector collector;
	private int pvnum = 0;
	
	//业务逻辑  分布式 集群 并发度 线程(接收tuple然后进行处理)
	@Override
	public void execute(Tuple input) 
		//1.获取数据
		String line = input.getStringByField("logs");
		
		//2.切分数据
		String[] fields = line.split("\\t");
		String session_id=fields[1];
		
		//3.局部累加
		if(session_id != null) 
			pvnum++;
			//输出
			collector.emit(new Values(Thread.currentThread().getId(),pvnum));
		
	

	//初始化调用
	@Override
	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) 
		this.collector=collector;
	

	//声明
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) 
		//声明输出
		declarer.declare(new Fields("threadid","pvnum"));
	

	//一个bolt即将关闭时调用 不能保证一定被调用 资源清理
	@Override
	public void cleanup() 
		// TODO Auto-generated method stub
		
	

	//配置
	@Override
	public Map<String, Object> getComponentConfiguration() 
		// TODO Auto-generated method stub
		return null;
	
	

  

3)bolt

累加次数求和

package pvcount;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

/**
 * @author Dawn
 * @date 2019年6月7日10:39:52
 * @version 1.0
 * 累加次数求和
 */
public class PvCountSumBolt implements IRichBolt

	private OutputCollector collector;
	private HashMap<Long, Integer> hashmap=new HashMap<>();
	
	@Override
	public void cleanup() 
		
	

	@Override
	public void execute(Tuple input) 
		//1.获取数据
		Long threadId = input.getLongByField("threadid");
		Integer pvnum = input.getIntegerByField("pvnum");
		
		//2.创建集合 存储 (threadid,pvnum) 
		hashmap.put(threadId, pvnum);
		
		//3.累加求和(拿到集合中所有value值)
		Iterator<Integer> iterator = hashmap.values().iterator();
		
		//4.清空之前的数据
		int sum=0;
		while(iterator.hasNext()) 
			sum+=iterator.next();
		
		
		System.err.println(Thread.currentThread().getName() + "总访问量为->" + sum);
	

	@Override
	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) 
		
	

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) 
		
	

	@Override
	public Map<String, Object> getComponentConfiguration() 
		// TODO Auto-generated method stub
		return null;
	


  

4)Driver

使用字段分组

package pvcount;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/**
 * @author Dawn
 * @date 2019年6月7日10:45:53
 * @version 1.0 统计网站访问量(实时统计)
 */
public class PvCountDriver 
	public static void main(String[] args) 
		// 1.创建拓扑
		TopologyBuilder builder = new TopologyBuilder();

		// 2.指定设置
		builder.setSpout("pvcountspout", new PvCountSpout(), 1);
		builder.setBolt("pvsplitbolt", new PvCountSplitBolt(), 6).setNumTasks(4).fieldsGrouping("pvcountspout",
				new Fields("logs"));
		builder.setBolt("pvcountbolt", new PvCountSumBolt(), 1).fieldsGrouping("pvsplitbolt",
				new Fields("threadid", "pvnum"));

		// 3.创建配置信息
		Config conf = new Config();
		conf.setNumWorkers(2);
		
		// 4.提交任务
		LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology("pvcounttopology", conf, builder.createTopology());
	


  

运行结果如下:

技术图片

 

总共190条数据。统计完成之后再进行添加数据。程序会继续统计

技术图片

 

以上是关于大数据学习之Storm实时统计网站访问量案例35的主要内容,如果未能解决你的问题,请参考以下文章

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PVUV+展示

大数据学习之(Storm)-原理详解!

网站访问量实时统计

Elasticsearch学习之深入聚合分析四---案例实战

Storm实时处理架构

storm 流式计算框架