电信采集之数据处理

Posted shuilifang815

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了电信采集之数据处理相关的知识,希望对你有一定的参考价值。

该项目的实质就是就是将3A服务器记录下来的日志封装成对象保存在数据库中

一、需求分析
1.1需求概述
所谓电信采集指的是3A服务器通过中心处理系统对一个记录了用户上线、下线等用户信息的文件的一个采集,将获取到的信息传送到数据库中进行持久化保存,并将为传输到数据库中的信息进行备份。
1.2需求分析
将一段时间内采集到的数据记录为数据清单,记作t_detail_x,通过中心处理系统对数据清单中的数据进行分析,根据七上八下的原则将数据分为完全信息和不完全信息(只包含7的即只有上线记录的为不完全信息,包含7又包含8的即有上线记录又有下线记录的为完全信息),将处理过的信息传送到3A服务器,即客户端将用户用网情况发送到服务器端,并将信息进行相应的备份。
以下 是我根据自己的理解所画的流程图:
技术分享图片
 
1.3项目中所使用的jar包
项目中所使用的jar包,以下jar包的功能分别是:解析xml文件、提供日志记录、jdbc的jar包、为项目提供接口
技术分享图片
二、项目各模块的具体实现
2.1数据采集模块
需要采集的数据格式如下,将1000条数据封装在temp.txt中,在进行数据采集的时候读取temp.txt文件。
#briup1660|037:wKgB1660A|7|1239110900|44.211.221.247
#briup4418|037:wKgB4418A|7|1239138480|251.196.223.191
#|037:wKgB1660A|8|1239203860|44.211.221.247
#briup1247|037:wKgB1247A|7|1239106770|22.7.202.75
#briup3288|037:wKgB3288A|7|1239127180|240.144.42.68
#|037:wKgB1247A|8|1239176602|22.7.202.75
#briup8258|037:wKgB8258A|7|1239176880|90.203.198.194
#briup2391|037:wKgB2391A|7|1239118210|161.43.86.232
进行数据采集时所遇到的问题:
      1.如何读写数据
        使用缓存流对数据进行读写
      2.如何封装数据
         将完全信息保存在list集合中,将不完全信息保存在map集合中,以BIDR对象的形式进行保存
      3.所采集到的数据分为两种情况
         第一种:用户上线但是用户还没有下线
         第二种:用户上下线信息都有
        针对这两种情况的解决方式:
         第一种:进行数据的备份,在下一次采集的时候将数据重新读出来,因为用户在下一次采集的时候可能会下线
         第二种:直接封装成对象传给服务器端
package com.briup.woss.client;

import java.io.*;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.briup.util.BIDR;
import com.briup.util.BackUP;
import com.briup.util.Configuration;
import com.briup.woss.ConfigurationAWare;

public class GatherImpl implements Gather,ConfigurationAWare{
    String pathName;
    String pathName1="src/com/briup/woss/File/map.txt";
    //list存入完整数据
    static List<BIDR> list=new ArrayList<BIDR>();
   //存储不完整数据Map<IP,BIDR>
    static Map<String,BIDR> map=new HashMap();
    Configuration conf=null;
    @Override
    public void init(Properties p) {
        // TODO Auto-generated method stub
        pathName=(String) p.get("src-file");
    }

    @SuppressWarnings("unchecked")
    @Override
    public Collection<BIDR> gather() throws Exception {
        // TODO Auto-generated method stub
        BackUP bi=conf.getBackup();
        //加载备份文件
        Map<String, BIDR> newMap=(Map<String,BIDR>)bi.load(pathName1, BackUP.LOAD_REMOVE);
        if(newMap!=null){
            map.putAll(newMap);
        }
        //1.读temp.txt,成功的标志是打印到控制台
        BufferedReader br=new BufferedReader(new FileReader(new File(pathName)));
        BIDR bidr=new BIDR();
        String str="";
        while((str=br.readLine())!=null){
            //System.out.println(str);
            String[] line=str.split("[|]");
                //System.out.println(line.length);
                if(line[2].equals("7")){
                    bidr=new BIDR();
                    bidr.setAAA_login_name(line[0].substring(1));
                    bidr.setNAS_ip(line[1]);
                    Long login_date=Long.parseLong(line[3]);
                    Timestamp login_time=new Timestamp(login_date*1000);
                    bidr.setLogin_date(login_time);
                    bidr.setLogin_ip(line[4]);    
                    //保存不完整信息
                    map.put(line[4],bidr);
                    }else if(line[2].equals("8")){
                        BIDR bidr1 = map.get(line[4]);
                        if(bidr1!=null){
                            //设置用户下线时长 
                            Long logout_date=Long.parseLong(line[3]);
                            Timestamp logout_time=new Timestamp(logout_date*1000);
                            bidr1.setLogout_date(logout_time);
                            //计算出用户在线时长
                            Integer time_deration=(int) (logout_date - (bidr1.getLogin_date().getTime())/1000);
                            bidr1.setTime_deration(time_deration);
                            //完整数据存入list
                            list.add(bidr1);
                            map.remove(line[4]);
                        }    
                        
                    }
        }
        bi.store(pathName1, map, bi.STORE_OVERRIDE);
        System.out.println("备份的不完整数据为:"+map.size());
        br.close();
        System.out.println("list="+list.size());
        return list;
        
        
    }
    /*public static void main(String[] args) throws Exception {
        new GatherImpl().gather();
        for(BIDR bidr:list){
            System.out.println(bidr.getAAA_login_name()+","+bidr.getLogin_ip()+" "+bidr.getNAS_ip()+" "+bidr.getTime_deration()+","+bidr.getLogin_date()+","+bidr.getLogout_date());
        }
        System.out.println("完整的信息为"+list.size());
        System.out.println("不完整信息:"+map.size());
    }
*/
    @Override
    public void setConfiguration(Configuration co) {
        // TODO Auto-generated method stub
        this.conf=co;
    }

}
2.2备份模块

负责备份一些没有处理完的数据,所谓没有处理完的数据,我的理解是没有下线记录的

不完全信息以及尚未保存到数据库中的完全信息

进行数据的备份需要实现的方法:

Object load(String key, boolean flag)

store(String key, Object date, boolean flag)

在进行数据备份时所遇到的问题:

1.如何实现数据的备份

2.如何读取文件中的备份文件

3.在使用load()和store()这两个方法的时候,是追加还是覆盖之前的数据 

   因为数据是隔一段时间进会进行更新的,某一个用户当前没有下线,可能再过一段时间就下线了,因此是覆盖之前的数据

package com.briup.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
import java.util.Properties;

public class BackUPImpl implements BackUP {

    String filePath;
    Object object = null;
    private static List<BIDR> list;

    @Override
    public void init(Properties p) {
        // TODO Auto-generated method stub

    }

    // 通过键名获取已经备份的数据 key备份数据的键
    @Override
    public Object load(String key, boolean flag) throws Exception {
        // TODO Auto-generated method stub
        File file = new File(filePath, key);
        // file是否为空
        if (file.exists() && file.length() != 0) {
            ObjectInputStream ois = new ObjectInputStream(new FileInputStream(file));
            object = ois.readObject();
        }
        return object;

    }

    // 通过键名存储数据,key-备份数据的键,date-需要备份的数据,flag如果键值已经存在数据,追加还是覆盖之前的数据
    // 文件路径key
    @Override
    public void store(String key, Object date, boolean flag) throws Exception {
        // TODO Auto-generated method stub
        // 通过key找备份文件,键值就是文件名
        // 接收到完整数据
        File file = new File(key);
        if(file.exists()){
        ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(file, flag));
        oos.writeObject(date);
        oos.flush();
        oos.close();
        
    }
    }

}
2.3网络模块

负责将采集好的数据发给服务器

需要实现的方法是send(Collection<BIDR> collection)

在此过程中遇到的问题是

1、如何建立连接

使用socket建立连接 

2、如何保存数据

使用对象流的形式读写数据

3、所保存的数据是完全信息还是不完全信息

客户端向服务器端进行保存的数据是完全信息

package com.briup.woss.client;

import java.io.*;
import java.net.Socket;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;

import com.briup.util.BIDR;
import com.briup.util.Configuration;
import com.briup.woss.ConfigurationAWare;

public class ClientImpl implements Client{
   private static String id;
   private static int port;
  
    
    private static ObjectOutputStream oo;

    @Override
    public void init(Properties p) {
        // TODO Auto-generated method stub
        id=p.getProperty("id");//通过将依赖注入的方式将配置信息注入到该模块中
        port=Integer.parseInt(p.getProperty("port"));    
    }
    
     
    @Override
    public void send(Collection<BIDR> collection) throws Exception {
        //创建Socket指定ip地址端口号
        Socket socket=new Socket(id,port);
        //数据流
        OutputStream os=socket.getOutputStream();
        oo=new ObjectOutputStream(os);//将数据保存在对象流中,保存的是完全信息
        oo.writeObject(collection);
        System.out.println(collection);
        System.out.println(collection.size());
        oo.flush();
        oo.close();
        socket.close();
        
        
    // TODO Auto-generated method stub
    //1.id+port
    //2.数据流
        

    }


    
    }

2.4服务器端模块

作用:负责接收客户端传过来的数据

需要实现的方法:Collection<BIDR> revicer()

 

package com.briup.woss.Server;
import java.io.*;
import java.net.*;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import com.briup.util.BIDR;
import com.briup.woss.server.Server;

public class ServrImpl implements Server{
    private ServerSocket ss;
    private int port;
    private ObjectInputStream oi;
    

    @Override
    public void init(Properties p) {
        // TODO Auto-generated method stub
        System.out.println(p.get("port"));
         port=Integer.parseInt((String) p.get("port"));
         System.out.println(port);
    
    }
    
    @SuppressWarnings("unchecked")
    @Override
    public Collection<BIDR> revicer() throws Exception {
        
        // TODO Auto-generated method stub
        //1.port
        //数据流
        //1)创建ServerSocket
        ss=new ServerSocket(port);
        System.out.println("服务器已启动,请连接·····");
        while(true){
        Socket s=ss.accept();
        oi=new ObjectInputStream(s.getInputStream());
        List<BIDR> c=(List<BIDR>)oi.readObject();
        System.out.println("连接成功");
        return c;
        }
        
        
    }

    @Override
    public void shutdown() {
        // TODO Auto-generated method stub
        
    }
    

    

}

 

2.5数据的持久化保存
所谓数据的持久化保存,就是将完全信息进行入库
进行数据入库的时候遇到的问题
1.将信息以什么样的形式进行保存
将信息封装成BIDR对象,保存至集合中去
2.如何将完全信息和不完全信息区分保存
将完全信息保存在list集合中,对完全信息进行持久化保存,对不完全信息进行一个数据的备份,为了将数据的备份直观的表现出来,在进行数据的入库时设置一个异常,将未进行入库的完全信息进行一个备份,在该程序中,当第200条数据进行入库的时候,不能继续入库,将未进行入库的完全信息的备份到list.txt
3.是用什么样的方式来实现数据的持久化保存
因为需要实现的仅是电信采集这个大项目的一个小功能,加上mybatis用的不是很熟练,我用的是jdbc来实现数据的持久化保存。在使用jdbc进行数据的持久化保存的时候,因为Connection最多常见300个prepareStatement,将插入sql语句ps = connection.prepareStatement(sql),放在下面的for (BIDR bidr : collction)里面遍历,由于遍历次数过多,会出现超出游标最大值异常,因为读取的是一个月电信采集的信息,因此我将ps=connection.prepareStatement(sql)放在上面的for循环里面遍历,通过遍历天数,将sql语句插入,与for (BIDR bidr : collction)获取到的天数相对应,进而将信息存储到数据库中
package com.briup.woss.Server;


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import com.briup.util.BIDR;
import com.briup.util.BackUP;
import com.briup.util.Configuration;
import com.briup.util.Logger;
import com.briup.woss.ConfigurationAWare;
import com.briup.woss.server.DBStore;

public class DBStoreImpl implements DBStore,ConfigurationAWare{
    /*
     * driver=oracle.jdbc.driver.OracleDriver
     * url=jdbc:oracle:thin:@localhost:1521:XE username=oracle password=orcle
     */
    private static String driver;
    private static String url;
    private static String username;
    private static String password;
    private static Connection connection;
    private String aaa_login_name;
    private String login_ip;
    private java.sql.Date login_date;
    private java.sql.Date logout_date;
    private String nas_ip;
    private String time_duration;
    private int i_date;
    private static PreparedStatement[] ps = new PreparedStatement[31];;
    String sql;
    static int i = 0;
    private static String pathName = "src/com/briup/woss/File/list.txt";
    Configuration conf=null;
    // 存放完整数据
    static List<BIDR> list = new ArrayList<>();
    @Override
    public void init(Properties p) {
        driver=p.getProperty("driver");
        url=p.getProperty("url");
        username=p.getProperty("username");
        password=p.getProperty("password");
    }
    @Override
    public void saveToDB(Collection<BIDR> collction) throws Exception {
        BackUP bi=conf.getBackup();
        Logger log=conf.getLogger();
        try {
            // 注册驱动
            Class.forName(driver);
            // 创建连接
            connection = DriverManager.getConnection(url,username,password);
            System.out.println(connection);
            for (int i = 0; i < 31; i++) {

                sql = "insert into t_detail_" + i
                        + "(aaa_login_name,login_ip,login_date,logout_date,nas_ip,time_duration) "
                        + "values(?,?,?,?,?,?)";
                ps[i] = connection.prepareStatement(sql);
            }
            //connection最多创建300个prepareStatement
            //将ps = connection.prepareStatement(sql);放在下面的for (BIDR bidr : collction)里面遍历,由于遍历次数过多,会出现超出游标最大值异常
            //遍历31次,与下面遍历bidr对象时,存储信息时的ps相对应
            for (BIDR bidr : collction) {
                Timestamp login_d = bidr.getLogin_date();
                String s_date = login_d.toString();
                String[] str1 = s_date.split(" ");
                String[] str2 = str1[0].split("-");
                i_date = Integer.parseInt(str2[2]);
                aaa_login_name = bidr.getAAA_login_name();
                login_ip = bidr.getLogin_ip();
                login_date = new java.sql.Date(bidr.getLogin_date().getTime());
                logout_date = new java.sql.Date(bidr.getLogout_date().getTime());
                // 通过PreparedStatement将信息存储到数据库中
                ps[i_date].setString(1, aaa_login_name);
                ps[i_date].setString(2, login_ip);
                ps[i_date].setDate(3, login_date);
                ps[i_date].setDate(4, logout_date);
                ps[i_date].setString(5, nas_ip);
                ps[i_date].setString(6, time_duration);
                // 执行sql
                ps[i_date].executeUpdate();
                i++;
                if (i == 200) {
                    i = 1 / 0;
                }
                list.add(bidr);
                System.out.println("插入数据成功!");
            }

            log.info("入库数据的个数" + list.size());
            collction.removeAll(list);
            log.info("未入库数据的个数" + collction.size());

        } catch (Exception e) {
             connection.rollback();
            bi.store(pathName, list, BackUP.STORE_OVERRIDE);//对象.字段会出现黄色波浪线,可以改成类名.字段
            log.debug("备份数据为"+list.size());
            log.debug("未入库数据的个数" + collction.size());
            log.debug("插入个数为:"+i);
            
        }

    }

    /*public static void main(String[] args) throws Exception {
        GatherImpl ga = new GatherImpl();
        new DBStoreImpl().saveToDB(ga.gather());
        System.out.println("备份数据的个数为:" + list.size());
        System.out.println(i);
    }*/
    @Override
    public void setConfiguration(Configuration co) {
        // TODO Auto-generated method stub
        this.conf=co;
        
    }

}

3.6公共配置模块

需要实现的方法

BackUP getBackup()

Client getClient()

DBStore getDBStore()

Gather getGather()

Logger getLogger()

Server getServer()

1、如何实现日志记录

2、了解日志级别

日志级别一共有五种,级别由高到低依次是:fatal、error、warn、info、debug

3、如何将日志输出到控制台和指定文件中去

在项目中,不能通过new来创建对象,要通过Configurtion来获取

package com.briup.util;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.dom4j.*;
import org.dom4j.io.SAXReader;

import com.briup.woss.ConfigurationAWare;
import com.briup.woss.WossModule;
import com.briup.woss.client.Client;
import com.briup.woss.client.Gather;
import com.briup.woss.server.DBStore;
import com.briup.woss.server.Server;

public class ConfigurationImpl implements Configuration{
    String filePath="src/com/briup/woss/File/conf.xml";
    //存入woss模块对象
    Map<String,WossModule> wossMap=new HashMap<>();
    //存放配置信息
    Properties pro=new Properties();
    @Override
    public BackUP getBackup() throws Exception {
        // TODO Auto-generated method stub
        return (BackUP) wossMap.get("backup");
    }

    @Override
    public Client getClient() throws Exception {
        // TODO Auto-generated method stub
        return (Client) wossMap.get("client");
    }

    @Override
    public DBStore getDBStore() throws Exception {
        // TODO Auto-generated method stub
        return (DBStore) wossMap.get("dbstore");
    }
    

    public static void main(String[] args) throws Exception {
        new ConfigurationImpl().getDBStore();
    }
    
    @Override
    public Gather getGather() throws Exception {
        // TODO Auto-generated method stub
        return (Gather) wossMap.get("gather");
    }
    

    @Override
    public Logger getLogger() throws Exception {
        // TODO Auto-generated method stub
        return (Logger) wossMap.get("logger");
    }


    @Override
    public Server getServer() throws Exception {
        return (Server) wossMap.get("server");
    }
    

    public ConfigurationImpl() {
        try {
        //1.获取解析器。读取conf.xml
           //创建SAXReader读取器,专门用于读取xml
        SAXReader saxReader=new SAXReader();
        //2.获取根节
            Document document=saxReader.read(filePath);
            
            Element rootElement=document.getRootElement();

            //3.获取子节点--属性值
            List elements=rootElement.elements();
            for(Object object:elements){
                Element e=(Element)object;
                String name=e.getName();
                //System.out.println("子节点"+name);
                //class
                String attValue=e.attributeValue("class");
//                System.out.println(attValue);
                //通过反射获取对象
                    WossModule woss;
                    try {
                        woss = (WossModule)Class.forName(attValue).newInstance();
//                        System.out.println(woss);
                        wossMap.put(name, woss);
//                        System.out.println(name);
                        for(String key:wossMap.keySet()){
//                            System.out.println(key+":"+wossMap.get(key));
                            
                            //4.固定值-->Properties
                            List ee=e.elements();
                            for(Object obj:ee){
                                Element el=(Element)obj;
                                String key1=el.getName();
                                String value=el.getText();
//                                System.out.println(key1);
//                                System.out.println(el.getName()+"*:*"+el.getText());
                                pro.put(key1, value);
                                String po=(String)pro.get("po");
                             //   System.out.println("po:**************");
                            //    System.out.println(po);   
                                
                            }
                            //配置信息依赖注入
                            for(Object obj:wossMap.values()){
                                //调用init()方法,注入配置信息
                                if(obj instanceof WossModule){
                                    ((WossModule) obj).init(pro);
                                }
                                if(obj instanceof ConfigurationAWare){
                                    ((ConfigurationAWare)obj).setConfiguration(this);
                                }
                            }
                                
                        }

                    
                    } catch (ClassNotFoundException e1) {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                    }
                    //class--->实例化对象放入集合
               
                    
                    }
            
            
        } catch (Exception e2) {
            // TODO Auto-generated catch block
            e2.printStackTrace();
        }
        
            
            
        
        
        //固定值
        //class实例化对象放入集合
        
    }

}

详细的代码已上传至GitHub:https://github.com/shuilifang815/-815

如果您有更好的看法或理解,欢迎留言!

 

 
 
 
 
 
 
 
 
 
 
 
 
 
 
java电信采集项目
如何向GitHub上面上传项目步骤介绍:http://blog.csdn.net/ymfwj/article/details/52491194
就是将3A服务器记录下来的日志封装成对象保存在数据库中
第一步:根据7上8下的原则将读取到的文件分为不完全信息和完全信息
             将完全信息和不完全信息分别存储在list、map集合中
第二步:建立客户端和服务器,实现数据的传输
 
第三步:实现数据的持久化保存(保存的是完全信息)
             使用jdbc的六步进行数据的保存
第四步:数据的备份
             其中两个方法加载load(),保存save()
             建立map.txt,list.map两个txt文件分别保存不完全信息和完全信息
             在GatherImpl里保存不完全信息
             在数据的持久化保存中保存的是未进入数据库的完全信息
             
 
所谓完全信息:指的是既有上线记录,又有离线记录,可以计算出在线时间
电信采集
需求分析:
就是通过客户端将用户用网情况发送到服务器端,万一服务器荡机就可以通过读取已经存储到到数据库以及为存储到数据保存在备份文件中的信息,将费用收回取,避免受到较大损失
 
所谓电信采集指的是对一个记录了用户上线、下线等用户信息的文件的一个采集,备份
记录信息的文件temp.txt
将不完全信息,完全信息分别存储在map,list集合中
然后建立服务器端、客户端,通过客户端将读取到的信息发送到服务器端
 
一、需求分析
1.1需求概述
所谓电信采集指的是3A服务器通过中心处理系统对一个记录了用户上线、下线等用户信息的文件的一个采集,将获取到的信息传送到数据库中进行持久化保存,并将为传输到数据库中的信息进行备份。
1.2需求分析
将一段时间内采集到的数据记录为数据清单,记作t_detail_x,通过中心处理系统对数据清单中的数据进行分析,根据七上八下的原则将数据分为完全信息和不完全信息(只包含7的即只有上线记录的为不完全信息,包含7又包含8的即有上线记录又有下线记录的为完全信息),
将处理过的信息传送到3A服务器,即客户端将用户用网情况发送到服务器端,并将信息进行相应的备份。
 
 
 
 
 
技术分享图片
 
 
二、项目概述
项目所用到的jar包
技术分享图片
这些jar包的功能为:
技术分享图片
解析xml文件
技术分享图片
提供日志记录
技术分享图片
jdbc的jar包
技术分享图片
为项目提供接口
三、项目各模块的具体实现
3.1数据采集模块
        需要采集的数据格式如下,将1000条数据封装在temp.txt中,在进行数据采集的时候读取temp.txt文件。
 
#briup1660|037:wKgB1660A|7|1239110900|44.211.221.247
#briup4418|037:wKgB4418A|7|1239138480|251.196.223.191
#|037:wKgB1660A|8|1239203860|44.211.221.247
#briup1247|037:wKgB1247A|7|1239106770|22.7.202.75
#briup3288|037:wKgB3288A|7|1239127180|240.144.42.68
#|037:wKgB1247A|8|1239176602|22.7.202.75
#briup8258|037:wKgB8258A|7|1239176880|90.203.198.194
#briup2391|037:wKgB2391A|7|1239118210|161.43.86.232
      进行数据采集时所遇到的问题:
      1.如何读写数据
        使用缓存流对数据进行读写
      2.如何封装数据
         将完全信息保存在list集合中,将不完全信息保存在map集合中,以BIDR对象的形式进行保存
      3.所采集到的数据分为两种情况
         第一种:用户上线但是用户还没有下线
         第二种:用户上下线信息都有
        针对这两种情况的解决方式:
         第一种:进行数据的备份,在下一次采集的时候将数据重新读出来,因为用户在下一次采集的时候可能会下线
         第二种:直接封装成对象传给服务器端
       
package com.briup.woss.client;
import java.io.*;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.briup.util.BIDR;
import com.briup.util.BackUP;
import com.briup.util.Configuration;
import com.briup.woss.ConfigurationAWare;
public class GatherImpl implements Gather,ConfigurationAWare{
    String pathName;
    String pathName1="src/com/briup/woss/File/map.txt";
    //list存入完整数据
    static List<BIDR> list=new ArrayList<BIDR>();
   //存储不完整数据Map<IP,BIDR>
    static Map<String,BIDR> map=new HashMap();
    Configuration conf=null;
    @Override
    public void init(Properties p) {
        // TODO Auto-generated method stub
        pathName=(String) p.get("src-file");
    }
    @Override
    public Collection<BIDR> gather() throws Exception {
        // TODO Auto-generated method stub
        BackUP bi=conf.getBackup();
        //加载备份文件
        Map<String, BIDR> newMap=(Map<String,BIDR>)bi.load(pathName1, bi.LOAD_REMOVE);
        if(newMap!=null){
            map.putAll(newMap);
        }
        //1.读temp.txt,成功的标志是打印到控制台
        BufferedReader br=new BufferedReader(new FileReader(new File(pathName)));
        BIDR bidr=new BIDR();
        String str="";
        while((str=br.readLine())!=null){
            //System.out.println(str);
            String[] line=str.split("[|]");
                //System.out.println(line.length);
                if(line[2].equals("7")){
                    bidr=new BIDR();
                    bidr.setAAA_login_name(line[0].substring(1));
                    bidr.setNAS_ip(line[1]);
                    Long login_date=Long.parseLong(line[3]);
                    Timestamp login_time=new Timestamp(login_date*1000);
                    bidr.setLogin_date(login_time);
                    bidr.setLogin_ip(line[4]);    
                    //保存不完整信息
                    map.put(line[4],bidr);
                    }else if(line[2].equals("8")){
                        BIDR bidr1 = map.get(line[4]);
                        if(bidr1!=null){
                            //设置用户下线时长
                            Long logout_date=Long.parseLong(line[3]);
                            Timestamp logout_time=new Timestamp(logout_date*1000);
                            bidr1.setLogout_date(logout_time);
                            //计算出用户在线时长
                            Integer time_deration=(int) (logout_date - (bidr1.getLogin_date().getTime())/1000);
                            bidr1.setTime_deration(time_deration);
                            //完整数据存入list
                            list.add(bidr1);
                            map.remove(line[4]);
                        }    
                        
                    }
        }
        bi.store(pathName1, map, bi.STORE_OVERRIDE);
        System.out.println("备份的不完整数据为:"+map.size());
        br.close();
        System.out.println("list="+list.size());
        return list;
        
        
    }
    /*public static void main(String[] args) throws Exception {
        new GatherImpl().gather();
        for(BIDR bidr:list){
            System.out.println(bidr.getAAA_login_name()+","+bidr.getLogin_ip()+" "+bidr.getNAS_ip()+" "+bidr.getTime_deration()+","+bidr.getLogin_date()+","+bidr.getLogout_date());
        }
        System.out.println("完整的信息为"+list.size());
        System.out.println("不完整信息:"+map.size());
    }
*/
    @Override
    public void setConfiguration(Configuration co) {
        // TODO Auto-generated method stub
        this.conf=co;
    }
}
 
            
 
 
 
 
 
 
3.2备份模块
负责备份一些没有处理完的数据,所谓没有处理完的数据,我的理解是没有下线记录的不完全信息以及尚未保存到数据库中的完全信息
进行数据的备份需要实现的方法:
Object load(String key, boolean flag)
store(String key, Object date, boolean flag)
 
在进行数据备份时所遇到的问题:
1.如何实现数据的备份
2.如何读取文件中的备份文件
3.在使用load()和store()这两个方法的时候,是追加还是覆盖之前的数据 
   因为数据是隔一段时间进会进行更新的,某一个用户当前没有下线,可能再过一段时间就下线了,因此是覆盖之前的数据
  
 
 
package com.briup.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
import java.util.Properties;
 
public class BackUPImpl implements BackUP {
 
     String filePath;
     Object object = null;
     private static List<BIDR> list;
 
     @Override
     public void init(Properties p) {
         // TODO Auto-generated method stub
 
     }
 
     // 通过键名获取已经备份的数据 key备份数据的键
     @Override
     public Object load(String key, boolean flag) throws Exception {
         // TODO Auto-generated method stub
         File file = new File(filePath, key);
         // file是否为空
         if (file.exists() && file.length() != 0) {
              ObjectInputStream ois = new ObjectInputStream(new FileInputStream(file));
              object = ois.readObject();
         }
         return object;
 
     }
 
     // 通过键名存储数据,key-备份数据的键,date-需要备份的数据,flag如果键值已经存在数据,追加还是覆盖之前的数据
     // 文件路径key
     @Override
     public void store(String key, Object date, boolean flag) throws Exception {
         // TODO Auto-generated method stub
         // 通过key找备份文件,键值就是文件名
         // 接收到完整数据
         File file = new File(key);
         if(file.exists()){
         ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(file, flag));
         oos.writeObject(date);
         oos.flush();
         oos.close();
         
     }
     }
 
}
 
 
 
3.3网络模块
负责将采集好的数据发给服务器
需要实现的方法是send(Collection<BIDR> collection)
在此过程中遇到的问题是
1、如何建立连接
使用socket建立连接 
2、如何保存数据
使用对象流的形式读写数据
3、所保存的数据是完全信息还是不完全信息
客户端向服务器端进行保存的数据是完全信息
package com.briup.woss.client;
import java.io.*;
import java.net.Socket;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
import com.briup.util.BIDR;
import com.briup.util.Configuration;
import com.briup.woss.ConfigurationAWare;
public class ClientImpl implements Client{
   private static String id;
   private static int port;
  
    
    private static ObjectOutputStream oo;
    @Override
    public void init(Properties p) {
        // TODO Auto-generated method stub
        id=p.getProperty("id");//通过将依赖注入的方式将配置信息注入到该模块中
        port=Integer.parseInt(p.getProperty("port"));    
    }
    
     
    @Override
    public void send(Collection<BIDR> collection) throws Exception {
        //创建Socket指定ip地址端口号
        Socket socket=new Socket(id,port);
        //数据流
        OutputStream os=socket.getOutputStream();
        oo=new ObjectOutputStream(os);//将数据保存在对象流中,保存的是完全信息
        oo.writeObject(collection);
        System.out.println(collection);
        System.out.println(collection.size());
        oo.flush();
        oo.close();
        socket.close();
        
        
    // TODO Auto-generated method stub
    //1.id+port
    //2.数据流
        
    }
    
    }
 
 
3.4服务器端模块
作用:负责接收客户端传过来的数据
需要实现的方法:Collection<BIDR> revicer()
 
package com.briup.woss.Server;
import java.io.*;
import java.net.*;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import com.briup.util.BIDR;
import com.briup.woss.server.Server;
public class ServrImpl implements Server{
    private ServerSocket ss;
    private int port;
    private ObjectInputStream oi;
    
    @Override
    public void init(Properties p) {
        // TODO Auto-generated method stub
        System.out.println(p.get("port"));
         port=Integer.parseInt((String) p.get("port"));
         System.out.println(port);
    
    }
    
    @SuppressWarnings("unchecked")
    @Override
    public Collection<BIDR> revicer() throws Exception {
        
        // TODO Auto-generated method stub
        //1.port
        //数据流
        //1)创建ServerSocket
        ss=new ServerSocket(port);
        System.out.println("服务器已启动,请连接·····");
        while(true){
        Socket s=ss.accept();
        oi=new ObjectInputStream(s.getInputStream());
        List<BIDR> c=(List<BIDR>)oi.readObject();
        System.out.println("连接成功");
        return c;
        }
        
        
    }
    @Override
    public void shutdown() {
        // TODO Auto-generated method stub
        
    }
    
    
}
 
 
3.5数据的持久化保存
所谓数据的持久化保存,就是将完全信息进行入库
进行数据入库的时候遇到的问题
1.将信息以什么样的形式进行保存
将信息封装成BIDR对象,保存至集合中去
2.如何将完全信息和不完全信息区分保存
将完全信息保存在list集合中,对完全信息进行持久化保存,对不完全信息进行一个数据的备份,为了将数据的备份直观的表现出来,在进行数据的入库时设置一个异常,将未进行入库的完全信息进行一个备份,在该程序中,当第200条数据进行入库的时候,不能继续入库,将未进行入库的完全信息的备份到list.txt
3.是用什么样的方式来实现数据的持久化保存
因为需要实现的仅是电信采集这个大项目的一个小功能,加上mybatis用的不是很熟练,我用的是jdbc来实现数据的持久化保存。在使用jdbc进行数据的持久化保存的时候,因为Connection最多常见300个prepareStatement,将插入sql语句ps = connection.prepareStatement(sql),放在下面的for (BIDR bidr : collction)里面遍历,由于遍历次数过多,会出现超出游标最大值异常,因为读取的是一个月电信采集的信息,因此我将ps=connection.prepareStatement(sql)放在上面的for循环里面遍历,通过遍历天数,将sql语句插入,与for (BIDR bidr : collction)获取到的天数相对应,进而将信息存储到数据库中
package com.briup.woss.Server;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import com.briup.util.BIDR;
import com.briup.util.BackUP;
import com.briup.util.Configuration;
import com.briup.util.Logger;
import com.briup.woss.ConfigurationAWare;
import com.briup.woss.server.DBStore;
public class DBStoreImpl implements DBStore,ConfigurationAWare{
    /*
     * driver=oracle.jdbc.driver.OracleDriver
     * url=jdbc:oracle:thin:@localhost:1521:XE username=oracle password=orcle
     */
    private static String driver;
    private static String url;
    private static String username;
    private static String password;
    private static Connection connection;
    private String aaa_login_name;
    private String login_ip;
    private java.sql.Date login_date;
    private java.sql.Date logout_date;
    private String nas_ip;
    private String time_duration;
    private int i_date;
    private static PreparedStatement[] ps = new PreparedStatement[31];;
    String sql;
    static int i = 0;
    private static String pathName = "src/com/briup/woss/File/list.txt";
    Configuration conf=null;
    // 存放完整数据
    static List<BIDR> list = new ArrayList<>();
    @Override
    public void init(Properties p) {
        driver=p.getProperty("driver");
        url=p.getProperty("url");
        username=p.getProperty("username");
        password=p.getProperty("password");
    }
    @Override
    public void saveToDB(Collection<BIDR> collction) throws Exception {
        BackUP bi=conf.getBackup();
        Logger log=conf.getLogger();
        try {
            // 注册驱动
            Class.forName(driver);
            // 创建连接
            connection = DriverManager.getConnection(url,username,password);
            System.out.println(connection);
            for (int i = 0; i < 31; i++) {
                sql = "insert into t_detail_" + i
                        + "(aaa_login_name,login_ip,login_date,logout_date,nas_ip,time_duration) "
                        + "values(?,?,?,?,?,?)";
                ps[i] = connection.prepareStatement(sql);
            }
            //connection最多创建300个prepareStatement
            //将ps = connection.prepareStatement(sql);放在下面的for (BIDR bidr : collction)里面遍历,由于遍历次数过多,会出现超出游标最大值异常
            //遍历31次,与下面遍历bidr对象时,存储信息时的ps相对应
            for (BIDR bidr : collction) {
                Timestamp login_d = bidr.getLogin_date();
                String s_date = login_d.toString();
                String[] str1 = s_date.split(" ");
                String[] str2 = str1[0].split("-");
                i_date = Integer.parseInt(str2[2]);
                aaa_login_name = bidr.getAAA_login_name();
                login_ip = bidr.getLogin_ip();
                login_date = new java.sql.Date(bidr.getLogin_date().getTime());
                logout_date = new java.sql.Date(bidr.getLogout_date().getTime());
                // 通过PreparedStatement将信息存储到数据库中
                ps[i_date].setString(1, aaa_login_name);
                ps[i_date].setString(2, login_ip);
                ps[i_date].setDate(3, login_date);
                ps[i_date].setDate(4, logout_date);
                ps[i_date].setString(5, nas_ip);
                ps[i_date].setString(6, time_duration);
                // 执行sql
                ps[i_date].executeUpdate();
                i++;
                if (i == 200) {
                    i = 1 / 0;
                }
                list.add(bidr);
                System.out.println("插入数据成功!");
            }
            log.info("入库数据的个数" + list.size());
            collction.removeAll(list);
            log.info("未入库数据的个数" + collction.size());
        } catch (Exception e) {
             connection.rollback();
            bi.store(pathName, list, BackUP.STORE_OVERRIDE);//对象.字段会出现黄色波浪线,可以改成类名.字段
            log.debug("备份数据为"+list.size());
            log.debug("未入库数据的个数" + collction.size());
            log.debug("插入个数为:"+i);
            
        }
    }
    /*public static void main(String[] args) throws Exception {
        GatherImpl ga = new GatherImpl();
        new DBStoreImpl().saveToDB(ga.gather());
        System.out.println("备份数据的个数为:" + list.size());
        System.out.println(i);
    }*/
    @Override
    public void setConfiguration(Configuration co) {
        // TODO A
uto-generated method stub
        this.conf=co;
        
    }
}
 
 
3.6公共配置模块
 
需要实现的方法
BackUP getBackup()
Client getClient()
DBStore getDBStore()
Gather getGather()
Logger getLogger()
Server getServer()
1、如何实现日志记录
2、了解日志级别
日志级别一共有五种,级别由高到低依次是:fatal、error、warn、info、debug
  public void debug(Object message);          //输出debug级别的日志信息;
       public void info(Object message);           //输出info级别的日志信息;
       public void warn(Object message);           //输出warn级别的日志信息;
       public void error(Object message);          //输出error级别的日志信息;
       public void fatal(Object message);          //输出fatal级别的日志信息;
       public void log(Priority p, Object message);//输出参数Priority指定级别的日志信息;
3、如何将日志输出到控制台和指定文件中去
在项目中,不能通过new来创建对象,要通过Configurtion来获取
 
 
 
package com.briup.util;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.dom4j.*;
import org.dom4j.io.SAXReader;
import com.briup.woss.ConfigurationAWare;
import com.briup.woss.WossModule;
import com.briup.woss.client.Client;
import com.briup.woss.client.Gather;
import com.briup.woss.server.DBStore;
import com.briup.woss.server.Server;
public class ConfigurationImpl implements Configuration{
    String filePath="src/com/briup/woss/File/conf.xml";
    //存入woss模块对象
    Map<String,WossModule> wossMap=new HashMap<>();
    //存放配置信息
    Properties pro=new Properties();
    @Override
    public BackUP getBackup() throws Exception {
        // TODO Auto-generated method stub
        return (BackUP) wossMap.get("backup");
    }
    @Override
    public Client getClient() throws Exception {
        // TODO Auto-generated method stub
        return (Client) wossMap.get("client");
    }
    @Override
    public DBStore getDBStore() throws Exception {
        // TODO Auto-generated method stub
        return (DBStore) wossMap.get("dbstore");
    }
    
    public static void main(String[] args) throws Exception {
        new ConfigurationImpl().getDBStore();
    }
    
    @Override
    public Gather getGather() throws Exception {
        // TODO Auto-generated method stub
        return (Gather) wossMap.get("gather");
    }
    
    @Override
    public Logger getLogger() throws Exception {
        // TODO Auto-generated method stub
        return (Logger) wossMap.get("logger");
    }
    @Override
    public Server getServer() throws Exception {
        return (Server) wossMap.get("server");
    }
    
    public ConfigurationImpl() {
        try {
        //1.获取解析器。读取conf.xml
           //创建SAXReader读取器,专门用于读取xml
        SAXReader saxReader=new SAXReader();
        //2.获取根节
            Document document=saxReader.read(filePath);
            
            Element rootElement=document.getRootElement();
            //3.获取子节点--属性值
            List elements=rootElement.elements();
            for(Object object:elements){
                Element e=(Element)object;
                String name=e.getName();
                //System.out.println("子节点"+name);
                //class
                String attValue=e.attributeValue("class");
//                System.out.println(attValue);
                //通过反射获取对象
                    WossModule woss;
                    try {
                        woss = (WossModule)Class.forName(attValue).newInstance();
//                        System.out.println(woss);
                        wossMap.put(name, woss);
//                        System.out.println(name);
                        for(String key:wossMap.keySet()){
//                            System.out.println(key+":"+wossMap.get(key));
                            
                            //4.固定值-->Properties
                            List ee=e.elements();
                            for(Object obj:ee){
                                Element el=(Element)obj;
                                String key1=el.getName();
                                String value=el.getText();
//                                System.out.println(key1);
//                                System.out.println(el.getName()+"*:*"+el.getText());
                                pro.put(key1, value);
                                String po=(String)pro.get("po");
                             //   System.out.println("po:**************");
                            //    System.out.println(po);   
                                
                            }
                            //配置信息依赖注入
                            for(Object obj:wossMap.values()){
                                //调用init()方法,注入配置信息
                                if(obj instanceof WossModule){
                                    ((WossModule) obj).init(pro);
                                }
                                if(obj instanceof ConfigurationAWare){
                                    ((ConfigurationAWare)obj).setConfiguration(this);
                                }
                            }
                                
                        }
                    
                    } catch (ClassNotFoundException e1) {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                    }
                    //class--->实例化对象放入集合
               
                    
                    }
            
            
        } catch (Exception e2) {
            // TODO Auto-generated catch block
            e2.printStackTrace();
        }
        
            
            
        
        
        //固定值
        //class实例化对象放入集合
        
    }
}
 
 
项目的详细代码已上传至GitHub:

以上是关于电信采集之数据处理的主要内容,如果未能解决你的问题,请参考以下文章

电信宽带运营支撑系统

电信采集子项目1(大体架构)

直播流程

电信NB-IOT的温湿度采集器开发记录

如何采集电信的电视信号---搭建酒店OTT-TV的重要环节

Spark+TDengine 在中国电信电力测功系统监控平台上的应用实践