电信采集子项目2(具体实现)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了电信采集子项目2(具体实现)相关的知识,希望对你有一定的参考价值。

1.项目所用到jar包:

  woss-interface.jar:规范项目的各个模块

    技术分享技术分享技术分享技术分享

      各个接口的功能在我的上一篇文章中有介绍,感兴趣的去找一下。

  demo4j-1.6.1.jar:解析xml文件

  log4j-1.2.15:提供日志记录

  ojdbc14.jar:jdbc的jar包

 

3.流程图

   技术分享  

  画的不好别见怪,嘿嘿。

 

 

2.项目的各个模块的具体实现

   2.1客户端采集模块

      假如我们需要采集的数据的结构如下:

        #briup1660|037:wKgB1660A|7|1239110900|44.211.221.247

        #briup4418|037:wKgB4418A|7|1239138480|251.196.223.191

        #|037:wKgB1660A|8|1239203860|44.211.221.247

      

      问题:
        1.怎么获得文件路径
        2.了解每一行数据的意思
          2.1 数据中使用的是|进行分割
          2.2 数据一共分为俩种
              遵循7上8下的规则
              包含 7 的数据 表示用户上线
              包含 8 的数据 表示用户下线
          2.3 包含7的数据
              数据分为五个部分
                第一部分:登录的用户名字(去掉#号)
                第二部分:NAS服务器的名称
                第三部分:一定是7
                第四部分:上线的时间(单位是秒)
                第五部分:登录时的IP
          2.4 包含8的数据
              数据分为五个部分
                第一部分:一定是个符合 #
                第二部分:NAS服务器的名称
                第三部分:一定是8
                第四部分:下线的时间(单位是秒)
                第五部分:登录时的IP

         3.如何读写数据
         4.如何封装数据
         5.采集的数据分俩种情况
           第一种情况数据:
              用户上线了同时也下线了
           第二种情况数据:
              用户上线了但是还没有下线
         6.俩种数据情况怎么处理
           上面描述的第一种数据:
              封装好对象之后就准备传给服务器端
           上面描述的第二种数据:
              进行数据的备份,在一下次采集中,需要把这个备份的数据重新读出来使用,因为用户可能在一下次采集中下线了
         7.第二次读取数据的时候,如何从第一次读完数据的下一行开始读
              可以记录一下本次总共读取了多少个字节,下一次可以直接跳过这么多个字节,接着读就可以了
         8.在读取过程中或者处理过程中,如果出现了异常,需要把数据进行备份
         9.注意重要信息的日志记录

         以下是实现代码:

          

 1 package com.briup.woss.client;
 2 
 3 import java.io.BufferedReader;
 4 import java.io.FileInputStream;
 5 import java.io.InputStream;
 6 import java.io.InputStreamReader;
 7 import java.sql.Timestamp;
 8 import java.util.ArrayList;
 9 import java.util.Collection;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Properties;
14 
15 import org.junit.Test;
16 
17 import com.briup.util.BIDR;
18 import com.briup.util.Configuration;
19 import com.briup.woss.ConfigurationAWare;
20 
21 public class GatherImpl implements Gather{
22 
23     private static String file;
24     private InputStream fis;
25     private InputStreamReader isr;
26     private BufferedReader br;
27     public Map<String, BIDR> map1;
28     private List<BIDR> date;
29 
30     // 读取数据/解析/封装成BIDR对象/对象存放在集合
31     // #briup4418|037:wKgB4418A|7|1239138480|251.196.223.191
32     // #|037:wKgB1660A|8|1239203860|44.211.221.247
33     @Override
34     public void init(Properties p) {
35         file = p.getProperty("filePath");//将配置文件注入到该模块中
36     }
37     
38         //接收数据,将数据进行处理,并保存到collection集合中,并返回。
39     @Override
40     @Test
41     public Collection<BIDR> gather() throws Exception {
42         // 获取文件内容
43         BIDR bidr = null;
44         map1 = new HashMap<String, BIDR>();
45         date = new ArrayList<>();
46         fis = new FileInputStream(file);
47         isr = new InputStreamReader(fis);
48         br = new BufferedReader(isr);
49         String str = null;
50         while ((str = br.readLine()) != null) {
51             bidr = new BIDR();
52                         //通过|分割字符串,由于|号比较特殊,要加[]号
53             String[] arrStr = str.split("[|]");
54             if ("7".equals(arrStr[2])) {
55                 bidr.setAAA_login_name(arrStr[0]);
56                 bidr.setNAS_ip(arrStr[1]);
57                                 //由于bidr中时间是Timestamp类型的所以这里转换的时候要注意下
58                 bidr.setLogin_date(new Timestamp(
59                         Long.parseLong(arrStr[3]) * 1000));
60                 bidr.setLogin_ip(arrStr[4]);
61                 map1.put(arrStr[4], bidr);
62             } else {
63 
64                 BIDR b = map1.remove(arrStr[4]);
65                 b.setLogin_ip(arrStr[1]);
66                 b.setLogout_date(new Timestamp(Long.parseLong(arrStr[3]) * 1000));
67                 long l = (b.getLogout_date().getTime() - b.getLogin_date()
68                         .getTime());
69                 b.setTime_deration((int) l);
70                 date.add(b);
71             }
72         }
73 
74         return date;
75     }
76 
77 }                  

 

 

  2.2备份模块

     负责备份一些没有处理完的数据

     需要实现的方法:
      void store(String filePath, Object obj,
      boolean append)
      Object load(String filePath, boolean del)
    问题:
      1.如何获得备份文件存放的目录信息
      2.如何把数据备份到文件
      3.如何读取文件中的备份数据
      4.如何实现备份数据时候的追加或者是覆盖
      5.如何控制读取备份数据后文件是否需要删除

    以下是实现代码:

    

package com.briup.woss.util;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Properties;

import com.briup.util.BackUP;

public class BackUpImpl implements BackUP{

    static String fName;
    @Override
    public void init(Properties p) {
        fName = p.getProperty("bakFilePath");
    }
        //通过对象输出流将需要备份的数据进行备份
    @Override
    public void store(String fileName, Object obj,
            boolean paramBoolean) throws Exception {
        File file = new File(fName+fileName);
        ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(file,paramBoolean));
        out.writeObject(obj);
    }

    @Override
    public Object load(String fileName, boolean paramBoolean)
            throws Exception {
        File file = new File(fName+fileName);
        if(!file.exists()){
            return null;
        }
        ObjectInputStream in = new ObjectInputStream(new FileInputStream(file));
        Object object = in.readObject();
        return object;
    }

}

 

  

  2.3网络模块

    负责把采集好的数据发给服务器

    需要实现的方法:
      void send(Collection<BIDR> c)
      问题:
        1.如何得到连接服务器的相关信息
        2.如何得到采集好的数据
        3.如何把数据发送给服务器
        4.如果发送数据失败怎么处理
        5.注意重要信息的日志记录

    以下是实现代码

      

package com.briup.woss.client;

import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.Collection;
import java.util.Properties;

import com.briup.util.BIDR;

public class ClientImpl implements Client{

    static String ip;
    static int port;
    @Override
    public void init(Properties p) {    //通过依赖注入的方式将配置信息注入到该模块中
        ip = p.getProperty("ip");
        port = Integer.parseInt(p.getProperty("port"));
    }
        //通过socket以及对象流将数据传输到客户端
    @Override
    public void send(Collection<BIDR> c) throws Exception {
        Socket s = new Socket(ip, port);
        ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
        oos.writeObject(c);
    }
}

 

  

  2.4服务器端网络模块

    负责接收客户端传过来的数据

    需要实现的方法:
      Collection<BIDR> revicer();
      void shutdown();
    问题:
      1.如何获得服务器启动时候用的相关信息
      2.如何关闭关闭服务器
      3.如何接收客户端传过来的信息
      4.如何处理客户端并发的问题
      5.接收到数据之后一下步怎么做
      6.数据的接收或者处理过程备份的问题
      7.注意重要信息的日志记录

    以下是实现代码

     

package com.briup.woss.server;

import java.io.ObjectInputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Collection;
import java.util.Properties;

import com.briup.util.BIDR;
import com.briup.util.Configuration;
import com.briup.woss.ConfigurationAWare;

public class ServerImpl implements Server,ConfigurationAWare{

    static int port;
    private ObjectInputStream ois;
    private Object o;
    private static DBStoreImpl dbStore;
    @Override
    public void init(Properties p) {
        port = Integer.parseInt(p.getProperty("server_port"));
    }
        //这里通过多线程的方式接受数据,通过调用入库模块进行入库
        //因为服务器端可能只有一个,但是客户端应该有多个,同时接收来自多个客户端发来的信息时便需要实现多线程
    @Override
    public Collection<BIDR> revicer() throws Exception {
        
        new Thread(new Runnable() {
            @Override
            public void run() {
                ServerSocket ss;
                try {
                    ss = new ServerSocket(port);
                    Socket s = ss.accept();
                    ois = new ObjectInputStream(s.getInputStream());
                    Collection<BIDR> c = (Collection)ois.readObject();
                    dbStore.saveToDB(c);
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        }).start();
        
        return null;
    }

    @Override
    public void shutdown() {
        
    }

    @Override
    public void setConfiguration(Configuration p) {
        try {
            dbStore = (DBStoreImpl) p.getDBStore();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

  

  2.5服务器端入库模块

    负责接收到的数据插入到数据库中

    需要实现的方法:
      void saveToDB(Collection<BIDR> c)
    问题:
      1.如何获得连接数据库的相关信息
      2.怎么把数据插入到数据库中
      3.插入数据时候的效率问题
      4.什么样的数据对应哪一种表

    

package com.briup.woss.server;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.Properties;


import javax.swing.JTextArea;

import com.briup.util.BIDR;
import com.briup.woss.GUI.DisData;
import com.briup.woss.GUI.MyAppender;

public class DBStoreImpl implements DBStore{
    
    private static String driver;
    private static String url;
    private static String user;
    private static String password;
    private String aaa_login_name;
    private String login_ip;
    private java.sql.Date login_date;
    private java.sql.Date logout_date;
    private String nas_ip;
    private String time_duration;
    private Connection conn;
    private int i_date;
    private Properties p;
    
    public DBStoreImpl() {
        p = new Properties();
        init(p);
    }
    
    @Override
    public void init(Properties p) {
        driver = p.getProperty("driver");
        url = p.getProperty("url");
        user = p.getProperty("username");
        password = p.getProperty("password");
    }
    
    @Override
    public void saveToDB(Collection<BIDR> p) throws Exception {
                //通过jdbc与数据库建立链接
        Class.forName(driver);
        conn = DriverManager.getConnection(url, user, password);
        for (BIDR bidr : p) {
            Timestamp login_d = bidr.getLogin_date();
            String s_date = login_d.toString();
            String[] str1 = s_date.split(" ");
            String[] str2 = str1[0].split("-");
            i_date = Integer.parseInt(str2[2]);
            
            aaa_login_name = bidr.getAAA_login_name();
            login_ip = bidr.getLogin_ip();
            login_date = new java.sql.Date(bidr.getLogin_date().getTime());
            logout_date = new java.sql.Date(bidr.getLogout_date().getTime());
            String sql = "insert into t_detail_"+i_date+"(aaa_login_name,login_ip,login_date,logout_date,nas_ip,time_duration) "
                    + "values(?,?,?,?,?,?)";
            //通过PreparedStatement将信息存储到数据库中
            PreparedStatement ps = conn.prepareStatement(sql);
            ps.setString(1, aaa_login_name);
            ps.setString(2, login_ip);
            ps.setDate(3, login_date);
            ps.setDate(4, logout_date);
            ps.setString(5, nas_ip);
            ps.setString(6, time_duration);
            ps.execute();
                        //将数据传送到DisData这个类中,此类是用来将入库信息显示在GUI中的。
            new DisData(p);
        }
    }

    
}
    

  

  2.6公共配置模块

    该模块相当于一个工厂

    负责
    1.产生各个模块对象
    2.读取各个模块所需的信息,并且把信息注入到每个模块中
      注:这时候需要每个模块都实现接口WossModule
    3.如果某个模块A中需要用到配置模块,那么就需要把自己(因为自己就是配置模块)注入到这个模块A中
      注:这时候需要模块A实现接口ConfigurationAWare

    需要实现的方法:
      Logger getLogger();
      BackUP getBackup();
      Gather getGather();
      Client getClient();
      Server getServer();
      DBStore getDBStore();

    问题:
      1.怎么获得每个模块的相关信息
      2.如何创建每个模块的对象
      3.怎么把每个模块需要的数据注入到模块中

      4.什么时候可以把自己(配置模块本身)注入到需要的模块中

    以下是实现代码:

    

package com.briup.woss.util;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import com.briup.util.BackUP;
import com.briup.util.Configuration;
import com.briup.util.Logger;
import com.briup.woss.ConfigurationAWare;
import com.briup.woss.WossModule;
import com.briup.woss.client.Client;
import com.briup.woss.client.Gather;
import com.briup.woss.server.DBStore;
import com.briup.woss.server.Server;

public class ConfigurationImpl implements Configuration{

    Map<String, Object> objMap;
    
    Properties p1;
    Properties p2;
    
    public ConfigurationImpl() {
        this("src/conf/conf.xml");
    }
    
    public ConfigurationImpl(String filePath) {
        init(filePath);
    }

    public void init(String filePath){
        
        objMap = new HashMap<String, Object>();
        ParseXML p = new ParseXML();
        Map<String, Properties> map = p.parseXML(filePath);
        p1 = map.get("p1");
        p2 = map.get("p2");
        
        
        for(Object obj:p1.values()){
            try {
                Object instance = Class.forName((String)obj).newInstance();
                objMap.put((String)obj, instance);
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        for(Object obj:objMap.values()){
            if(obj instanceof WossModule){
                ((WossModule)obj).init(p2);
            }
            if(obj instanceof ConfigurationAWare){
                ((ConfigurationAWare) obj).setConfiguration(this);
            }
        }
        
    }
    @Override
    public Logger getLogger() throws Exception {
        return (Logger) objMap.get("com.briup.woss.util.LoggerImpl");
    }

    @Override
    public BackUP getBackup() throws Exception {
        return (BackUP) objMap.get("com.briup.woss.util.BackUpImpl");
    }

    @Override
    public Gather getGather() throws Exception {
        return (Gather) objMap.get("com.briup.woss.client.GatherImpl");
    }

    @Override
    public Client getClient() throws Exception {
        return (Client) objMap.get("com.briup.woss.client.ClientImpl");
    }

    @Override
    public Server getServer() throws Exception {
        return (Server) objMap.get("com.briup.woss.server.ServerImpl");
    }

    @Override
    public DBStore getDBStore() throws Exception {
        return (DBStore) objMap.get("com.briup.woss.server.DBStoreImpl");
    }

}

 

  

  2.7公共日志模块

     负责记录系统运行过程的一些重要信息

     需要实现的方法:
        void debug(String msg);
        void info(String msg);
        void warn(String msg);
        void error(String msg);
        void fatal(String msg);
    问题:
      1.怎么来实现日记记录
      2.了解日志级别
      3.怎么设置日志的级别
      4.怎么获得日志对象
      5.怎么控制日志的格式
      6.怎么控制日志输出到控制台和指定文件中(重难点)

    一下是实现代码:

    

package com.briup.woss.util;

import java.util.Properties;



import org.apache.log4j.PropertyConfigurator;

import com.briup.util.Logger;

public class LoggerImpl implements Logger{

    private static org.apache.log4j.Logger logger;
    
    @Override
    public void init(Properties p) {
        String configFilePath = p.getProperty("configFilePath");
        PropertyConfigurator.configure(configFilePath);
        logger = org.apache.log4j.Logger.getRootLogger();
    }

    @Override
    public void debug(String paramString) {
        logger.debug(paramString);
    }

    @Override
    public void info(String paramString) {
        logger.info(paramString);
    }
    
    @Override
    public void warn(String paramString) {
        logger.warn(paramString);
    }

    @Override
    public void error(String paramString) {
        logger.error(paramString);
    }

    @Override
    public void fatal(String paramString) {
        logger.fatal(paramString);
    }

}

 

 

3一些辅助类,以及GUI页面实现

  解析XML文件类

    目的:所有的jdbc配置信息,以及woss-interface.jar中各个接口的信息都配置在xml文件中,所以要通过sax解析将配置信息获取到。

    然后存入map集合中,在公共配置模块获取到。

    

package com.briup.woss.util;

import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.xml.sax.EntityResolver;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;


@SuppressWarnings("all")
public class ParseXML {
    
    Map<String, Properties> map;
    Properties p1;
    Properties p2;
    public Map<String, Properties> parseXML(String filePath){
        
        try {
            
            map = new HashMap<String, Properties>();
            
            p1 = new Properties();
            
            p2 = new Properties();
            
            SAXReader reader = new SAXReader();
            
            reader.setEntityResolver(new EntityResolver() {
                
                @Override
                public InputSource resolveEntity(String publicId, String systemId)
                        throws SAXException, IOException {
            
                    byte[] bytes = "".getBytes();
                    
                    return new InputSource(new ByteArrayInputStream(bytes));
                }
            });
            
            Document document = reader.read(new FileInputStream(filePath));
            
            Element rootElement = document.getRootElement();
            
            List<Element> elements = rootElement.elements();
            
            for(Element e:elements){
                
                String name = e.getName();
                String className = e.attributeValue("class");
                List<Element> elements2 = e.elements();
                p1.setProperty(name, className);
                
                for(Element e1:elements2){
                    String tagName = e1.getName();
                    String name1 = e1.getText();
                    p2.setProperty(tagName, name1);
                }
                
                map.put("p1",p1);
                map.put("p2",p2);
                
                
            //    System.out.println(className);
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }
        return map; 
        
    }
}

 

 

  GUI类

  目的:通过图形界面化的方式将功能显示在界面上,有利于用户交互。

  实现代码:

    

package com.briup.woss.GUI;

import java.awt.BorderLayout;
import java.awt.Color;
import java.awt.Container;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;

import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;

import org.apache.log4j.Appender;
import org.junit.Test;

import com.briup.woss.server.DBStoreImpl;
import com.briup.woss.util.ConfigurationImpl;

@SuppressWarnings("all")
public class GUI extends JFrame implements ActionListener{
    
    private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getRootLogger();
    private JPanel northJPanel;
    private JPanel centerJPanel;
    private JButton start_client;
    private JButton start_server;
    private JButton dis_data;
    private static JTextArea area;
    private static JScrollPane scrollPane;
    private static Appender appender;
    
    public static void main(String[] args) {
        new GUI();
    }
    
    public GUI() {
        try {
            this.setSize(600, 400);
            this.setBackground(Color.BLACK);
            this.setLocationRelativeTo(null);
            this.setResizable(false);
            init();
            initLog1();
            this.setDefaultCloseOperation(EXIT_ON_CLOSE);
            this.setVisible(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Test
    public void init(){
        Container container = getContentPane();
        container.setLayout(new BorderLayout());
        
        //设置北面的JPanel
        northJPanel = new JPanel();
        northJPanel.setBackground(Color.white);
        start_client = new JButton("启动客户端");
        start_server = new JButton("启动服务器");
        dis_data = new JButton("显示数据");
        dis_data.setBackground(Color.white);
        start_client.setBackground(Color.white);
        start_server.setBackground(Color.white);
        northJPanel.add(start_client);
        northJPanel.add(start_server);
        northJPanel.add(dis_data);
        
        //设置中间的JPanel
        centerJPanel = new JPanel();
        centerJPanel.setBackground(Color.white);
        centerJPanel.setLayout(new BorderLayout());
        area = new JTextArea();
        area.setLineWrap(true);
        area.setBackground(Color.white);
        area.setEditable(false);
        scrollPane = new JScrollPane(area);
        centerJPanel.add(scrollPane,BorderLayout.CENTER);
        
        container.add(northJPanel,BorderLayout.NORTH);
        container.add(centerJPanel,BorderLayout.CENTER);
        
        start_client.addActionListener(this);
        start_server.addActionListener(this);
        dis_data.addActionListener(this);
    }
        
        public static void initLog1(){
        
        try {
            appender = logger.getAppender("textArea");
            ((MyAppender)appender).setArea(area);
            ((MyAppender)appender).setPane(scrollPane);
        } catch (Exception e) {
            e.printStackTrace();
            }
        }
        
    @Override
    public void actionPerformed(ActionEvent e) {
        Object obj = e.getSource();
        if(obj == start_client){
            new Thread(new ClientThread()).start();
            logger.debug("客户端启动成功");
        }
        if(obj == start_server){
            new Thread(new ServerThread()).start();
            logger.info("服务器启动成功");
        }
        
        if(obj == dis_data){
            DisData d = new DisData();
            d.setArea(area);
            d.dis_Data();
        }
    }
    
}

 

 

以上则是这次电信采集项目的大部分代码,还有的没贴出来,因为我要赶着吃饭了。有兴趣的朋友们可以看一看,共同进步,共勉,加油!

 

































































































以上是关于电信采集子项目2(具体实现)的主要内容,如果未能解决你的问题,请参考以下文章

电信采集子项目1(大体架构)

电信NB-IOT的温湿度采集器开发记录

如何采集电信的电视信号---搭建酒店OTT-TV的重要环节

如何采集电信的电视信号---搭建酒店OTT-TV/IPTV网络电视系统的重要环节

电信平台对接CTWing具体实现流程

电信采集之数据处理