封装自己的ThreadPool
Posted 姩澕
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了封装自己的ThreadPool相关的知识,希望对你有一定的参考价值。
/** * Created by lzd on 2016年6月6日 下午5:06:56 */ public class JdPriceUtils{ private static final Logger log = Logger.getLogger(JdPriceUtils.class); private int threads = 1; private ThreadPool threadPool; private DbStore store; private IRedis redis = RedisFactory.getShardedRedis(); private ReentrantLock LOCK = new ReentrantLock(); private Condition condition = this.LOCK.newCondition(); private final LinkedList<JdBookInfo> list = new LinkedList<>(); private final Object LIST_LOCK = new Object(); // private static final CloseableHttpClient client = HttpClients.createDefault(); public interface DbStore{ List<JdBookInfo> get(); void save(JdBookInfo book); } public JdPriceUtils(DbStore dbStore){ this.store = dbStore; } public void init(){ threadPool = new ThreadPool(this.getThreads()); while (true) { synchronized (LIST_LOCK) { list.addAll(product()); log.info("list container init size = " + list.size()); } if (list.size() == 0){ this.await(); }else { consume(); this.signal(); } } } protected List<JdBookInfo> product(){ return store.get(); } protected void consume(){ while (list.size() > 0) { final JdBookInfo jdBookInfo; synchronized (LIST_LOCK) { jdBookInfo = list.removeFirst(); } threadPool.execute(new Runnable() { @Override public void run() { try { handlePrice(jdBookInfo); } catch (Exception e) { log.error("handle price is error",e); } } }); } } protected void handlePrice(JdBookInfo jdBookInfo){ log.info(Thread.currentThread().getName() + "--处理--" + jdBookInfo.getJsin() + "--的价格"); if( jdBookInfo == null || jdBookInfo.getJsin() == null) { new IllegalArgumentException("param is not valid"); } String code = jdBookInfo.getJsin(); String res = null; while (StringUtils.isEmpty(res)) { res = getPrice(code); } Map map = Collections.EMPTY_MAP;; try { String json = res.substring(res.indexOf("{"), res.lastIndexOf("}")+1); map = JSON.parseObject(json,Map.class); log.info("request return json = [" + json + "]"); } catch (Exception e) { addList(jdBookInfo); log.warn("json parse is error , again back list"); return; } String error = String.valueOf(map.get("error")); if(!"null".equals(error)){ addList(jdBookInfo); log.warn("get pdos_captcha,list add book = [jsin = "+jdBookInfo.getJsin()+";price = "+jdBookInfo.getPreface() + " or " + jdBookInfo.getSalePrice()); return; } String price = String.valueOf(map.get("m")); String salePrice = String.valueOf(map.get("p")); //如果价格为null时候,设默认值为0; if ("null".equals(price)) { price = "0"; } if ("null".equals(salePrice)) { salePrice = "0"; } JdBookInfo book = new JdBookInfo(); book.setJsin(jdBookInfo.getJsin()); book.setPrice(price); book.setSalePrice(salePrice); book.setId(jdBookInfo.getId()); store.save(book); } private void addList(JdBookInfo jdBookInfo){ synchronized (LIST_LOCK) { list.addLast(jdBookInfo); } } protected String getPrice(String code){ String url = "http://p.3.cn/prices/get?skuid=J_"+code+"&type=1&area=1_72_2840&callback=cnp"; String res = null; try { String sRandMember = null; String[] split = null; while (sRandMember == null) { sRandMember = redis.sRandMember("proxies"); if(sRandMember == null) continue; split = sRandMember.split(":"); } HttpHost httpHost = new HttpHost(split[0],Integer.parseInt(split[1])); RequestConfig reqConfig = RequestConfig.custom().setProxy(httpHost) .setConnectionRequestTimeout(10000) .setConnectTimeout(10000) .setSocketTimeout(10000).build(); log.info("request httpHost = " + split[0]+":"+split[1]); RequestBuilder requestBuilder = RequestBuilder.get(url).setConfig(reqConfig); CloseableHttpResponse response = HttpClients.createDefault().execute(requestBuilder.build()); InputStream in = response.getEntity().getContent(); byte[] byteArray = IOUtils.toByteArray(in); res = new String(byteArray,"utf-8"); } catch (Exception e) { log.error("get conntion price url is error"); } return res; } private void await(){ this.LOCK.lock(); try { log.info("main thrad is await ..."); this.condition.await(30000L,TimeUnit.MILLISECONDS); } catch (Exception e) { } finally { this.LOCK.unlock(); } } private void signal(){ this.LOCK.lock(); try { this.condition.signal(); } catch (Exception e) { } finally { this.LOCK.unlock(); } } public int getThreads() { return threads; } public JdPriceUtils setThreads(int threads) { this.threads = threads; return this; } public void start(){ this.init(); } public class ThreadPool{ private int threads; private ExecutorService executorService; private AtomicInteger threadActive = new AtomicInteger(0); private ReentrantLock poolLock = new ReentrantLock(); private Condition poolCondition = this.poolLock.newCondition(); public ThreadPool(int threads){ this.threads = threads; executorService = Executors.newFixedThreadPool(threads); } public void execute(final Runnable runnable){ try { poolLock.lock(); if (threadActive.get() >= threads ) { while (threadActive.get() >= threads ){ try { poolCondition.await(); } catch (Exception e) { e.printStackTrace(); } } } } catch (Exception e) { } finally { poolLock.unlock(); } threadActive.incrementAndGet(); executorService.execute(new Runnable() { @Override public void run() { runnable.run(); try { poolLock.lock(); poolCondition.signal(); threadActive.decrementAndGet(); } catch (Exception e) { } finally { poolLock.unlock(); } } }); } } }
以上是关于封装自己的ThreadPool的主要内容,如果未能解决你的问题,请参考以下文章
VSCode自定义代码片段14——Vue的axios网络请求封装