日志收集系统-多线程消息队列
Posted 左侧码工
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了日志收集系统-多线程消息队列相关的知识,希望对你有一定的参考价值。
1.接入系统采用监听器方式
package com.sf.log.listener; import java.util.Date; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import org.apache.commons.lang.StringUtils; import org.apache.http.Consts; import org.apache.http.entity.StringEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sf.log.model.Log; import com.sf.log.util.LogHttpUtils; import com.sf.log.util.PropertiesUtil; /** * * @author * @create 2016年12月8日 */ public class LogListener implements ServletContextListener{ private static Logger logger = LoggerFactory.getLogger(LogListener.class); private static ExecutorService service = null; public static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>( Integer.valueOf(PropertiesUtil.getValue("logQueueSize"))); @Override public void contextDestroyed(ServletContextEvent sce) { // TODO Auto-generated method stub } // @Override public void contextInitialized(ServletContextEvent sce) { sendQueueTread(); } private static void sendQueueTread(){ int sendTreadPool = Integer.valueOf(PropertiesUtil.getValue("logThreadPool")); String logServerUrl = PropertiesUtil.getValue("logServerUrl"); if (service == null) { // 创建一个固定大小的线程池 service = Executors.newFixedThreadPool(sendTreadPool); for (int i = 0; i < sendTreadPool; i++) { Runnable run = new Runnable() { @Override public void run() { while (true) { try { String json = queue.take(); if(StringUtils.isNotEmpty(logServerUrl)){ StringEntity stringEntity = new StringEntity(json, Consts.UTF_8); LogHttpUtils.httpPost(logServerUrl, stringEntity); } } catch (Exception e) { //e.printStackTrace(); logger.error(e.getMessage()); } } } }; service.execute(run); } } } public static void main(String [] args){ sendQueueTread(); Log log=new Log(); log.setAccount("1"); log.setContent("2"); log.setCreatedate(new Date()); log.setDescription("3"); log.setExceptionCode("4"); //LogService.addLog(log); } }
model对象
package com.sf.log.model; import java.util.Date; public class Log { /** * <pre> * * 表字段 : zy_log.id * </pre> */ private String id; /** * <pre> * * 表字段 : zy_log.account * </pre> */ private String account; /** * <pre> * * 表字段 : zy_log.description * </pre> */ private String description; /** * <pre> * * 表字段 : zy_log.method * </pre> */ private String method; /** * <pre> * * 表字段 : zy_log.requestIp * </pre> */ private String requestIp; /** * <pre> * * 表字段 : zy_log.exceptioncode * </pre> */ private String exceptionCode; /** * <pre> * * 表字段 : zy_log.exceptionDetail * </pre> */ private String exceptionDetail; /** * <pre> * * 表字段 : zy_log.params * </pre> */ private String params; /** * <pre> * 创建日期 * 表字段 : zy_log.createDate * </pre> */ private Date createdate; /** * <pre> * 日志内容 * 表字段 : zy_log.content * </pre> */ private String content; /** * <pre> * 用户所做的操作 * 表字段 : zy_log.operation * </pre> */ private String operation; /** * <pre> * * 表字段 : zy_log.logType * </pre> */ private String logtype; public String getId() { return id; } public void setId(String id) { this.id = id == null ? null : id.trim(); } public String getAccount() { return account; } public void setAccount(String account) { this.account = account == null ? null : account.trim(); } public String getDescription() { return description; } public void setDescription(String description) { this.description = description == null ? null : description.trim(); } public String getMethod() { return method; } public void setMethod(String method) { this.method = method == null ? null : method.trim(); } public String getParams() { return params; } public void setParams(String params) { this.params = params == null ? null : params.trim(); } public Date getCreatedate() { return createdate; } public void setCreatedate(Date createdate) { this.createdate = createdate; } public String getContent() { return content; } public void setContent(String content) { this.content = content == null ? null : content.trim(); } public String getOperation() { return operation; } public void setOperation(String operation) { this.operation = operation == null ? null : operation.trim(); } public String getLogtype() { return logtype; } public void setLogtype(String logtype) { this.logtype = logtype == null ? null : logtype.trim(); } public String getRequestIp() { return requestIp; } public void setRequestIp(String requestIp) { this.requestIp = requestIp; } public String getExceptionCode() { return exceptionCode; } public void setExceptionCode(String exceptionCode) { this.exceptionCode = exceptionCode; } public String getExceptionDetail() { return exceptionDetail; } public void setExceptionDetail(String exceptionDetail) { this.exceptionDetail = exceptionDetail; } }
service 操作日志
package com.sf.log.service; import com.sf.log.listener.LogListener; import com.sf.log.model.Log; import com.sf.log.util.JsonUtil; public class LogService { public static String addLog(Log log) { String json = JsonUtil.fastModelToJson(log); try { LogListener.queue.put(json); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public static String addExceptionLog(Log log) { // TODO Auto-generated method stub return null; } }
属性文件工具类
package com.sf.log.util; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.springframework.core.io.DefaultResourceLoader; import org.springframework.core.io.Resource; import org.springframework.core.io.ResourceLoader; public class PropertiesUtil { private static ResourceLoader resourceLoader = new DefaultResourceLoader(); private static Map<String, String> ctxPropertiesMap = new HashMap<>(); public static String getValue(String key) { String value = ctxPropertiesMap.get(key); if (StringUtils.isEmpty(value)) { Properties prop = new Properties(); InputStream is = null; InputStreamReader reader = null; try { Resource resource = resourceLoader.getResource("log.properties"); is = resource.getInputStream(); reader = new InputStreamReader(is,"utf-8"); prop.load(reader); value = prop.getProperty(key); ctxPropertiesMap.put(key, prop.getProperty(key)); } catch (IOException ex) { } finally { IOUtils.closeQuietly(is); IOUtils.closeQuietly(reader); } } return value; } public static void main(String[] args) { //PropertiesUtil ip = new PropertiesUtil(); //ip.init(); String queueSize=PropertiesUtil.getValue("logQueueSize"); System.out.println("queueSize:"+queueSize); } }
远程接口请求http工具类
package com.sf.log.util; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.InterruptedIOException; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; import java.net.URL; import java.net.URLConnection; import java.net.URLEncoder; import java.net.UnknownHostException; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; import javax.net.ssl.SSLHandshakeException; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpEntityEnclosingRequest; import org.apache.http.HttpRequest; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.NoHttpResponseException; import org.apache.http.StatusLine; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.LayeredConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.SSLContextBuilder; import org.apache.http.conn.ssl.TrustStrategy; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicNameValuePair; import org.apache.http.protocol.HttpContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; @SuppressWarnings("deprecation") public class LogHttpUtils { private static final Logger log = LoggerFactory.getLogger(LogHttpUtils.class); private static String DEFAULTE_NCODING = "utf-8"; private static int REP_CODE = 200; static final int timeOut = 30 * 1000; /**http连接池**/ private static ConnectionSocketFactory plainsf = PlainConnectionSocketFactory .getSocketFactory(); private static LayeredConnectionSocketFactory sslsf = SSLConnectionSocketFactory .getSocketFactory(); private static Registry<ConnectionSocketFactory> registry = RegistryBuilder .<ConnectionSocketFactory> create().register("http", plainsf) .register("https", sslsf).build(); private static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager( registry); static{ // 将最大连接数增加到200 cm.setMaxTotal(200); // 将每个路由基础的连接增加到20 cm.setDefaultMaxPerRoute(20); } // 请求重试处理 private static HttpRequestRetryHandler httpRequestRetryHandler = new HttpRequestRetryHandler() { public boolean retryRequest(IOException exception, int executionCount, HttpContext context) { if (executionCount >= 3) {// 如果已经重试了3次,就放弃 return false; } if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试 return true; } if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常 return false; } if (exception instanceof InterruptedIOException) {// 超时 return false; } if (exception instanceof UnknownHostException) {// 目标服务器不可达 return false; } if (exception instanceof ConnectTimeoutException) {// 连接被拒绝 return false; } if (exception instanceof SSLException) {// ssl握手异常 return false; } HttpClientContext clientContext = HttpClientContext .adapt(context); HttpRequest request = clientContext.getRequest(); // 如果请求是幂等的,就再次尝试 if (!(request instanceof HttpEntityEnclosingRequest)) { return true; } return false; } }; /**---end http连接池---**/ private static CloseableHttpClient client = HttpClients.custom() .setConnectionManager(cm) .setRetryHandler(httpRequestRetryHandler).build(); private static CloseableHttpClient sslClient = HttpClients.custom() .setConnectionManager(cm) .setRetryHandler(httpRequestRetryHandler).setSSLSocketFactory(new SSLConnectionSocketFactory(getSSLContext())).build(); private static RequestConfig getRequestConfig() { return RequestConfig.custom().setConnectionRequestTimeout(timeOut) .setConnectTimeout(timeOut).setSocketTimeout(timeOut).build(); } private static String parseURL(String url) { if (StringUtils.isNotBlank(url)) { if (url.startsWith("http")) { return url; } else { return "http://" + url; } } return null; } private static String encodeParams(Map<String, Object> params) { StringBuilder sb = new StringBuilder(); if (null != params) { Set<String> keys = params.keySet(); int first = 0; for (String key : keys) { Object value = params.get(key); if (first > 0) { sb.append("&"); } first++; sb.append(key); sb.append("="); String v = String.valueOf(value); try { String encodeValue = URLEncoder.encode(v, DEFAULTE_NCODING); sb.append(encodeValue); } catch (UnsupportedEncodingException e) { log.error("UnsupportedEncoding:" + DEFAULTE_NCODING); } } } return sb.toString(); } private static void setHeaders(HttpRequestBase request, Map<String, Object> headers) { if (null != request && null != headers) { for (Entry<String, Object> entry : headers.entrySet()) { request.setHeader(entry.getKey(), (String)entry.getValue()); } } } private static String getEncoding(String contentType) { if (contentType != null) { String[] strs = contentType.split(";"); if (strs != null && strs.length > 1) { String charSet = strs[1].trim(); String[] charSetKeyValues = charSet.split("="); if (charSetKeyValues.length == 2 && charSetKeyValues[0].equalsIgnoreCase("charset")) { return charSetKeyValues[1]; } } } return DEFAULTE_NCODING; } private static HttpRespMeta getResponse(HttpResponse response) { InputStream inputStream = null; if (response != null) { StatusLine line = response.getStatusLine(); if (line != null) { HttpRespMeta responseMeta = new HttpRespMeta(); int code = line.getStatusCode(); log.info(">>>>>>>>code:"+code); responseMeta.setCode(code); if (code == REP_CODE) { try { inputStream = response.getEntity().getContent(); if (inputStream != null) { byte[] bs = IOUtils.toByteArray(inputStream); responseMeta.setResponse(bs); Header contentType = response.getEntity().getContentType(); if(null != contentType){ responseMeta.setContentType(contentType.getValue()); responseMeta.setEncode(getEncoding(contentType.getValue())); } } } catch (IllegalStateException e) { log.error("getResponse IllegalStateException:"+ e.getLocalizedMessage()); } catch (IOException e) { log.error("getResponse IOException:"+ e.getLocalizedMessage()); } finally { IOUtils.closeQuietly(inputStream); } } return responseMeta; } } return null; } public static HttpRespMeta httpGet(String url, Map<String, Object> headers,Map<String, Object> params) { String newUrl = parseURL(url); if (newUrl == null) { return null; } if (params != null) { newUrl = newUrl + "?" + encodeParams(params); } HttpGet httpGet = new HttpGet(newUrl); setHeaders(httpGet, headers); return httpGet(newUrl, httpGet); } private static HttpRespMeta httpGet(String url, HttpGet httpGet) { try { CloseableHttpClient client = getInstance(url) ; HttpResponse response = client.execute(httpGet); return getResponse(response); } catch (ClientProtocolException e) { log.error("httpPost ClientProtocolException:" + e.getMessage()); } catch (IOException e) { log.error("httpPost IOException:" + e.getMessage()); } finally { httpGet.releaseConnection(); } return null; } public static HttpRespMeta httpPost(String url,Map<String, Object> headers, Map<String, Object> params) { log.info("[NimServer post] url=" + url + ",params=" + JSON.toJSONString(params)); String newUrl = parseURL(url); HttpPost httpPost = new HttpPost(newUrl); setHeaders(httpPost, headers); try { if (params != null) { List<NameValuePair> list = new LinkedList<NameValuePair>(); for (Map.Entry<String, Object> entry : params.entrySet()) { if(null !=entry.getValue()){ list.add(new BasicNameValuePair(entry.getKey(), entry.getValue().toString())); } } HttpEntity entity = new UrlEncodedFormEntity(list, "utf-8"); httpPost.setEntity(entity); } return httpPost(newUrl, httpPost); } catch (UnsupportedEncodingException e) { log.error("httpPost UnsupportedEncodingException:"+ e.getMessage()); } return null; } public static HttpRespMeta httpPost(String url, StringEntity entity,Map<String, Object> headers, Map<String, Object> params) { String newUrl = parseURL(url); if (newUrl == null) { return null; } if (params != null) { newUrl = newUrl + "?" + encodeParams(params); } HttpPost httpPost = new HttpPost(newUrl); if (null != entity) { httpPost.setEntity(entity); } setHeaders(httpPost, headers); return httpPost(newUrl, httpPost); } public static HttpRespMeta httpPost(String url, StringEntity entity) { String newUrl = parseURL(url); if (newUrl == null) { return null; } HttpPost httpPost = new HttpPost(newUrl); if (null != entity) { httpPost.setEntity(entity); } return httpPost(newUrl, httpPost); } public static HttpRespMeta httpPost(String url, Map<String, Object> params) { String newUrl = parseURL(url); HttpPost httpPost = new HttpPost(newUrl); try { if (params != null) { List<NameValuePair> list = new LinkedList<NameValuePair>(); Set<String> keys = params.keySet(); for (String key : keys) { list.add(new BasicNameValuePair(key, params.get(key).toString())); } HttpEntity entity = new UrlEncodedFormEntity(list, "utf-8"); httpPost.setEntity(entity); } return httpPost(newUrl, httpPost); } catch (UnsupportedEncodingException e) { log.error("httpPost UnsupportedEncodingException:"+ e.getMessage()); } return null; } public static String sendPost(String url, String param) { PrintWriter out = null; BufferedReader in = null; String result = ""; try { URL realUrl = new URL(url); // 打开和URL之间的连接 URLConnection conn = realUrl.openConnection(); // 设置通用的请求属性 conn.setRequestProperty("accept", "*/*"); conn.setRequestProperty("connection", "Keep-Alive"); conn.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)"); conn.setRequestProperty("Content-Type", "application/json; charset=utf-8"); // 发送POST请求必须设置如下两行 conn.setDoOutput(true); conn.setDoInput(true); // 获取URLConnection对象对应的输出流 out = new PrintWriter(conn.getOutputStream()); // 发送请求参数 out.print(param); // flush输出流的缓冲 out.flush(); // 定义BufferedReader输入流来读取URL的响应 in = new BufferedReader( new InputStreamReader(conn.getInputStream())); String line; while ((line = in.readLine()) != null) { result += line; } } catch (Exception e) { // System.out.println("发送 POST 请求出现异常!"+e); //e.printStackTrace(); } //使用finally块来关闭输出流、输入流 finally{ try{ if(out!=null){ out.close(); } if(in!=null){ in.close(); } } catch(IOException ex){ ex.printStackTrace(); } } return result; } private static HttpRespMeta httpPost(String url, HttpPost httpPost) { CloseableHttpClient client = getInstance(url) ; try { httpPost.setConfig(getRequestConfig()); HttpResponse response = client.execute(httpPost); return getResponse(response); } catch (ClientProtocolException e) { log.error("httpPost ClientProtocolException:" + e.getMessage()); } catch (IOException e) { log.error("httpPost IOException:" + e.getMessage()); } finally { httpPost.releaseConnection(); } return null; } private static CloseableHttpClient getInstance(String url) { if (StringUtils.isNotBlank(url) && url.startsWith("https")) { return sslClient ; } return client; } private static SSLContext getSSLContext() { try { return new SSLContextBuilder().loadTrustMaterial(null,new TrustStrategy() { public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException { return true; } }).build(); } catch (KeyManagementException e) { log.error("HttpClient KeyManagementException:" + e.getMessage()); } catch (NoSuchAlgorithmException e) { log.error("HttpClient NoSuchAlgorithmException:"+ e.getMessage()); } catch (KeyStoreException e) { log.error("HttpClient KeyStoreException:" + e.getMessage()); } return null; } public static class HttpRespMeta { private String encode; private int code; private String contentType; private byte[] response; public int getCode() { return code; } public void setCode(int code) { this.code = code; } public ByteArrayInputStream getResponseAsInputStream() { return new ByteArrayInputStream(response); } public long getResponseLength() { return response.length; } public String getResponseAsString() throws UnsupportedEncodingException { return new String(response, encode); } public String getContentType() { return contentType; } public void setContentType(String contentType) { this.contentType = contentType; } public byte[] getResponseAsBytes() { return response; } public String getEncode() { return encode; } public void setEncode(String encode) { this.encode = encode; } public void setResponse(byte[] response) { this.response = response; } } /** * 发送get请求 * @param url * @param headers * @param params * @return */ public static String sendGet(String url, Map<String, Object> headers,Map<String, Object> params){ HttpRespMeta meta = LogHttpUtils.httpGet(url, headers, params); String result = null; if (null != meta ) { try { if(null != meta.getResponseAsBytes()){ result = meta.getResponseAsString(); } } catch (UnsupportedEncodingException e) { } } return result; } /** * 发送get请求 * @param url * @param headers * @param params * @return */ public static String sendPost(String url, Map<String, Object> headers,Map<String, Object> params){ HttpRespMeta meta = LogHttpUtils.httpPost(url, headers, params); String result = null; if (null != meta ) { try { if(null != meta.getResponseAsBytes()){ if(StringUtils.isEmpty(meta.getEncode())){ meta.setEncode("utf-8"); } result = meta.getResponseAsString(); } } catch (UnsupportedEncodingException e) { } } return result; } public static void main(String[] args) throws FileNotFoundException, IOException { Map<String, Object> headers = null ; Map<String, Object> params =null; HttpRespMeta resp = LogHttpUtils.httpGet("http://nos-yx.netease.com/yixinpublic/pr_zc4e0zwrir_rvq7mbmo75q==_1395712361_170501",headers, params); if (resp != null && resp.getCode() == 200) { File file = new File("D:\\Tools\\weibow.jpg"); IOUtils.copy(resp.getResponseAsInputStream(), new FileOutputStream(file)); System.out.println("len:" + resp.getResponseLength()); System.out.println("filelen:" + file.length()); } } }
阿里json封装类
package com.sf.log.util; import java.util.HashMap; import java.util.Map; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.alibaba.fastjson.serializer.ValueFilter; import net.sf.json.JSONObject; public class JsonUtil { /** * 实体数据转成json字符串 * @param object * @return */ public static String modelToJson(Object object){ String jsonString = ""; try { jsonString = JSON.toJSONString(object); } catch (Exception e) { return null; } return jsonString ; } /** * 阿里的json解析 对象转字符串 * @param object * @return */ public static String fastModelToJson(Object object){ String jsonString = ""; try { jsonString = JSON.toJSONString(object, SerializerFeature.WriteMapNullValue); } catch (Exception e) { return null; } return jsonString ; } /** * null值转为"" * @param object * @return */ public static String fastModelToJsoneEmpty(Object object){ String jsonString = ""; ValueFilter valueFilter = new ValueFilter() { @Override public Object process(Object object, String name, Object value) { if (value == null) return ""; return value; } }; try { jsonString = JSON.toJSONString(object, valueFilter); } catch (Exception e) { return null; } return jsonString ; } /** * 保留空值 实体数据转成json字符串 * @param object * @return */ public static String netModelToJson(Object object){ JSONObject jobject = JSONObject.fromObject(object); return jobject.toString(); } /** * json字符串转类型 * @param json * @param clazz * @return */ public static <T> T jsonToModel(String json,Class<T> clazz){ try { return JSON.parseObject(json, clazz); } catch (Exception e) { return null; } } public static void main(String[] args) { Map<String,String> map = new HashMap<String,String>(); map.put("1", null); map.put("2", "fdsfd"); //String s = netModelToJson(map); String s1 = fastModelToJson(map); System.out.println(jsonToModel(s1,Map.class)); System.out.println(s1); } }
属性文件配置log.properties
logQueueSize=10000 logThreadPool=10 logServerUrl=http://localhost:8080/logserver/addLog
web.xml中配置
logQueueSize=10000 logThreadPool=10 logServerUrl=日志系统url
2.服务器端就是简单的单表操作.不做记录
以上是关于日志收集系统-多线程消息队列的主要内容,如果未能解决你的问题,请参考以下文章