ReentrantLock - lock.unlock()不会释放锁定

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReentrantLock - lock.unlock()不会释放锁定相关的知识,希望对你有一定的参考价值。

我正在尝试使用单个任务生产者(老板)和多个任务消费者(员工)创建应用程序。老板会不时添加更多任务,并由员工清理任务队列。问题如下:只有一个消费者正在完成所有工作,一旦完成,其他消费者就开始工作了。

我尝试使用ReentrantLock,但似乎没有任何改变。

package threads;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class Store {

    public static ReentrantLock lock = new ReentrantLock();


    public static void main(String[] args) {
        AtomicInteger tasksList = new AtomicInteger(7);
        Runnable boss = () -> {
            System.out.println("Boss: Good morning.");
            List<Runnable> employeeList = generateEmployeesList(5, tasksList);
            for (Runnable employee : employeeList) {
                employee.run();         
            }
            while(tasksList.get() > 0) {

                try {
                    lock.lock();
                    System.out.println("Work more!");
                    tasksList.incrementAndGet();

                } finally {
                    lock.unlock();
                    try {
                        Thread.sleep(1500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("Boss: Time to go home.");
        };

        System.out.println("= Starting =");
        boss.run();

    }

    public static List<Runnable> generateEmployeesList(int amountOfEmployees, AtomicInteger tasksList){
        List<Runnable> employeeList = new ArrayList<Runnable>();
        for (int i = 0; i < amountOfEmployees; i++) {
            employeeList.add(createEmployee(i+1, tasksList));
        }
        return employeeList;
    }

    public static Runnable createEmployee(int employeeNumber, AtomicInteger tasksList) {
        return () -> {
            System.out.println("Thread #" + (employeeNumber) + " Started. Missing tasks: " + tasksList.get());
            while(tasksList.get() > 0) {

                try {
                    lock.lock();
                    System.out.println("Thread #" + (employeeNumber) + ". Task completed: " + tasksList.decrementAndGet() +" tasks left.");
                } finally {
                    lock.unlock();
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
            System.out.println("Thread #" + (employeeNumber) +": Finished.");
        };
    }

}

结果(错误):

= Starting =
Boss: Good morning.
Thread #1 Started. Missing tasks: 7
Thread #1. Task completed: 6 tasks left.
Thread #1. Task completed: 5 tasks left.
Thread #1. Task completed: 4 tasks left.
Thread #1. Task completed: 3 tasks left.
Thread #1. Task completed: 2 tasks left.
Thread #1. Task completed: 1 tasks left.
Thread #1. Task completed: 0 tasks left.
Thread #1: Finished.
Thread #2 Started. Missing tasks: 0
Thread #2: Finished.
Thread #3 Started. Missing tasks: 0
Thread #3: Finished.
Thread #4 Started. Missing tasks: 0
Thread #4: Finished.
Thread #5 Started. Missing tasks: 0
Thread #5: Finished.
Boss: Time to go home.

预期结果(随机消费者减少任务变量):

= Starting =
Boss: Good morning.
Thread #4 Started. Missing tasks: 7
Thread #4. Task completed: 6 tasks left.
Thread #2 Started. Missing tasks: 6
Thread #2. Task completed: 5 tasks left.
Thread #4. Task completed: 4 tasks left.
Thread #1 Started. Missing tasks: 4
...
Thread #1: Finished.
Thread #2: Finished.
Thread #3: Finished.
...
Boss: Time to go home.

我有什么不对的想法吗?我该怎么做才能解决我的问题?

我修改了应用程序以使用线程而不是Runnables。但是,我仍然遇到同样的问题。

class Lock {
    private static final ReentrantLock lock = new ReentrantLock();
    public static final ReentrantLock getLock() {
        return lock;
    }
}

public class Store {


    public static void main(String[] args) {    
        Boss b = new Boss(5, 7);
        b.start();
    }

    public static List<Thread> generateEmployeesList(int amountOfEmployees, AtomicInteger tasksList){
        List<Thread> employeeList = new ArrayList<Thread>();
        for (int i = 0; i < amountOfEmployees; i++) {
            employeeList.add(employeeFactory(i+1, tasksList));
        }
        return employeeList;
    }


    public static Thread employeeFactory(int employeeNumber, AtomicInteger tasksList) {
        return new Employee(employeeNumber, tasksList);
    }


}

class Employee extends Thread {
    int number = 0;
    AtomicInteger tasksList;
    public Employee(int number, AtomicInteger tasksList) {
        this.number = number;
        this.tasksList = tasksList;
    }
    @Override
    public void start() {
        System.out.println("Thread #" + number + " Started. Missing tasks: " + tasksList.get());
        while(tasksList.get() > 0) {

            try {
                Lock.getLock().lock();
                System.out.println("Thread #" + number + ". Task completed: " + tasksList.decrementAndGet() +" tasks left.");
            } finally {
                Lock.getLock().unlock();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
        System.out.println("Thread #" + number +": Finished.");
    }

}

class Boss extends Thread {
    int employees = 0;
    AtomicInteger tasks;

    public Boss(int employees, int numberOfTasks) {
        this.employees = employees;
        this.tasks = new AtomicInteger(numberOfTasks);
    }

    public void start() {
        List<Thread> employeeList = Store.generateEmployeesList(employees, tasks);
        System.out.println("Boss: Good Morning!");
        for (Thread employee : employeeList) {
            employee.start();
        }
        while(tasks.get() > 0 ) {
            Lock.getLock().lock();
            try {
            System.out.println("Keep working!");
            tasks.incrementAndGet();
            } finally {
                Lock.getLock().unlock();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}
答案

此代码存在多个问题。你误解了如何使用Runnables。 Runnable#run()方法只是运行您在当前所在线程中创建的代码。如果您想要一些并行性,则必须创建一个单独的线程来运行它们。

// Change this
employee.start();
// to this:
Thread t = new Thread(employee);
t.start();

其次,作为一名评论者提到你没有正确锁定对Atomic的访问。因此,即使它在单个操作中对自身的线程安全,但在多个调用中并不一致,特别是如果您的行为取决于您之前的结果。这就是为什么如果您进行上面的单一更改,您会看到如下所示的输出:

= Starting =
Boss: Good morning.
Work more!
Thread #2 Started. Missing tasks: 7
Thread #1 Started. Missing tasks: 8
Thread #4 Started. Missing tasks: 7
Thread #3 Started. Missing tasks: 7
Thread #1. Task completed: 7 tasks left.
Thread #5 Started. Missing tasks: 7
Thread #5. Task completed: 6 tasks left.
Thread #2. Task completed: 5 tasks left.
Thread #4. Task completed: 4 tasks left.
Thread #3. Task completed: 3 tasks left.
Thread #5. Task completed: 2 tasks left.
Thread #1. Task completed: 1 tasks left.
Thread #2. Task completed: 0 tasks left.
Thread #4. Task completed: -1 tasks left.
Thread #3. Task completed: -2 tasks left.
Boss: Time to go home.
Thread #5: Finished.
Thread #1: Finished.
Thread #4: Finished.
Thread #2: Finished.
Thread #3: Finished.

在观察剩下的任务之间

tasksList.get() > 0

并且“承担”一项任务

tasksList.incrementAndGet();

另一个线程可能已经完成了任务。这就是你看到的负数。操作必须都存在于循环中,或者必须使用其他一些同步方法,如内置的对象同步方法:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class Store {

    public static Object sync = new Object();

    public static void main(String[] args) {
        AtomicInteger tasksList = new AtomicInteger(7);
        Runnable boss = () -> {
            System.out.println("Boss: Good morning.");
            List<Runnable> employeeList = generateEmployeesList(5, tasksList);
            for (Runnable employee : employeeList) {
                Thread t = new Thread(employee);
                t.start();
            }

            synchronized (sync) {
                while(tasksList.get() > 0) {
                    try {                        
                        System.out.println("Work more!");
                        tasksList.incrementAndGet();
                    } finally {
                        try {
                            sync.wait(1500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }

            System.out.println("Boss: Time to go home.");
        };

        System.out.println("= Starting =");
        boss.run();

    }

    public static List<Runnable> generateEmployeesList(int amountOfEmployees, AtomicInteger tasksList){
        List<Runnable> employeeList = new ArrayList<Runnable>();
        for (int i = 0; i < amountOfEmployees; i++) {
            employeeList.add(createEmployee(i+1, tasksList));
        }
        return employeeList;
    }

    public static Runnable createEmployee(int employeeNumber, AtomicInteger tasksList) {
        return () -> {
            System.out.println("Thread #" + (employeeNumber) + " Started. Missing tasks: " + tasksList.get());
            synchronized (sync) {
                while (tasksList.get() > 0) {
                    try {
                        System.out.println("Thread #" + (employeeNumber) + ". Task completed: "
                                + tasksList.decrementAndGet() + " tasks left.");
                    } finally {
                        try {
                            sync.wait(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
            System.out.println("Thread #" + (employeeNumber) +": Finished.");
        };
    }
}

产量:

= Starting =
Boss: Good morning.
Thread #2 Started. Missing tasks: 7
Thread #3 Started. Missing tasks: 7
Work more!
Thread #4 Started. Missing tasks: 7
Thread #1 Started. Missing tasks: 7
Thread #3. Task completed: 7 tasks left.
Thread #5 Started. Missing tasks: 8
Thread #2. Task completed: 6 tasks left.
Thread #5. Task completed: 5 tasks left.
Thread #1. Task completed: 4 tasks left.
Thread #4. Task completed: 3 tasks left.
Thread #2. Task completed: 2 tasks left.
Thread #1. Task completed: 1 tasks left.
Thread #3. Task completed: 0 tasks left.
Thread #4: Finished.
Thread #5: Finished.
Boss: Time to go home.
Thread #3: Finished.
Thread #2: Finished.
Thread #1: Finished.

以上是关于ReentrantLock - lock.unlock()不会释放锁定的主要内容,如果未能解决你的问题,请参考以下文章

ReentrantLock源码分析

ReentrantLock详解

ReentrantLock实现原理深入探究

ReentrantLock原理,ReentrantLock和synchronized区别

ReentrantLock

ReentrantLock