storm 整合 kafka之保存MySQL数据库

Posted 初见微凉i

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm 整合 kafka之保存MySQL数据库相关的知识,希望对你有一定的参考价值。

整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理。实际上在 apache-storm-0.9.3这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置。

1、配置Maven依赖包
[html] view plain copy
 
  1. <dependency>  
  2.      <groupId>org.apache.kafka</groupId>  
  3.      <artifactId>kafka_2.10</artifactId>  
  4.      <version>0.8.2.0</version>  
  5.      <exclusions>  
  6.                <exclusion>  
  7.                     <groupId>org.slf4j</groupId>  
  8.                     <artifactId>slf4j-log4j12</artifactId>  
  9.                </exclusion>  
  10.      </exclusions>  
  11. </dependency>  
  12.   
  13. <!-- kafka整合storm -->  
  14. <dependency>  
  15.      <groupId>org.apache.storm</groupId>  
  16.      <artifactId>storm-core</artifactId>  
  17.      <version>0.9.3</version>  
  18.      <scope>provided</scope>  
  19.      <exclusions>  
  20.           <exclusion>  
  21.                <groupId>org.slf4j</groupId>  
  22.                <artifactId>log4j-over-slf4j</artifactId>  
  23.           </exclusion>  
  24.           <exclusion>  
  25.                <groupId>org.slf4j</groupId>  
  26.                <artifactId>slf4j-api</artifactId>  
  27.           </exclusion>  
  28.      </exclusions>  
  29. </dependency>  
  30.   
  31. <dependency>  
  32.      <groupId>org.apache.storm</groupId>  
  33.      <artifactId>storm-kafka</artifactId>  
  34.      <version>0.9.3</version>  
  35. </dependency>  
storm程序能接收到数据,并进行处理,但是会发现数据被重复处理这是因为在bolt中没有对数据进行确认,需要调用ack或者fail方法, 修改完成之后即可。
2、编写Storm程序
[java] view plain copy
 
  1. package com.yun.storm;  
  2. import java.util.UUID;  
  3.   
  4. import storm.kafka.KafkaSpout;  
  5. import storm.kafka.SpoutConfig;  
  6. import storm.kafka.ZkHosts;  
  7. import backtype.storm.Config;  
  8. import backtype.storm.LocalCluster;  
  9. import backtype.storm.topology.TopologyBuilder;  
  10.   
  11. /** 
  12. * Storm读取Kafka消息中间件数据 
  13. * @author shenfl 
  14. */  
  15. public class KafkaLogProcess {  
  16.   
  17.   
  18.      private static final String BOLT_ID = LogFilterBolt.class.getName();  
  19.      private static final String SPOUT_ID = KafkaSpout.class.getName();  
  20.   
  21.      public static void main(String[] args) {  
  22.            
  23.           TopologyBuilder builder = new TopologyBuilder();  
  24.           //表示kafka使用的zookeeper的地址  
  25.           String brokerZkStr = "192.168.2.20:2181";  
  26.           ZkHosts zkHosts = new ZkHosts(brokerZkStr);  
  27.           //表示的是kafak中存储数据的主题名称  
  28.           String topic = "mytopic";  
  29.           //指定zookeeper中的一个根目录,里面存储kafkaspout读取数据的位置等信息  
  30.           String zkRoot = "/kafkaspout";  
  31.           String id = UUID.randomUUID().toString();  
  32.           SpoutConfig spoutconf  = new SpoutConfig(zkHosts, topic, zkRoot, id);  
  33.           builder.setSpout(SPOUT_ID , new KafkaSpout(spoutconf));  
  34.           builder.setBolt(BOLT_ID,new  LogFilterBolt()).shuffleGrouping(SPOUT_ID);  
  35.            
  36.           LocalCluster localCluster = new LocalCluster();  
  37.           localCluster.submitTopology(KafkaLogProcess.class.getSimpleName(), new Config(),builder.createTopology() );  
  38.      }  
  39. }  
[java] view plain copy
 
  1. package com.yun.storm;  
  2.   
  3. import java.util.Map;  
  4. import java.util.regex.Matcher;  
  5. import java.util.regex.Pattern;  
  6.   
  7. import backtype.storm.task.OutputCollector;  
  8. import backtype.storm.task.TopologyContext;  
  9. import backtype.storm.topology.OutputFieldsDeclarer;  
  10. import backtype.storm.topology.base.BaseRichBolt;  
  11. import backtype.storm.tuple.Tuple;  
  12.   
  13. /** 
  14. * 处理来自KafkaSpout的tuple,并保存到数据库中 
  15. * @author shenfl 
  16. */  
  17. public class LogFilterBolt extends BaseRichBolt {  
  18.   
  19.      private OutputCollector collector;  
  20.      /** 
  21.      * 
  22.      */  
  23.      private static final long serialVersionUID = 1L;  
  24.   
  25.      Pattern p = Pattern.compile("省公司鉴权接口url\\[(.*)]\\,响应时间\\[([0-9]+)\\],当前时间\\[([0-9]+)\\]");  
  26.   
  27.      /** 
  28.      * 每个LogFilterBolt实例仅初始化一次 
  29.      */  
  30.      @Override  
  31.      public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {  
  32.           this.collector = collector;  
  33.      }  
  34.   
  35.      @Override  
  36.      public void execute(Tuple input) {  
  37.           try {  
  38.                // 接收KafkaSpout的数据  
  39.                byte[] bytes = input.getBinaryByField("bytes");  
  40.                String value = new String(bytes).replaceAll("[\n\r]", "");  
  41.                // 解析数据并入库  
  42.                Matcher m = p.matcher(value);  
  43.                if (m.find()) {  
  44.                     String url = m.group(1);  
  45.                     String usetime = m.group(2);  
  46.                     String currentTime = m.group(3);  
  47.                     System.out.println(url + "->" + usetime + "->" + currentTime);  
  48.                }  
  49.                this.collector.ack(input);  
  50.           } catch (Exception e) {  
  51.                this.collector.fail(input);  
  52.           }  
  53.      }  
  54.   
  55.      @Override  
  56.      public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  57.      }  
  58. }  

 

 
3、解析日志入库
3.1 引入Maven依赖包
[java] view plain copy
 
  1. <!-- mysql maven相关依赖 -->  
  2. <dependency>  
  3.      <groupId>commons-dbutils</groupId>  
  4.      <artifactId>commons-dbutils</artifactId>  
  5.      <version>1.6</version>  
  6. </dependency>  
  7. <dependency>  
  8.      <groupId>mysql</groupId>  
  9.      <artifactId>mysql-connector-java</artifactId>  
  10.      <version>5.1.29</version>  
  11. </dependency>  
 
3.2 编写MyDbUtils工具类
(1)创建数据表
[sql] view plain copy
 
  1. create database jfyun;  
  2.   
  3. CREATE TABLE `log_info` (  
  4.    `id` int(10) NOT NULL AUTO_INCREMENT,  
  5.    `topdomain` varchar(100) COLLATE latin1_german1_ci DEFAULT NULL,  
  6.    `usetime` varchar(10) COLLATE latin1_german1_ci DEFAULT NULL,  
  7.    `time` datetime DEFAULT NULL,  
  8.    PRIMARY KEY (`id`)  
  9. ) ENGINE=InnoDB AUTO_INCREMENT=1803 DEFAULT CHARSET=latin1 COLLATE=latin1_german1_ci  
(2)MyDbUtils的程序
[java] view plain copy
 
  1. package com.yun.storm.util;  
  2.   
  3. import java.sql.Connection;  
  4. import java.sql.DriverManager;  
  5. import java.sql.ResultSet;  
  6. import java.sql.SQLException;  
  7. import java.util.ArrayList;  
  8. import java.util.Date;  
  9. import java.util.List;  
  10.   
  11. import org.apache.commons.dbutils.BasicRowProcessor;  
  12. import org.apache.commons.dbutils.QueryRunner;  
  13. import org.apache.commons.dbutils.handlers.ArrayListHandler;  
  14.   
  15. public class MyDbUtils {  
  16.       
  17.      private static String className = "com.mysql.jdbc.Driver";  
  18.      private static String url = "jdbc:mysql://192.168.2.20:3306/jfyun?useUnicode=true&characterEncoding=utf-8";  
  19.      private static String user = "root";  
  20.      private static String password = "123";  
  21.      private static QueryRunner queryRunner = new QueryRunner();  
  22.   
  23.      public static final String INSERT_LOG = "insert into log_info(topdomain,usetime,time) values(?,?,?)";  
  24.   
  25.      static{  
  26.           try {  
  27.                Class.forName(className);  
  28.           } catch (ClassNotFoundException e) {  
  29.                e.printStackTrace();  
  30.           }  
  31.      }  
  32.      public static void main(String[] args) throws Exception {  
  33.           String topdomain = "taobao.com";  
  34.           String usetime = "100";  
  35.           String currentTime="1444218216106";  
  36.           MyDbUtils.update(MyDbUtils.INSERT_LOG, topdomain,usetime,currentTime);  
  37.           update(INSERT_LOG,topdomain,usetime,MyDateUtils.formatDate2(new Date(Long.parseLong(currentTime))));  
  38.      }  
  39.      /** 
  40.      * @param conn 
  41.      * @throws SQLException 
  42.      */  
  43.      public static void update(String sql,Object... params) throws SQLException {  
  44.           Connection connection = getConnection();  
  45.           //更新数据  
  46.           queryRunner.update(connection,sql, params);  
  47.           connection.close();  
  48.      }  
  49.       
  50.      public static List<String> executeQuerySql(String sql) {  
  51.            
  52.           List<String> result = new ArrayList<String>();  
  53.           try {  
  54.                List<Object[]> requstList = queryRunner.query(getConnection(), sql,  
  55.                          new ArrayListHandler(new BasicRowProcessor() {  
  56.                               @Override  
  57.                               public <Object> List<Object> toBeanList(ResultSet rs,  
  58.                                         Class<Object> type) throws SQLException {  
  59.                                    return super.toBeanList(rs, type);  
  60.                               }  
  61.                          }));  
  62.                for (Object[] objects : requstList) {  
  63.                     result.add(objects[0].toString());  
  64.                }  
  65.           } catch (SQLException e) {  
  66.                e.printStackTrace();  
  67.           }  
  68.           return result;  
  69.      }  
  70.      /** 
  71.      * @throws SQLException 
  72.      * 
  73.      */  
  74.      public static Connection getConnection() throws SQLException {  
  75.           //获取mysql连接  
  76.           return DriverManager.getConnection(url, user, password);  
  77.      }  
  78. }  
(3)修改storm程序
[java] view plain copy
 
  1. if (m.find()) {  
  2.      url = m.group(1);  
  3.      usetime = m.group(2);  
  4.      currentTime = m.group(3);  
  5.      System.out.println(url + "->" + usetime + "->" + currentTime);  
  6.   
  7.      MyDbUtils.update(MyDbUtils.INSERT_LOG, url, usetime,  
  8.                MyDateUtils.formatDate2(new Date(Long.parseLong(currentTime))));  
  9. }  
 
(4)统计指标
[sql] view plain copy
 
  1. --统计每个url平均响应时间  
  2. SELECT  
  3.   topdomain,  
  4.   ROUND(AVG(usetime) / 1000, 2) avg_use_time  
  5. FROM  
  6.   log_info  
  7. GROUP BY topdomain;  
技术分享图片
 
原文链接:http://blog.csdn.net/shenfuli/article/details/48982687

以上是关于storm 整合 kafka之保存MySQL数据库的主要内容,如果未能解决你的问题,请参考以下文章

Kafka+Storm+HDFS 整合示例

SpringBoot整合Kafka和Storm

SpringBoot整合Kafka和Storm

storm和kafka整合

Kafka+Storm+HDFS整合实践

Flume+Kafka+Storm+Redis 大数据在线实时分析