Storm综合应用
Posted 皓洲
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm综合应用相关的知识,希望对你有一定的参考价值。
Storm综合应用
实验内容
假设在某一搜索应用中,需要实时统计搜索次数最多的热门关键词,并按照实时统计搜索次数输出最热门的20大热门关键词及被搜索次数。用户搜索的日志通过Flume采集,并写入Kafka,Storm从Kafka中实时读取数据,并完成实时统计工作,并进行输出。
提示:
(1)搜索日志可以采用搜狗搜索数据,一行代表一次用户搜索;
(2)Flume Agent的Source可以配置为syslogudp类型(端口5640),监控搜索日志;
(3)输出形式自定。
思路
使用Flume批量的读取文件,模拟数据流,作为生产者将数据发送到Kafka集群中
Storm作为消费者从Kafka集群中持续接收数据,完成实时统计工作,并进行输出。
Flume的agent配置
# 给Agent中的三个组件各起一个别名,a1代表为Agent起的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source属性配置信息
a1.sources.r1.type = syslogudp
a1.sources.r1.host = localhost
a1.sources.r1.port = 5640
# sink1属性配置信息(输出到kafka)
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topictest
a1.sinks.k1.kafka.bootstrap.servers = centos01:9092,centos02:9092,centos03:9092
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
# channel属性配置信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定source和sink到channel上
a1.sources.r1.channels= c1
a1.sinks.k1.channel = c1
Storm 配置
conf/strom.yaml
storm.zookeeper.servers:
- "centos01"
- "centos02"
- "centos03"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
storm.zookeeper.port: 2181
storm.local.dir: "/opt/modules/strom-2.1.0/data"
nimbus.seeds: ["centos01","centos02"]
ui.port: 8099
Topology类
package storm.demo.wordcount;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpout;
/**
* @author zhz
* @date 2021/5/11 15:43
* 备注:Topology 类,用户在Storm中构建一个Topology
*/
public class kafkaTopology {
public static void main(String[] args) throws Exception {
//1.创建KafkaSpout对象
KafkaSpoutConfig.Builder<String,String> kafkaBuilder = KafkaSpoutConfig.builder("centos01:9092,centos02:9092,centos03:9092","topictest");
//设置kafka消费者组id
kafkaBuilder.setGroupId("testgroup");
//创建kafkaSpoutConfig
KafkaSpoutConfig<String,String> kafkaSpoutConfig = kafkaBuilder.build();
//通过kafkaSpoutConfig获得KafkaSpout对象
KafkaSpout<String,String> kafkaSpout = new KafkaSpout<String, String>(kafkaSpoutConfig);
SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();
WordCountBolt wordCountBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
//2.创建一个Topology
TopologyBuilder topologyBuilder = new TopologyBuilder();
//设置Spout,名称为"kafka-spout",并行度为2(也就是线程数),任务数为4(也就是实例数)。默认是1个线程,1个任务。 如果不指定Task数量,则一个线程执行一个Task,Task数量与线程数量一样。
topologyBuilder.setSpout("kafka-spout", kafkaSpout,2).setNumTasks(4);
//设置bolt,名称为"print-bolt",数据来源是名称为"kafka-spout"的spout,
//ShuffleGrouping:随机选择一个Task来发送,对Task的分配比较均匀。
topologyBuilder.setBolt("print-bolt", splitSentenceBolt,2).setNumTasks(4).shuffleGrouping("kafka-spout");
//FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。
topologyBuilder.setBolt("count-bolt", wordCountBolt,2).setNumTasks(4).fieldsGrouping("print-bolt", new Fields("word"));
//GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task,此时不管有多少个Task,只发往一个Task
topologyBuilder.setBolt("report-bolt", reportBolt,2).setNumTasks(4).globalGrouping("count-bolt");
Config config = new Config();
LocalCluster cluster = new LocalCluster();
//本地模式 ,第一个参数为定义拓扑名称
// cluster.submitTopology("word-count-topology", config, topologyBuilder.createTopology());
/* Utils.sleep(5000);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown(); */
//集群模式,需要打包jar上传到集群,然后使用命令 :storm jar storm_demo-0.0.1-SNAPSHOT.jar com.zwy.storm.demo.wordcount.WordCountTopology
config.setNumWorkers(2); //设置Worker进程数量
config.setNumAckers(0);//设置acker并发数,0代表取消acker任务。Acker任务默认是每个worker进程启动一个executor线程来执行,该实例启动了2个worker,则默认会启动2个executor线程,2个acker任务
StormSubmitter.submitTopology("kafka-topology",config,topologyBuilder.createTopology());
}
}
SplitSentenceBolt类
package storm.demo.wordcount;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class SplitSentenceBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector outputcollector;
/**
* bolt初始化方法,与spout的open()方法类似
*/
public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) {
this.outputcollector = outputcollector;
}
/**
* 接收Tuple数据进行处理
*/
public void execute(Tuple tuple) {
//获取发送过来的数据(此处得到发送过来的一句话)
String sentence = tuple.getStringByField(tuple.getValue(4).toString());
//将数据以空格分割为单词数组
String[] words = sentence.split("\\t");
//逐个将单词发射出去
this.outputcollector.emit(new Values(words[2]));
}
/**
* 字段声明
*/
public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {
outputfieldsdeclarer.declare(new Fields("word"));
}
}
WordCountBolt类
package storm.demo.wordcount;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* 单词统计,并且实时获取词频前N的发射出去
*/
public class WordCountBolt extends BaseRichBolt {
private static final long serialVersionUID = 2374950653902413273L;
private OutputCollector outputcollector;
//定义存放单词与词频的Map
private HashMap<String, Integer> counts = null;
/**
* bolt初始化方法,与spout的open()方法类似
*/
public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) {
this.outputcollector = outputcollector;
this.counts = new HashMap<String, Integer>();
}
/**
* 接收Tuple数据进行单词计数处理
*/
public void execute(Tuple tuple) {
//获取发送过来的单词
String word = tuple.getStringByField("word");
//添加这行代码的作用是看看值相等的word是不是同一个实例执行的,事实证明确实如此
//System.out.println(this + "====" + word);
//单词数量加1
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
//发送单词和计数给下一个bolt,分别对应字段"word"和"count"
this.outputcollector.emit(new Values(word, count));
}
/**
* 设置字段名称,对应emit(new Values(word, count))中的两个字段
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
ReportBolt类
package storm.demo.wordcount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
public class ReportBolt extends BaseRichBolt {
private static final long serialVersionUID = -1512537746316594950L;
private HashMap<String, Integer> counts = null;
public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) {
this.counts = new HashMap<String, Integer>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
int count = tuple.getIntegerByField("count");
counts.put(word, count);
//对counts中的单词进行排序
List<Entry<String,Integer>> list = new ArrayList<Entry<String,Integer>>(counts.entrySet());
Collections.sort(list, new Comparator<Map.Entry<String, Integer>>() {
public int compare(Map.Entry<String, Integer> o1,
Map.Entry<String, Integer> o2) {
return (o2.getValue() - o1.getValue());
}
});
//取list中前10个单词
int n=list.size()<=10?list.size():10;
String resultStr="";
for(int i=0;i<n;i++){
Entry<String,Integer> entry=list.get(i);
String sortWord=entry.getKey();
Integer sortCount=entry.getValue();
resultStr+=sortWord+"----"+sortCount+"\\n";
}
System.out.println("------------计数结果----------------");
//添加这行代码的作用是看看是不是同一个实例执行的
// System.out.println(this + "====" + word);
System.out.println(resultStr);
}
public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {
}
/**
* 在bolt被关闭的时候调用, 它应该清理所有被打开的资源。但是集群不保证这个方法一定会被执行。比如执行task的机器down掉了,那么根本就没有办法来调用那个方法。
*/
public void cleanup() {
System.out.println("---------- FINAL COUNTS -----------");
for (String key : counts.keySet()) {
System.out.println(key + " " + counts.get(key));
}
System.out.println("----------------------------");
}
}
开启服务
开启flume
flume-ng agent --conf conf --conf-file $FLUME_HOME/conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console
启动Zookeeper集群,在三个节点上执行:
cd /opt/modules/zookeeper-3.5.6-bin/
bin/zkServer.sh start
启动Kafka集群,在三个节点上执行:
cd /opt/modules/kafka_2.12-2.4.0/bin/kafka-server-start.sh -daemon config/server.properties
开启主题
cd /opt/modules/kafka_2.12-2.4.0/bin/kafka-topics.sh --create --zookeeper centos01:2181,centos02:2181,centos03:2181 --replication-factor 2 --partitions 2 --topic topictest
开启消费者
bin/kafka-console-consumer.sh --bootstrap-server centos01:9092,centos02:9092,centos03:9092 --topic topictest
启动Storm,在centos01上启动Nimbus和UI服务
storm nimbus >/dev/null 2>&1 &storm ui >/dev/null 2>&1 &
在centos02\\centos03节点上进入$STORM_HOME/bin 目录,启动Supervisor服务
./storm supervisor >/dev/null 2>&1 &
提交Topology任务
storm jar 09_storm_demo.jar storm.demo.wordcount.kafkaTopology
输出数据流
java -jar Client.jar sogou.500w.utf8
结果
以上是关于Storm综合应用的主要内容,如果未能解决你的问题,请参考以下文章