Trident中的解析包含的函数操作与投影操作
Posted juncaoit
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Trident中的解析包含的函数操作与投影操作相关的知识,希望对你有一定的参考价值。
一:函数操作
1.介绍
Tuple本身是不可变的
Function只是在原有的基础上追加新的tuple
2.说明
如果原来的字段是log,flag
新增之后的tuple可以访问这些字段,log,flag,orderId,orderAmt,memberId
3.先写驱动类
增加了解析
然后再将解析的日志进行打印
1 package com.jun.trident; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.generated.AlreadyAliveException; 7 import backtype.storm.generated.InvalidTopologyException; 8 import backtype.storm.tuple.Fields; 9 import backtype.storm.tuple.Values; 10 import storm.trident.TridentTopology; 11 import storm.trident.operation.Function; 12 import storm.trident.operation.TridentCollector; 13 import storm.trident.operation.TridentOperationContext; 14 import storm.trident.testing.FixedBatchSpout; 15 import storm.trident.tuple.TridentTuple; 16 17 import java.util.Map; 18 19 public class TridentDemo { 20 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 21 TridentTopology tridentTopology=new TridentTopology(); 22 //模拟数据 23 Fields field=new Fields("log","flag"); 24 FixedBatchSpout spout=new FixedBatchSpout(field,5, 25 new Values("168.214.187.214 - - [1481953616092] "GET /view.php HTTP/1.1" 200 0 "http://cn.bing.com/search?q=spark mllib" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","A"), 26 new Values("168.187.202.202 - - [1481953537038] "GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)" "-"","A"), 27 new Values("61.30.167.187 - - [1481953539039] "GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (Khtml, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","A"), 28 new Values("30.29.132.190 - - [1481953544042] "GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53" "-"","B"), 29 new Values("222.190.187.201 - - [1481953578068] "GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","B"), 30 new Values("72.202.43.53 - - [1481953579069] "GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","B") 31 ); 32 //多次循环 33 spout.setCycle(true); 34 //提交 35 Config config=new Config(); 36 tridentTopology.newStream("orderAnalyse",spout) 37 //过滤 38 .each(new Fields("log"),new ValidLogFilter()) 39 //解析 40 .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId")) 41 //不添加log了,日志太长,不方便看控制台的现象 42 .each(new Fields("flag","orderId","orderTime","orderAmtStr","memberId"),new PrintFilter()); 43 if(args==null || args.length<=0){ 44 LocalCluster localCluster=new LocalCluster(); 45 localCluster.submitTopology("tridentDemo",config,tridentTopology.build()); 46 }else { 47 config.setNumWorkers(2); 48 StormSubmitter.submitTopology(args[0],config,tridentTopology.build()); 49 } 50 } 51 }
4.解析方法类
1 package com.jun.trident; 2 3 import backtype.storm.tuple.Values; 4 import storm.trident.operation.Function; 5 import storm.trident.operation.TridentCollector; 6 import storm.trident.operation.TridentOperationContext; 7 import storm.trident.tuple.TridentTuple; 8 9 import java.util.HashMap; 10 import java.util.Map; 11 12 public class LogParserFunction implements Function { 13 @Override 14 public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) { 15 //function对传进来的tuple子集 16 String log=tridentTuple.getStringByField("log"); 17 //解析 18 int splitIndex=log.indexOf("IBEIfeng.gif")+13; 19 String orderInfo=log.substring(splitIndex).split(" ")[0]; 20 String[] kvs=orderInfo.split("\\&"); 21 Map<String,String> orderInfos=new HashMap<>(); 22 for(String kv:kvs){ 23 String[] keyValues=kv.split("="); 24 if(keyValues.length==2){ 25 orderInfos.put(keyValues[0],keyValues[1]); 26 } 27 } 28 String orderId=getValue(orderInfos,"order_id","");//注意这个在日志中是这个字段 29 String orderTime=getValue(orderInfos,"orderTime",""); 30 String orderAmtStr=getValue(orderInfos,"orderAmt",""); 31 String memberId=getValue(orderInfos,"memberId",""); 32 tridentCollector.emit(new Values(orderId,orderTime,orderAmtStr,memberId)); 33 } 34 35 @Override 36 public void prepare(Map map, TridentOperationContext tridentOperationContext) { 37 38 } 39 40 @Override 41 public void cleanup() { 42 43 } 44 45 public String getValue(Map<String,String> map,String key,String defaultValue){ 46 if(map.containsKey(key)){ 47 return map.get(key); 48 }else{ 49 return defaultValue; 50 } 51 } 52 }
5.效果
二:投影操作
1.说明
可以对tuple进行裁剪操作。
2.驱动类
先投影
然后打印
1 package com.jun.trident; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.generated.AlreadyAliveException; 7 import backtype.storm.generated.InvalidTopologyException; 8 import backtype.storm.tuple.Fields; 9 import backtype.storm.tuple.Values; 10 import storm.trident.TridentTopology; 11 import storm.trident.operation.Function; 12 import storm.trident.operation.TridentCollector; 13 import storm.trident.operation.TridentOperationContext; 14 import storm.trident.testing.FixedBatchSpout; 15 import storm.trident.tuple.TridentTuple; 16 17 import java.util.Map; 18 19 public class TridentDemo { 20 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 21 TridentTopology tridentTopology=new TridentTopology(); 22 //模拟数据 23 Fields field=new Fields("log","flag"); 24 FixedBatchSpout spout=new FixedBatchSpout(field,5, 25 new Values("168.214.187.214 - - [1481953616092] "GET /view.php HTTP/1.1" 200 0 "http://cn.bing.com/search?q=spark mllib" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","A"), 26 new Values("168.187.202.202 - - [1481953537038] "GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)" "-"","A"), 27 new Values("61.30.167.187 - - [1481953539039] "GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","A"), 28 new Values("30.29.132.190 - - [1481953544042] "GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53" "-"","B"), 29 new Values("222.190.187.201 - - [1481953578068] "GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","B"), 30 new Values("72.202.43.53 - - [1481953579069] "GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","B") 31 ); 32 //多次循环 33 spout.setCycle(true); 34 //提交 35 Config config=new Config(); 36 tridentTopology.newStream("orderAnalyse",spout) 37 //过滤 38 .each(new Fields("log"),new ValidLogFilter()) 39 //解析 40 .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId")) 41 //投影 42 .project(new Fields("orderId","orderTime","orderAmtStr","memberId")) 43 .each(new Fields("orderId","orderTime","orderAmtStr","memberId"),new PrintFilter()) 44 45 ; 46 if(args==null || args.length<=0){ 47 LocalCluster localCluster=new LocalCluster(); 48 localCluster.submitTopology("tridentDemo",config,tridentTopology.build()); 49 }else { 50 config.setNumWorkers(2); 51 StormSubmitter.submitTopology(args[0],config,tridentTopology.build()); 52 } 53 } 54 }
3.效果
三:解析
1.说明
这个部分在解析之后,进入分流阶段。涉及到全局分流与局部分流,现在先使用全局分流,进行打印验证。
2.聚合操作
部分删除,部分增加
3.驱动类
1 package com.jun.trident; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.StormSubmitter; 6 import backtype.storm.generated.AlreadyAliveException; 7 import backtype.storm.generated.InvalidTopologyException; 8 import backtype.storm.tuple.Fields; 9 import backtype.storm.tuple.Values; 10 import storm.trident.Stream; 11 import storm.trident.TridentState; 12 import storm.trident.TridentTopology; 13 import storm.trident.operation.Function; 14 import storm.trident.operation.TridentCollector; 15 import storm.trident.operation.TridentOperationContext; 16 import storm.trident.operation.builtin.Count; 17 import storm.trident.testing.FixedBatchSpout; 18 import storm.trident.testing.MemoryMapState; 19 import storm.trident.tuple.TridentTuple; 20 21 import java.util.Map; 22 23 public class TridentDemo { 24 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 25 TridentTopology tridentTopology=new TridentTopology(); 26 //模拟数据 27 Fields field=new Fields("log","flag"); 28 FixedBatchSpout spout=new FixedBatchSpout(field,5, 29 new Values("168.214.187.214 - - [1481953616092] "GET /view.php HTTP/1.1" 200 0 "http://cn.bing.com/search?q=spark mllib" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","A"), 30 new Values("168.187.202.202 - - [1481953537038] "GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)" "-"","A"), 31 new Values("61.30.167.187 - - [1481953539039] "GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","A"), 32 new Values("30.29.132.190 - - [1481953544042] "GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53" "-"","B"), 33 new Values("222.190.187.201 - - [1481953578068] "GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","B"), 34 new Values("72.202.43.53 - - [1481953579069] "GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","B") 35 ); 36 //多次循环 37 spout.setCycle(true); 38 //流处理 39 Stream stream=tridentTopology.newStream("orderAnalyse",spout) 40 //过滤 41 .each(new Fields("log"),new ValidLogFilter()) 42 //解析 43 .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId")) 44 //投影 45 .project(new Fields("orderId","orderTime","orderAmtStr","memberId")) 46 //时间解析 47 .each(new Fields("orderTime"),new DateTransFormerFunction(),new Fields("day","hour","minter")) 48 ; 49 //分流 50 //1.基于minter统计订单数量,分组统计 51 TridentState state=stream.groupBy(new Fields("minter")) 52 //全局聚合,使用内存存储状态信息 53 .persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("orderNumByMinter")); 54 state.newValuesStream().each(new Fields("minter","orderNumByMinter"),new PrintFilter()); 55 56 //提交 57 Config config=new Config(); 58 if(args==null || args.length<=0){ 59 LocalCluster localCluster=new LocalCluster(); 60 localCluster.submitTopology("tridentDemo",config,tridentTopology.build()); 61 }else { 62 config.setNumWorkers(2); 63 StormSubmitter.submitTopology(args[0],config,tridentTopology.build()); 64 } 65 } 66 }
4.时间解析方法
1 package com.jun.trident; 2 3 4 import backtype.storm.tuple.Values; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 8 import storm.trident.operation.Function; 9 import storm.trident.operation.TridentCollector; 10 import storm.trident.operation.TridentOperationContext; 11 import storm.trident.tuple.TridentTuple; 12 13 import java.text.DateFormat; 14 import java.text.SimpleDateFormat; 15 import java.util.Date; 16 import java.util.Map; 17 18 public class DateTransFormerFunction implements Function { 19 private static final Logger logger = LoggerFactory.getLogger(DateTransFormerFunction.class); 20 @Override 21 public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) { 22 String orderTime=tridentTuple.getStringByField("orderTime"); 23 // 处理时间 24 try { 25 long timestamp = Long.parseLong(orderTime); 26 Date date = new Date(); 27 date.setTime(timestamp); 28 DateFormat df = new SimpleDateFormat("yyyyMMddHHmm"); 29 String dateStr = df.format(date); 30 String day = dateStr.substring(0,8); 31 String hour = dateStr.substring(0,10); 32 String minute = dateStr ; 33 //发射 34 tridentCollector.emit(new Values(day,hour,minute)); 35 }catch (Exception e){ 36 logger.error("日期解析出错"+orderTime); 37 } 38 } 39 40 @Override 41 public void prepare(Map map, TridentOperationContext tridentOperationContext) { 42 43 } 44 45 @Override 46 public void cleanup() { 47 48 } 49 }
5.效果
以上是关于Trident中的解析包含的函数操作与投影操作的主要内容,如果未能解决你的问题,请参考以下文章