日志收集系统-多线程消息队列

Posted 左侧码工

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了日志收集系统-多线程消息队列相关的知识,希望对你有一定的参考价值。

1.接入系统采用监听器方式

package com.sf.log.listener;

import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.apache.commons.lang.StringUtils;
import org.apache.http.Consts;
import org.apache.http.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sf.log.model.Log;
import com.sf.log.util.LogHttpUtils;
import com.sf.log.util.PropertiesUtil;


/**
 * 
 * @author 
 * @create 2016年12月8日
 */
public class LogListener implements ServletContextListener{

	private static Logger logger = LoggerFactory.getLogger(LogListener.class);
	
	private static ExecutorService service = null;

	public static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
			Integer.valueOf(PropertiesUtil.getValue("logQueueSize")));
	 
	
	@Override
	public void contextDestroyed(ServletContextEvent sce) {
		// TODO Auto-generated method stub
	}

	//
	@Override
	public void contextInitialized(ServletContextEvent sce) {
		sendQueueTread();
	}
	
	private static void sendQueueTread(){
		int sendTreadPool = Integer.valueOf(PropertiesUtil.getValue("logThreadPool"));
		String logServerUrl = PropertiesUtil.getValue("logServerUrl");
		if (service == null) {
			// 创建一个固定大小的线程池
			service = Executors.newFixedThreadPool(sendTreadPool);
			for (int i = 0; i < sendTreadPool; i++) {	 
				Runnable run = new Runnable() {
					@Override
					public void run() {
						while (true) {
							try {
								
								String json = queue.take();
							
								if(StringUtils.isNotEmpty(logServerUrl)){
									StringEntity stringEntity = new StringEntity(json, Consts.UTF_8);
								    LogHttpUtils.httpPost(logServerUrl, stringEntity);
								}
							} catch (Exception e) {
								//e.printStackTrace();
								logger.error(e.getMessage());
							}
						}

					}
				};
				service.execute(run);
			}
		}
	}
 
	
	public static void main(String [] args){

		sendQueueTread();
	
		Log log=new Log();
		log.setAccount("1");
		log.setContent("2");
		log.setCreatedate(new Date());
		log.setDescription("3");
		log.setExceptionCode("4");
		//LogService.addLog(log);
	}

}

 

model对象

package com.sf.log.model;

import java.util.Date;
public class Log {
	 /**
    * <pre>
    * 
    * 表字段 : zy_log.id
    * </pre>
    */
   private String id;

   /**
    * <pre>
    * 
    * 表字段 : zy_log.account
    * </pre>
    */
   private String account;

   /**
    * <pre>
    * 
    * 表字段 : zy_log.description
    * </pre>
    */
   private String description;

   /**
    * <pre>
    * 
    * 表字段 : zy_log.method
    * </pre>
    */
   private String method;

   /**
    * <pre>
    * 
    * 表字段 : zy_log.requestIp
    * </pre>
    */
   private String requestIp;

 

	/**
    * <pre>
    * 
    * 表字段 : zy_log.exceptioncode
    * </pre>
    */
   private String exceptionCode;

   /**
    * <pre>
    * 
    * 表字段 : zy_log.exceptionDetail
    * </pre>
    */
   private String exceptionDetail;

   /**
    * <pre>
    * 
    * 表字段 : zy_log.params
    * </pre>
    */
   private String params;

   /**
    * <pre>
    * 创建日期
    * 表字段 : zy_log.createDate
    * </pre>
    */
   private Date createdate;

   /**
    * <pre>
    * 日志内容
    * 表字段 : zy_log.content
    * </pre>
    */
   private String content;

   /**
    * <pre>
    * 用户所做的操作
    * 表字段 : zy_log.operation
    * </pre>
    */
   private String operation;

   /**
    * <pre>
    * 
    * 表字段 : zy_log.logType
    * </pre>
    */
   private String logtype;

   public String getId() {
       return id;
   }

   public void setId(String id) {
       this.id = id == null ? null : id.trim();
   }

   public String getAccount() {
       return account;
   }

   public void setAccount(String account) {
       this.account = account == null ? null : account.trim();
   }

   public String getDescription() {
       return description;
   }

   public void setDescription(String description) {
       this.description = description == null ? null : description.trim();
   }

   public String getMethod() {
       return method;
   }

   public void setMethod(String method) {
       this.method = method == null ? null : method.trim();
   }
  
   public String getParams() {
       return params;
   }

   public void setParams(String params) {
       this.params = params == null ? null : params.trim();
   }

   public Date getCreatedate() {
       return createdate;
   }

   public void setCreatedate(Date createdate) {
       this.createdate = createdate;
   }

   public String getContent() {
       return content;
   }

   public void setContent(String content) {
       this.content = content == null ? null : content.trim();
   }

   public String getOperation() {
       return operation;
   }

   public void setOperation(String operation) {
       this.operation = operation == null ? null : operation.trim();
   }

   

   public String getLogtype() {
       return logtype;
   }

   public void setLogtype(String logtype) {
       this.logtype = logtype == null ? null : logtype.trim();
   }
   
   public String getRequestIp() {
 		return requestIp;
 	}

 	public void setRequestIp(String requestIp) {
 		this.requestIp = requestIp;
 	}

 	public String getExceptionCode() {
 		return exceptionCode;
 	}

 	public void setExceptionCode(String exceptionCode) {
 		this.exceptionCode = exceptionCode;
 	}

 	public String getExceptionDetail() {
 		return exceptionDetail;
 	}

 	public void setExceptionDetail(String exceptionDetail) {
 		this.exceptionDetail = exceptionDetail;
 	}
}

  service 操作日志

package com.sf.log.service;

import com.sf.log.listener.LogListener;
import com.sf.log.model.Log;
import com.sf.log.util.JsonUtil;

public class LogService {

	public static String addLog(Log log) {
		String json = JsonUtil.fastModelToJson(log);
		try {
			LogListener.queue.put(json);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return null;
	}

	public static String addExceptionLog(Log log) {
		// TODO Auto-generated method stub
		return null;
	}

}

  属性文件工具类

package com.sf.log.util;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;


public class PropertiesUtil {
	
	private static ResourceLoader resourceLoader = new DefaultResourceLoader();
	private static Map<String, String> ctxPropertiesMap = new HashMap<>();

	public static String getValue(String key) {
		String value = ctxPropertiesMap.get(key);
		if (StringUtils.isEmpty(value)) {
			Properties prop = new Properties();
			InputStream is = null;
			InputStreamReader reader = null;
			try {
				Resource resource = resourceLoader.getResource("log.properties");
				is = resource.getInputStream();
				reader = new InputStreamReader(is,"utf-8");
				prop.load(reader);
				value = prop.getProperty(key);
				ctxPropertiesMap.put(key, prop.getProperty(key));
			} catch (IOException ex) {

			} finally {
				IOUtils.closeQuietly(is);
				IOUtils.closeQuietly(reader);
			}
		}
		return value;
	}

	 

	public static void main(String[] args) {
		//PropertiesUtil ip = new PropertiesUtil();
		//ip.init();
		String queueSize=PropertiesUtil.getValue("logQueueSize");
		System.out.println("queueSize:"+queueSize);
	}
}

 远程接口请求http工具类

package com.sf.log.util;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.NoHttpResponseException;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.LayeredConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLContextBuilder;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.protocol.HttpContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
 
@SuppressWarnings("deprecation")
public class LogHttpUtils {
	
	private static final Logger log = LoggerFactory.getLogger(LogHttpUtils.class);
	private static String DEFAULTE_NCODING = "utf-8";
	private static int REP_CODE = 200;
	static final int timeOut = 30 * 1000;
	/**http连接池**/
	private static ConnectionSocketFactory plainsf = PlainConnectionSocketFactory
			.getSocketFactory();
	private static LayeredConnectionSocketFactory sslsf = SSLConnectionSocketFactory
			.getSocketFactory();
	private static Registry<ConnectionSocketFactory> registry = RegistryBuilder
			.<ConnectionSocketFactory> create().register("http", plainsf)
			.register("https", sslsf).build();
	
	private static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(
			registry);
	static{
	// 将最大连接数增加到200
	cm.setMaxTotal(200); 
	// 将每个路由基础的连接增加到20
	cm.setDefaultMaxPerRoute(20);
	}
	// 请求重试处理
	private static HttpRequestRetryHandler httpRequestRetryHandler = new HttpRequestRetryHandler() {
		public boolean retryRequest(IOException exception,
				int executionCount, HttpContext context) {
			if (executionCount >= 3) {// 如果已经重试了3次,就放弃
				return false;
			}
			if (exception instanceof NoHttpResponseException) {// 如果服务器丢掉了连接,那么就重试
				return true;
			}
			if (exception instanceof SSLHandshakeException) {// 不要重试SSL握手异常
				return false;
			}
			if (exception instanceof InterruptedIOException) {// 超时
				return false;
			}
			if (exception instanceof UnknownHostException) {// 目标服务器不可达
				return false;
			}
			if (exception instanceof ConnectTimeoutException) {// 连接被拒绝
				return false;
			}
			if (exception instanceof SSLException) {// ssl握手异常
				return false;
			}

			HttpClientContext clientContext = HttpClientContext
					.adapt(context);
			HttpRequest request = clientContext.getRequest();
			// 如果请求是幂等的,就再次尝试
			if (!(request instanceof HttpEntityEnclosingRequest)) {
				return true;
			}
			return false;
		}
	};
	/**---end http连接池---**/
	private static CloseableHttpClient client = HttpClients.custom()
			.setConnectionManager(cm)
			.setRetryHandler(httpRequestRetryHandler).build();
	private static CloseableHttpClient sslClient = HttpClients.custom()
			.setConnectionManager(cm)
			.setRetryHandler(httpRequestRetryHandler).setSSLSocketFactory(new SSLConnectionSocketFactory(getSSLContext())).build();
	

	private static RequestConfig getRequestConfig() {
		return RequestConfig.custom().setConnectionRequestTimeout(timeOut)
				.setConnectTimeout(timeOut).setSocketTimeout(timeOut).build();
	}

	private static String parseURL(String url) {
		if (StringUtils.isNotBlank(url)) {
			if (url.startsWith("http")) {
				return url;
			} else {
				return "http://" + url;
			}
		}
		return null;

	}

	private static String encodeParams(Map<String, Object> params) {
		StringBuilder sb = new StringBuilder();
		if (null != params) {
			Set<String> keys = params.keySet();
			int first = 0;
			for (String key : keys) {
				Object value = params.get(key);
				if (first > 0) {
					sb.append("&");
				}
				first++;
				sb.append(key);
				sb.append("=");
				String v = String.valueOf(value);
				try {
					String encodeValue = URLEncoder.encode(v, DEFAULTE_NCODING);
					sb.append(encodeValue);
				} catch (UnsupportedEncodingException e) {
					log.error("UnsupportedEncoding:" + DEFAULTE_NCODING);
				}
			}
		}
		return sb.toString();
	}

	private static void setHeaders(HttpRequestBase request,
			Map<String, Object> headers) {
		if (null != request && null != headers) {
			for (Entry<String, Object> entry : headers.entrySet()) {
				request.setHeader(entry.getKey(), (String)entry.getValue());
			}
		}
	}

	private static String getEncoding(String contentType) {
		if (contentType != null) {
			String[] strs = contentType.split(";");
			if (strs != null && strs.length > 1) {
				String charSet = strs[1].trim();
				String[] charSetKeyValues = charSet.split("=");
				if (charSetKeyValues.length == 2
						&& charSetKeyValues[0].equalsIgnoreCase("charset")) {
					return charSetKeyValues[1];
				}
			}
		}
		return DEFAULTE_NCODING;
	}

	private static HttpRespMeta getResponse(HttpResponse response) {
		InputStream inputStream = null;
		if (response != null) {
			StatusLine line = response.getStatusLine();
			if (line != null) {
				HttpRespMeta responseMeta = new HttpRespMeta();
				int code = line.getStatusCode();
				log.info(">>>>>>>>code:"+code);
				responseMeta.setCode(code);
				if (code == REP_CODE) {
					try {
						inputStream = response.getEntity().getContent();
						if (inputStream != null) {
							byte[] bs = IOUtils.toByteArray(inputStream);
							responseMeta.setResponse(bs);
							Header contentType = response.getEntity().getContentType();
							if(null != contentType){
								responseMeta.setContentType(contentType.getValue());
								responseMeta.setEncode(getEncoding(contentType.getValue()));
							}
						}
					} catch (IllegalStateException e) {
						log.error("getResponse IllegalStateException:"+ e.getLocalizedMessage());
					} catch (IOException e) {
						log.error("getResponse IOException:"+ e.getLocalizedMessage());
					} finally {
						IOUtils.closeQuietly(inputStream);
					}
				}
				return responseMeta;
			}
		}
		return null;
	}


	public static HttpRespMeta httpGet(String url, Map<String, Object> headers,Map<String, Object> params) {
		String newUrl = parseURL(url);
		if (newUrl == null) {
			return null;
		}
		if (params != null) {
			newUrl = newUrl + "?" + encodeParams(params);
		}
		HttpGet httpGet = new HttpGet(newUrl);
		setHeaders(httpGet, headers);
		return httpGet(newUrl, httpGet);
	}

	private static HttpRespMeta httpGet(String url, HttpGet httpGet) {
		try {
			CloseableHttpClient client = getInstance(url) ;
			HttpResponse response = client.execute(httpGet);
			return getResponse(response);
		} catch (ClientProtocolException e) {
			log.error("httpPost ClientProtocolException:" + e.getMessage());
		} catch (IOException e) {
			log.error("httpPost IOException:" + e.getMessage());
		} finally {
			httpGet.releaseConnection();
		}
		return null;
	}
	
	public static HttpRespMeta httpPost(String url,Map<String, Object> headers, Map<String, Object> params) {
		log.info("[NimServer post] url=" + url + ",params=" + JSON.toJSONString(params));
		String newUrl = parseURL(url);
		HttpPost httpPost = new HttpPost(newUrl);
		setHeaders(httpPost, headers);
		try {
			if (params != null) {
				List<NameValuePair> list = new LinkedList<NameValuePair>();
				for (Map.Entry<String, Object> entry : params.entrySet()) {
					if(null !=entry.getValue()){
						list.add(new BasicNameValuePair(entry.getKey(), entry.getValue().toString()));
					}
				}
				HttpEntity entity = new UrlEncodedFormEntity(list, "utf-8");
				httpPost.setEntity(entity);
			}
			return httpPost(newUrl, httpPost);
		} catch (UnsupportedEncodingException e) {
			log.error("httpPost UnsupportedEncodingException:"+ e.getMessage());
		}
		return null;
	}

	public static HttpRespMeta httpPost(String url, StringEntity entity,Map<String, Object> headers, Map<String, Object> params) {
		String newUrl = parseURL(url);
		if (newUrl == null) {
			return null;
		}
		if (params != null) {
			newUrl = newUrl + "?" + encodeParams(params);
		}
		HttpPost httpPost = new HttpPost(newUrl);
		if (null != entity) {
			httpPost.setEntity(entity);
		}
		setHeaders(httpPost, headers);
		return httpPost(newUrl, httpPost);
	}

	public static HttpRespMeta httpPost(String url, StringEntity entity) {
		String newUrl = parseURL(url);
		if (newUrl == null) {
			return null;
		}
		HttpPost httpPost = new HttpPost(newUrl);
		if (null != entity) {
			httpPost.setEntity(entity);
		}
		return httpPost(newUrl, httpPost);
	}

	public static HttpRespMeta httpPost(String url, Map<String, Object> params) {
		String newUrl = parseURL(url);
		HttpPost httpPost = new HttpPost(newUrl);
		try {
			if (params != null) {
				List<NameValuePair> list = new LinkedList<NameValuePair>();
				Set<String> keys = params.keySet();
				for (String key : keys) {
					list.add(new BasicNameValuePair(key, params.get(key).toString()));
				}
				HttpEntity entity = new UrlEncodedFormEntity(list, "utf-8");
				httpPost.setEntity(entity);
			}
			return httpPost(newUrl, httpPost);
		} catch (UnsupportedEncodingException e) {
			log.error("httpPost UnsupportedEncodingException:"+ e.getMessage());
		}
		return null;
	}
	
	public static String sendPost(String url, String param) {
        PrintWriter out = null;
        BufferedReader in = null;
        String result = "";
        try {
            URL realUrl = new URL(url);
            // 打开和URL之间的连接
            URLConnection conn = realUrl.openConnection();
            // 设置通用的请求属性
            conn.setRequestProperty("accept", "*/*");
            conn.setRequestProperty("connection", "Keep-Alive");
            conn.setRequestProperty("user-agent",
                    "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
            conn.setRequestProperty("Content-Type", "application/json; charset=utf-8");
            // 发送POST请求必须设置如下两行
            conn.setDoOutput(true);
            conn.setDoInput(true);
            // 获取URLConnection对象对应的输出流
            out = new PrintWriter(conn.getOutputStream());
            // 发送请求参数
            out.print(param);
            // flush输出流的缓冲
            out.flush();
            // 定义BufferedReader输入流来读取URL的响应
            in = new BufferedReader(
                    new InputStreamReader(conn.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) {
                result += line;
            }
        } catch (Exception e) {
           // System.out.println("发送 POST 请求出现异常!"+e);
            //e.printStackTrace();
        }
        //使用finally块来关闭输出流、输入流
        finally{
            try{
                if(out!=null){
                    out.close();
                }
                if(in!=null){
                    in.close();
                }
            }
            catch(IOException ex){
                ex.printStackTrace();
            }
        }
        return result;
    }    

	private static HttpRespMeta httpPost(String url, HttpPost httpPost) {
		CloseableHttpClient client = getInstance(url) ;
		try {
			
			httpPost.setConfig(getRequestConfig());
			HttpResponse response = client.execute(httpPost);
			return getResponse(response);
		} catch (ClientProtocolException e) {
			log.error("httpPost ClientProtocolException:" + e.getMessage());
		} catch (IOException e) {
			log.error("httpPost IOException:" + e.getMessage());
		} finally {
			httpPost.releaseConnection();
		}
		return null;
	}
	
	private static CloseableHttpClient getInstance(String url) {
		if (StringUtils.isNotBlank(url) && url.startsWith("https")) {
			return sslClient ;
		}
		return client;
	}

	private static SSLContext getSSLContext() {
		try {
			return new SSLContextBuilder().loadTrustMaterial(null,new TrustStrategy() {
						public boolean isTrusted(X509Certificate[] chain,
								String authType) throws CertificateException {
							return true;
						}
					}).build();
		} catch (KeyManagementException e) {
			log.error("HttpClient KeyManagementException:" + e.getMessage());
		} catch (NoSuchAlgorithmException e) {
			log.error("HttpClient NoSuchAlgorithmException:"+ e.getMessage());
		} catch (KeyStoreException e) {
			log.error("HttpClient KeyStoreException:" + e.getMessage());
		}
		return null;
	}

	public static class HttpRespMeta {
		private String encode;
		private int code;
		private String contentType;
		private byte[] response;

		public int getCode() {
			return code;
		}

		public void setCode(int code) {
			this.code = code;
		}

		public ByteArrayInputStream getResponseAsInputStream() {
			return new ByteArrayInputStream(response);
		}

		public long getResponseLength() {
			return response.length;
		}

		public String getResponseAsString() throws UnsupportedEncodingException {
			return new String(response, encode);
		}

		public String getContentType() {
			return contentType;
		}

		public void setContentType(String contentType) {
			this.contentType = contentType;
		}

		public byte[] getResponseAsBytes() {
			return response;
		}

		public String getEncode() {
			return encode;
		}

		public void setEncode(String encode) {
			this.encode = encode;
		}

		public void setResponse(byte[] response) {
			this.response = response;
		}
	}

	/**
	 * 发送get请求
	 * @param url
	 * @param headers
	 * @param params
	 * @return
	 */
	public static String sendGet(String url, Map<String, Object> headers,Map<String, Object> params){
		HttpRespMeta meta = LogHttpUtils.httpGet(url, headers, params);
		String result = null;
		if (null != meta ) {
			try {
				if(null != meta.getResponseAsBytes()){
					result = meta.getResponseAsString();
				}
			} catch (UnsupportedEncodingException e) {
			}
		}
		return result;
	}
	
	/**
	 * 发送get请求
	 * @param url
	 * @param headers
	 * @param params
	 * @return
	 */
	public static String sendPost(String url, Map<String, Object> headers,Map<String, Object> params){
		HttpRespMeta meta = LogHttpUtils.httpPost(url, headers, params);
		String result = null;
		if (null != meta ) {
			try {
				if(null != meta.getResponseAsBytes()){
					if(StringUtils.isEmpty(meta.getEncode())){
						meta.setEncode("utf-8");
					}
					result = meta.getResponseAsString();
				}
			} catch (UnsupportedEncodingException e) {
			}
		}
		return result;
	}
	
	
	public static void main(String[] args) throws FileNotFoundException,
			IOException {
		Map<String, Object> headers = null ;
	    Map<String, Object> params =null;
		HttpRespMeta resp = LogHttpUtils.httpGet("http://nos-yx.netease.com/yixinpublic/pr_zc4e0zwrir_rvq7mbmo75q==_1395712361_170501",headers, params);
		if (resp != null && resp.getCode() == 200) {
			File file = new File("D:\\Tools\\weibow.jpg");
			IOUtils.copy(resp.getResponseAsInputStream(), new FileOutputStream(file));
			System.out.println("len:" + resp.getResponseLength());
			System.out.println("filelen:" + file.length());
		}
	}

}

  阿里json封装类

package com.sf.log.util;

import java.util.HashMap;
import java.util.Map;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.fastjson.serializer.ValueFilter;

import net.sf.json.JSONObject;

public class JsonUtil {
	
	/**
	 * 实体数据转成json字符串
	 * @param object
	 * @return
	 */
	public static String modelToJson(Object object){
    	String jsonString = "";
		try {
			jsonString = JSON.toJSONString(object);
		} catch (Exception e) {
			return null;
		} 
    	return jsonString ;
    }
	
	/**
	 * 阿里的json解析 对象转字符串
	 * @param object
	 * @return
	 */
	public static String fastModelToJson(Object object){
		String jsonString = "";
		try {
			jsonString = JSON.toJSONString(object, SerializerFeature.WriteMapNullValue);
		} catch (Exception e) {
			return null;
		} 
    	return jsonString ;
	}
	
	/**
	 * null值转为""
	 * @param object
	 * @return
	 */
	public static String fastModelToJsoneEmpty(Object object){
		String jsonString = "";
		ValueFilter valueFilter = new ValueFilter() {
			@Override
			public Object process(Object object, String name, Object value) {
				if (value == null)
					return "";
				return value;
			}
		};
		try {
			jsonString = JSON.toJSONString(object, valueFilter);
		} catch (Exception e) {
			return null;
		} 
    	return jsonString ;
	}
	
	
	/**
	 * 保留空值 实体数据转成json字符串
	 * @param object
	 * @return
	 */
	public static String netModelToJson(Object object){
		 JSONObject jobject = JSONObject.fromObject(object);
		 return jobject.toString();
	}
	
	/**
	 * json字符串转类型
	 * @param json
	 * @param clazz
	 * @return
	 */
	public static <T> T jsonToModel(String json,Class<T> clazz){
		try {
			return JSON.parseObject(json, clazz);
		} catch (Exception e) {
			return null;
		}
	}
	
	public static void main(String[] args) {
		Map<String,String> map = new HashMap<String,String>();
		map.put("1", null);
		map.put("2", "fdsfd");
		//String s = netModelToJson(map);
		String s1 = fastModelToJson(map);
		System.out.println(jsonToModel(s1,Map.class));
		System.out.println(s1);
	}
}

  

属性文件配置log.properties

logQueueSize=10000
logThreadPool=10
logServerUrl=http://localhost:8080/logserver/addLog

 

web.xml中配置

logQueueSize=10000
logThreadPool=10
logServerUrl=日志系统url

  

 

2.服务器端就是简单的单表操作.不做记录

以上是关于日志收集系统-多线程消息队列的主要内容,如果未能解决你的问题,请参考以下文章

ELK+kafka构建日志收集系统

ELK日志收集分析系统配置

消息队列,大数据时代的神器

多线程批处理队列

TWEN-ASR ONE 语音识别系列教程---多线程与消息队列使用

TWEN-ASR ONE 语音识别系列教程---多线程与消息队列使用