Storm综合应用

Posted 皓洲

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm综合应用相关的知识,希望对你有一定的参考价值。

Storm综合应用

实验内容

假设在某一搜索应用中,需要实时统计搜索次数最多的热门关键词,并按照实时统计搜索次数输出最热门的20大热门关键词及被搜索次数。用户搜索的日志通过Flume采集,并写入Kafka,Storm从Kafka中实时读取数据,并完成实时统计工作,并进行输出。

在这里插入图片描述

提示:

(1)搜索日志可以采用搜狗搜索数据,一行代表一次用户搜索;

(2)Flume Agent的Source可以配置为syslogudp类型(端口5640),监控搜索日志;

(3)输出形式自定。

思路

使用Flume批量的读取文件,模拟数据流,作为生产者将数据发送到Kafka集群中

Storm作为消费者从Kafka集群中持续接收数据,完成实时统计工作,并进行输出。

Flume的agent配置

# 给Agent中的三个组件各起一个别名,a1代表为Agent起的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source属性配置信息
a1.sources.r1.type = syslogudp
a1.sources.r1.host = localhost
a1.sources.r1.port = 5640

# sink1属性配置信息(输出到kafka)
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topictest
a1.sinks.k1.kafka.bootstrap.servers = centos01:9092,centos02:9092,centos03:9092
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder


# channel属性配置信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定source和sink到channel上
a1.sources.r1.channels= c1
a1.sinks.k1.channel = c1

Storm 配置

conf/strom.yaml

 storm.zookeeper.servers:
     - "centos01"
     - "centos02"
     - "centos03"
 supervisor.slots.ports:
     - 6700
     - 6701
     - 6702
     - 6703
 storm.zookeeper.port: 2181
 storm.local.dir: "/opt/modules/strom-2.1.0/data"
 nimbus.seeds: ["centos01","centos02"]
 ui.port: 8099

Topology类

package storm.demo.wordcount;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpout;
/**
 * @author zhz
 * @date 2021/5/11 15:43
 * 备注:Topology 类,用户在Storm中构建一个Topology
 */
public class kafkaTopology {
    public static void main(String[] args) throws Exception {

        //1.创建KafkaSpout对象
        KafkaSpoutConfig.Builder<String,String> kafkaBuilder = KafkaSpoutConfig.builder("centos01:9092,centos02:9092,centos03:9092","topictest");
        //设置kafka消费者组id
        kafkaBuilder.setGroupId("testgroup");
        //创建kafkaSpoutConfig
        KafkaSpoutConfig<String,String> kafkaSpoutConfig = kafkaBuilder.build();
        //通过kafkaSpoutConfig获得KafkaSpout对象
        KafkaSpout<String,String> kafkaSpout = new KafkaSpout<String, String>(kafkaSpoutConfig);

        SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();
        WordCountBolt wordCountBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();

        //2.创建一个Topology
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //设置Spout,名称为"kafka-spout",并行度为2(也就是线程数),任务数为4(也就是实例数)。默认是1个线程,1个任务。  如果不指定Task数量,则一个线程执行一个Task,Task数量与线程数量一样。
        topologyBuilder.setSpout("kafka-spout", kafkaSpout,2).setNumTasks(4);
        //设置bolt,名称为"print-bolt",数据来源是名称为"kafka-spout"的spout,
        //ShuffleGrouping:随机选择一个Task来发送,对Task的分配比较均匀。
        topologyBuilder.setBolt("print-bolt", splitSentenceBolt,2).setNumTasks(4).shuffleGrouping("kafka-spout");
        //FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。
        topologyBuilder.setBolt("count-bolt", wordCountBolt,2).setNumTasks(4).fieldsGrouping("print-bolt", new Fields("word"));
        //GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task,此时不管有多少个Task,只发往一个Task
        topologyBuilder.setBolt("report-bolt", reportBolt,2).setNumTasks(4).globalGrouping("count-bolt");

        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        //本地模式 ,第一个参数为定义拓扑名称
//      cluster.submitTopology("word-count-topology", config, topologyBuilder.createTopology());
       /* Utils.sleep(5000);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();  */

        //集群模式,需要打包jar上传到集群,然后使用命令 :storm jar storm_demo-0.0.1-SNAPSHOT.jar com.zwy.storm.demo.wordcount.WordCountTopology
        config.setNumWorkers(2); //设置Worker进程数量
        config.setNumAckers(0);//设置acker并发数,0代表取消acker任务。Acker任务默认是每个worker进程启动一个executor线程来执行,该实例启动了2个worker,则默认会启动2个executor线程,2个acker任务
        StormSubmitter.submitTopology("kafka-topology",config,topologyBuilder.createTopology());



    }
}

SplitSentenceBolt类

package storm.demo.wordcount;
import java.util.Map;  
  
import org.apache.storm.task.OutputCollector;  
import org.apache.storm.task.TopologyContext;  
import org.apache.storm.topology.OutputFieldsDeclarer;  
import org.apache.storm.topology.base.BaseRichBolt;  
import org.apache.storm.tuple.Fields;  
import org.apache.storm.tuple.Tuple;  
import org.apache.storm.tuple.Values;  
  
public class SplitSentenceBolt extends BaseRichBolt {  
	private static final long serialVersionUID = 1L;
	private OutputCollector outputcollector;  
  
	/**
	 * bolt初始化方法,与spout的open()方法类似
	 */
	public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) {  
		this.outputcollector = outputcollector;  
	}  
	/**
	 * 接收Tuple数据进行处理
	 */
    public void execute(Tuple tuple) {  
    	//获取发送过来的数据(此处得到发送过来的一句话)
        String sentence = tuple.getStringByField(tuple.getValue(4).toString());
        //将数据以空格分割为单词数组
        String[] words = sentence.split("\\t");
        //逐个将单词发射出去
        this.outputcollector.emit(new Values(words[2]));
    }  
  
    /**
     * 字段声明
     */
    public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {  
        outputfieldsdeclarer.declare(new Fields("word"));  
    }  
  
} 

WordCountBolt类

package storm.demo.wordcount;
import java.util.HashMap;  
import java.util.Map;  

import org.apache.storm.task.OutputCollector;  
import org.apache.storm.task.TopologyContext;  
import org.apache.storm.topology.OutputFieldsDeclarer;  
import org.apache.storm.topology.base.BaseRichBolt;  
import org.apache.storm.tuple.Fields;  
import org.apache.storm.tuple.Tuple;  
import org.apache.storm.tuple.Values;  
  
/**
 * 单词统计,并且实时获取词频前N的发射出去
 */
public class WordCountBolt extends BaseRichBolt {  
    
   private static final long serialVersionUID = 2374950653902413273L;
   private OutputCollector outputcollector;  
   //定义存放单词与词频的Map
    private HashMap<String, Integer> counts = null;  
  
    /**
     * bolt初始化方法,与spout的open()方法类似
     */
    public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) {  
        this.outputcollector = outputcollector;  
        this.counts = new HashMap<String, Integer>();  
    }  
  
    /**
    * 接收Tuple数据进行单词计数处理
    */
    public void execute(Tuple tuple) {  
       //获取发送过来的单词
        String word = tuple.getStringByField("word");  
        //添加这行代码的作用是看看值相等的word是不是同一个实例执行的,事实证明确实如此  
        //System.out.println(this + "====" + word);  
        //单词数量加1
        Integer count = counts.get(word);
        if (count == null)
          count = 0;
        count++;
        counts.put(word, count);
        //发送单词和计数给下一个bolt,分别对应字段"word"和"count"
        this.outputcollector.emit(new Values(word, count));  
    }
    /**
     * 设置字段名称,对应emit(new Values(word, count))中的两个字段
     */
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word","count"));         
   }
}

ReportBolt类

package storm.demo.wordcount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;  
import java.util.List;
import java.util.Map;  
import java.util.Map.Entry;

import org.apache.storm.task.OutputCollector;  
import org.apache.storm.task.TopologyContext;  
import org.apache.storm.topology.OutputFieldsDeclarer;  
import org.apache.storm.topology.base.BaseRichBolt;  
import org.apache.storm.tuple.Tuple;  
  
public class ReportBolt extends BaseRichBolt {  
	
	private static final long serialVersionUID = -1512537746316594950L;
	private HashMap<String, Integer> counts = null;  
  
    public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) {  
        this.counts = new HashMap<String, Integer>();  
    }  
  
    public void execute(Tuple tuple) {  
        String word = tuple.getStringByField("word");  
        int count = tuple.getIntegerByField("count");  
        counts.put(word, count);  
      //对counts中的单词进行排序
        List<Entry<String,Integer>> list = new ArrayList<Entry<String,Integer>>(counts.entrySet());
		Collections.sort(list, new Comparator<Map.Entry<String, Integer>>() {
			  public int compare(Map.Entry<String, Integer> o1,
			      Map.Entry<String, Integer> o2) {
			    return (o2.getValue() - o1.getValue());
			  }
			});
        
        //取list中前10个单词
		int n=list.size()<=10?list.size():10;
		String resultStr="";
		for(int i=0;i<n;i++){
			Entry<String,Integer> entry=list.get(i);
			String sortWord=entry.getKey();
			Integer sortCount=entry.getValue();
			resultStr+=sortWord+"----"+sortCount+"\\n";
		}
		System.out.println("------------计数结果----------------"); 
		//添加这行代码的作用是看看是不是同一个实例执行的  
//		System.out.println(this + "====" + word); 
		System.out.println(resultStr);
    }  
  
    public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {  
  
    }  
  
    /**
     * 在bolt被关闭的时候调用, 它应该清理所有被打开的资源。但是集群不保证这个方法一定会被执行。比如执行task的机器down掉了,那么根本就没有办法来调用那个方法。
     */
    public void cleanup() {  
        System.out.println("---------- FINAL COUNTS -----------");  
        for (String key : counts.keySet()) {  
            System.out.println(key + " " + counts.get(key));  
        }  
        System.out.println("----------------------------");  
    }  
}  

开启服务

开启flume

flume-ng agent --conf conf --conf-file $FLUME_HOME/conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console

启动Zookeeper集群,在三个节点上执行:

cd /opt/modules/zookeeper-3.5.6-bin/
bin/zkServer.sh start

启动Kafka集群,在三个节点上执行:

cd /opt/modules/kafka_2.12-2.4.0/bin/kafka-server-start.sh -daemon config/server.properties

开启主题

cd /opt/modules/kafka_2.12-2.4.0/bin/kafka-topics.sh --create --zookeeper centos01:2181,centos02:2181,centos03:2181 --replication-factor 2 --partitions 2 --topic topictest

开启消费者

bin/kafka-console-consumer.sh --bootstrap-server centos01:9092,centos02:9092,centos03:9092 --topic topictest

启动Storm,在centos01上启动Nimbus和UI服务

storm nimbus >/dev/null 2>&1 &storm ui >/dev/null 2>&1 &

在centos02\\centos03节点上进入$STORM_HOME/bin 目录,启动Supervisor服务

./storm supervisor >/dev/null 2>&1 &

在这里插入图片描述

提交Topology任务

storm jar 09_storm_demo.jar storm.demo.wordcount.kafkaTopology

输出数据流

java -jar Client.jar sogou.500w.utf8 

在这里插入图片描述

结果

在这里插入图片描述

以上是关于Storm综合应用的主要内容,如果未能解决你的问题,请参考以下文章

Storm概念原理详解及其应用BaseStorm

第3节 storm高级应用:5定时器任务

storm的定时任务

Storm详解写第一个Storm应用

storm应用实践:实时事务处理《读书笔记》

storm应用实践:实时事务处理《读书笔记》