Java 多线程共享对象 - 需要设计模式

Posted

技术标签:

【中文标题】Java 多线程共享对象 - 需要设计模式【英文标题】:Java sharing objects by multiple threads - design pattern needed 【发布时间】:2015-05-27 12:15:53 【问题描述】:

我想就我正在设计的一个简单的多线程系统获得一些建议。

想法: 该应用程序正在捕获帧并在第一个图像视图中显示它们。这些捕获的帧也正在处理(由 MyHandDetectionThread),然后显示在第二个图像视图中。

我的解决方案

public class VideoManager 
    private volatile BufferLinkedList<InputFrame> mInputFrames;
    private volatile BufferLinkedList<ProcessedFrame> mProcessedFrames;

    private static VideoManager mVideoManagerInstance = new VideoManager();

    private Timer captureTimer;
    private MyVideoCaptureThread myVideoCaptureThread;
    private MyFrameDisplayThread myFrameDisplayThread;
    private MyHandDetectionThread myHandDetectionThread;
    private MyProcessedFrameDisplayThread myProcessedFrameDisplayThread;

    private enum ThreadMessages 
        PROCESS_INPUT_FRAME,
        NEW_INPUT_FRAME,
        NEW_PROCESSED_FRAME_ARRIVED,
        GET_NEW_FRAME
    

    public static VideoManager getInstance() 
        if (mVideoManagerInstance == null) 
            mVideoManagerInstance = new VideoManager();
        
        return mVideoManagerInstance;
    

    // not visible constructor - for singleton purposes
    private VideoManager() 
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
    

    public void startDetectionAndRecognition(ImageView camIV, ImageView handIV) 
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);

        captureTimer = new Timer();

        myVideoCaptureThread = new MyVideoCaptureThread();
        myFrameDisplayThread = new MyFrameDisplayThread(camIV, handIV);
        myHandDetectionThread = new MyHandDetectionThread();
        myProcessedFrameDisplayThread = new MyProcessedFrameDisplayThread();

        captureTimer.schedule(new TimerTask() 
            public void run() 
                if (myVideoCaptureThread != null && myVideoCaptureThread.threadMessages != null)
                    myVideoCaptureThread.threadMessages.offer(ThreadMessages.GET_NEW_FRAME);
            
        , 0, 1000 / Config.fps);
        myFrameDisplayThread.start();
        myVideoCaptureThread.start();
        myHandDetectionThread.start();
        myProcessedFrameDisplayThread.start();
    

    public void stop() 
        captureTimer.cancel();
        myVideoCaptureThread.interrupt();
        myHandDetectionThread.interrupt();
        myFrameDisplayThread.interrupt();
        myGestureRecogitionThread.interrupt();

        mInputFrames.removeAll(mInputFrames);
        mProcessedFrames.removeAll(mProcessedFrames);

        isActive = false;
    

    public boolean isActive() 
        return isActive;
    

    ////////////////////////
    // Thread clases
    ////////////////////////
    private class MyVideoCaptureThread extends Thread 
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128);

        @Override
        public void run() 
            WebCamVideoCapture vc = new WebCamVideoCapture();
            while (!isInterrupted()) 
                if (threadMessages != null && threadMessages.poll() == ThreadMessages.GET_NEW_FRAME) 
                    Mat mat = vc.getNextMatFrame();
                    if (mat != null && mInputFrames != null) 
                        mInputFrames.offerFirst(new InputFrame(mat));

                        if (myFrameDisplayThread != null && myFrameDisplayThread.threadMessages != null)
                            myFrameDisplayThread.threadMessages.offer(ThreadMessages.NEW_INPUT_FRAME);

                        if (myHandDetectionThread != null && myHandDetectionThread.threadMessages != null)
                            myHandDetectionThread.threadMessages.offer(ThreadMessages.PROCESS_INPUT_FRAME);
                    
                
            
            vc.close();
        
    

    private class MyFrameDisplayThread extends Thread 
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128);

        ImageView mCamImageView;


        long lastUpdatedCamImageViewMillis;
        long lastUpdatedHandImageViewMillis;

        public MyFrameDisplayThread(ImageView mImageView) 
            this.mCamImageView = mImageView;
        

        private synchronized void updateImageViews() 
            if (threadMessages.poll() == ThreadMessages.NEW_INPUT_FRAME && mInputFrames != null && !mInputFrames.isEmpty() && mInputFrames.peek() != null && mInputFrames.peek().getFrame() != null) 
                if(Config.IS_DEBUG) System.out.println("Updating frame image view");
                mCamImageView.setImage(Utils.cvMatToImage(mInputFrames.peekFirst().getFrame()));
             
        

        @Override
        public void run() 
            while (!isInterrupted()) 
                updateImageViews();
            
        
    

    private class MyHandDetectionThread extends Thread 
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128); //TODO if multiple threads, define it out of class
        HandDetector hd = new HandDetector();

        @Override
        public void run() 
            while (!isInterrupted()) 
                if (threadMessages.poll() == ThreadMessages.PROCESS_INPUT_FRAME && mInputFrames != null && mInputFrames.size() > 0 && mInputFrames.peek() != null) 
                    if(Config.IS_DEBUG) System.out.println("Detecting hand...");

                    mProcessedFrames.offerFirst(new ProcessedFrame(hd.detectHand(mInputFrames.peek()), null, null, null));

                    if (myGestureRecogitionThread != null && myGestureRecogitionThread.threadMessages != null)
                        myGestureRecogitionThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);

                    if(myFrameDisplayThread != null && myFrameDisplayThread.threadMessages != null)
                        myFrameDisplayThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);
                
            
        
    

    private class MyProcessedFrameDisplayThread extends Thread 
        LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(128);
        ImageView mHandImageView;
        public MyProcessedFrameDisplayThread(ImageView mHandImageView) 
            mHandImageView = mHandImageView;
        

        private synchronized void updateImageViews() 
            if(threadMessages.poll() == ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED && mProcessedFrames != null && !mProcessedFrames.isEmpty() && mProcessedFrames.peek() != null && mProcessedFrames.peek().getmHandMask() != null) 
                if(Config.IS_DEBUG) System.out.println("Updating hand image view");
                mHandImageView.setImage(Utils.cvMatToImage(mProcessedFrames.peekFirst().getmHandMask()));
            
        

        @Override
        public void run() 
            while (!isInterrupted())
                if (threadMessages.poll() == ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED)
                    updateImageViews();
        

    

public class BufferLinkedList<E> extends LinkedList<E> 
    private int counter = 0;
    private int sizeLimit = 48;

    public BufferLinkedList(int sizeLimit) 
        this.sizeLimit = sizeLimit;
    

    @Override
    public synchronized boolean offerFirst(E e) 
        while(size() > sizeLimit) 
            removeLast();
        

        return super.offerFirst(e);
    

    @Override
    public synchronized E peekFirst() 
        return super.peekFirst();
    

    @Override
    public synchronized E peekLast() 
        return super.peekLast();
    

    @Override
    public synchronized E pollFirst() 
        return super.pollFirst();
    

    @Override
    public synchronized E pollLast() 
        return super.pollLast();
    

我的问题: 帧显示不流畅。在更新 imageviews 的方法之间有不规则的 1-5 秒间隔。然而,MyHandDetectionThread 的任务运行得很快。并且显示线程的消息队列的大小正在快速增加。也许这是因为存储帧的列表上有一些锁?

问题: 我的解决方案正确吗?是否有一些描述这种情况的设计模式?您有一些改进建议吗?

编辑: 我在线程循环中添加了等待和通知。结果令人满意。 CPU 成本现在约为 30%,而之前约为 80%。一切运行更稳定,更顺畅。但是,我不熟悉等待和通知方法。因此,如果您在我的代码中发现一些愚蠢的东西,请告诉我。

public class VideoManager 
    private volatile BufferLinkedList<InputFrame> mInputFrames;
    private volatile BufferLinkedList<ProcessedFrame> mProcessedFrames;

    private static VideoManager mVideoManagerInstance = new VideoManager();

    private Timer captureTimer;
    private MyVideoCaptureThread myVideoCaptureThread;
    private MyFrameDisplayThread myFrameDisplayThread;
    private MyHandDetectionThread myHandDetectionThread;
    private MyGestureRecogitionThread myGestureRecogitionThread;
    private MySkinDisplayThread mySkinDisplayThread;

    private final static int THREAD_MESSAGES_LIMIT = 10000;
    private final static int TIMER_INTERVAL = 1000 / Config.fps;
    private final static int WAITING_TIMEOUT = 2000;

    private enum ThreadMessages 
        PROCESS_INPUT_FRAME,
        NEW_INPUT_FRAME,
        NEW_PROCESSED_FRAME_ARRIVED,
        GET_NEW_FRAME
    

    public static VideoManager getInstance() 
        if (mVideoManagerInstance == null) 
            mVideoManagerInstance = new VideoManager();
        
        return mVideoManagerInstance;
    

    // not visible constructor - for singleton purposes
    private VideoManager() 
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
    

    public void startDetectionAndRecognition(ImageView camIV, ImageView handIV) 
        mInputFrames = new BufferLinkedList<>(Config.inputFramesListLimit);
        mProcessedFrames = new BufferLinkedList<>(Config.inputFramesListLimit);

        captureTimer = new Timer();
        myFrameDisplayThread = new MyFrameDisplayThread(camIV);
        myVideoCaptureThread = new MyVideoCaptureThread();
        myHandDetectionThread = new MyHandDetectionThread();
        myGestureRecogitionThread = new MyGestureRecogitionThread();
        mySkinDisplayThread = new MySkinDisplayThread(handIV);

        myVideoCaptureThread.start();
        captureTimer.schedule(new TimerTask() 
            public void run() 
                if (myVideoCaptureThread != null && myVideoCaptureThread.threadMessages != null) 
                    myVideoCaptureThread.threadMessages.offer(ThreadMessages.GET_NEW_FRAME);
                    System.out.println("Timer get frame request sent");
                    myVideoCaptureThread.wakeUp();
                
            
        , 0, TIMER_INTERVAL);
        myFrameDisplayThread.start();
        mySkinDisplayThread.start();
        myHandDetectionThread.start();
        myGestureRecogitionThread.start();
    

    public void stop() 
        captureTimer.cancel();
        myVideoCaptureThread.interrupt();
        myHandDetectionThread.interrupt();
        mySkinDisplayThread.interrupt();
        myFrameDisplayThread.interrupt();
        myGestureRecogitionThread.interrupt();

        mInputFrames.removeAll(mInputFrames);
        mProcessedFrames.removeAll(mProcessedFrames);

    

    ////////////////////////
    // Lock class
    ////////////////////////
    private static final class Lock 

    ////////////////////////
    // Thread clases
    ////////////////////////
    private class MyVideoCaptureThread extends Thread 
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        WebCamVideoCapture vc = new WebCamVideoCapture();
        Lock lock = new Lock();

        @Override
        public void run() 
            synchronized (lock) 
                while (!isInterrupted()) 
                    if (threadMessages.poll() != ThreadMessages.GET_NEW_FRAME) 
                        try 
                            lock.wait(WAITING_TIMEOUT);
                            System.out.println("WideoCaptureThread waiting");
                         catch (InterruptedException e) 
                            e.printStackTrace();
                        
                    
                    Mat mat = vc.getNextMatFrame();
                    System.out.println("getting next frame from webcam");
                    if (mat != null && mInputFrames != null) 
                        mInputFrames.offerFirst(new InputFrame(vc.getNextMatFrame()));

                        if (myHandDetectionThread != null && myHandDetectionThread.threadMessages != null) 
                            myHandDetectionThread.wakeUp();
                            myHandDetectionThread.threadMessages.offer(ThreadMessages.PROCESS_INPUT_FRAME);
                        

                        if (myFrameDisplayThread != null && myFrameDisplayThread.threadMessages != null) 
                            myFrameDisplayThread.wakeUp();
                            myFrameDisplayThread.threadMessages.offer(ThreadMessages.NEW_INPUT_FRAME);
                        
                    
                
            
        

        public void wakeUp() 
            synchronized (lock) 
                lock.notifyAll();
                System.out.println("Waking up WideoCapture");
            
        

        @Override
        public void interrupt() 
            vc.close();
            super.interrupt();
        
    

    private class MyFrameDisplayThread extends Thread 
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        Lock lock = new Lock();

        ImageView mCamImageView;

        public MyFrameDisplayThread(ImageView mImageView) 
            this.mCamImageView = mImageView;
        

        private void updateImageViews() 
            if (shouldUpdateCamImageView() && mInputFrames != null && !mInputFrames.isEmpty() && mInputFrames.peek() != null && mInputFrames.peek().getFrame() != null) 
                System.out.println("Updating frame image view");
                mCamImageView.setImage(Utils.cvMatToImage(mInputFrames.peekFirst().getFrame()));
                threadMessages.poll();
            
        

        @Override
        public void run() 
            synchronized (lock) 
                while (!isInterrupted()) 
                    if (threadMessages.peek() != ThreadMessages.NEW_INPUT_FRAME) 
                        try 
                            lock.wait(WAITING_TIMEOUT);
                         catch (InterruptedException e) 
                            e.printStackTrace();
                        
                    
                    updateImageViews();
                
            
        

        public void wakeUp() 
            synchronized (lock) 
                lock.notifyAll();
                System.out.println("Waking up FrameDisplay");
            
        

        private boolean shouldUpdateCamImageView() 
            if (!Config.CAPTURE_PREVIEW_MODE) return false;
            return true;
        
    

    private class MySkinDisplayThread extends Thread 
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        ImageView mHandImageView;
        Object lock = new Lock();

        public MySkinDisplayThread(ImageView mHandImageView) 
            this.mHandImageView = mHandImageView;
        

        private synchronized void updateHandImageView() 
            if (shouldUpdateHandImageView() && mProcessedFrames != null && !mProcessedFrames.isEmpty() && mProcessedFrames.peek() != null && mProcessedFrames.peek().getmHandMask() != null) 
                System.out.println("Updating skin image view");
                mHandImageView.setImage(Utils.cvMatToImage(mProcessedFrames.peekFirst().getmHandMask()));
                threadMessages.poll();
            
        

        @Override
        public void run() 
            synchronized (lock) 
                while (!isInterrupted()) 
                    if (threadMessages.peek() != ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED) 
                        try 
                            lock.wait(WAITING_TIMEOUT);
                         catch (InterruptedException e) 
                            e.printStackTrace();
                        
                    
                    updateHandImageView();
                
            
        

        private boolean shouldUpdateHandImageView() 
            if (!Config.SKIN_MASK_PREVIEW_MODE) return false;
            return true;
//            long now = System.currentTimeMillis();
//            boolean should = now - lastUpdatedHandImageViewMillis > TIMER_INTERVAL;
//            lastUpdatedHandImageViewMillis = now;
//            return should;
        

        public void wakeUp() 
            synchronized (lock) 
                lock.notifyAll();
                System.out.println("Waking up FrameDisplay");
            
        
    

    private class MyHandDetectionThread extends Thread 
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT); //TODO if multiple threads, define it out of class
        HandDetector hd = new HandDetector();
        Object lock = new Lock();

        @Override
        public void run() 
            synchronized (lock) 
                while (!isInterrupted()) 
                    if (threadMessages.poll() != ThreadMessages.PROCESS_INPUT_FRAME) 
                        try 
                            lock.wait(WAITING_TIMEOUT);
                         catch (InterruptedException e) 
                            e.printStackTrace();
                        
                    
                    if (mInputFrames != null /*&& mInputFrames.size() > 0 && mInputFrames.peek() != null && !mInputFrames.peek().getIsProcessed()*/) 
                        System.out.println("Detecting hand...");
//                    Mat handMask = hd.detectHand(mInputFrames.peek());
//                    int[][] fingerCoordinates = new int[5][2];
//                    int[] convDefects = new int[5];
//                    int[] handCenterCoordinates = new int[2];
                        mProcessedFrames.offerFirst(new ProcessedFrame(hd.detectHand(mInputFrames.peek()), null, null, null));
                        if (myGestureRecogitionThread != null && myGestureRecogitionThread.threadMessages != null) 
                            myGestureRecogitionThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);
                            mySkinDisplayThread.wakeUp();
                        

                        if (mySkinDisplayThread != null && mySkinDisplayThread.threadMessages != null) 
                            mySkinDisplayThread.threadMessages.offer(ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED);
                            mySkinDisplayThread.wakeUp();
                        
                    
                
            
        

        public void wakeUp() 
            synchronized (lock) 
                lock.notifyAll();
                System.out.println("Waking up hand Detection");
            
        
    

    private class MyGestureRecogitionThread extends Thread 
        volatile LinkedBlockingQueue<ThreadMessages> threadMessages = new LinkedBlockingQueue<>(THREAD_MESSAGES_LIMIT);
        GestureRecognizer r = new GestureRecognizer();
        Lock lock = new Lock();

        @Override
        public void run() 
            synchronized (lock) 
                while (!isInterrupted()) 
                    if (threadMessages.poll() != ThreadMessages.NEW_PROCESSED_FRAME_ARRIVED) 
                        try 
                            lock.wait(WAITING_TIMEOUT);
                         catch (InterruptedException e) 
                            e.printStackTrace();
                        
                     else 
                        r.lookForGestures(mProcessedFrames);
                    
                
            
        

        public void wakeUp() 
            synchronized (lock) 
                lock.notifyAll();
                System.out.println("Waking up hand Detection");
            
        
    
 

【问题讨论】:

它要求很好地实现Future,你可以让它顺利 可能不是问题的根本原因,但VideoManager 不是线程安全的!!!! 哪些操作不是线程安全的? 啊,好的,我现在可以看到了。 【参考方案1】:

两个线程似乎都在其run() 方法中使用轮询;即它们不断循环检查布尔条件的语句。这可能对 CPU 使用率不利,因为单个线程可以锁定 CPU 而不会给其他线程任何周期;它最终可能会占用 CPU,即使它没有做任何太有用的事情;只是失败了一些布尔条件。

您应该使用异步方法与线程通信;而不是使用轮询机制,you should put threads to sleep 当它们不需要进行任何处理时,并在需要它们时唤醒它们。这允许线程让出 CPU,这意味着它们愿意放弃其活动上下文,以便其他线程可以执行。

【讨论】:

我试试睡觉。谢谢。 好东西。它可能会有点毛茸茸;一开始很难让某些东西工作,因为并发性可能真的很困难。我建议您先获取一个简单的示例,然后在其中构建您的视频代码。 @AlexT。我不完全确定这如何回答这个问题。您能否详细说明让线程休眠将如何解决延迟问题? 这听起来像是资源争用,这种间歇性的行为,虽然一个线程似乎执行得非常快。我认为一个线程可能会占用所有 CPU 是合理的。由于每个线程都使用轮询来检查它们是否被中断以继续执行它们的任务,我认为它们很可能会被锁定在一个旋转周期中。您可以通过重新运行应用程序并将每个线程打印到控制台并查看它是否只是被单个线程的输出填充来确认是否是这种情况。 我添加了等待。结果令人满意。我还没有完全理解这种方法,所以对我更新的代码的任何关键的 cmet 表示赞赏。

以上是关于Java 多线程共享对象 - 需要设计模式的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程编程之不可变对象模式

JAVA多线程模式-Single Threaded Execution

实验二:多线程

JAVA多线程模式-Immutable

java并发常识

Java的多线程实现生产/消费模式