[项目] 电信数据运营

Posted cxc1357

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[项目] 电信数据运营相关的知识,希望对你有一定的参考价值。

 数据生产

技术图片
  1 import java.io.*;
  2 import java.text.DecimalFormat;
  3 import java.text.ParseException;
  4 import java.text.SimpleDateFormat;
  5 import java.util.*;
  6 
  7 public class ProductLog {
  8     // 存放生产的电话号码
  9     private List<String> phoneList = new ArrayList<String>();
 10     private Map<String, String> phoneNameMap = new HashMap<>();
 11     String startTime = "2020-01-01";
 12     String endTime = "2020-12-31";
 13 
 14     public void initPhone() {
 15         //20个随机电话
 16         phoneList.add("17078388295");
 17         phoneList.add("13980337439");
 18         phoneList.add("14575535933");
 19         phoneList.add("19902496992");
 20         phoneList.add("18549641558");
 21         phoneList.add("17005930322");
 22         phoneList.add("18468618874");
 23         phoneList.add("18576581848");
 24         phoneList.add("15978226424");
 25         phoneList.add("15542823911");
 26         phoneList.add("17526304161");
 27         phoneList.add("15422018558");
 28         phoneList.add("17269452013");
 29         phoneList.add("17764278604");
 30         phoneList.add("15711910344");
 31         phoneList.add("15714728273");
 32         phoneList.add("16061028454");
 33         phoneList.add("16264433631");
 34         phoneList.add("17601615878");
 35         phoneList.add("15897468949");
 36 
 37         //随机电话对应的姓名
 38         phoneNameMap.put("17078388295", "李雁");
 39         phoneNameMap.put("13980337439", "卫艺");
 40         phoneNameMap.put("14575535933", "仰莉");
 41         phoneNameMap.put("19902496992", "陶欣悦");
 42         phoneNameMap.put("18549641558", "施梅梅");
 43         phoneNameMap.put("17005930322", "金虹霖");
 44         phoneNameMap.put("18468618874", "魏明艳");
 45         phoneNameMap.put("18576581848", "华贞");
 46         phoneNameMap.put("15978226424", "华啟倩");
 47         phoneNameMap.put("15542823911", "仲采绿");
 48         phoneNameMap.put("17526304161", "卫丹");
 49         phoneNameMap.put("15422018558", "戚丽红");
 50         phoneNameMap.put("17269452013", "何翠柔");
 51         phoneNameMap.put("17764278604", "钱溶艳");
 52         phoneNameMap.put("15711910344", "钱琳");
 53         phoneNameMap.put("15714728273", "缪静欣");
 54         phoneNameMap.put("16061028454", "焦秋菊");
 55         phoneNameMap.put("16264433631", "吕访琴");
 56         phoneNameMap.put("17601615878", "沈丹");
 57         phoneNameMap.put("15897468949", "褚美丽");
 58     }
 59 
 60     // 生产数据
 61     // caller,callee,buildTime,duration
 62     // 主叫,被叫,通话建立时间,通话持续时间
 63     public String product() {
 64         String caller;
 65         String callee;
 66 
 67         // 生成主叫的随机索引
 68         int callerIndex = (int) (Math.random() * phoneList.size());
 69         // 通过随机索引获得主叫电话号码
 70         caller = phoneList.get(callerIndex);
 71 
 72         while (true) {
 73             int calleeIndex = (int) (Math.random() * phoneList.size());
 74             callee = phoneList.get(calleeIndex);
 75             // 去重判断
 76             if (!caller.equals(callee)) break;
 77         }
 78 
 79         // 随机产生通话建立时间
 80         String buildTime = randomBuildTime(startTime,endTime);
 81 
 82         // 随机产生通话持续时间
 83         DecimalFormat df = new DecimalFormat("0000");
 84         String duration = df.format((int) (30 * 60 * Math.random()));
 85 
 86         StringBuilder sb = new StringBuilder();
 87         sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append(duration);
 88 
 89         return sb.toString();
 90     }
 91 
 92     // 随机生成时间
 93     private String randomBuildTime(String startTime, String endTime) {
 94         try {
 95             SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
 96             Date startDate = sdf1.parse(startTime);
 97             Date endDate = sdf1.parse(endTime);
 98 
 99             // 生成时间字符串
100             if(endDate.getTime() <= startDate.getTime()){return null;}
101 
102             // (结束 - 起始) * 随机[0,1) + 起始
103             long randomTS = startDate.getTime() + (long)((endDate.getTime() - startDate.getTime())*Math.random());
104             Date resultDate = new Date(randomTS);
105             SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
106             String resultTimeString = sdf2.format(resultDate);
107 
108             return resultTimeString;
109         } catch (ParseException e) {
110             e.printStackTrace();
111         }
112         return null;
113     }
114 
115     public void writeLog(String filePath){
116         try {
117             OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(filePath,true), "UTF-8");
118             while(true){
119                 try {
120                     Thread.sleep(500);
121                     String log = product();
122                     System.out.println(log);
123                     osw.write(log+"
");
124                     osw.flush();
125                 } catch (InterruptedException e) {
126                     e.printStackTrace();
127                 } catch (IOException e) {
128                     e.printStackTrace();
129                 }
130             }
131         } catch (UnsupportedEncodingException e) {
132             e.printStackTrace();
133         } catch (FileNotFoundException e) {
134             e.printStackTrace();
135         }
136     }
137 
138     public static void main(String[] args) {
139         args = new String[]{"F:\\idea-workspace\\CT_BD\\data\\calllog.csv"};
140         ProductLog productLog = new ProductLog();
141         productLog.initPhone();
142         productLog.product();
143         productLog.writeLog(args[0]);
144     }
145 }
View Code

producer.sh

#!/bin/bash
java -cp /root/temp/CT_producer-1.0-SNAPSHOT.jar ProductLog /root/temp/calllog.csv

数据消费

  • Flume用于监控目标文件的变化,并把信息传递到Kafka

Flume配置

技术图片
 1 #定义agent名, source、channel、sink的名称
 2 a1.sources = r1
 3 a1.channels = c1
 4 a1.sinks = k1
 5 
 6 #具体定义source
 7 a1.sources.r1.type = exec
 8 a1.sources.r1.command = tail -F -c +0 /root/temp/calllog.csv
 9 a1.sources.r1.shell = /bin/bash -c
10 
11 #具体定义channel
12 a1.channels.c1.type = memory
13 a1.channels.c1.capacity = 1000
14 a1.channels.c1.transactionCapacity = 100
15 
16 #具体定义sink
17 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
18 a1.sinks.k1.brokerList = bigdata111:9092
19 a1.sinks.k1.topic = call
20 a1.sinks.k1.batchSize = 20
View Code
  • 启动kafka生产者:bin/kafka-server-start.sh config/server.properties &
  • 创建主题:bin/kafka-topics.sh --create --zookeeper bigdata111:2181 -replication-factor 1 --partitions 3 --topic calllog
  • 启动kafka消费者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mydemo1 --from-beginning
  • 启动flume:bin/flume-ng agent -c conf/ -n a1 -f /root/temp/flume-kafka.conf
  • 生产数据:sh producer.sh

技术图片

数据存储

  • 将产生的数据实时存储在HBase中
  • 编写调用HBaseAPI的相关方法,将从Kafka中读取出来的数据写入HBase中
  • HBase的描述器:命名空间描述器、表描述器、列族描述器
  • 协处理器:主叫插入f1后,被叫插入f2。修改程序和虚拟机中的 hbase-site.xml  

数据分析

  • 逻辑简单,代码量大
  • 按照时间范围(年月日),统计出所属时间范围内所有手机号码的通话次数总和及通话时长总和
    • 维度:某个视角,如按时间维度,统计2018年全年的通话记录,表示为2018年*月*日
    • 通过Mapper将数据按照不同维度聚合给Reducer
    • 通过Reducer拿到按各个维度聚合过来的数据,汇总输出
    • 将Reducer输出通过outputformat输出到mysql
  • 表结构设计
    • contacts:存放手机号,联系人姓名
    • call:存放某个时间维度下通话次数及通话时长总和
    • dimension_data:存放时间
  • 数据形式:联系人维度,时间维度
电话号码:123456 Chen
年:2020
月:12
日:31
  • HBase-->Mysql
    • Sqoop
    • 自定义输出:map,reducer,outputformat,runner
  • 下载 lombok 插件,并在maven中添加依赖

数据展示

 

 

参考

flume

https://www.freesion.com/article/4812259552/

电信项目

https://blog.csdn.net/liu16659/article/details/81133090?

以上是关于[项目] 电信数据运营的主要内容,如果未能解决你的问题,请参考以下文章

大数据电信客服-数据生产

看看全球十大电信巨头的大数据玩法

电信宽带运营支撑系统

全国超10亿用户!AntDB数据库的电信核心交易替换之路

大数据 电信客服项目

移动联通电信云网获客运营商大数据