什么是线程池?

传统多线程方案中采用的服务器模型是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。

线程池则是一种采用预创建技术的多线程处理形式,在程序启动之初就创建一定数量线程,运行过程中将任务添加到队列,然后再将任务分配给已创建好的线程中自动启动这些任务。因为程序边运行边创建线程是比较耗时的,所以我们通过池化的思想,减少创建线程和销毁线程对程序资源的消耗。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。

线程池原理

线程池采用预创建的技术,在程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列中。这些线程都是处于阻塞状态,不消耗 CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中执行。当 N1 个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。

使用线程完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。

线程池技术正是关注如何缩短或调整 T1,T3 时间的技术,从而提高服务器程序性能的。它把 T1,T3 分别安排在程序的启动和结束的时间段或者一些空闲的时间段,这样在程序处理多个任务时,就不会有 T1,T3 的开销了。同时,线程池不仅调整 T1 和 T3 产生的时间段,而且它还显著减少了创建线程的数目。

线程池适合场景

事实上,线程池并不是万能的。它有其特定的使用场合。线程池致力于减少线程本身的开销对应用所产生的影响,这是有前提的,前提就是线程本身开销与线程执行任务相比不可忽略。如果线程本身的开销相对于线程任务执行开销而言是可以忽略不计的,那么此时线程池所带来的好处是不明显的,比如对于FTP服务器以及Telnet服务器,通常传送文件的时间较长,开销较大,那么此时,我们采用线程池未必是理想的方法,我们可以选择“即时创建,即时销毁”的策略。

总之线程池通常适合下面的几个场合:

  1. 单位时间内处理任务频繁而且任务处理时间短。
  2. 对实时性要求较高。如果接受到任务后在创建线程,可能满足不了实时要求,因此必须采用线程池进行预创建。

线程池的组成部分

  • 线程池管理器(ThreadPoolManager):用于创建并管理线程
  • 工作线程(WorkThread):线程池中的线程
  • 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行
  • 任务队列(TaskQueue):用于存放没有处理的任务

线程池实现原理

线程池管理一个任务队列,一个线程队列,然后每次取一个任务分配给一个线程去做,循环往复。而线程池一般要复用线程,所以如果是取一个 task 分配给某一个 thread,执行完之后再重新分配,在语言层面上基本都是不支持的:一般语言的 thread 都是执行一个固定的 task 函数,执行完毕线程也就结束了,因此要如何实现 task 和 thread 的分配呢?

思路就是:让每一个 thread 都去执行调度函数:循环获取一个 task,然后执行之。保证了 thread 函数的唯一性,而且复用线程执行 task。

线程池实现

线程池代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>

class ThreadPool {
public:
// constructor
ThreadPool(size_t threadnum = 1);
// deconstructor
~ThreadPool();
// add task
bool EnQueue(std::function<void()>);

private:
// the work threads
std::vector<std::thread> workers_;
// the task queue
std::queue<std::function<void()>> tasks_;
// the task queue lock
std::mutex mutex_;
// condition variable
std::condition_variable condition_;
// constrol thread pool
bool stop_;

};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threadnum) : stop_(false) {
for (size_t i = 0; i < threadnum; ++i) {
workers_.emplace_back( [this]{ // 为每个线程构造一个调度函数
while (1) {
std::function<void()> task;
{ // 此处花括号使得在执行 task 前即释放互斥锁
std::unique_lock<std::mutex> lock(this->mutex_);
// stop_ 为 false 且 task queue 为空时阻塞线程,其他情况都不阻塞
this->condition_.wait(lock, [this]{
return (this->stop_ || !this->tasks_.empty()); });
// 线程结束,用于销毁线程
if (this->stop_ && this->tasks_.empty())
return;
if (this->tasks_.empty() == false) {
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
}
task();
}
});
}
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
{
// 对于所有线程共享的变量操作前都需要加锁
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
}
// 唤醒所有阻塞的线程
condition_.notify_all();
for (std::thread &worker : workers_) {
// 等待所有线程都执行完
worker.join();
}
}

// add new task to the task queue
bool ThreadPool::EnQueue(std::function<void()> task) {
{
// 对于 task queue 的操作都需要加锁,因为 task queue 是所有线程共有的
std::unique_lock<std::mutex> lock(mutex_);
tasks_.emplace(task);
}
condition_.notify_one();
return true;
}

#endif /* _THREADPOOL_H_ */

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include "threadpool.h"
using std::cout;
using std::endl;

class Test {
public:
void process_to_test(const int i, const int j) {
int count = 0;
for (int x = 0; x < i; ++x) {
for (int y = 0; y < j; ++y) {
++count;
}
}
cout << "Thread num: " << -(i - 8) << " " << count << endl;
}
};

int main() {
Test test;
ThreadPool pool(8);
for (unsigned i = 0; i < 8; ++i) {
pool.EnQueue(std::bind(&Test::process_to_test, &test, 8 - i, 1000000));
}
return 0;
}

参考