如何通知多线程应用程序中的特定线程

Posted

技术标签:

【中文标题】如何通知多线程应用程序中的特定线程【英文标题】:How to notify a specific thread in a multithread application 【发布时间】:2017-05-15 13:40:21 【问题描述】:

我正在开发一个服务器应用程序,它接收来自客户端的 RESTful 请求并将其发送到新线程(UDP 数据包)中的特定设备。此外,它在执行开始时运行另一个由 servlet 侦听器启动的线程,该线程侦听系统所有设备发送的 UDP 数据包。

当客户端从特定设备发出请求时,REST 服务必须启动一个线程,从该线程将 UDP 数据包发送到设备,并等待响应。当 UDP 服务器最终接收到来自该设备的数据包(检查数据包中的 ip)时,它必须通知被阻塞的线程继续执行并完成。

我考虑过使用wait()notify()notifyAll() 方法,但是,由于可以阻塞许多线程等待多个设备的响应,我不知道如何通知解除阻塞只有所需的线程(在响应设备上发出请求的线程)。有没有办法使用这些方法来做到这一点?还有其他方法吗?这是一些代码(简化):

SocketServletListener:

public class SocketServletListener implements ServletContextListener 

    private UDPServer server;
    private ServletContext context;

    @Override
    public void contextInitialized(ServletContextEvent sce) 
        context = sce.getServletContext();  
        server = new UDPServer();
        server.start();
    

    @Override
    public void contextDestroyed(ServletContextEvent sce) 
        context = sce.getServletContext();
        server.interrupt();
    


UDP服务器:

public class UDPServer extends Thread 

    private SocketUDPCommunication comm;


    public UDPServer() 
        comm = new SocketUDPCommunication();
    

    @Override
    public void run() 

        DatagramPacket response;
        try 
            comm.setPort(Utils.UDP_SERVER_PORT);
            comm.createSocket();

            while (!Thread.currentThread().isInterrupted()) 
                try 
                    response = comm.receiveResponse();
                 catch (SocketTimeoutException e) 
                    continue;
                                           
                InetAddress ip = response.getAddress();
                int port = response.getPort();

                byte[] byteSend = comm.discardOffset(response);

                //TODO notify thread which made the request for the responding device (identified by ip)

            
         catch (IOException e) 
            System.err.println("Unable to process client request: " + e.getMessage());
         catch (IllegalArgumentException ex) 
            System.err.println("Illegal Argument: " + ex.getMessage());
         finally 
            comm.closeConnection();
        
    

    @Override
    public void interrupt() 
        super.interrupt();
        comm.closeConnection();
    

DataSend.java:

@Path("dataSend")
public class DataSend 

    @Context
    private UriInfo context;

    public DataSend() 
    

    @POST
    @Consumes(MediaType.APPLICATION_JSON)   
    public Response postJson(ForceStatus status) 

        new TestExecution(status).start();

        return Response.status(Response.Status.OK).build();     
    
   

测试执行:

public class TestExecution extends Thread 
    private ForceStatus status;

    public ExamExecution(ForceStatus status) 
        this.status = status;
    

    @Override
    public void run() 
        ProtocolStatus p = new ProtocolStatus();
        byte[] s = p.createResponseFrame(status.getForce());

        List<Integer> executedTest = new ArrayList<>();

        //Simple UDP client
        UDPClient client = new UDPClient();
        .
        .
        .
        //t is a pojo which contains the data from a battery of tests
        while(!executedTest.contains(t.getTestId())) 

            client.send(status.getIp(), status.getPort(), s);
            //TODO wait until UDPServer thread gets the response from the device

            executedTest.add(t.getTestId());

            nextTest = t.getNextTestId();

            t = getEntity(nextTest);
               
    

【问题讨论】:

***.com/questions/43289395/… ? 你应该看看this link,它有你的答案 How to make a Java thread wait for another thread's output?的可能重复 我建议使用像 Netty netty.io 这样的东西,而不是尝试自己构建这种东西。 我的情况不一样,因为我必须解除阻塞的只是被服务器收到响应的客户端线程请求阻塞的线程,而不是所有线程 【参考方案1】:

我是这样解决的:

首先我创建了一个单例类来管理请求列表,该列表由不同的线程共享。

public class SharedWaitingThreads 
    private ArrayList<ResponseToWait> queue;
    private static SharedWaitingThreads mySharedWaitingThreads;

    private SharedWaitingThreads() 
        queue = new ArrayList<>();
    

    public static SharedWaitingThreads getInstance() 
        if(mySharedWaitingThreads == null)
            mySharedWaitingThreads = new SharedWaitingThreads();

        return mySharedWaitingThreads;
    

    public ArrayList<ResponseToWait> getQueue() 
        return queue;
    

    public void setQueue(ArrayList<ResponseToWait> queue) 
        this.queue = queue;
    

    public void waitForAnswer(ResponseToWait r) throws InterruptedException 
        System.out.println("Petición registrada " + r.toString());
        synchronized(mySharedWaitingThreads) 
            mySharedWaitingThreads.getQueue().add(r);
            while(mySharedWaitingThreads.getQueue().contains(r))           
                mySharedWaitingThreads.wait();
            
        
    



    public ResponseToWait answerWaitingThread(ResponseToWait r, boolean compareSeqNum) 
        ResponseToWait rw = null;
        synchronized(mySharedWaitingThreads) 
            for(ResponseToWait rwAux : mySharedWaitingThreads.getQueue()) 
                if(rwAux.equals(r)) 
                    rw = rwAux;
                    mySharedWaitingThreads.getQueue().remove(rwAux);
                    //every time a thread is released, notify to release the lock
                    mySharedWaitingThreads.notifyAll(); 
                    break;
                
            
        
        return rw;
    

这个单例实例由主线程(contextInitialized)启动,并由所有需要等待响应才能继续工作的线程共享。 ResponseToWait 包含每个请求/线程所需的所有信息。 equals 方法被重写以适应所需的功能(在我的情况下,我通过 ip 和请求类型进行比较)

public class ExamExecution extends Thread 

    private SharedWaitingThreads waitingThreads;
    private static volatile Thread myThread;

    public ExamExecution(SharedWaitingThreads waitingThreads) 
        this.waitingThreads = waitingThreads;
    

    @Override
    public void start() 
        myThread = new Thread(this);
        myThread.start();
    

    @Override
    public void run() 
        Thread thisThread = Thread.currentThread();
        ProtocolStatus p = new ProtocolStatus();
        UDPClient client = new UDPClient();
        if(status.getWorkingMode() == WorkingMode.CONTINUE_EXECUTION) 
            byte[] frameRestart = p.createResponseFrame(status.getWorkingMode());
            client.send(getIp(), getPort(), frameRestart);
            //send the frame and block the thread until the server gets the proper response
            try 
                waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_STATUS, frameRestart));
             catch (InterruptedException ex) 
                Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex);
            
        else
        if(status.getForce() == ForceStatus.START)                 
            //get data from database and initialize variables       
            .
            .
            .

            while(!executedTest.contains(testInExam.getTestId()) && myThread != null) 
                int attempts = 0;
                res = false;
                seqNumber = this.seqNumber.getValue();
                while(!res && (attempts < 3)) 
                    TestMemoryMap map = new TestMemoryMap(testInExam.getTestId());
                    byte[] frameConfig = pc.createConfigFrame(Utils.ID_RTU, (byte)1, (byte)0, 
                        Utils.MEM_MAP_VERSION, (byte)0, map.getMemoryMap().length, seqNumber, map.getMemoryMap());

                    res = client.send(getIp(), getPort(), frameConfig);

                    if(res) 
                        try 
                            System.out.println(Thread.currentThread().getName() + " blocked waiting config answer");
                            waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_CONFIG, frameConfig));
                         catch (InterruptedException ex) 
                            Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex);
                        
                    
                    attempts++;
                
                System.out.println("Config frame received:" + res);

                if(res) 
                    byte[] frame = p.createResponseFrame(status.getWorkingMode());
                    client.send(getIp(), getPort(), frame);

                    try 
                        System.out.println(Thread.currentThread().getName() + " blocked waiting end execution answer");
                        waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_STATUS, frame));
                     catch (InterruptedException ex) 
                        Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex);
                                   
                
                //add test to list of executed tests
                executedTest.add(testInExam.getTestId());
                nextTest = testInExam.getNextTestInExamId();
                if(nextTest != 0) 
                    testInExam = daot.getEntity(nextTest);
                    testId = testInExam.getTestId();
                
            
         else if(status.getForce() == ForceStatus.END) 
            System.out.println("Stopping...");
            //abort the execution of the thread
            this.endExecution();

        
    

    private void endExecution() 
        synchronized(myThread) 
            this.myThread = null;
           
    

udp 服务器线程必须回答具体的等待线程,具体取决于收到的响应:

public class UDPServer extends Thread 

    private SocketUDPCommunication comm;
    private UDPClient udpClient;
    private SharedWaitingThreads waitingThreads;

    public UDPServer(SharedWaitingThreads waitingThreads) 
        comm = new SocketUDPCommunication();
        udpClient = new UDPClient();
        this.waitingThreads = waitingThreads;
    


    @Override
    public void run() 
        DatagramPacket response;
        try 
            comm.setPort(Utils.UDP_SERVER_PORT);
            comm.createSocket();

            while (!Thread.currentThread().isInterrupted()) 
                System.out.println("Waiting for clients to connect on port:" + comm.getSocket().getLocalPort());
                try 
                    response = comm.receiveResponse();
                 catch (SocketTimeoutException e) 
                    continue;
                                           
                InetAddress ip = response.getAddress();
                int port = response.getPort();

                byte[] byteSend = comm.discardOffset(response);

                byte[] header = new byte[Utils.STD_HEADER_SIZE];
                Utils.getCleanHeader(byteSend, header);
                byte type = header[12];

                ResponseToWait r1;
                if(type == Utils.TYPE_CONFIG_REPORT) 
                    ProtocolConfig pc = new ProtocolConfig();
                    pc.parseFrame(byteSend);
                    int mapType = pc.getPayload()[0];
                    int idElement = pc.getPayload()[1];
                    r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_CONFIG, null);
                    if(checkPendingRequests(r1, null)) 
                        System.out.println("Resending config");
                        continue;
                    
                    waitingThreads.answerWaitingThread(r1, true);
                else if(type == Utils.TYPE_STATUS_REPORT) 
                    ProtocolStatus protocol = new ProtocolStatus();

                    r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_STATUS);
                    if(checkPendingRequests(r1, statusTest)) continue;
                    byte[] frame;
                    if(statusTest.equals(StatusTest.FINALIZED)) 
                        System.out.println("Test finalized. Waking threads");
                        r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_STATUS, null);
                        //Free possible waiting threads
                        ResponseToWait res1 = waitingThreads.answerWaitingThread(r1, false);

                    
                
            
         catch (IOException e) 
            System.err.println("Unable to process client request: " + e.getMessage());
         catch (IllegalArgumentException ex) 
            System.err.println("Illegal Argument: " + ex.getMessage());
         catch (InterruptedException ex) 
            Logger.getLogger(UDPServer.class.getName()).log(Level.SEVERE, null, ex);
         finally 
            comm.closeConnection();
        
    

    private boolean checkPendingRequests(ResponseToWait rw, StatusTest status) 
        boolean resend = false;
        System.out.println("Status: " + status);
        synchronized(waitingThreads) 
            for(ResponseToWait r : waitingThreads.getQueue()) 
                if(r.getResponseType() == Utils.TYPE_CONFIG && r.getIp().equals(rw.getIp())) 
                    udpClient.send(r.getIp(), r.getPort(), r.getFrame());
                    resend = true;
                
                if(r.getResponseType() == Utils.TYPE_STATUS && r.getIp().equals(rw.getIp()))
                    udpClient.send(r.getIp(), r.getPort(), r.getFrame());
                    resend = true;  
                
            
        
        return resend;
    

    @Override
    public void interrupt() 
        super.interrupt();
        comm.closeConnection();
    


请注意,我简化了代码以使其更简单和具有指导意义,实际情况要复杂得多

【讨论】:

以上是关于如何通知多线程应用程序中的特定线程的主要内容,如果未能解决你的问题,请参考以下文章

同步主线程和工作线程

java多线程如何实现特定线程读取磁盘上特定标识的文件

如何通知尾部更新到 C++ 窗口中的线程? [读取全局变量的未缓存值]

第54天:Python 多线程 Event

第54天:Python 多线程 Event

Java 多线程 :入门- 线程间协作:挂起当前线程(wait)与通知其他线程继续执行(notify notifyAll)