著录数据打包

Posted Livon

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了著录数据打包相关的知识,希望对你有一定的参考价值。

使用 MyBatis 连接 Oracle ,然后向 MongoDB 中写入。

数据量 1.3 亿,每分钟 7 - 10 万。

SyncPacker_201603.java

package syncPacker;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.http.message.BasicNameValuePair;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.Mongo;
import com.pro.framework.action.BaseController;

import logCenter.SendLog;
import net.sf.json.JSONObject;
import syncPacker.bean.PatentBibliographicChangeBean;
import syncPacker.bean.SyncDataPackageBean;
import utils.DateUtils;
import utils.DatetimeUtils;
import utils.HttpUtils;

/**
 * 增量数据分块打包 全处理
 * 
 * http://localhost:8080/PatentSearchExtend/syncPacker!pack.action
 * 
 * http://10.78.2.21:8080/PatentSearchExtend/syncPacker!pack.action?bean.
 * tableName=E_BIBLIOGRAPHIC_CHANGE_TEMP&bean.maxRowsPerSyncPackerPackage= 10000
 */
@Service
public class SyncPacker_201603 extends BaseController {

    private static final long serialVersionUID = 1L;

    // 初始化:数据库接口
    @Autowired
    private SyncPackerDao dao;

    // 初始化:发送端地址
    @Value("${url_syncSender}")
    private String url_syncSender;

    // 初始化:本地文件存储路径
    @Value("${path_syncPacker_package}")
    private String path_syncPacker_package;

    // 初始化:读取最大数据包名称的地址
    @Value("${url_selectMaxPackageNumber}")
    private String url_selectMaxPackageNumber;

    // 初始化:存储最大数据包名称的地址
    @Value("${url_insertSyncDataPackage}")
    private String url_insertSyncDataPackage;

    // 初始化:查询条件Bean
    private SyncDataPackageBean bean = new SyncDataPackageBean();
    // 初始化:查询结果List
    private List<PatentBibliographicChangeBean> pbcList = new ArrayList<PatentBibliographicChangeBean>();
    // 初始化:形成的数据包名称
    private List<PatentBibliographicChangeBean> pbcList_withPackageName = new ArrayList<PatentBibliographicChangeBean>();
    // 初始化:已打包的增量数据ID表单
    private List<String> pdaIdList = new ArrayList<String>();
    // 初始化:用于删除数据的临时ID List
    private List<String> idPartList = new ArrayList<String>();
    // 初始化:传输协议
    private HttpUtils httpUtils = new HttpUtils();
    // 初始化:键值串
    private List<BasicNameValuePair> paramList = new ArrayList<BasicNameValuePair>();
    // POST 结果
    private String str_postResult;
    // 初始化:发送url返回的信息
    private JSONObject json_postResult;
    // 初始化:历史最大包编号
    private String maxPackageNumber;
    // 初始化:记录打包完成后的数据版本
    private Integer centerNodeDataVersion;
    // 发送远程日志
    private SendLog sendLog = new SendLog();
    // 记录本地日志
    private Logger logger = Logger.getLogger(SyncPacker_201603.class);
    // 初始化:判断程序是否正在运行
    public static boolean isRunning = false;

    // 本次处理完成后的最大包编号
    private String packedPackageNumber;
    // 用于返回json的成功信息
    private String success = "success";
    

    /** 文件补发用:指定数据重新打包 */
    // http://localhost:8080/PatentSearchExtend/syncPacker!packByPackageNumber.action?bean.packageNumberStart=000101&bean.packageNumberEnd=000102
    public String packByPackageNumber() throws Exception {

        logMemory("本次请求处理开始。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd() );

        String job_start = DateUtils.getCurrentTimeString(0);
        
        try {
            

            for ( int i = Integer.valueOf(bean.getPackageNumberStart()); 
                    i <= Integer.valueOf(bean.getPackageNumberEnd()); i++) {                

                // 开始时间
                String package_start = DateUtils.getCurrentTimeString(0);
                
                // 包编号
                String packageNumber = String.format( "%06d", i );
                
                logMemory("开始,包编号:", packageNumber );
                
                //(1)读历史表
                pbcList = selectList_changeHistory( packageNumber ); 
                
                if( null == pbcList ) {
                    logMemory("<span style=\"color:red;\">数据为空 !!!</span>", "");
                    
                } else{
                    logMemory("数据查询完毕,数据量为", String.valueOf( pbcList.size() ));

                    //(2)插入MongoDB
                    insertMongoDB( pbcList );
                    pbcList.clear();
                };
                
                logMemory("传输结束,包编号:" + packageNumber ," 用时: "
                        + DatetimeUtils.getDistanceTimes_string(package_start, DateUtils.getCurrentTimeString(0)) );
            }
        } catch (Exception e) {
            // 日志输出
            logMemory("系统发生异常", e.getMessage());
            e.printStackTrace();
        }
        
        logMemory("本次请求处理完成,程序结束。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd()
                + " 用时: " + DatetimeUtils.getDistanceTimes_string( job_start, DateUtils.getCurrentTimeString(0)) );
        
        return SUCCESS;
    }
    

    /**
     * 读历史表
     * @param packageNumber
     * @return
     */
    private List<PatentBibliographicChangeBean> selectList_changeHistory( String packageNumber ){
        
        for ( int i = 0; i < 100; i++ ) {
            
            try {
                return dao.selectList_changeHistory( packageNumber );                
            } catch (Exception e) {
                // TODO Auto-generated catch block
//                e.printStackTrace();
                logMemory("系统发生异常", e.getMessage());
                try {
                    Thread.sleep(500);
                    logMemory("暂停 0.5 秒", "" );
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            } 
        }// loop end
        return null ;
        
    }
    
    
    

    /**
     * 插入 MongoDB
     */
    public void insertMongoDB( List<PatentBibliographicChangeBean> pbcList ) {    

        //# MongoDB(数据加载目标)
        String syncLoadIntoMongoDbService = "10.78.2.23:27017";
        String syncLoadIntoMongoDbName = "patent_search_extend";
        String syncLoadIntoMongoTable = "patent_bibliographic_20160319";

        // 加载开始
//        logger.info(DateUtils.getNow() + " Load start: " + syncLoadIntoMongoDbService );

        Mongo m = null;
        try {
            m = new Mongo( syncLoadIntoMongoDbService );
        } catch (UnknownHostException e) {
            e.printStackTrace();
            logger.info(DateUtils.getNow() + " UnknownHostException:" + e.getMessage());
        }

        // 库名
        DB db = m.getDB( syncLoadIntoMongoDbName );
//        logger.info( DateUtils.getNow() + " Db:" + syncLoadIntoMongoDbName );

        // 表名
        DBCollection collection = db.getCollection( syncLoadIntoMongoTable );
//        logger.info(DateUtils.getNow() + " Table:" + syncLoadIntoMongoTable );
        
        // 循环列表,将每个元素插入数据库
        for( PatentBibliographicChangeBean pbcBean : pbcList ){        
            
            
            
            //(1)读取一条著录数据
//            JSONObject json = packByPackageNumber_readBibl( pbcBean.getId() );
            
//            if( null == json ) return ;

            // 列,值
            BasicDBObject insDoc = new BasicDBObject();
            
            insDoc.put("abstract_No"    , pbcBean.getAbstract_No()    );
            insDoc.put("app_Addr"       , pbcBean.getApp_Addr()       );
            insDoc.put("app_Cn"         , pbcBean.getApp_Cn()         );
            insDoc.put("app_Country"    , pbcBean.getApp_Country()    );
            insDoc.put("app_Date"       , pbcBean.getApp_Date()       );
            insDoc.put("app_Name"       , pbcBean.getApp_Name()       );
            insDoc.put("app_Sn"         , pbcBean.getApp_Sn()         );
            insDoc.put("app_Type"       , pbcBean.getApp_Type()       );
            insDoc.put("app_Zip"        , pbcBean.getApp_Zip()        );
            insDoc.put("ecla"           , pbcBean.getEcla()           );
            insDoc.put("fi"             , pbcBean.getFi()             );
            insDoc.put("ft"             , pbcBean.getFt()             );
            insDoc.put("id"             , pbcBean.getId()             );
            insDoc.put("inv_Title"      , pbcBean.getInv_Title()      );
            insDoc.put("invent_Type"    , pbcBean.getInvent_Type()    );
            insDoc.put("inventor"       , pbcBean.getInventor()       );
            insDoc.put("ipc_Standard"   , pbcBean.getIpc_Standard()   );
            insDoc.put("locarno"        , pbcBean.getLocarno()        );
            insDoc.put("operation_Time" , pbcBean.getOperation_Time() );
            insDoc.put("operation_Type" , pbcBean.getOperation_Type() );
            insDoc.put("package_Name"   , pbcBean.getPackage_Name()   );
            insDoc.put("pct_App_Cn"     , pbcBean.getPct_App_Cn()     );
            insDoc.put("pct_App_Date"   , pbcBean.getPct_App_Date()   );
            insDoc.put("pct_App_Sn"     , pbcBean.getPct_App_Sn()     );
            insDoc.put("pct_Date"       , pbcBean.getPct_Date()       );
            insDoc.put("pct_Pub_Cn"     , pbcBean.getPct_Pub_Cn()     );
            insDoc.put("pct_Pub_Date"   , pbcBean.getPct_Pub_Date()   );
            insDoc.put("pct_Pub_Lang"   , pbcBean.getPct_Pub_Lang()   );
            insDoc.put("pct_Pub_Sn"     , pbcBean.getPct_Pub_Sn()     );
            insDoc.put("prn"            , pbcBean.getPrn()            );
            insDoc.put("prn_Cn"         , pbcBean.getPrn_Cn()         );
            insDoc.put("prn_Date"       , pbcBean.getPrn_Date()       );
            insDoc.put("prn_Sn"         , pbcBean.getPrn_Sn()         );
            insDoc.put("prn_Type"       , pbcBean.getPrn_Type()       );
            insDoc.put("pub_Cn"         , pbcBean.getPub_Cn()         );
            insDoc.put("pub_Date"       , pbcBean.getPub_Date()       );
            insDoc.put("pub_Sn"         , pbcBean.getPub_Sn()         );
            insDoc.put("pub_Type"       , pbcBean.getPub_Type()       );
            insDoc.put("uc"             , pbcBean.getUc()             );
            
            collection.insert(insDoc);
            insDoc.clear();
            
        }

        // 循环遍历pdaBeanList
//        System.out.println("loading ...");
//        logger.info(DateUtils.getNow() + " rows:" + pdaBeanList.size());

        // 当前记录编号
        // int currentRowNumber = 0;

        if ( m != null) m.close();
        
//        System.out.println("Load finished.");
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    

    /** 记录日志 */
    private void logMemory(String behavior, String content) {
        // 向服务器发送日志
//        sendLog.send("syncPacker", behavior, content);
        // 记录本地日志
        logger.info(DateUtils.getNow() + " " + behavior + " :" + content);
        // 控制台输出日志
//        System.out.println("syncPacker : " + DateUtils.getNow() + " " + behavior + " :" + content);
    }

    @Override
    public String insert() throws Exception {
        return null;
    }

    @Override
    public String update() throws Exception {
        return null;
    }

    @Override
    public String selectList() throws Exception {
        return null;
    }

    @Override
    public String delete() throws Exception {
        return null;
    }

    public static boolean isRunning() {
        return isRunning;
    }

    public static void setRunning(boolean isRunning) {
        SyncPacker_201603.isRunning = isRunning;
    }

    public Integer getCenterNodeDataVersion() {
        return centerNodeDataVersion;
    }

    public void setCenterNodeDataVersion(Integer centerNodeDataVersion) {
        this.centerNodeDataVersion = centerNodeDataVersion;
    }

    public String getSuccess() {
        return success;
    }

    public void setSuccess(String success) {
        this.success = success;
    }

    public String getMaxPackageNumber() {
        return maxPackageNumber;
    }

    public void setMaxPackageNumber(String maxPackageNumber) {
        this.maxPackageNumber = maxPackageNumber;
    }

    public String getPackedPackageNumber() {
        return packedPackageNumber;
    }

    public void setPackedPackageNumber(String packedPackageNumber) {
        this.packedPackageNumber = packedPackageNumber;
    }

    public SyncDataPackageBean getBean() {
        return bean;
    }

    public void setBean(SyncDataPackageBean bean) {
        this.bean = bean;
    }
}

 

一个备份,记一下吧:

  1 package syncPacker;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.File;
  5 import java.io.FileReader;
  6 import java.io.FileWriter;
  7 import java.io.IOException;
  8 import java.net.UnknownHostException;
  9 import java.util.ArrayList;
 10 import java.util.Date;
 11 import java.util.HashMap;
 12 import java.util.List;
 13 import java.util.Map;
 14 
 15 import org.apache.http.message.BasicNameValuePair;
 16 import org.apache.log4j.Logger;
 17 import org.springframework.beans.factory.annotation.Autowired;
 18 import org.springframework.beans.factory.annotation.Value;
 19 import org.springframework.stereotype.Service;
 20 
 21 import com.mongodb.BasicDBObject;
 22 import com.mongodb.DB;
 23 import com.mongodb.DBCollection;
 24 import com.mongodb.Mongo;
 25 import com.pro.framework.action.BaseController;
 26 
 27 import logCenter.SendLog;
 28 import net.sf.json.JSONObject;
 29 import syncPacker.bean.PatentBibliographicChangeBean;
 30 import syncPacker.bean.SyncDataPackageBean;
 31 import utils.DateUtils;
 32 import utils.DatetimeUtils;
 33 import utils.HttpUtils;
 34 
 35 /**
 36  * 增量数据分块打包 全处理
 37  * 
 38  * http://localhost:8080/PatentSearchExtend/syncPacker!pack.action
 39  * 
 40  * http://10.78.2.21:8080/PatentSearchExtend/syncPacker!pack.action?bean.
 41  * tableName=E_BIBLIOGRAPHIC_CHANGE_TEMP&bean.maxRowsPerSyncPackerPackage= 10000
 42  */
 43 @Service
 44 public class SyncPacker_201603 extends BaseController {
 45 
 46     private static final long serialVersionUID = 1L;
 47 
 48     // 初始化:数据库接口
 49     @Autowired
 50     private SyncPackerDao dao;
 51 
 52     // 初始化:发送端地址
 53     @Value("${url_syncSender}")
 54     private String url_syncSender;
 55 
 56     // 初始化:本地文件存储路径
 57     @Value("${path_syncPacker_package}")
 58     private String path_syncPacker_package;
 59 
 60     // 初始化:读取最大数据包名称的地址
 61     @Value("${url_selectMaxPackageNumber}")
 62     private String url_selectMaxPackageNumber;
 63 
 64     // 初始化:存储最大数据包名称的地址
 65     @Value("${url_insertSyncDataPackage}")
 66     private String url_insertSyncDataPackage;
 67 
 68     // 初始化:查询条件Bean
 69     private SyncDataPackageBean bean = new SyncDataPackageBean();
 70     // 初始化:查询结果List
 71     private List<PatentBibliographicChangeBean> pbcList = new ArrayList<PatentBibliographicChangeBean>();
 72     // 初始化:形成的数据包名称
 73     private List<PatentBibliographicChangeBean> pbcList_withPackageName = new ArrayList<PatentBibliographicChangeBean>();
 74     // 初始化:已打包的增量数据ID表单
 75     private List<String> pdaIdList = new ArrayList<String>();
 76     // 初始化:用于删除数据的临时ID List
 77     private List<String> idPartList = new ArrayList<String>();
 78     // 初始化:传输协议
 79     private HttpUtils httpUtils = new HttpUtils();
 80     // 初始化:键值串
 81     private List<BasicNameValuePair> paramList = new ArrayList<BasicNameValuePair>();
 82     // POST 结果
 83     private String str_postResult;
 84     // 初始化:发送url返回的信息
 85     private JSONObject json_postResult;
 86     // 初始化:历史最大包编号
 87     private String maxPackageNumber;
 88     // 初始化:记录打包完成后的数据版本
 89     private Integer centerNodeDataVersion;
 90     // 发送远程日志
 91     private SendLog sendLog = new SendLog();
 92     // 记录本地日志
 93     private Logger logger = Logger.getLogger(SyncPacker_201603.class);
 94     // 初始化:判断程序是否正在运行
 95     public static boolean isRunning = false;
 96 
 97     // 本次处理完成后的最大包编号
 98     private String packedPackageNumber;
 99     // 用于返回json的成功信息
100     private String success = "success";
101 
102     /** 主处理:数据同步_增量数据分块打包 */
103     // @Scheduled(cron = "${scheduled_syncPacker}")
104     public String pack() {
105 
106         logMemory("scheduled_syncPacker start", "");
107         if (isRunning) {
108             logger.info(DateUtils.getNow() + "正在运行,退出");
109             return SUCCESS;
110         }
111         isRunning = true;
112         // 数据包数量
113         int packagesAmount = 0;
114         Date startTime = new Date();
115 
116         for (int i = 1; i <= 1; i++) {
117             logMemory("循环内部(单个数据包)开始:内部编号", "" + i);
118 
119             try {
120                 // 1.读取定量数据
121                 selectPbcList();
122                 if (pbcList.size() <= 0) {
123                     logMemory("无数据", "退出");
124                     break;
125                 }
126 
127                 // 2.形成文件
128                 if (save2disk()) {
129                     packagesAmount++;
130                     // 3.转移数据
131                     transferFromChange2History();
132                 }
133             } catch (Exception e) {
134                 // 日志输出
135                 logMemory("系统发生异常", e.getMessage());
136                 e.printStackTrace();
137                 break;
138             } finally {
139                 // 清空临时List
140                 paramList.clear();
141                 pbcList.clear();
142                 pbcList_withPackageName.clear();
143                 pdaIdList.clear();
144                 System.gc();
145             }
146             logMemory("循环内部(单个数据包)结束,内部编号", "" + i);
147         }
148         // loop end
149 
150         // 清空临时List
151         paramList.clear();
152         pbcList.clear();
153         pbcList_withPackageName.clear();
154         pdaIdList.clear();
155         System.gc();
156 
157         isRunning = false;
158         logMemory("本次打包结束,新增数据包数量:" + packagesAmount + ",历时:"
159                 + DatetimeUtils.getDistanceTimes_string(startTime, new Date()), "");
160         logMemory("finished", "scheduled_syncPacker end");
161 
162         return SUCCESS;
163     }
164 
165     /** selectPbcList() */
166     // 读取 专利著录变化 数据 数据库有连接失败的时候,尝试 10 次
167     private void selectPbcList() {
168 
169         // 日志输出
170         logMemory("增量数据分块打包开始执行", "");
171 
172         for (int i = 0; i < 10; i++) {
173             try {
174                 // -------------- 查询分块著录信息增量数据 --------------
175                 logMemory("准备读取数据", "");
176                 // 每次取10000
177                 pbcList = dao.selectList(bean);
178                 logMemory("读取完毕", "数据量" + pbcList.size());
179                 return;
180             } catch (Exception e) {
181                 // 日志输出
182                 logMemory("系统发生异常", e.getMessage());
183                 e.printStackTrace();
184             }
185         }
186         // loop end
187         return;
188     }
189 
190     /**
191      * save2disk()
192      * 
193      * 将打包文件存盘
194      */
195     private boolean save2disk() throws Exception {
196 
197         // -------------- 分块打包增量数据 --------------
198 
199         // 查询上一次处理最后生成的数据包名称
200         logMemory("准备查询最后一个数据包名称", "");
201 
202         // 发送url,请求对应服务器返回最大数据包名称
203         // syncDataPackageBean = dao.lastDataPackageInfo(0);
204         str_postResult = httpUtils.post(url_selectMaxPackageNumber, null);
205         logMemory("取最大包编号:" + url_selectMaxPackageNumber, " 返回 " + str_postResult);
206 
207         // 判断是否成功发送信息
208         if (str_postResult == null || "".equals(str_postResult)) {
209             // 发送失败,跳出循环,结束程序
210             logMemory("失败!", " 返回为空");
211             isRunning = false;
212             return false;
213         }
214 
215         // 解析Post返回的Json
216         json_postResult = JSONObject.fromObject(str_postResult);
217 
218         // 判断是否成功获取到最大包编号
219         if ("success".equalsIgnoreCase(json_postResult.getString("success"))) {
220             // 发送成功,进行下一步操作
221             maxPackageNumber = json_postResult.get("maxPackageNumber").toString();
222         } else {
223             // 发送失败,跳出循环,结束程序
224             logMemory("失败!", "没有返回成功标志。");
225             isRunning = false;
226             return false;
227         }
228 
229         logMemory("上一次最后一个数据包名称", maxPackageNumber);
230         logMemory("准备将数据写入文件", " 路径:" + path_syncPacker_package);
231 
232         // 得到新数据包名,同时分块打包增量数据
233         pbcList_withPackageName = DataPacking_2.CompressPdaData(pbcList, maxPackageNumber, path_syncPacker_package);
234 
235         // 日志输出
236         logMemory("新数据包名称", pbcList_withPackageName.get(0).getPackage_Name() + ".txt");
237         logMemory("转换完成,数量", String.valueOf(pbcList_withPackageName.size()));
238 
239         // 记录打包完成的数据版本信息(流水号)
240         centerNodeDataVersion = Integer.parseInt(pbcList_withPackageName.get(0).getPackage_Name());
241 
242         // -------------- 记录已完成的数据包名称 --------------
243         // 发送打包完成的数据包名称(流水号)
244         // dao.insertPacName(syncDataPackageBean);
245         paramList = new ArrayList<BasicNameValuePair>();
246         paramList.add(0,
247                 new BasicNameValuePair("packedPackageNumber", pbcList_withPackageName.get(0).getPackage_Name()));
248 
249         str_postResult = httpUtils.post(url_insertSyncDataPackage, paramList);
250         logMemory("新增数据包:" + url_insertSyncDataPackage, " 返回 " + str_postResult);
251 
252         // 判断是否成功发送信息
253         if (str_postResult == null || "".equals(str_postResult)) {
254             // 发送失败,跳出循环,结束程序
255         

以上是关于著录数据打包的主要内容,如果未能解决你的问题,请参考以下文章

ChatGPT 辅助专利写作

索引-mysql

EasyClick 运行代码片段出Null

EasyClick 运行代码片段出Null

Android:将数据(附加)传递给片段

EndNote 输出样式模板(根据国家标准制订)