著录数据打包
Posted Livon
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了著录数据打包相关的知识,希望对你有一定的参考价值。
使用 MyBatis 连接 Oracle ,然后向 MongoDB 中写入。
数据量 1.3 亿,每分钟 7 - 10 万。
SyncPacker_201603.java
package syncPacker; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.http.message.BasicNameValuePair; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.Mongo; import com.pro.framework.action.BaseController; import logCenter.SendLog; import net.sf.json.JSONObject; import syncPacker.bean.PatentBibliographicChangeBean; import syncPacker.bean.SyncDataPackageBean; import utils.DateUtils; import utils.DatetimeUtils; import utils.HttpUtils; /** * 增量数据分块打包 全处理 * * http://localhost:8080/PatentSearchExtend/syncPacker!pack.action * * http://10.78.2.21:8080/PatentSearchExtend/syncPacker!pack.action?bean. * tableName=E_BIBLIOGRAPHIC_CHANGE_TEMP&bean.maxRowsPerSyncPackerPackage= 10000 */ @Service public class SyncPacker_201603 extends BaseController { private static final long serialVersionUID = 1L; // 初始化:数据库接口 @Autowired private SyncPackerDao dao; // 初始化:发送端地址 @Value("${url_syncSender}") private String url_syncSender; // 初始化:本地文件存储路径 @Value("${path_syncPacker_package}") private String path_syncPacker_package; // 初始化:读取最大数据包名称的地址 @Value("${url_selectMaxPackageNumber}") private String url_selectMaxPackageNumber; // 初始化:存储最大数据包名称的地址 @Value("${url_insertSyncDataPackage}") private String url_insertSyncDataPackage; // 初始化:查询条件Bean private SyncDataPackageBean bean = new SyncDataPackageBean(); // 初始化:查询结果List private List<PatentBibliographicChangeBean> pbcList = new ArrayList<PatentBibliographicChangeBean>(); // 初始化:形成的数据包名称 private List<PatentBibliographicChangeBean> pbcList_withPackageName = new ArrayList<PatentBibliographicChangeBean>(); // 初始化:已打包的增量数据ID表单 private List<String> pdaIdList = new ArrayList<String>(); // 初始化:用于删除数据的临时ID List private List<String> idPartList = new ArrayList<String>(); // 初始化:传输协议 private HttpUtils httpUtils = new HttpUtils(); // 初始化:键值串 private List<BasicNameValuePair> paramList = new ArrayList<BasicNameValuePair>(); // POST 结果 private String str_postResult; // 初始化:发送url返回的信息 private JSONObject json_postResult; // 初始化:历史最大包编号 private String maxPackageNumber; // 初始化:记录打包完成后的数据版本 private Integer centerNodeDataVersion; // 发送远程日志 private SendLog sendLog = new SendLog(); // 记录本地日志 private Logger logger = Logger.getLogger(SyncPacker_201603.class); // 初始化:判断程序是否正在运行 public static boolean isRunning = false; // 本次处理完成后的最大包编号 private String packedPackageNumber; // 用于返回json的成功信息 private String success = "success"; /** 文件补发用:指定数据重新打包 */ // http://localhost:8080/PatentSearchExtend/syncPacker!packByPackageNumber.action?bean.packageNumberStart=000101&bean.packageNumberEnd=000102 public String packByPackageNumber() throws Exception { logMemory("本次请求处理开始。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd() ); String job_start = DateUtils.getCurrentTimeString(0); try { for ( int i = Integer.valueOf(bean.getPackageNumberStart()); i <= Integer.valueOf(bean.getPackageNumberEnd()); i++) { // 开始时间 String package_start = DateUtils.getCurrentTimeString(0); // 包编号 String packageNumber = String.format( "%06d", i ); logMemory("开始,包编号:", packageNumber ); //(1)读历史表 pbcList = selectList_changeHistory( packageNumber ); if( null == pbcList ) { logMemory("<span style=\"color:red;\">数据为空 !!!</span>", ""); } else{ logMemory("数据查询完毕,数据量为", String.valueOf( pbcList.size() )); //(2)插入MongoDB insertMongoDB( pbcList ); pbcList.clear(); }; logMemory("传输结束,包编号:" + packageNumber ," 用时: " + DatetimeUtils.getDistanceTimes_string(package_start, DateUtils.getCurrentTimeString(0)) ); } } catch (Exception e) { // 日志输出 logMemory("系统发生异常", e.getMessage()); e.printStackTrace(); } logMemory("本次请求处理完成,程序结束。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd() + " 用时: " + DatetimeUtils.getDistanceTimes_string( job_start, DateUtils.getCurrentTimeString(0)) ); return SUCCESS; } /** * 读历史表 * @param packageNumber * @return */ private List<PatentBibliographicChangeBean> selectList_changeHistory( String packageNumber ){ for ( int i = 0; i < 100; i++ ) { try { return dao.selectList_changeHistory( packageNumber ); } catch (Exception e) { // TODO Auto-generated catch block // e.printStackTrace(); logMemory("系统发生异常", e.getMessage()); try { Thread.sleep(500); logMemory("暂停 0.5 秒", "" ); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } }// loop end return null ; } /** * 插入 MongoDB */ public void insertMongoDB( List<PatentBibliographicChangeBean> pbcList ) { //# MongoDB(数据加载目标) String syncLoadIntoMongoDbService = "10.78.2.23:27017"; String syncLoadIntoMongoDbName = "patent_search_extend"; String syncLoadIntoMongoTable = "patent_bibliographic_20160319"; // 加载开始 // logger.info(DateUtils.getNow() + " Load start: " + syncLoadIntoMongoDbService ); Mongo m = null; try { m = new Mongo( syncLoadIntoMongoDbService ); } catch (UnknownHostException e) { e.printStackTrace(); logger.info(DateUtils.getNow() + " UnknownHostException:" + e.getMessage()); } // 库名 DB db = m.getDB( syncLoadIntoMongoDbName ); // logger.info( DateUtils.getNow() + " Db:" + syncLoadIntoMongoDbName ); // 表名 DBCollection collection = db.getCollection( syncLoadIntoMongoTable ); // logger.info(DateUtils.getNow() + " Table:" + syncLoadIntoMongoTable ); // 循环列表,将每个元素插入数据库 for( PatentBibliographicChangeBean pbcBean : pbcList ){ //(1)读取一条著录数据 // JSONObject json = packByPackageNumber_readBibl( pbcBean.getId() ); // if( null == json ) return ; // 列,值 BasicDBObject insDoc = new BasicDBObject(); insDoc.put("abstract_No" , pbcBean.getAbstract_No() ); insDoc.put("app_Addr" , pbcBean.getApp_Addr() ); insDoc.put("app_Cn" , pbcBean.getApp_Cn() ); insDoc.put("app_Country" , pbcBean.getApp_Country() ); insDoc.put("app_Date" , pbcBean.getApp_Date() ); insDoc.put("app_Name" , pbcBean.getApp_Name() ); insDoc.put("app_Sn" , pbcBean.getApp_Sn() ); insDoc.put("app_Type" , pbcBean.getApp_Type() ); insDoc.put("app_Zip" , pbcBean.getApp_Zip() ); insDoc.put("ecla" , pbcBean.getEcla() ); insDoc.put("fi" , pbcBean.getFi() ); insDoc.put("ft" , pbcBean.getFt() ); insDoc.put("id" , pbcBean.getId() ); insDoc.put("inv_Title" , pbcBean.getInv_Title() ); insDoc.put("invent_Type" , pbcBean.getInvent_Type() ); insDoc.put("inventor" , pbcBean.getInventor() ); insDoc.put("ipc_Standard" , pbcBean.getIpc_Standard() ); insDoc.put("locarno" , pbcBean.getLocarno() ); insDoc.put("operation_Time" , pbcBean.getOperation_Time() ); insDoc.put("operation_Type" , pbcBean.getOperation_Type() ); insDoc.put("package_Name" , pbcBean.getPackage_Name() ); insDoc.put("pct_App_Cn" , pbcBean.getPct_App_Cn() ); insDoc.put("pct_App_Date" , pbcBean.getPct_App_Date() ); insDoc.put("pct_App_Sn" , pbcBean.getPct_App_Sn() ); insDoc.put("pct_Date" , pbcBean.getPct_Date() ); insDoc.put("pct_Pub_Cn" , pbcBean.getPct_Pub_Cn() ); insDoc.put("pct_Pub_Date" , pbcBean.getPct_Pub_Date() ); insDoc.put("pct_Pub_Lang" , pbcBean.getPct_Pub_Lang() ); insDoc.put("pct_Pub_Sn" , pbcBean.getPct_Pub_Sn() ); insDoc.put("prn" , pbcBean.getPrn() ); insDoc.put("prn_Cn" , pbcBean.getPrn_Cn() ); insDoc.put("prn_Date" , pbcBean.getPrn_Date() ); insDoc.put("prn_Sn" , pbcBean.getPrn_Sn() ); insDoc.put("prn_Type" , pbcBean.getPrn_Type() ); insDoc.put("pub_Cn" , pbcBean.getPub_Cn() ); insDoc.put("pub_Date" , pbcBean.getPub_Date() ); insDoc.put("pub_Sn" , pbcBean.getPub_Sn() ); insDoc.put("pub_Type" , pbcBean.getPub_Type() ); insDoc.put("uc" , pbcBean.getUc() ); collection.insert(insDoc); insDoc.clear(); } // 循环遍历pdaBeanList // System.out.println("loading ..."); // logger.info(DateUtils.getNow() + " rows:" + pdaBeanList.size()); // 当前记录编号 // int currentRowNumber = 0; if ( m != null) m.close(); // System.out.println("Load finished."); } /** 记录日志 */ private void logMemory(String behavior, String content) { // 向服务器发送日志 // sendLog.send("syncPacker", behavior, content); // 记录本地日志 logger.info(DateUtils.getNow() + " " + behavior + " :" + content); // 控制台输出日志 // System.out.println("syncPacker : " + DateUtils.getNow() + " " + behavior + " :" + content); } @Override public String insert() throws Exception { return null; } @Override public String update() throws Exception { return null; } @Override public String selectList() throws Exception { return null; } @Override public String delete() throws Exception { return null; } public static boolean isRunning() { return isRunning; } public static void setRunning(boolean isRunning) { SyncPacker_201603.isRunning = isRunning; } public Integer getCenterNodeDataVersion() { return centerNodeDataVersion; } public void setCenterNodeDataVersion(Integer centerNodeDataVersion) { this.centerNodeDataVersion = centerNodeDataVersion; } public String getSuccess() { return success; } public void setSuccess(String success) { this.success = success; } public String getMaxPackageNumber() { return maxPackageNumber; } public void setMaxPackageNumber(String maxPackageNumber) { this.maxPackageNumber = maxPackageNumber; } public String getPackedPackageNumber() { return packedPackageNumber; } public void setPackedPackageNumber(String packedPackageNumber) { this.packedPackageNumber = packedPackageNumber; } public SyncDataPackageBean getBean() { return bean; } public void setBean(SyncDataPackageBean bean) { this.bean = bean; } }
一个备份,记一下吧:
1 package syncPacker; 2 3 import java.io.BufferedReader; 4 import java.io.File; 5 import java.io.FileReader; 6 import java.io.FileWriter; 7 import java.io.IOException; 8 import java.net.UnknownHostException; 9 import java.util.ArrayList; 10 import java.util.Date; 11 import java.util.HashMap; 12 import java.util.List; 13 import java.util.Map; 14 15 import org.apache.http.message.BasicNameValuePair; 16 import org.apache.log4j.Logger; 17 import org.springframework.beans.factory.annotation.Autowired; 18 import org.springframework.beans.factory.annotation.Value; 19 import org.springframework.stereotype.Service; 20 21 import com.mongodb.BasicDBObject; 22 import com.mongodb.DB; 23 import com.mongodb.DBCollection; 24 import com.mongodb.Mongo; 25 import com.pro.framework.action.BaseController; 26 27 import logCenter.SendLog; 28 import net.sf.json.JSONObject; 29 import syncPacker.bean.PatentBibliographicChangeBean; 30 import syncPacker.bean.SyncDataPackageBean; 31 import utils.DateUtils; 32 import utils.DatetimeUtils; 33 import utils.HttpUtils; 34 35 /** 36 * 增量数据分块打包 全处理 37 * 38 * http://localhost:8080/PatentSearchExtend/syncPacker!pack.action 39 * 40 * http://10.78.2.21:8080/PatentSearchExtend/syncPacker!pack.action?bean. 41 * tableName=E_BIBLIOGRAPHIC_CHANGE_TEMP&bean.maxRowsPerSyncPackerPackage= 10000 42 */ 43 @Service 44 public class SyncPacker_201603 extends BaseController { 45 46 private static final long serialVersionUID = 1L; 47 48 // 初始化:数据库接口 49 @Autowired 50 private SyncPackerDao dao; 51 52 // 初始化:发送端地址 53 @Value("${url_syncSender}") 54 private String url_syncSender; 55 56 // 初始化:本地文件存储路径 57 @Value("${path_syncPacker_package}") 58 private String path_syncPacker_package; 59 60 // 初始化:读取最大数据包名称的地址 61 @Value("${url_selectMaxPackageNumber}") 62 private String url_selectMaxPackageNumber; 63 64 // 初始化:存储最大数据包名称的地址 65 @Value("${url_insertSyncDataPackage}") 66 private String url_insertSyncDataPackage; 67 68 // 初始化:查询条件Bean 69 private SyncDataPackageBean bean = new SyncDataPackageBean(); 70 // 初始化:查询结果List 71 private List<PatentBibliographicChangeBean> pbcList = new ArrayList<PatentBibliographicChangeBean>(); 72 // 初始化:形成的数据包名称 73 private List<PatentBibliographicChangeBean> pbcList_withPackageName = new ArrayList<PatentBibliographicChangeBean>(); 74 // 初始化:已打包的增量数据ID表单 75 private List<String> pdaIdList = new ArrayList<String>(); 76 // 初始化:用于删除数据的临时ID List 77 private List<String> idPartList = new ArrayList<String>(); 78 // 初始化:传输协议 79 private HttpUtils httpUtils = new HttpUtils(); 80 // 初始化:键值串 81 private List<BasicNameValuePair> paramList = new ArrayList<BasicNameValuePair>(); 82 // POST 结果 83 private String str_postResult; 84 // 初始化:发送url返回的信息 85 private JSONObject json_postResult; 86 // 初始化:历史最大包编号 87 private String maxPackageNumber; 88 // 初始化:记录打包完成后的数据版本 89 private Integer centerNodeDataVersion; 90 // 发送远程日志 91 private SendLog sendLog = new SendLog(); 92 // 记录本地日志 93 private Logger logger = Logger.getLogger(SyncPacker_201603.class); 94 // 初始化:判断程序是否正在运行 95 public static boolean isRunning = false; 96 97 // 本次处理完成后的最大包编号 98 private String packedPackageNumber; 99 // 用于返回json的成功信息 100 private String success = "success"; 101 102 /** 主处理:数据同步_增量数据分块打包 */ 103 // @Scheduled(cron = "${scheduled_syncPacker}") 104 public String pack() { 105 106 logMemory("scheduled_syncPacker start", ""); 107 if (isRunning) { 108 logger.info(DateUtils.getNow() + "正在运行,退出"); 109 return SUCCESS; 110 } 111 isRunning = true; 112 // 数据包数量 113 int packagesAmount = 0; 114 Date startTime = new Date(); 115 116 for (int i = 1; i <= 1; i++) { 117 logMemory("循环内部(单个数据包)开始:内部编号", "" + i); 118 119 try { 120 // 1.读取定量数据 121 selectPbcList(); 122 if (pbcList.size() <= 0) { 123 logMemory("无数据", "退出"); 124 break; 125 } 126 127 // 2.形成文件 128 if (save2disk()) { 129 packagesAmount++; 130 // 3.转移数据 131 transferFromChange2History(); 132 } 133 } catch (Exception e) { 134 // 日志输出 135 logMemory("系统发生异常", e.getMessage()); 136 e.printStackTrace(); 137 break; 138 } finally { 139 // 清空临时List 140 paramList.clear(); 141 pbcList.clear(); 142 pbcList_withPackageName.clear(); 143 pdaIdList.clear(); 144 System.gc(); 145 } 146 logMemory("循环内部(单个数据包)结束,内部编号", "" + i); 147 } 148 // loop end 149 150 // 清空临时List 151 paramList.clear(); 152 pbcList.clear(); 153 pbcList_withPackageName.clear(); 154 pdaIdList.clear(); 155 System.gc(); 156 157 isRunning = false; 158 logMemory("本次打包结束,新增数据包数量:" + packagesAmount + ",历时:" 159 + DatetimeUtils.getDistanceTimes_string(startTime, new Date()), ""); 160 logMemory("finished", "scheduled_syncPacker end"); 161 162 return SUCCESS; 163 } 164 165 /** selectPbcList() */ 166 // 读取 专利著录变化 数据 数据库有连接失败的时候,尝试 10 次 167 private void selectPbcList() { 168 169 // 日志输出 170 logMemory("增量数据分块打包开始执行", ""); 171 172 for (int i = 0; i < 10; i++) { 173 try { 174 // -------------- 查询分块著录信息增量数据 -------------- 175 logMemory("准备读取数据", ""); 176 // 每次取10000 177 pbcList = dao.selectList(bean); 178 logMemory("读取完毕", "数据量" + pbcList.size()); 179 return; 180 } catch (Exception e) { 181 // 日志输出 182 logMemory("系统发生异常", e.getMessage()); 183 e.printStackTrace(); 184 } 185 } 186 // loop end 187 return; 188 } 189 190 /** 191 * save2disk() 192 * 193 * 将打包文件存盘 194 */ 195 private boolean save2disk() throws Exception { 196 197 // -------------- 分块打包增量数据 -------------- 198 199 // 查询上一次处理最后生成的数据包名称 200 logMemory("准备查询最后一个数据包名称", ""); 201 202 // 发送url,请求对应服务器返回最大数据包名称 203 // syncDataPackageBean = dao.lastDataPackageInfo(0); 204 str_postResult = httpUtils.post(url_selectMaxPackageNumber, null); 205 logMemory("取最大包编号:" + url_selectMaxPackageNumber, " 返回 " + str_postResult); 206 207 // 判断是否成功发送信息 208 if (str_postResult == null || "".equals(str_postResult)) { 209 // 发送失败,跳出循环,结束程序 210 logMemory("失败!", " 返回为空"); 211 isRunning = false; 212 return false; 213 } 214 215 // 解析Post返回的Json 216 json_postResult = JSONObject.fromObject(str_postResult); 217 218 // 判断是否成功获取到最大包编号 219 if ("success".equalsIgnoreCase(json_postResult.getString("success"))) { 220 // 发送成功,进行下一步操作 221 maxPackageNumber = json_postResult.get("maxPackageNumber").toString(); 222 } else { 223 // 发送失败,跳出循环,结束程序 224 logMemory("失败!", "没有返回成功标志。"); 225 isRunning = false; 226 return false; 227 } 228 229 logMemory("上一次最后一个数据包名称", maxPackageNumber); 230 logMemory("准备将数据写入文件", " 路径:" + path_syncPacker_package); 231 232 // 得到新数据包名,同时分块打包增量数据 233 pbcList_withPackageName = DataPacking_2.CompressPdaData(pbcList, maxPackageNumber, path_syncPacker_package); 234 235 // 日志输出 236 logMemory("新数据包名称", pbcList_withPackageName.get(0).getPackage_Name() + ".txt"); 237 logMemory("转换完成,数量", String.valueOf(pbcList_withPackageName.size())); 238 239 // 记录打包完成的数据版本信息(流水号) 240 centerNodeDataVersion = Integer.parseInt(pbcList_withPackageName.get(0).getPackage_Name()); 241 242 // -------------- 记录已完成的数据包名称 -------------- 243 // 发送打包完成的数据包名称(流水号) 244 // dao.insertPacName(syncDataPackageBean); 245 paramList = new ArrayList<BasicNameValuePair>(); 246 paramList.add(0, 247 new BasicNameValuePair("packedPackageNumber", pbcList_withPackageName.get(0).getPackage_Name())); 248 249 str_postResult = httpUtils.post(url_insertSyncDataPackage, paramList); 250 logMemory("新增数据包:" + url_insertSyncDataPackage, " 返回 " + str_postResult); 251 252 // 判断是否成功发送信息 253 if (str_postResult == null || "".equals(str_postResult)) { 254 // 发送失败,跳出循环,结束程序 255以上是关于著录数据打包的主要内容,如果未能解决你的问题,请参考以下文章