大数据电信客服-数据生产

Posted lambda-小张

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据电信客服-数据生产相关的知识,希望对你有一定的参考价值。

目录

一、项目背景

二、项目架构

三、项目实现

1.数据生产

2.数据结构

3.编写代码

在ct.common.bean下

在ct.common.constant类

在ct.producer下

在ct.common.util下

在ct.producer.bean下

在ct.producer.io

四、测试结果


一、项目背景

通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等+。我们以此为背景,寻找一个切入点,学习其中的方法论。当前我们的需求是:统计每天、每月以及每年的每个人的通话次数及时长。

二、项目架构

 

三、项目实现

1.数据生产

此情此景,对于该模块的业务,即数据生产过程,一般并不会让你来进行操作,数据生产是一套完整且严密的体系,这样可以保证数据的鲁棒性。但是如果涉及到项目的一体化方案的设计(数据的产生、存储、分析、展示),则必须清楚每一个环节是如何处理的,包括其中每个环境可能隐藏的问题;数据结构,数据内容可能出现的问题。

2.数据结构

我们将在HBase中存储两个电话号码,以及通话建立的时间和通话持续时间,最后再加上一个flag作为判断第一个电话号码是否为主叫。姓名字段的存储我们可以放置于另外一张表做关联查询,当然也可以插入到当前表中。

列名

解释

举例

call1

第一个手机号码

15369468720

call1_name

第一个手机号码人姓名(非必须)

李雁

call2

第二个手机号码

19920860202

call2_name

第二个手机号码人姓名(非必须)

卫艺

date_time

建立通话的时间

20171017081520

date_time_ts

建立通话的时间(时间戳形式)

duration

通话持续时间(秒)

0600

3.编写代码

思路:

a) 创建Java集合类存放模拟的电话号码和联系人;

b) 随机选取两个手机号码当作“主叫”与“被叫”(注意判断两个手机号不能重复),产出call1call2字段数据;

c) 创建随机生成通话建立时间的方法,可指定随机范围,最后生成通话建立时间,产出date_time字段数据;

d) 随机一个通话时长,单位:秒,产出duration字段数据;

e)将产出的一条数据拼接封装到一个字符串中;

f)使用IO操作将产出的一条通话数据写入到本地文件中;

 新建module项目:

pom.xml文件配置:

project-ct

<modules>
        <module>ct-common</module>
        <module>ct-producer</module>
        <module>ct-consumer</module>
    </modules>
ct-producer
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>lenovo-project-ct</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>ct-producer</artifactId>



    <dependencies>
        <dependency>
            <groupId>org.example</groupId>
            <artifactId>ct-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

在ct.common.bean下

创建Val

package ct.common.bean;
/**
* 值对象接口
* */
public interface Val 
    public void setValue(Object val);
    public Object getValue();

创建Producer

package .ct.common.bean;

import java.io.Closeable;

/**
 * @program: IntelliJ IDEA
 * @description: 生产者接口
 * 
 * @create: 2022-10-18 14:52
 */

public interface Producer extends Closeable 
    public void setIn(DataIn in);
    public void setOut(DataOut out);
    /**
    * 生产数据
    * */
    public void produce();

创建DataIn

package ct.common.bean;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;


/**
 * @program: IntelliJ IDEA
 * @description: ming
 * 
 * @create: 2022-10-18 14:54
 */
public interface DataIn extends Closeable 
    public void setPath(String path);

    public Object read() throws IOException;
    public <T extends Data> List<T> read(Class<T> clazz) throws IOException;

创建DataOut

package ct.common.bean;

import java.io.Closeable;

/**
 * @program: IntelliJ IDEA
 * @description: ming
 * 
 * @create: 2022-10-18 14:54
 */
public interface DataOut extends Closeable 
    public void setPath(String path);

    public void write(Object data) throws Exception;
    public void write(String data) throws Exception;

创建Data

package ct.common.bean;

/**
 * @program: IntelliJ IDEA
 * @description: 数据对象
 * 
 * @create: 2022-10-18 14:57
 */
public abstract class Data implements Val

    public String content;



    public void setValue(Object val) 
        content = (String)val;
    


    public String getValue() 
        return content;
    


在ct.common.constant类

创建Names

import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;

/*
* 名称常量枚举类
* */
public enum Names implements Val 
    NAMESPACE("ct");

    private String name;

    private Names(String name)
        this.name = name;
    




    @Override
    public String Value() 
        return name;
    

在ct.producer下

创建Bootstrap

import java.io.IOException;

/**
 * @program: IntelliJ IDEA
 * @description: 启动对象
 * 
 * @create: 2022-10-18 15:20
 */
public class Bootstrap 
    public static void main(String[] args) throws Exception 

        if(args.length <2 )
            System.out.println("系统参数不正确,请按照指定格式传递:java -jar Produce.jar path1 path2");
            System.out.println(1);
        

        //构建生产者对象
        Producer producer = new LocalFileProducer();

/*        producer.setIn(new LocalFileDataIn("D:\\\\contact.txt"));
        producer.setOut(new LocalFileDataOut("D:\\\\call.log"));*/

        producer.setIn(new LocalFileDataIn(args[0]));
        producer.setOut(new LocalFileDataOut(args[1]));
        //生产数据
        producer.produce();

        //关闭生产对象
        producer.close();

    

contact.txt文件数据自己创建,格式:手机号+tab符号+姓名,call.log是空文件,用于存储通话记录

 

 

在ct.common.util下

创建NumberUtil



import java.text.DecimalFormat;

/**
 * @program: IntelliJ IDEA
 * @description: 数字工具类
 * @
 * @create: 2022-10-19 11:13
 */
public class NumberUtil 
    /**
    * 将数字格式化字符串
    * */
    public static String foramt(int num, int length )
        StringBuilder stringBuilder = new StringBuilder();
        for(int i= 1; i<= length;i++)
            stringBuilder.append("0");
        

        DecimalFormat df = new DecimalFormat(stringBuilder.toString());
        return df.format(num);
    

    public static void main(String[] args) 
        System.out.println(foramt(10,10));
    


创建 DateUtil



import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @program: IntelliJ IDEA
 * @description: 日期工具类
 * @
 * @create: 2022-10-19 11:27
 */
public class DateUtil 
    /**
     * 将指定的日期按照指定的格式,格式化为字符串
     * @param date
     * @param format
     * @return
     */
    public static String format (Date date,String format)
        SimpleDateFormat sdf = new SimpleDateFormat(format);
        return sdf.format(date);
    

    /**
     * 将日期字符串按照指定的格式解析为日期对象
     * @param dateString
     * @param format
     * @return
     */
    public static Date parse(String dateString,String format)
        SimpleDateFormat sdf = new SimpleDateFormat(format);
        Date date = null;
        try 
            date = sdf.parse(dateString);
         catch (ParseException e) 
            e.printStackTrace();
        
        return date;
    

在ct.producer.bean下

创建LocalFileProducer



import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Random;

/**
 * @program: IntelliJ IDEA
 * @description: 本地数据文件生产者
 * @author: 
 * @create: 2022-10-18 15:24
 */
public class LocalFileProducer implements Producer 
    private DataIn in;
    private DataOut out;
    private volatile boolean flg = true;

    public void setIn(DataIn in) 
        this.in = in;
    
    public void setOut(DataOut out) 
        this.out = out;
    

    /**
     * 生产数据
     */
    public void produce() 


        try 
            //读取通讯录数据
            List<Contact> contacts = in.read(Contact.class);
            for(Contact contact : contacts)
                System.out.println(contact);
            
            while (flg)

                //从通讯录中随机查找2个电话号码(主叫,被叫)
                /**区别
                 * Math.random()类型是boolean,没有规律
                 * Random类型是int,有规律
                 * */
                int call1Index = new Random().nextInt(contacts.size());
                int call2Index;
                while (true)
                    call2Index = new Random().nextInt(contacts.size());
                    if(call1Index != call2Index)
                        break;
                    
                
                Contact call1 = contacts.get(call1Index);
                Contact call2 = contacts.get(call2Index);

                //生产随机的通话时间
                String startDate = "20210101000000";
                String endDate = "20220101000000";

                long startTime = DateUtil.parse(startDate,"yyyyMMddHHmmss").getTime();
                long endTime = DateUtil.parse(endDate,"yyyyMMddHHmmss").getTime();

                //通话时间
                long calltime = startTime + (long)((endTime - startTime) * Math.random());
                //通话时间字符串
                String callTimeString = DateUtil.format(new Date(calltime),"yyyyMMddHHmmss");

                //生产随机的通话时长
                String duration = NumberUtil.foramt(new Random().nextInt(3000), 4);


                //生产随机的通话记录
                Calllog log = new Calllog(call1.getTel(),call2.getTel(),callTimeString,duration);

                System.out.println(log);
                //将通话记录刷写到数据文件中
                out.write(log);

                //500ms=0.5s
                Thread.sleep(500);
            
         catch (Exception e) 
            e.printStackTrace();
        



    
    /*
    * 关闭生产者
    * */
    public void close() throws IOException 
        if(in != null)
            in.close();
        
        if(out != null)
            out.close();
        
    

创建Contact



/**
 * @program: IntelliJ IDEA
 * @description: 联系人
 * 
 * @create: 2022-10-18 21:13
 */
public class Contact extends Data 
    private String tel;
    private String name;

    public String getTel() 
        return tel;
    

    public void setTel(String tel) 
        this.tel = tel;
    

    public String getName() 
        return name;
    

    public void setName(String name) 
        this.name = name;
    
    public void setValue(Object val)
        content = (String)val;
        String[] values = content.split("\\t");
        setName(values[1]);
        setTel(values[0]);
    
    public String toString()
        return "Contact["+tel+","+name+"]";
    

创建Calllog



/**
 * @program: IntelliJ IDEA
 * @description: ming
 * 
 * @create: 2022-10-19 11:46
 */
public class Calllog 
    private String call1;
    private String call2;
    private String calltime;
    private String duration;

    @Override
    public String toString() 
        return call1+"\\t"+call2+"\\t"+calltime+"\\t"+duration;
    

    public Calllog(String call1, String call2, String calltime, String duration) 
        this.call1 = call1;
        this.call2 = call2;
        this.calltime = calltime;
        this.duration = duration;
    

    public String getCall1() 
        return call1;
    

    public void setCall1(String call1) 
        this.call1 = call1;
    

    public String getCall2() 
        return call2;
    

    public void setCall2(String call2) 
        this.call2 = call2;
    

    public String getCalltime() 
        return calltime;
    

    public void setCalltime(String calltime) 
        this.calltime = calltime;
    

    public String getDuration() 
        return duration;
    

    public void setDuration(String duration) 
        this.duration = duration;
    

 

在ct.producer.io

创建LocalFileDataIn



import java.io.*;
import java.util.ArrayList;
import java.util.List;

/**
 * @program: IntelliJ IDEA
 * @description: 本地文件数据输入
 * 
 * @create: 2022-10-18 20:17
 */
public class LocalFileDataIn implements DataIn 
    private BufferedReader reader = null;

    public LocalFileDataIn(String path)
        setPath(path);
    

    public void setPath(String path) 
        try 
            reader = new BufferedReader(new InputStreamReader(new FileInputStream(path),"gbk"));
         catch (UnsupportedEncodingException e) 
            e.printStackTrace();
         catch (FileNotFoundException e) 
            e.printStackTrace();
        
    

    public Object read() throws IOException 
        return null;
    
    /**
    * 读取数据,返回数据集合
    * */
    public <T extends Data> List<T> read(Class<T> clazz) throws IOException 

        List<T> ts = new ArrayList<T>();
        try 
            //从数据文件中读取所有的数据
            String line = null;
            while ((line = reader.readLine()) != null)
                //将数据转换为指定类型的对象,封装为集合返回
                T t = clazz.newInstance();
                t.setValue(line);
                ts.add(t);
            
        catch (Exception e)
            e.printStackTrace();
        




        return ts;
    

    /*
    * 关闭资源*/
    public void close() throws IOException 
        if(reader != null)
            reader.close();
        
    

创建LocalFileDataOut



import java.io.*;

/**
 * @program: IntelliJ IDEA
 * @description: 本地文件数据输出
 * 
 * @create: 2022-10-18 20:18
 */
public class LocalFileDataOut implements DataOut 
    private PrintWriter writer = null;

    public LocalFileDataOut(String path)
        setPath(path);
    

    public void setPath(String path) 
        try 
            writer = new PrintWriter(new OutputStreamWriter(new FileOutputStream(path),"utf-8"));
         catch (UnsupportedEncodingException e) 
            e.printStackTrace();
         catch (FileNotFoundException e) 
            e.printStackTrace();
        
    

    public void write(Object data) throws Exception 
        write(data.toString());
    

    /***
     * 将数据字符串生成到文件中
     * @param data
     * @throws Exception
     */
    public void write(String data) throws Exception 
        writer.println(data);
        writer.flush();
    

    /**
     * 释放资源
     * @throws IOException
     */
    public void close() throws IOException 
        if(writer != null)
            writer.close();
        
    

四、测试结果

1.控制台输出

 2.文件中查看

3. 打包jar

 

然后在项目里面查看jar包

 

 并且把jar包和contact.txt通讯录导入到cd /opt/module/data/

[root@hadoop01 data]# ll
total 320
-rw-r--r--. 1 root root   1992 Sep 22 22:31 contact.txt
-rw-r--r--. 1 root root  14870 Sep 22 22:41 ct-producer.jar

4.在Linux中进行测试

[root@hadoop01 data]# java -jar ct-producer.jar /opt/module/data/contact.txt /opt/module/data/call.log 

以上是关于大数据电信客服-数据生产的主要内容,如果未能解决你的问题,请参考以下文章

大数据开发实战系列之电信客服

大数据视频又来啦!Scala全套+电信客服案例

答疑分享097:数据可视化-生产情况一览

[项目] 电信数据运营

图像检索在高德地图POI数据生产中的应用

单表千亿电信大数据场景,使用Spark+CarbonData替换Impala案例