storm 整合 kafka之保存MySQL数据库
Posted 初见微凉i
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm 整合 kafka之保存MySQL数据库相关的知识,希望对你有一定的参考价值。
整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理。实际上在 apache-storm-0.9.3这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置。
1、配置Maven依赖包
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.0</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- kafka整合storm -->
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>0.9.3</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka</artifactId>
- <version>0.9.3</version>
- </dependency>
2、编写Storm程序
- package com.yun.storm;
- import java.util.UUID;
- import storm.kafka.KafkaSpout;
- import storm.kafka.SpoutConfig;
- import storm.kafka.ZkHosts;
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.topology.TopologyBuilder;
- /**
- * Storm读取Kafka消息中间件数据
- *
- * @author shenfl
- *
- */
- public class KafkaLogProcess {
- private static final String BOLT_ID = LogFilterBolt.class.getName();
- private static final String SPOUT_ID = KafkaSpout.class.getName();
- public static void main(String[] args) {
- TopologyBuilder builder = new TopologyBuilder();
- //表示kafka使用的zookeeper的地址
- String brokerZkStr = "192.168.2.20:2181";
- ZkHosts zkHosts = new ZkHosts(brokerZkStr);
- //表示的是kafak中存储数据的主题名称
- String topic = "mytopic";
- //指定zookeeper中的一个根目录,里面存储kafkaspout读取数据的位置等信息
- String zkRoot = "/kafkaspout";
- String id = UUID.randomUUID().toString();
- SpoutConfig spoutconf = new SpoutConfig(zkHosts, topic, zkRoot, id);
- builder.setSpout(SPOUT_ID , new KafkaSpout(spoutconf));
- builder.setBolt(BOLT_ID,new LogFilterBolt()).shuffleGrouping(SPOUT_ID);
- LocalCluster localCluster = new LocalCluster();
- localCluster.submitTopology(KafkaLogProcess.class.getSimpleName(), new Config(),builder.createTopology() );
- }
- }
- package com.yun.storm;
- import java.util.Map;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichBolt;
- import backtype.storm.tuple.Tuple;
- /**
- * 处理来自KafkaSpout的tuple,并保存到数据库中
- *
- * @author shenfl
- *
- */
- public class LogFilterBolt extends BaseRichBolt {
- private OutputCollector collector;
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- Pattern p = Pattern.compile("省公司鉴权接口url\\[(.*)]\\,响应时间\\[([0-9]+)\\],当前时间\\[([0-9]+)\\]");
- /**
- * 每个LogFilterBolt实例仅初始化一次
- */
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
- @Override
- public void execute(Tuple input) {
- try {
- // 接收KafkaSpout的数据
- byte[] bytes = input.getBinaryByField("bytes");
- String value = new String(bytes).replaceAll("[\n\r]", "");
- // 解析数据并入库
- Matcher m = p.matcher(value);
- if (m.find()) {
- String url = m.group(1);
- String usetime = m.group(2);
- String currentTime = m.group(3);
- System.out.println(url + "->" + usetime + "->" + currentTime);
- }
- this.collector.ack(input);
- } catch (Exception e) {
- this.collector.fail(input);
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
- }
3、解析日志入库
3.1 引入Maven依赖包
- <!-- mysql maven相关依赖 -->
- <dependency>
- <groupId>commons-dbutils</groupId>
- <artifactId>commons-dbutils</artifactId>
- <version>1.6</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.29</version>
- </dependency>
3.2 编写MyDbUtils工具类
(1)创建数据表
- create database jfyun;
- CREATE TABLE `log_info` (
- `id` int(10) NOT NULL AUTO_INCREMENT,
- `topdomain` varchar(100) COLLATE latin1_german1_ci DEFAULT NULL,
- `usetime` varchar(10) COLLATE latin1_german1_ci DEFAULT NULL,
- `time` datetime DEFAULT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=1803 DEFAULT CHARSET=latin1 COLLATE=latin1_german1_ci
(2)MyDbUtils的程序
- package com.yun.storm.util;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import org.apache.commons.dbutils.BasicRowProcessor;
- import org.apache.commons.dbutils.QueryRunner;
- import org.apache.commons.dbutils.handlers.ArrayListHandler;
- public class MyDbUtils {
- private static String className = "com.mysql.jdbc.Driver";
- private static String url = "jdbc:mysql://192.168.2.20:3306/jfyun?useUnicode=true&characterEncoding=utf-8";
- private static String user = "root";
- private static String password = "123";
- private static QueryRunner queryRunner = new QueryRunner();
- public static final String INSERT_LOG = "insert into log_info(topdomain,usetime,time) values(?,?,?)";
- static{
- try {
- Class.forName(className);
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) throws Exception {
- String topdomain = "taobao.com";
- String usetime = "100";
- String currentTime="1444218216106";
- MyDbUtils.update(MyDbUtils.INSERT_LOG, topdomain,usetime,currentTime);
- update(INSERT_LOG,topdomain,usetime,MyDateUtils.formatDate2(new Date(Long.parseLong(currentTime))));
- }
- /**
- * @param conn
- * @throws SQLException
- */
- public static void update(String sql,Object... params) throws SQLException {
- Connection connection = getConnection();
- //更新数据
- queryRunner.update(connection,sql, params);
- connection.close();
- }
- public static List<String> executeQuerySql(String sql) {
- List<String> result = new ArrayList<String>();
- try {
- List<Object[]> requstList = queryRunner.query(getConnection(), sql,
- new ArrayListHandler(new BasicRowProcessor() {
- @Override
- public <Object> List<Object> toBeanList(ResultSet rs,
- Class<Object> type) throws SQLException {
- return super.toBeanList(rs, type);
- }
- }));
- for (Object[] objects : requstList) {
- result.add(objects[0].toString());
- }
- } catch (SQLException e) {
- e.printStackTrace();
- }
- return result;
- }
- /**
- * @throws SQLException
- *
- */
- public static Connection getConnection() throws SQLException {
- //获取mysql连接
- return DriverManager.getConnection(url, user, password);
- }
- }
(3)修改storm程序
- if (m.find()) {
- url = m.group(1);
- usetime = m.group(2);
- currentTime = m.group(3);
- System.out.println(url + "->" + usetime + "->" + currentTime);
- MyDbUtils.update(MyDbUtils.INSERT_LOG, url, usetime,
- MyDateUtils.formatDate2(new Date(Long.parseLong(currentTime))));
- }
(4)统计指标
- --统计每个url平均响应时间
- SELECT
- topdomain,
- ROUND(AVG(usetime) / 1000, 2) avg_use_time
- FROM
- log_info
- GROUP BY topdomain;
原文链接:http://blog.csdn.net/shenfuli/article/details/48982687
以上是关于storm 整合 kafka之保存MySQL数据库的主要内容,如果未能解决你的问题,请参考以下文章