小知识:nginx线程池源码分析

周末看了nginx线程池部分的代码,顺手照抄了一遍,写成了自己的版本。实现上某些地方还是有差异的,不过基本结构全部摘抄。

  在这里分享一下。如果你看懂了我的版本,也就证明你看懂了nginx线程池。

  本文只列出了关键数据结构和API,重在理解nginx线程池设计思路。完整代码在最后的链接里。

  1.任务节点

?
1
2
3
4
5
6
7
8
9
typedef void (*CB_FUN)(void *);
//任务结构体
typedef struct task
{
void    *argv; //任务函数的参数(任务执行结束前,要保证参数地址有效)
CB_FUN    handler; //任务函数(返回值必须为0  非0值用作增加线程,和销毁线程池)
struct task *next; //任务链指针
}zoey_task_t;

  handler为函数指针,是实际的任务函数,argv为该函数的参数,next指向下一个任务。

  2.任务队列

?
1
2
3
4
5
6
7
typedef struct task_queue
{
zoey_task_t *head; //队列头
zoey_task_t **tail;  //队列尾
unsigned int maxtasknum; //最大任务限制
unsigned int curtasknum; //当前任务数
}zoey_task_queue_t;

  head为任务队列头指针,tail为任务队列尾指针,maxtasknum为队列最大任务数限制,curtasknum为队列当前任务数。

  3.线程池

?
1
2
3
4
5
6
7
8
9
10
typedef struct threadpool
{
pthread_mutex_t  mutex; //互斥锁
pthread_cond_t   cond;  //条件锁
zoey_task_queue_t    tasks;//任务队列
unsigned int    threadnum; //线程数
unsigned int    thread_stack_size; //线程堆栈大小
}zoey_threadpool_t;

  mutex为互斥锁 cond为条件锁。mutex和cond共同保证线程池任务的互斥领取或者添加。

  tasks指向任务队列。

  threadnum为线程池的线程数

  thread_stack_size为线程堆栈大小 

  4.启动配置

?
1
2
3
4
5
6
7
//配置参数
typedef struct threadpool_conf
{
unsigned int threadnum;  //线程数
unsigned int thread_stack_size;//线程堆栈大小
unsigned int maxtasknum;//最大任务限制
}zoey_threadpool_conf_t;

  启动配置结构体是初始化线程池时的一些参数。

  5.初始化线程池

  首先检查参数是否合法,然后初始化mutex,cond,key(pthread_key_t)。key用来读写线程全局变量,此全局变量控制线程是否退出。

  最后创建线程。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
zoey_threadpool_t* zoey_threadpool_init(zoey_threadpool_conf_t *conf)
{
zoey_threadpool_t *pool = NULL;
int error_flag_mutex = 0;
int error_flag_cond = 0;
pthread_attr_t attr;
do{
if (z_conf_check(conf) == -1){ //检查参数是否合法
break;
}
pool = (zoey_threadpool_t *)malloc(sizeof(zoey_threadpool_t));//申请线程池句柄
if (pool == NULL){
break;
}
//初始化线程池基本参数
pool->threadnum = conf->threadnum;
pool->thread_stack_size = conf->thread_stack_size;
pool->tasks.maxtasknum = conf->maxtasknum;
pool->tasks.curtasknum = 0;
z_task_queue_init(&pool->tasks);
if (z_thread_key_create() != 0){//创建一个pthread_key_t,用以访问线程全局变量。
free(pool);
break;
}
if (z_thread_mutex_create(&pool->mutex) != 0){ //初始化互斥锁
z_thread_key_destroy();
free(pool);
break;
}
if (z_thread_cond_create(&pool->cond) != 0){ //初始化条件锁
z_thread_key_destroy();
z_thread_mutex_destroy(&pool->mutex);
free(pool);
break;
}
if (z_threadpool_create(pool) != 0){    //创建线程池
z_thread_key_destroy();
z_thread_mutex_destroy(&pool->mutex);
z_thread_cond_destroy(&pool->cond);
free(pool);
break;
}
return pool;
}while(0);
return NULL;
}

 6.添加任务

  首先申请一个任务节点,实例化后将节点加入任务队列,并将当前任务队列数++并通知其他进程有新任务。整个过程加锁。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
int zoey_threadpool_add_task(zoey_threadpool_t *pool, CB_FUN handler, void* argv)
{
zoey_task_t *task = NULL;
//申请一个任务节点并赋值
task = (zoey_task_t *)malloc(sizeof(zoey_task_t));
if (task == NULL){
return -1;
}
task->handler = handler;
task->argv = argv;
task->next = NULL;
if (pthread_mutex_lock(&pool->mutex) != 0){ //加锁
free(task);
return -1;
}
do{
if (pool->tasks.curtasknum >= pool->tasks.maxtasknum){//判断工作队列中的任务数是否达到限制
break;
}
//将任务节点尾插到任务队列
*(pool->tasks.tail) = task;
pool->tasks.tail = &task->next;
pool->tasks.curtasknum++;
//通知阻塞的线程
if (pthread_cond_signal(&pool->cond) != 0){
break;
}
//解锁
pthread_mutex_unlock(&pool->mutex);
return 0;
}while(0);
pthread_mutex_unlock(&pool->mutex);
free(task);
return -1;
}

 7.销毁线程池

  销毁线程池其实也是向任务队列添加任务,只不过添加的任务是让线程退出。z_threadpool_exit_cb函数会将lock置0后退出线程,lock为0表示此线程

  已经退出,接着退出下一个线程。退出完线程释放所有资源。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void zoey_threadpool_destroy(zoey_threadpool_t *pool)
{
unsigned int n = 0;
volatile unsigned int lock;
//z_threadpool_exit_cb函数会使对应线程退出
for (; n < pool->threadnum; n++){
lock = 1;
if (zoey_threadpool_add_task(pool, z_threadpool_exit_cb, &lock) != 0){
return;
}
while (lock){
usleep(1);
}
}
z_thread_mutex_destroy(&pool->mutex);
z_thread_cond_destroy(&pool->cond);
z_thread_key_destroy();
free(pool);
}

 8.增加一个线程

  很简单,再生成一个线程以及线程数++即可。加锁。

?
1
2
3
4
5
6
7
8
9
10
int zoey_thread_add(zoey_threadpool_t *pool)
{
int ret = 0;
if (pthread_mutex_lock(&pool->mutex) != 0){
return -1;
}
ret = z_thread_add(pool);
pthread_mutex_unlock(&pool->mutex);
return ret;
}

 9.改变任务队列最大任务限制

  当num=0时设置线程数为无限大。

?
1
2
3
4
5
6
7
8
void zoey_set_max_tasknum(zoey_threadpool_t *pool,unsigned int num)
{
if (pthread_mutex_lock(&pool->mutex) != 0){
return -1;
}
z_change_maxtask_num(pool, num); //改变最大任务限制
pthread_mutex_unlock(&pool->mutex);
}

  10.使用示例

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int main()
{
int array[10000] = {0};
int i = 0;
zoey_threadpool_conf_t conf = {5,0,5}; //实例化启动参数
zoey_threadpool_t *pool = zoey_threadpool_init(&conf);//初始化线程池
if (pool == NULL){
return 0;
}
for (; i < 10000; i++){
array[i] = i;
if (i == 80){
zoey_thread_add(pool); //增加线程
zoey_thread_add(pool);
}
if (i == 100){
zoey_set_max_tasknum(pool, 0); //改变最大任务数  0为不做上限
}
while(1){
if (zoey_threadpool_add_task(pool, testfun, &array[i]) == 0){
break;
}
printf(“error in i = %d\n”,i);
}
}
zoey_threadpool_destroy(pool);
while(1){
sleep(5);
}
return 0;
}

  11.源码

https://github.com/unlikewashface/zoey_threadpool.git

线程池可以发挥更多作用,比如可以把连接放到线程池里。nginx的异步加lua的协程是个非常好的组合,现在有了线程池后,线程池加协程将是另一个选择。总而言之,如果在保证性能的情况下,让nginx开发变得非常简单,这是非常利好的消息。

声明: 猿站网有关资源均来自网络搜集与网友提供,任何涉及商业盈利目的的均不得使用,否则产生的一切后果将由您自己承担! 本平台资源仅供个人学习交流、测试使用 所有内容请在下载后24小时内删除,制止非法恶意传播,不对任何下载或转载者造成的危害负任何法律责任!也请大家支持、购置正版! 。本站一律禁止以任何方式发布或转载任何违法的相关信息访客发现请向站长举报,会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。本网站的资源部分来源于网络,如有侵权烦请发送邮件至:2697268773@qq.com进行处理。
建站知识

小知识:Nginx服务器中的GZip配置参数详解

2023-5-4 1:41:35

建站知识

小知识:网络/命令行抓包工具Tcpdump详解

2023-5-4 1:54:53

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索