用Java模拟多线程下的客户端与服务端数据交互
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用Java模拟多线程下的客户端与服务端数据交互相关的知识,希望对你有一定的参考价值。
目录
五、使用ReentrantLock 替换掉synchronized
一、场景
用99条子线程去接收客户端发送过来的请求,用1个主线程去接收99个子线程处理完后的结果,其中99条子线程相当于服务端的线程,99个请求相当于客户端发送过来的数据包。
二、案例分析
此实例主要含有以下4个类。
FutureMain: 主线程,创建子线程,获取子线程的处理结果。
RequestFuture: 请求类,相当于是HttpRequest, 最核心的是get()方法和received()方法,get()方法用来同步获取数据,received()方法用来同步通知主线程获取数据。
Response: 响应类,相当于HttpResponse, 对客户端的请求发出响应,此处保证响应id和请求id一致。
SubThread: 子线程,用来接收RequestFuture, 处理完请求后,将结果通过通知的方式写回到主线程。
三、具体实现
1) FutureMain 类
package com.example.basic;
import java.util.ArrayList;
import java.util.List;
/**
* @author bingbing
* @date 2021/5/20 0020 22:24
*/
public class FutureMain {
public static void main(String[] args) {
List<RequestFuture> futureList = new ArrayList<>();
// 模拟99个客户端发起请求
for (int i = 1; i < 100; i++) {
long id = i;
RequestFuture future = new RequestFuture();
future.setId(id);
// 设置请求内容
future.setRequestData("您好!");
// 将请求加入到缓存中
RequestFuture.addFuture(future);
futureList.add(future);
// 模拟发送请求
sendMsg(future);
// 由服务端子线程处理客户端请求
SubThread sub = new SubThread(future);
sub.start();
}
// 获取处理结果
for (RequestFuture future : futureList) {
Object result = future.get();
System.out.println(result);
}
}
private static void sendMsg(RequestFuture future) {
System.out.println("客户端发数据,id=" + future.getId());
}
}
2) SubThread类
package com.example.basic;
/**
* @author bingbing
* @date 2021/5/20 0020 22:28
*/
public class SubThread extends Thread {
private RequestFuture future;
public SubThread(RequestFuture future) {
this.future = future;
}
@Override
public void run() {
// 处理请求
Response response = new Response();
response.setId(future.getId());
response.setResult("服务端收到消息====当前线程id=" + Thread.currentThread().getId());
// 将结果写回主线程
future.receivced(response);
}
}
3) RequestFuture 类
package com.example.basic;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author bingbing
* @date 2021/5/20 0020 22:20
*/
public class RequestFuture {
private long id;
private Object requestData;
private Object result;
private long timeout = 5000;
// 缓存请求
private static Map<Long, Object> futures = new ConcurrentHashMap<>();
public long getId() {
return id;
}
// 添加future
public static void addFuture(RequestFuture requestFuture) {
futures.put(requestFuture.getId(), requestFuture);
}
public void setId(long id) {
this.id = id;
}
public Object getRequestData() {
return requestData;
}
public void setRequestData(Object requestData) {
this.requestData = requestData;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public static Map<Long, Object> getFutures() {
return futures;
}
public static void setFutures(Map<Long, Object> futures) {
RequestFuture.futures = futures;
}
/**
* 获取请求结果
*/
public Object get() {
synchronized (this) {
while (this.result == null) {
try {
this.wait(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return this.result;
}
/**
* 接收处理结果
*/
public void receivced(Response response) {
RequestFuture future = (RequestFuture) futures.remove(response.getId());
if (future != null) {
// future.setResult(response.getResult()) ;
this.result = response.getResult();
synchronized (this) {
future.notify();
}
}
}
}
4) response 类
package com.example.basic;
/**
* @author bingbing
* @date 2021/5/20 0020 22:29
*/
public class Response {
private Long id;
private Object result;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
打印结果:
由结果可知,服务端按照顺序将客户端的请求进行一一处理,然后响应。
四、使用线程池管理子线程
将99个子线程使用线程池管理。
引入google的guava包,引用ThreadFactoryBuilder 类
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
手动创建一个线程池
package com.example.basic;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;
/**
* @author bingbing
* @date 2021/5/21 0021 10:47
*/
public class ThreadPoolUtil {
private static final ExecutorService threadPool;
private static int maxPoolSize = 100;
static {
int corePoolSize = Runtime.getRuntime().availableProcessors();
long keepAliveTime = 5;
TimeUnit keepAliveTimeUnit = TimeUnit.MINUTES;
int queSize = 10000;
threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveTime, keepAliveTimeUnit, new LinkedBlockingDeque<>(queSize)
, new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build());
}
/**
* 获取线程池
*
* @return 线程池
*/
public static ExecutorService getThreadPool() {
return threadPool;
}
}
FutureMain类添加线程池, 将99个子线程用线程池管理。
package com.example.basic;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author bingbing
* @date 2021/5/20 0020 22:24
*/
public class FutureMain {
private static ExecutorService executorService = ThreadPoolUtil.getThreadPool();
public static void main(String[] args) {
List<RequestFuture> futureList = new ArrayList<>();
// 模拟99个客户端发起请求
for (int i = 1; i < 100; i++) {
long id = i;
RequestFuture future = new RequestFuture();
future.setId(id);
// 设置请求内容
future.setRequestData("您好!");
// 将请求加入到缓存中
RequestFuture.addFuture(future);
futureList.add(future);
// 模拟发送请求
sendMsg(future);
// 由服务端子线程处理客户端请求
executorService.submit(() -> {
// 处理请求
Response response = new Response();
response.setId(future.getId());
response.setResult("服务端收到消息====当前客户端id:" + future.getId() + ",当前线程id:" + Thread.currentThread().getId());
// 将结果写回主线程
future.receivced(response);
});
// SubThread sub = new SubThread(future);
// sub.start();
}
executorService.shutdown();
// 获取处理结果
for (RequestFuture future : futureList) {
Object result = future.get();
System.out.println(result);
}
}
private static void sendMsg(RequestFuture future) {
System.out.println("客户端发数据,id=" + future.getId());
}
}
五、使用ReentrantLock 替换掉synchronized
由于synchronized锁是重量级的阻塞锁,性能会非常的差,可使用ReentrantLock替换synchronized关键字。
定义 client 和server 两个condition, 重写get()和received方法。
package com.example.basic;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author bingbing
* @date 2021/5/20 0020 22:20
*/
public class RequestFuture {
//可重入锁,替换掉wait和notify()
private Lock lock = new ReentrantLock();
Condition client = lock.newCondition();
Condition server = lock.newCondition();
private long id;
private Object requestData;
private Object result;
private long timeout = 5000;
// 缓存请求
private static Map<Long, Object> futures = new ConcurrentHashMap<>();
public long getId() {
return id;
}
// 添加future
public static void addFuture(RequestFuture requestFuture) {
futures.put(requestFuture.getId(), requestFuture);
}
public void setId(long id) {
this.id = id;
}
public Object getRequestData() {
return requestData;
}
public void setRequestData(Object requestData) {
this.requestData = requestData;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public static Map<Long, Object> getFutures() {
return futures;
}
public static void setFutures(Map<Long, Object> futures) {
RequestFuture.futures = futures;
}
/**
* 获取请求结果
*/
public Object get() {
lock.lock();
try {
while (this.result == null) {
try {
server.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
client.signal();
return this.result;
} catch (Exception e) {
} finally {
lock.unlock();
}
return null;
}
/**
* 接收处理结果
*/
public void receivced(Response response) {
RequestFuture future = (RequestFuture) futures.remove(response.getId());
lock.lock();
try {
if (future != null) {
this.result = response.getResult();
server.signal();
} else {
client.await(5, TimeUnit.SECONDS);
}
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
注: 一定要在finally 里将锁释放掉,否则可能会产生死锁的情况。
六、Readme
模拟一个Java多线程实例。
目标: 开启一条主线程用来接服务端处理请求后的结果数据,其余99条子线程用来接收客户端的请求,99条子线程处理完请求后,然后将结果写给主线程,主线程同步阻塞接收到子线程的数据然后进行响应。
1) 主线程通过get()方法同步阻塞wait()获取处理结果。
2) 子线程处理完客户端请求后,通知notify主线程接收数据。
3) 用线程池管理99个子线程。
4) 用ReentrantLock 替换掉synchronized。
以上是关于用Java模拟多线程下的客户端与服务端数据交互的主要内容,如果未能解决你的问题,请参考以下文章