线程池代码

Posted zqlmianshi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池代码相关的知识,希望对你有一定的参考价值。

缓存线程池

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class CachedThreadPoolExample

  public static void main(String[] args)

  ExecutorService executor = Executors.newCachedThreadPool();

  for (int i = 0; i < 10; i++)

    Runnable task = new Task(i);

    executor.execute(task);

  

  executor.shutdown();

static class Task implements Runnable

  private int taskId;

  public Task(int taskId)

    this.taskId = taskId;

  

  public void run()

  System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());

  try

    Thread.sleep(1000);

   catch (InterruptedException e)

    e.printStackTrace();

System.out.println("Task " + taskId + " is completed");

 

配合countdownlatch使用:

CountDownLatch配合线程池使用
CountDownLatch的作用就是等待其他线程都执行完任务,然后主线程才继续往下执行。

CountDownLatch适合于对任务进行拆分,使其并行执行,比如某个任务执行10s,其对数据的请求可以分为五个部分,那么就可以将这个任务拆分为5个子任务,分别交由五个线程执行,执行完成之后再由主线程进行汇总,此时,总的执行时间将决定于执行最慢的任务,平均来看,还是大大减少了总的执行时间。

CountDownLatch主要有两个方法:countDown()和await()。
countDown()方法用于使计数器减一,await()方法则使调用该方法的线程处于等待状态,一般为主线程。当倒数到0时主线程才执行。

CountDownLatch使用主要步骤:在构造CountDownLatch的时候需要传入一个整数n,每一个线程执行完一个任务倒数一次。在这个整数倒数到0之前,主线程需要进行等待。倒数到0主线程才执行。

构造CountDownLatch
final CountDownLatch latch = new CountDownLatch(60);//初始化,计数器容量为60
1
计数器减一
latch.countDown();//计数器容量减一
1
使调用该方法的线程处于等待状态,当倒数到0时主线程才执行。
latch.await();
1
示例:使用线程池的方式,计算1000名学生的学习数据
Main类,用于初始化线程池大小、线程提交任务
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author : [WangWei]
* @version : [v1.0]
* @className : Main
* @description : [初始化线程池,线程提交任务]
* @createTime : [2022/12/28 9:20]
* @updateUser : [WangWei]
* @updateTime : [2022/12/28 9:20]
* @updateRemark : [描述说明本次修改内容]
*/
public class Main
public static void main(String[] args)
//实例化一个固定大小为5个线程的newFixedThreadPool线程池
ExecutorService excutorService = Executors.newFixedThreadPool(10);
//构造CountDownLatch传入数量为1000,初始化的计数器大小为1000,与学生数量对应。
final CountDownLatch latch = new CountDownLatch(1000);
//计算1000个学生的学习数据
for (int i = 0; i <1000 ; i++)
//线程提交任务
excutorService.submit(new CalaulationStudentsData(i,latch));

try
//使调用该方法的主线程处于等待状态,当倒数到0时主线程才执行。
latch.await();
catch (InterruptedException e)
throw new RuntimeException("学生学习数据普通版多线程处理异常",e);

//关闭线程池
excutorService.shutdown();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
CalculationStudentsData类:用于执行线程,并执行具体的业务
import java.util.concurrent.CountDownLatch;

/**
* @author : [WangWei]
* @version : [v1.0]
* @className : CalaulationStudentsData
* @description : [计算学生学习数据]
* @createTime : [2022/12/28 9:24]
* @updateUser : [WangWei]
* @updateTime : [2022/12/28 9:24]
* @updateRemark : [描述说明本次修改内容]
*/
public class CalculationStudentsData implements Runnable
int studentNumber;
CountDownLatch latch;

/*
* @version V1.0
* Title: CalculationStudentsData
* @author Wangwei
* @description 构造函数中传入学生编号
* @createTime 2022/12/28 9:29
* @param [studentNumber]
* @return
*/
public CalculationStudentsData(int studentNumber, CountDownLatch latch)
this.studentNumber=studentNumber;
this.latch=latch;

/*
* @version V1.0
* Title: run
* @author Wangwei
* @description 执行线程
* @createTime 2022/12/28 9:30
* @param []
* @return void
*/
@Override
public void run()
try
//计算学生学习数据的方法
this.CalculationStudentData();
//计数器减一
latch.countDown();
catch (Exception e)
//计数器减一,避免死循环
latch.countDown();
throw new RuntimeException("学生学习数据子线程执行数据异常"+e);


/*
* @version V1.0
* Title: CalaulationStudentData
* @author Wangwei
* @description 计算学生学习数据的方法
* @createTime 2022/12/28 9:32
* @param []
* @return void
*/
private void CalculationStudentData()
//输出学生编号和执行该任务的线程名称
System.out.println("完成学生"+studentNumber+"的学习成绩计算"+Thread.currentThread().getName());


简单线程池原理和代码

线程池就是,预先创建一定数量的线程,然后当需要异步任务时,只要把任务放入队列中,线程池自动在队列中取任务,每执行完一个任务就自动取下一个任务

本文提供的是一个简单的线程池,所以并不提供线程的自动增减的功能,以比较简单的代码来理解其原理

代码只有一个文件,算上注释才勉强200行,由于代码较长就不全部贴在这里了。

线程池代码见Github【点击】

由于代码使用了一些c++11的东西,所以先需要复习一下以下几个东西:(不要被吓怕,就算不会其实也能懂下面的讲解,具体语法所表达的意思我会说明)

  • std::thread
  • std::mutex
  • std::condition_variable
  • std::move
  • std::lock_guard
  • std::unique_lock
  • lambda表达式

下面开始代码讲解:

先从入口说起:构造函数

template <unsigned _TCount>
FixedThreadPool<_TCount>::FixedThreadPool()
: m_jobsleft(0), m_isDone(false), m_isFinished(false) {
    for (int i = 0; i < _TCount; ++i) {
        m_threads[i] = std::move(std::thread([this, i]() {
            this->DoTask();
        }));
    }
}

在构造函数中,根据模板参数_TCount创建一定数量的线程,将所有线程存在了数组(std::array)中。

然后你会注意到,每个线程都会运行DoTask方法,注意:DoTask是运行于子线程中的

template <unsigned _TCount>
void FixedThreadPool<_TCount>::DoTask() {
    // Run in subthreads.
    // Take the next job in the queue and run it. Notify the main thread that a job has completed.
    while (!m_isDone) {
        this->NextJob()();
        -- m_jobsleft;
        // Notify the main thread that a job has completed.
        m_conditionWait.notify_one();
    }
}

不去看那些烦人的标记变量,先从大的方面理解其原理:

在循环中每次去一个任务(我猜是在队列里取,若队列为空则会block),取到任务后执行任务(即执行lambda表达式),jobsleft减少,然后通知给主线程“我又执行完一个任务”

这里有两个关注点:NextJob如何取任务?m_conditionWait都有谁在阻塞?
先看NextJob如何取任务?

template <unsigned _TCount>
typename FixedThreadPool<_TCount>::JobHandler FixedThreadPool<_TCount>::NextJob() {
    // Run in subthreads.
    // Get the next job; pop the first item in the queue, otherwise wait for a signal from the main thread.
    JobHandler handler;
    std::unique_lock<std::mutex> qlock(m_mutexQueue);
    
    // Wait for a job if we don‘t have any.
    m_conditionJob.wait(qlock, [this]()->bool {
        return m_queue.size() || m_isDone;
    });
    
    // Get job from the queue
    if (!m_isDone) {
        handler = m_queue.front();
        m_queue.pop_front();
    }
    else { // If we‘re bailing out, ‘inject‘ a job into the queue to keep jobsleft accurate.
        handler = []{};
        ++m_jobsleft;
    }
    return handler;
}

注意:这个函数也是运行在子线程中

希望你已经学会使用std::condition_variable了,简单来说m_conditionJob.wait就是在判断是否队列为空(先不要关心烦人的m_isDone)。

如果队列为空则会阻塞,然后就会一直等待,等待到啥时候呢?(我猜测当有新任务时一定会有通知notify_one()),通知来了检测满足条件就继续向下执行。

会看到从队列中取出一个任务,然后返回。

这里有个关注点:啥时候会有m_conditionJob的notify_xxx()?

在这里:

template <unsigned _TCount>
void FixedThreadPool<_TCount>::AddJob(JobHandler job) {
    // Add a new job to the pool. If there are no jobs in the queue, a thread is woken up to take the job. If all threads are busy, the job is added to the end of the queue.
    std::lock_guard<std::mutex> guard(m_mutexQueue);
    m_queue.emplace_back(job);
    ++ m_jobsleft;
    m_conditionJob.notify_one();
}

注意:这是主线程中由用户调用的方法

当然还有一处在JoinAll中,不过这对理解线程池运行流程关系不大。下面讨论另一个问题时在看。

现在你脑子中是否有线程池的运行流程了。

主线程:【创建子线程】->【AddJob】

子线程:【DoTask】->【NextJob】->【NextJob】...->【NextJob】

描述:子线程在DoTask中循环通过【NextJob】取任务,当没有任务时,会block在NextJob中,一直等待到主线程的【AddJob】调用后,会wakeup一个(只会唤醒一个线程)已经阻塞的NextJob,然后NextJob返回队列中的一个任务,交给DoTask执行,DoTask执行完成后通知又执行完一个任务(可用于判断所有任务是否都执行完成)。

 

到这里还比较简单一些,下面考虑退出的问题:

退出的问题在于让所有可能被阻塞住的子线程全部唤醒,然后顺利的走向销毁。

先看析构函数:

template <unsigned _TCount>
FixedThreadPool<_TCount>::~FixedThreadPool() {
    this->JoinAll();
}

JoinAll,听着就像thread的join嘛,看看:

template <unsigned _TCount>
void FixedThreadPool<_TCount>::JoinAll(bool wait) {
    if (m_isFinished) {
        return;
    }
    
    if (wait) {
        this->WaitAll();
    }
    
    // note that we‘re done, and wake up any thread that‘s
    // waiting for a new job
    m_isDone = true;
    m_conditionJob.notify_all();
    
    for(auto &x : m_threads) {
        if(x.joinable()) {
            x.join();
        }
    }
    m_isFinished = true;
}

注意:JoinAll会在主线程执行

奥,m_isFinished用来保证JoinAll只执行一次的。

wait嘛,WaitAll看名字就像等待所有任务执行完毕嘛,而且必须要阻塞住调用WaitAll的线程,否则怎么能叫Wait呢!

下面看看m_isDone=true,然后通知所有(notify_all())的m_conditionJob.wait,那就是通知所有线程中的m_conditionJob.wait呀,先不管继续往下看。

下面就是遍历所有的子线程,然后全部join掉,这可是会阻塞主线程的!主线程会等待所有join的子线程执行完才能回到主线程,不过若所有任务执行完了,join之后子线程不就over了嘛

 

    // Wait for a job if we don‘t have any.
    m_conditionJob.wait(qlock, [this]()->bool {
        return m_queue.size() || m_isDone;
    });

还记得这里吧,NextJob方法,运行于子线程中。

当JoinAll中notify_all时,这里就会被唤醒,由于m_isDone为true,不管你队列是否为空都会继续执行下去。子线程要退出,那么就不能被阻塞住,所以这里就是用来唤醒子线程,让子线程顺利退出的。

 

    // Get job from the queue
    if (!m_isDone) {
        handler = m_queue.front();
        m_queue.pop_front();
    }
    else { // If we‘re bailing out, ‘inject‘ a job into the queue to keep jobsleft accurate.
        handler = []{};
        ++m_jobsleft;
    }

所以就到了下面这个语句块,返回一个空的handler。 都要退出了,为了处理一致,返回空的也无可厚非。

 

下面再看看WaitAll是什么鬼:

template <unsigned _TCount>
void FixedThreadPool<_TCount>::WaitAll() {
    // Waits until all jobs have finshed executing.
    if (m_jobsleft > 0) {
        std::unique_lock<std::mutex> lock(m_mutexWait);
        m_conditionWait.wait(lock, [this]()->bool {
            return this->m_jobsleft == 0;
        });
        lock.unlock();
    }
}

奥,原来如此,WaitAll果然就是阻塞住你,然后等待剩余的任务数为0时,才会被唤醒(结合DoTask中的notify_one)。

 

这么看来,在JoinAll中:

如果wait=true,那么就会等待所有任务自然的执行完成后join所有线程退出。

如果wait=false,那么就会让所有阻塞在等待任务上的线程直接执行一个空任务,然后退出。或者让正在执行任务的线程执行完任务后退出。

到这里你明白了吗?

好好看看代码,碰到了不会的地方在来找找灵感吧。

 

 

 

 

 

 

 

 

以上是关于线程池代码的主要内容,如果未能解决你的问题,请参考以下文章

可扩/减容线程池C语言原理讲解及代码实现

可扩/减容线程池C语言原理讲解及代码实现

异步代码、共享变量、线程池线程和线程安全

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

线程池代码(通用版)

03 | 线程池:业务代码最常用也最容易犯错的组件