用Java模拟多线程下的客户端与服务端数据交互

Posted Dream_it_possible!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用Java模拟多线程下的客户端与服务端数据交互相关的知识,希望对你有一定的参考价值。

 

 

目录

一、场景

二、案例分析

三、具体实现

  1)  FutureMain 类

  2)  SubThread类

  3)  RequestFuture 类

  4)  response 类

四、使用线程池管理子线程

五、使用ReentrantLock 替换掉synchronized

六、Readme


一、场景

           用99条子线程去接收客户端发送过来的请求,用1个主线程去接收99个子线程处理完后的结果,其中99条子线程相当于服务端的线程,99个请求相当于客户端发送过来的数据包。

二、案例分析

         此实例主要含有以下4个类。

         FutureMain:   主线程,创建子线程,获取子线程的处理结果。

         RequestFuture:  请求类,相当于是HttpRequest, 最核心的是get()方法和received()方法,get()方法用来同步获取数据,received()方法用来同步通知主线程获取数据。

         Response:  响应类,相当于HttpResponse, 对客户端的请求发出响应,此处保证响应id和请求id一致。

         SubThread:   子线程,用来接收RequestFuture, 处理完请求后,将结果通过通知的方式写回到主线程。

三、具体实现

  1)  FutureMain 类

package com.example.basic;

import java.util.ArrayList;
import java.util.List;

/**
 * @author bingbing
 * @date 2021/5/20 0020 22:24
 */
public class FutureMain {

    public static void main(String[] args) {

        List<RequestFuture> futureList = new ArrayList<>();
        // 模拟99个客户端发起请求
        for (int i = 1; i < 100; i++) {
            long id = i;
            RequestFuture future = new RequestFuture();
            future.setId(id);
            // 设置请求内容
            future.setRequestData("您好!");
            // 将请求加入到缓存中
            RequestFuture.addFuture(future);
            futureList.add(future);
            // 模拟发送请求
            sendMsg(future);
            // 由服务端子线程处理客户端请求
            SubThread sub = new SubThread(future);
            sub.start();
        }
        // 获取处理结果
        for (RequestFuture future : futureList) {
            Object result = future.get();
            System.out.println(result);
        }


    }

    private static void sendMsg(RequestFuture future) {
        System.out.println("客户端发数据,id=" + future.getId());
    }
}

  2)  SubThread类

package com.example.basic;

/**
 * @author bingbing
 * @date 2021/5/20 0020 22:28
 */
public class SubThread extends Thread {


    private RequestFuture future;

    public SubThread(RequestFuture future) {
        this.future = future;
    }

    @Override
    public void run() {
        // 处理请求
        Response response = new Response();
        response.setId(future.getId());
        response.setResult("服务端收到消息====当前线程id=" + Thread.currentThread().getId());
        // 将结果写回主线程
        future.receivced(response);
    }
}

  3)  RequestFuture 类

package com.example.basic;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author bingbing
 * @date 2021/5/20 0020 22:20
 */
public class RequestFuture {


    private long id;

    private Object requestData;

    private Object result;


    private long timeout = 5000;

    // 缓存请求
    private static Map<Long, Object> futures = new ConcurrentHashMap<>();

    public long getId() {
        return id;
    }


    // 添加future
    public static void addFuture(RequestFuture requestFuture) {
        futures.put(requestFuture.getId(), requestFuture);
    }


    public void setId(long id) {
        this.id = id;
    }

    public Object getRequestData() {
        return requestData;
    }

    public void setRequestData(Object requestData) {
        this.requestData = requestData;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }

    public static Map<Long, Object> getFutures() {
        return futures;
    }

    public static void setFutures(Map<Long, Object> futures) {
        RequestFuture.futures = futures;
    }

    /**
     * 获取请求结果
     */
    public Object get() {
        synchronized (this) {
            while (this.result == null) {
                try {
                    this.wait(timeout);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return this.result;
    }


    /**
     * 接收处理结果
     */
    public void receivced(Response response) {
        RequestFuture future = (RequestFuture) futures.remove(response.getId());
        if (future != null) {
//            future.setResult(response.getResult()) ;
            this.result = response.getResult();
            synchronized (this) {
                future.notify();
            }
        }
    }

}

  4)  response 类

package com.example.basic;

/**
 * @author bingbing
 * @date 2021/5/20 0020 22:29
 */
public class Response {

    private Long id;

    private Object result;


    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }
}

 打印结果:

   由结果可知,服务端按照顺序将客户端的请求进行一一处理,然后响应。

四、使用线程池管理子线程

          将99个子线程使用线程池管理。

          引入google的guava包,引用ThreadFactoryBuilder 类

<dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>20.0</version>
</dependency>

      手动创建一个线程池

package com.example.basic;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.concurrent.*;

/**
 * @author bingbing
 * @date 2021/5/21 0021 10:47
 */
public class ThreadPoolUtil {

    private static final ExecutorService threadPool;

    private static int maxPoolSize = 100;

    static {
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        long keepAliveTime = 5;
        TimeUnit keepAliveTimeUnit = TimeUnit.MINUTES;
        int queSize = 10000;

        threadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
                keepAliveTime, keepAliveTimeUnit, new LinkedBlockingDeque<>(queSize)
                ,  new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build());
    }


    /**
     * 获取线程池
     *
     * @return 线程池
     */
    public static ExecutorService getThreadPool() {
        return threadPool;
    }

}

         FutureMain类添加线程池, 将99个子线程用线程池管理。

package com.example.basic;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author bingbing
 * @date 2021/5/20 0020 22:24
 */
public class FutureMain {


    private static ExecutorService executorService = ThreadPoolUtil.getThreadPool();

    public static void main(String[] args) {

        List<RequestFuture> futureList = new ArrayList<>();
        // 模拟99个客户端发起请求
        for (int i = 1; i < 100; i++) {
            long id = i;
            RequestFuture future = new RequestFuture();
            future.setId(id);
            // 设置请求内容
            future.setRequestData("您好!");
            // 将请求加入到缓存中
            RequestFuture.addFuture(future);
            futureList.add(future);
            // 模拟发送请求
            sendMsg(future);
            // 由服务端子线程处理客户端请求
            executorService.submit(() -> {
                // 处理请求
                Response response = new Response();
                response.setId(future.getId());
                response.setResult("服务端收到消息====当前客户端id:" + future.getId() + ",当前线程id:" + Thread.currentThread().getId());
                // 将结果写回主线程
                future.receivced(response);
            });
//            SubThread sub = new SubThread(future);
//            sub.start();
        }
        executorService.shutdown();
        // 获取处理结果
        for (RequestFuture future : futureList) {
            Object result = future.get();
            System.out.println(result);
        }


    }

    private static void sendMsg(RequestFuture future) {
        System.out.println("客户端发数据,id=" + future.getId());
    }
}

五、使用ReentrantLock 替换掉synchronized

          由于synchronized锁是重量级的阻塞锁,性能会非常的差,可使用ReentrantLock替换synchronized关键字。 

          定义 client 和server 两个condition, 重写get()和received方法。

package com.example.basic;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author bingbing
 * @date 2021/5/20 0020 22:20
 */
public class RequestFuture {

    //可重入锁,替换掉wait和notify()
    private Lock lock = new ReentrantLock();
    Condition client = lock.newCondition();
    Condition server = lock.newCondition();


    private long id;

    private Object requestData;

    private Object result;


    private long timeout = 5000;

    // 缓存请求
    private static Map<Long, Object> futures = new ConcurrentHashMap<>();

    public long getId() {
        return id;
    }


    // 添加future
    public static void addFuture(RequestFuture requestFuture) {
        futures.put(requestFuture.getId(), requestFuture);
    }


    public void setId(long id) {
        this.id = id;
    }

    public Object getRequestData() {
        return requestData;
    }

    public void setRequestData(Object requestData) {
        this.requestData = requestData;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }

    public static Map<Long, Object> getFutures() {
        return futures;
    }

    public static void setFutures(Map<Long, Object> futures) {
        RequestFuture.futures = futures;
    }

    /**
     * 获取请求结果
     */
    public Object get() {
        lock.lock();
        try {
            while (this.result == null) {
                try {
                    server.await(5, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            client.signal();
            return this.result;
        } catch (Exception e) {

        } finally {
            lock.unlock();
        }
        return null;
    }


    /**
     * 接收处理结果
     */
    public void receivced(Response response) {
        RequestFuture future = (RequestFuture) futures.remove(response.getId());
        lock.lock();
        try {
            if (future != null) {
                this.result = response.getResult();
                server.signal();
            } else {
                client.await(5, TimeUnit.SECONDS);
            }
        } catch (Exception e) {

        } finally {
            lock.unlock();
        }

    }

}

  注:  一定要在finally 里将锁释放掉,否则可能会产生死锁的情况。

六、Readme

模拟一个Java多线程实例。

目标: 开启一条主线程用来接服务端处理请求后的结果数据,其余99条子线程用来接收客户端的请求,99条子线程处理完请求后,然后将结果写给主线程,主线程同步阻塞接收到子线程的数据然后进行响应。

1)  主线程通过get()方法同步阻塞wait()获取处理结果。

2)  子线程处理完客户端请求后,通知notify主线程接收数据。

3)  用线程池管理99个子线程。

4)  用ReentrantLock 替换掉synchronized。

以上是关于用Java模拟多线程下的客户端与服务端数据交互的主要内容,如果未能解决你的问题,请参考以下文章

java网络编程——多线程数据收发并行

用java多线程实现服务器与客户端之间的文件传输的代码!!!急!!!!

Java Socket 通信之多线程

用java编写多线程银行ATM 模拟程序

Socket 多线程编程

Linux-TCP编程流程-Socket编程-单线程实现TCP客户端和服务端交互-多进程实现TCP客户端和服务端交互