高并发下的批量处理与单个处理(利用jdk8新特性处理,提高性能)
Posted Sicozwl
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发下的批量处理与单个处理(利用jdk8新特性处理,提高性能)相关的知识,希望对你有一定的参考价值。
1、技术选型:
SpringBoot
2、案例:
实体类:
package com.zhangwl.complicatedemo.pojo;
import java.sql.Timestamp;
/**
* @ClassName Goods
* @Description
* @Author zhangwl
* @Date 2019/10/3 0:54
* @Version 1.0
**/
public class Goods {
private String goodsName;
private Timestamp goodsDate;
private String goodsCode;
private String serialNumber;
public Goods() {
}
public Goods(String goodsName, Timestamp goodsDate, String goodsCode, String serialNumber) {
this.goodsName = goodsName;
this.goodsDate = goodsDate;
this.goodsCode = goodsCode;
this.serialNumber = serialNumber;
}
public String getGoodsName() {
return goodsName;
}
public void setGoodsName(String goodsName) {
this.goodsName = goodsName;
}
public Timestamp getGoodsDate() {
return goodsDate;
}
public void setGoodsDate(Timestamp goodsDate) {
this.goodsDate = goodsDate;
}
public String getGoodsCode() {
return goodsCode;
}
public void setGoodsCode(String goodsCode) {
this.goodsCode = goodsCode;
}
public String getSerialNumber() {
return serialNumber;
}
public void setSerialNumber(String serialNumber) {
this.serialNumber = serialNumber;
}
@Override
public String toString() {
return "Goods{" +
"goodsName=‘" + goodsName + ‘\‘‘ +
", goodsDate=" + goodsDate +
", goodsCode=‘" + goodsCode + ‘\‘‘ +
", serialNumber=‘" + serialNumber + ‘\‘‘ +
‘}‘;
}
}
package com.zhangwl.complicatedemo.pojo;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* @ClassName Request
* @Description 封装请求的实体,达到批量求的目的
* @Author zhangwl
* @Date 2019/10/5 17:22
* @Version 1.0
**/
public class Request {
private String goodsCode;
CompletableFuture<Map<String, Object>> future = null;
public Request() {
}
public Request(String goodsCode, CompletableFuture<Map<String, Object>> future) {
this.goodsCode = goodsCode;
this.future = future;
}
public CompletableFuture<Map<String, Object>> getFuture() {
return future;
}
public void setFuture(CompletableFuture<Map<String, Object>> future) {
this.future = future;
}
public String getGoodsCode() {
return goodsCode;
}
public void setGoodsCode(String goodsCode) {
this.goodsCode = goodsCode;
}
@Override
public String toString() {
return "Request{" +
"goodsCode=‘" + goodsCode + ‘\‘‘ +
‘}‘;
}
}
******************************************************************************************************************************************
业务层:
package com.zhangwl.complicatedemo.service;
import com.zhangwl.complicatedemo.pojo.Goods;
import java.util.Map;
import java.util.concurrent.ExecutionException;
/**
* @ClassName GoodsService
* @Description
* @Author zhangwl
* @Date 2019/10/3 0:52
* @Version 1.0
**/
public interface GoodsService {
Map<String, Goods> findGoodsByGoodsCode(String goodsCode);
Map<String ,Object> getGoodsByGoodsCode(String goodsCode) throws ExecutionException, InterruptedException;
}
package com.zhangwl.complicatedemo.service;
import com.zhangwl.complicatedemo.pojo.Goods;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* @ClassName RemoteGoodsServiceCall
* @Description 远程调用接口
* @Author zhangwl
* @Date 2019/10/6 11:33
* @Version 1.0
**/
public interface RemoteGoodsService {
Map<String, Goods> findGoodsByGoodsCode(String goodsCode);
List<Map<String, Object>> findGoodsByBatchGoodsCode(List<String> goodsCodes);
}
package com.zhangwl.complicatedemo.service.impl;
import com.zhangwl.complicatedemo.pojo.Goods;
import com.zhangwl.complicatedemo.pojo.Request;
import com.zhangwl.complicatedemo.service.GoodsService;
import com.zhangwl.complicatedemo.service.RemoteGoodsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.*;
/**
* @ClassName GoodsServiceImpl
* @Description
* @Author zhangwl
* @Date 2019/10/3 0:52
* @Version 1.0
**/
@Service
public class GoodsServiceImpl implements GoodsService {
@Autowired
private RemoteGoodsService remoteGoodsService;
private LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<>();
@Override
public Map<String, Goods> findGoodsByGoodsCode(String goodsCode) {
Map<String, Goods> map = remoteGoodsService.findGoodsByGoodsCode(goodsCode);
return map;
}
@Override
public Map<String, Object> getGoodsByGoodsCode(String goodsCode) throws ExecutionException, InterruptedException {
/*jdk8将请求结果一一映射(通知)到线程*/
CompletableFuture<Map<String, Object>> future = new CompletableFuture<Map<String, Object>>();
Request request = new Request(goodsCode, future);
linkedBlockingQueue.add(request);
/*一直阻塞等待,有结果则返回结果*/
return future.get();
}
@PostConstruct
public void init() {
/*创建一个定时器*/
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
int size = linkedBlockingQueue.size();
if (size == 0) {
return;
}
List<Request> requests = new LinkedList<>();
/*说明队列中有数据*/
linkedBlockingQueue.forEach(o -> {
/*从队列中获取数据*/
Request request = linkedBlockingQueue.poll();
requests.add(request);
});
List<String> params = new LinkedList<>();
/*批量处理调用接口*/
requests.forEach(o -> {
params.add(o.getGoodsCode());
});
List<Map<String, Object>> goodsMapList = remoteGoodsService.findGoodsByBatchGoodsCode(params);
Map<String, Map<String, Object>> mapResult = new HashMap<>();
goodsMapList.forEach(o -> {
Set<String> keys = o.keySet();
keys.forEach(o1 ->mapResult.put(o1,o));
});
requests.forEach(o -> {
Map<String, Object> result = mapResult.get(o.getGoodsCode());
o.getFuture().complete(result);
});
}, 0, 10, TimeUnit.MILLISECONDS);
}
}
package com.zhangwl.complicatedemo.service.impl;
import com.zhangwl.complicatedemo.pojo.Goods;
import com.zhangwl.complicatedemo.service.RemoteGoodsService;
import com.zhangwl.complicatedemo.utils.CommonUtils;
import org.springframework.stereotype.Service;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* @ClassName RemoteGoodsServiceCallImpl
* @Description
* @Author zhangwl
* @Date 2019/10/6 11:34
* @Version 1.0
**/
@Service
public class RemoteGoodsServiceImpl implements RemoteGoodsService {
@Override
public Map<String, Goods> findGoodsByGoodsCode(String goodsCode) {
String uuid = CommonUtils.getUUIDString();
Goods goods = new Goods();
long mills = System.currentTimeMillis();
goods.setGoodsName("name" + mills + uuid.substring(0, 3));
goods.setGoodsDate(Timestamp.from(Instant.now()));
goods.setSerialNumber(uuid);
Map<String, Goods> map = new HashMap<>();
map.put(goodsCode, goods);
return map;
}
@Override
public List<Map<String, Object>> findGoodsByBatchGoodsCode(List<String> goodsCodes) {
List<Map<String, Object>> resultList = new LinkedList<>();
goodsCodes.forEach(o -> {
String uuid = CommonUtils.getUUIDString();
Goods goods = new Goods();
long mills = System.currentTimeMillis();
goods.setGoodsName("name" + mills + uuid.substring(0, 3));
goods.setGoodsDate(Timestamp.from(Instant.now()));
goods.setGoodsCode(o);
goods.setSerialNumber(uuid);
Map<String, Object> map = new HashMap<>();
map.put(o, goods);
resultList.add(map);
});
return resultList;
}
}
******************************************************************************************************************************************
测试类:
package com.zhangwl.complicatedemo;
import com.alibaba.fastjson.JSON;
import com.zhangwl.complicatedemo.pojo.Goods;
import com.zhangwl.complicatedemo.service.GoodsService;
import com.zhangwl.complicatedemo.utils.CommonUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ComplicatedemoApplicationTests {
private static final int T_NUM = 10000;
/*同步计数器*/
private CountDownLatch countDownLatch1 = new CountDownLatch(T_NUM);
private CountDownLatch countDownLatch2 = new CountDownLatch(T_NUM);
@Autowired
private GoodsService goodsService;
@Test
public void contextLoads_01() throws InterruptedException {
for (int i = 0; i < T_NUM; i++) {
Thread t = new Thread(() -> {
try {
countDownLatch1.await();
Map<String, Goods> goodsMap = goodsService.findGoodsByGoodsCode(CommonUtils.getUUIDString());
Set<String> keys = goodsMap.keySet();
keys.forEach(o -> {
String goodsJson = JSON.toJSONString(goodsMap.get(o));
System.out.println(goodsJson);
});
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t.start();
countDownLatch1.countDown();
}
Thread.sleep(3000);
}
/*
* 利用jdk8提高并发处理的能力
*/
@Test
public void contextLoads_02() throws InterruptedException {
for (int i = 0; i < T_NUM; i++) {
Thread t = new Thread(() -> {
try {
countDownLatch2.await();
Map<String, Object> goodsMap = goodsService.getGoodsByGoodsCode(CommonUtils.getUUIDString());
Set<String> keys = goodsMap.keySet();
keys.forEach(o -> {
String goodsJson = JSON.toJSONString(goodsMap.get(o));
System.out.println(goodsJson);
});
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
t.start();
countDownLatch2.countDown();
}
Thread.sleep(2000);
}
}
运行结果:
测试类中,
contextLoads_02测试方法的性能优于contextLoads_01方法。
以上是关于高并发下的批量处理与单个处理(利用jdk8新特性处理,提高性能)的主要内容,如果未能解决你的问题,请参考以下文章