您当前的位置:首页 > 电脑百科 > 程序开发 > 语言 > C/C++/C#

线程池原理详解及如何用C语言实现线程池

时间:2020-02-29 11:29:19  来源:  作者:

线程池是一种多线程处理形式,大多用于高并发服务器上,它能合理有效的利用高并发服务器上的线程资源;线程与进程用于处理各项分支子功能,我们通常的操作是:接收消息 ==> 消息分类 ==> 线程创建 ==> 传递消息到子线程 ==> 线程分离 ==> 在子线程中执行任务 ==> 任务结束退出;

对大多数小型局域网的通信来说,上述方法足够满足需求;但当我们的通信范围扩大到广域网或大型局域网通信中时,我们将面临大量消息频繁请求服务器;在这种情况下,创建与销毁线程都已经成为一种奢侈的开销,特别对于嵌入式服务器来说更应保证内存资源的合理利用;

因此,线程池技术应运而生;线程池允许一个线程可以多次复用,且每次复用的线程内部的消息处理可以不相同,将创建与销毁的开销省去而不必来一个请求开一个线程;

结构讲解:

线程池是一个抽象的概念,其内部由任务队列,一堆线程,管理者线程组成;

线程池原理详解及如何用C语言实现线程池

 

我们将以上图为例,实现一个最基础的线程池,接下来将分部分依次讲解;讲解顺序为:1.线程池总体结构 2.线程数组 3.任务队列 4.管理者线程 5.使用线程池接口的例子

一、线程池总体结构

这里讲解线程池在逻辑上的结构体;看下方代码,该结构体threadpool_t中包含线程池状态信息,任务队列信息以及多线程操作中的互斥锁;在任务结构体中包含了一个可以放置多种不同任务函数的函数指针,一个传入该任务函数的void*类型的参数;

注意:在使用时需要将你的消息分类处理函数装入任务的(*function);然后放置到任务队列并通知空闲线程;

线程池状态信息:描述当前线程池的基本信息,如是否开启、最小线程数、最大线程数、存活线程数、忙线程数、待销毁线程数等… …

任务队列信息:描述当前任务队列基本信息,如最大任务数、队列不为满条件变量、队列不为空条件变量等… …

多线程互斥锁:保证在同一时间点上只有一个线程在任务队列中取任务并修改任务队列信息、修改线程池信息;

函数指针:在打包消息阶段,将分类后的消息处理函数放在(*function);

void*类型参数:用于传递消息处理函数需要的信息;

/*任务*/

typedef struct {

void *(*function)(void *);

void *arg;

} threadpool_task_t;

/*线程池管理*/

struct threadpool_t{

pthread_mutex_t lock; /* 锁住整个结构体 */

pthread_mutex_t thread_counter; /* 用于使用忙线程数时的锁 */

pthread_cond_t queue_not_full; /* 条件变量,任务队列不为满 */

pthread_cond_t queue_not_empty; /* 任务队列不为空 */

pthread_t *threads; /* 存放线程的tid,实际上就是管理了线 数组 */

pthread_t admin_tid; /* 管理者线程tid */

threadpool_task_t *task_queue; /* 任务队列 */

/*线程池信息*/

int min_thr_num; /* 线程池中最小线程数 */

int max_thr_num; /* 线程池中最大线程数 */

int live_thr_num; /* 线程池中存活的线程数 */

int busy_thr_num; /* 忙线程,正在工作的线程 */

int wait_exit_thr_num; /* 需要销毁的线程数 */

/*任务队列信息*/

int queue_front; /* 队头 */

int queue_rear; /* 队尾 */

int queue_size;

/* 存在的任务数 */

int queue_max_size; /* 队列能容纳的最大任务数 */

/*线程池状态*/

int shutdown; /* true为关闭 */

};

**/*创建线程池*/**

threadpool_t *

threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)

{ /* 最小线程数 最大线程数 最大任务数*/

int i;

threadpool_t *pool = NULL;

do

{

/* 线程池空间开辟 */

if ((pool=(threadpool_t *)malloc(sizeof(threadpool_t))) == NULL)

{

printf("malloc threadpool false; n");

break;

}

/*信息初始化*/

pool->min_thr_num = min_thr_num;

pool->max_thr_num = max_thr_num;

pool->busy_thr_num = 0;

pool->live_thr_num = min_thr_num;

pool->wait_exit_thr_num = 0;

pool->queue_front = 0;

pool->queue_rear = 0;

pool->queue_size = 0;

pool->queue_max_size = queue_max_size;

pool->shutdown = false;

/* 根据最大线程数,给工作线程数组开空间,清0 */

pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);

if (pool->threads == NULL)

{

printf("malloc threads false;n");

break;

}

memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);

/* 队列开空间 */

pool->task_queue =

(threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);

if (pool->task_queue == NULL)

{

printf("malloc task queue false;n");

break;

}

/* 初始化互斥锁和条件变量 */

if ( pthread_mutex_init(&(pool->lock), NULL) != 0 ||

pthread_mutex_init(&(pool->thread_counter), NULL) !=0 ||

pthread_cond_init(&(pool->queue_not_empty), NULL) !=0 ||

pthread_cond_init(&(pool->queue_not_full), NULL) !=0)

{

printf("init lock or cond false;n");

break;

}

/* 启动min_thr_num个工作线程 */

for (i=0; i<min_thr_num; i++)

{

/* pool指向当前线程池 threadpool_thread函数在后面讲解 */

pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);

printf("start thread 0x%x... n", (unsigned int)pool->threads[i]);

}

/* 管理者线程 admin_thread函数在后面讲解 */

pthread_create(&(pool->admin_tid), NULL, admin_thread, (void *)pool);

return pool;

} while(0);

/* 释放pool的空间 */

threadpool_free(pool);

return NULL;

}

二、线程数组

线程数组实际上是在线程池初始化时开辟的一段存放一堆线程tid的空间,在逻辑上形成一个池,里面放置着提前创建的线程;这段空间中包含了正在工作的线程,等待工作的线程(空闲线程),等待被销毁的线程,申明但没有初始化的线程空间;

线程池原理详解及如何用C语言实现线程池

 

 

/*工作线程*/

void *

threadpool_thread(void *threadpool)

{

threadpool_t *pool = (threadpool_t *)threadpool;

threadpool_task_t task;

while (true)

{

pthread_mutex_lock(&(pool->lock));

/* 无任务则阻塞在 “任务队列不为空” 上,有任务则跳出 */

while ((pool->queue_size == 0) && (!pool->shutdown))

{

printf("thread 0x%x is waiting n", (unsigned int)pthread_self());

pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));

/* 判断是否需要清除线程,自杀功能 */

if (pool->wait_exit_thr_num > 0)

{

pool->wait_exit_thr_num--;

/* 判断线程池中的线程数是否大于最小线程数,是则结束当前线程 */

if (pool->live_thr_num > pool->min_thr_num)

{

printf("thread 0x%x is exiting n", (unsigned int)pthread_self());

pool->live_thr_num--;

pthread_mutex_unlock(&(pool->lock));

pthread_exit(NULL);//结束线程

}

}

}

/* 线程池开关状态 */

if (pool->shutdown) //关闭线程池

{

pthread_mutex_unlock(&(pool->lock));

printf("thread 0x%x is exiting n", (unsigned int)pthread_self());

pthread_exit(NULL); //线程自己结束自己

}

//否则该线程可以拿出任务

task.function = pool->task_queue[pool->queue_front].function; //出队操作

task.arg = pool->task_queue[pool->queue_front].arg;

pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; //环型结构

pool->queue_size--;

//通知可以添加新任务

pthread_cond_broadcast(&(pool->queue_not_full));

//释放线程锁

pthread_mutex_unlock(&(pool->lock));

//执行刚才取出的任务

printf("thread 0x%x start working n", (unsigned int)pthread_self());

pthread_mutex_lock(&(pool->thread_counter)); //锁住忙线程变量

pool->busy_thr_num++;

pthread_mutex_unlock(&(pool->thread_counter));

(*(task.function))(task.arg); //执行任务

//任务结束处理

printf("thread 0x%x end working n", (unsigned int)pthread_self());

pthread_mutex_lock(&(pool->thread_counter));

pool->busy_thr_num--;

pthread_mutex_unlock(&(pool->thread_counter));

}

pthread_exit(NULL);

}

三、任务队列

任务队列的存在形式与线程数组相似;在线程池初始化时根据传入的最大任务数开辟空间;当服务器前方后请求到来后,分类并打包消息成为任务,将任务放入任务队列并通知空闲线程来取;不同之处在于任务队列有明显的先后顺序,先进先出;而线程数组中的线程则是一个竞争关系去拿到互斥锁争取任务;

线程池原理详解及如何用C语言实现线程池

 

 

/*向线程池的任务队列中添加一个任务*/

int

threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg)

{

pthread_mutex_lock(&(pool->lock));

/*如果队列满了,调用wait阻塞*/

while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))

pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));

/*如果线程池处于关闭状态*/

if (pool->shutdown)

{

pthread_mutex_unlock(&(pool->lock));

return -1;

}

/*清空工作线程的回调函数的参数arg*/

if (pool->task_queue[pool->queue_rear].arg != NULL)

{

free(pool->task_queue[pool->queue_rear].arg);

pool->task_queue[pool->queue_rear].arg = NULL;

}

/*添加任务到任务队列*/

pool->task_queue[pool->queue_rear].function = function;

pool->task_queue[pool->queue_rear].arg = arg;

pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 逻辑环 */

pool->queue_size++;

/*添加完任务后,队列就不为空了,唤醒线程池中的一个线程*/

pthread_cond_signal(&(pool->queue_not_empty));

pthread_mutex_unlock(&(pool->lock));

return 0;

}

四、管理者线程

作为线程池的管理者,该线程的主要功能包括:检查线程池内线程的存活状态,工作状态;负责根据服务器当前的请求状态去动态的增加或删除线程,保证线程池中的线程数量维持在一个合理高效的平衡上;

说到底,它就是一个单独的线程,定时的去检查,根据我们的一个维持平衡算法去增删线程;

/*管理线程*/

void *

admin_thread(void *threadpool)

{

int i;

threadpool_t *pool = (threadpool_t *)threadpool;

while (!pool->shutdown)

{

printf("admin -----------------n");

sleep(DEFAULT_TIME); /*隔一段时间再管理*/

pthread_mutex_lock(&(pool->lock)); /*加锁*/

int queue_size = pool->queue_size; /*任务数*/

int live_thr_num = pool->live_thr_num; /*存活的线程数*/

pthread_mutex_unlock(&(pool->lock)); /*解锁*/

pthread_mutex_lock(&(pool->thread_counter));

int busy_thr_num = pool->busy_thr_num; /*忙线程数*/

pthread_mutex_unlock(&(pool->thread_counter));

printf("admin busy live -%d--%d-n", busy_thr_num, live_thr_num);

/*创建新线程 实际任务数量大于 最小正在等待的任务数量,存活线程数小于最大线程数*/

if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num)

{

printf("admin add-----------n");

pthread_mutex_lock(&(pool->lock));

int add=0;

/*一次增加 DEFAULT_THREAD_NUM 个线程*/

for (i=0; i<pool->max_thr_num && add<DEFAULT_THREAD_NUM

&& pool->live_thr_num < pool->max_thr_num; i++)

{

if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))

{

pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);

add++;

pool->live_thr_num++;

printf("new thread -----------------------n");

}

}

pthread_mutex_unlock(&(pool->lock));

}

/*销毁多余的线程 忙线程x2 都小于 存活线程,并且存活的大于最小线程数*/

if ((busy_thr_num*2) < live_thr_num && live_thr_num > pool->min_thr_num)

{

// printf("admin busy --%d--%d----n", busy_thr_num, live_thr_num);

/*一次销毁DEFAULT_THREAD_NUM个线程*/

pthread_mutex_lock(&(pool->lock));

pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;

pthread_mutex_unlock(&(pool->lock));

for (i=0; i<DEFAULT_THREAD_NUM; i++)

{

//通知正在处于空闲的线程,自杀

pthread_cond_signal(&(pool->queue_not_empty));

printf("admin cler --n");

}

}

}

return NULL;

/*线程是否存活*/

int

is_thread_alive(pthread_t tid)

{

int kill_rc = pthread_kill(tid, 0); //发送0号信号,测试是否存活

if (kill_rc == ESRCH) //线程不存在

{

return false;

}

return true;

}

五、释放

/*释放线程池*/

int

threadpool_free(threadpool_t *pool)

{

if (pool == NULL)

return -1;

if (pool->task_queue)

free(pool->task_queue);

if (pool->threads)

{

free(pool->threads);

pthread_mutex_lock(&(pool->lock)); /*先锁住再销毁*/

pthread_mutex_destroy(&(pool->lock));

pthread_mutex_lock(&(pool->thread_counter));

pthread_mutex_destroy(&(pool->thread_counter));

pthread_cond_destroy(&(pool->queue_not_empty));

pthread_cond_destroy(&(pool->queue_not_full));

}

free(pool);

pool = NULL;

return 0;

}

/*销毁线程池*/

int

threadpool_destroy(threadpool_t *pool)

{

int i;

if (pool == NULL)

{

return -1;

}

pool->shutdown = true;

/*销毁管理者线程*/

pthread_join(pool->admin_tid, NULL);

//通知所有线程去自杀(在自己领任务的过程中)

for (i=0; i<pool->live_thr_num; i++)

{

pthread_cond_broadcast(&(pool->queue_not_empty));

}

/*等待线程结束 先是pthread_exit 然后等待其结束*/

for (i=0; i<pool->live_thr_num; i++)

{

pthread_join(pool->threads[i], NULL);

}

threadpool_free(pool);

return 0;

}

六、接口

/* 线程池初始化,其管理者线程及工作线程都会启动 */

threadpool_t *thp = threadpool_create(10, 100, 100);

printf("threadpool init ... ... n");

/* 接收到任务后添加 */

threadpool_add_task(thp, do_work, (void *)p);

// ... ...

/* 销毁 */

threadpool_destroy(thp);

需要C/C++ linux服务器开发学习资料私信“资料”(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQLredis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享



Tags:线程池   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
原文链接: https://mp.weixin.qq.com/s/MTw7z6n_wk4y4CTmGkoRoA一切要从CPU说起你可能会有疑问,讲多线程为什么要从CPU说起呢?原因很简单,在这里没有那些时髦的概念,你可以更加清...【详细内容】
2021-08-13  Tags: 线程池  点击:(97)  评论:(0)  加入收藏
多线程并发是Java语言中非常重要的一块内容,同时,也是Java基础的一个难点。说它重要是因为多线程是日常开发中频繁用到的知识,说它难是因为多线程并发涉及到的知识点非常之多,想...【详细内容】
2021-07-12  Tags: 线程池  点击:(110)  评论:(0)  加入收藏
1. Dubbo简介及线程池策略Apache Dubbo 是一款高性能、轻量级的开源 Java 服务框架。提供了六大核心能力:面向接口代理的高性能RPC调用,智能容错和负载均衡,服务自动注册和发现...【详细内容】
2021-05-18  Tags: 线程池  点击:(202)  评论:(0)  加入收藏
在上一篇文章C++使用socket实现与微信小程序通信(下)中,小懵白就给大家简要地讲解了线程池的原理。 今天呢,小懵白就给大家继续讲解C++如何实现封装线程池类。第一步首先,我们需...【详细内容】
2021-05-14  Tags: 线程池  点击:(204)  评论:(0)  加入收藏
见字如面,我是威哥,一个从普通二本院校毕业,从未曾接触分布式、微服务、高并发到通过技术分享实现职场蜕变,成长为RocketMQ社区优秀布道师、大厂资深架构师,出版《RocketMQ技...【详细内容】
2021-03-31  Tags: 线程池  点击:(277)  评论:(0)  加入收藏
作者公众号:一角钱技术(org_yijiaoqian)前言线程池的具体实现有两种,分别是ThreadPoolExecutor 默认线程池和ScheduledThreadPoolExecutor 定时线程池,上一篇已经分析过ThreadPoo...【详细内容】
2020-12-22  Tags: 线程池  点击:(143)  评论:(0)  加入收藏
之前我们介绍了线程池的四种拒绝策略,了解了线程池参数的含义,那么今天我们来聊聊Java 中常见的几种线程池,以及在jdk7 加入的 ForkJoin 新型线程池 首先我们列出Java 中的...【详细内容】
2020-11-05  Tags: 线程池  点击:(91)  评论:(0)  加入收藏
前面几篇文章分析了线程的主要实现,今天来整体总结以下他们。总览图直接上总结的总览图,如下图: 如果看过前几篇文章应该基本能够看懂这张总结图,可能在单独的一篇文章里弄懂了...【详细内容】
2020-09-08  Tags: 线程池  点击:(63)  评论:(0)  加入收藏
大多数线程池实现都离不开锁的使用,如互斥量pthread_mutex*结合条件变量pthread_cond*。众所周知,锁的使用对于程序性能影响较大,虽然现有的pthread_mutex*在锁的申请与释放方...【详细内容】
2020-08-24  Tags: 线程池  点击:(87)  评论:(0)  加入收藏
作为 Java 程序员,无论是技术面试、项目研发或者是学习框架源码,不彻底掌握 Java 多线程的知识,做不到心中有数,干啥都没底气,尤其是技术深究时往往略显发憷。坐稳扶好,通过今天的...【详细内容】
2020-08-12  Tags: 线程池  点击:(48)  评论:(0)  加入收藏
▌简易百科推荐
一、简介很多时候我们都需要用到一些验证的方法,有时候需要用正则表达式校验数据时,往往需要到网上找很久,结果找到的还不是很符合自己想要的。所以我把自己整理的校验帮助类分...【详细内容】
2021-12-27  中年农码工    Tags:C#   点击:(2)  评论:(0)  加入收藏
引言在学习C语言或者其他编程语言的时候,我们编写的一个程序代码,基本都是在屏幕上打印出 hello world ,开始步入编程世(深)界(坑)的。C 语言版本的 hello world 代码:#include <std...【详细内容】
2021-12-21  一起学嵌入式    Tags:C 语言   点击:(11)  评论:(0)  加入收藏
读取SQLite数据库,就是读取一个路径\\192.168.100.**\position\db.sqlite下的文件<startup useLegacyV2RuntimeActivationPolicy="true"> <supportedRuntime version="v4.0"/...【详细内容】
2021-12-16  今朝我的奋斗    Tags:c#   点击:(21)  评论:(0)  加入收藏
什么是shellshell是c语言编写的程序,它在用户和操作系统之间架起了一座桥梁,用户可以通过这个桥梁访问操作系统内核服务。 它既是一种命令语言,同时也是一种程序设计语言,你可以...【详细内容】
2021-12-16  梦回故里归来    Tags:shell脚本   点击:(18)  评论:(0)  加入收藏
一、编程语言1.根据熟悉的语言,谈谈两种语言的区别?主要浅谈下C/C++和PHP语言的区别:1)PHP弱类型语言,一种脚本语言,对数据的类型不要求过多,较多的应用于Web应用开发,现在好多互...【详细内容】
2021-12-15  linux上的码农    Tags:c/c++   点击:(17)  评论:(0)  加入收藏
1.字符串数组+初始化char s1[]="array"; //字符数组char s2[6]="array"; //数组长度=字符串长度+1,因为字符串末尾会自动添&lsquo;\0&lsquo;printf("%s,%c\n",s1,s2[2]);...【详细内容】
2021-12-08  灯-灯灯    Tags:C语言   点击:(47)  评论:(0)  加入收藏
函数调用约定(Calling Convention),是一个重要的基础概念,用来规定调用者和被调用者是如何传递参数的,既调用者如何将参数按照什么样的规范传递给被调用者。在参数传递中,有两个很...【详细内容】
2021-11-30  小智雅汇    Tags:函数   点击:(19)  评论:(0)  加入收藏
一、问题提出问题:把m个苹果放入n个盘子中,允许有的盘子为空,共有多少种方法?注:5,1,1和1 5 1属同一种方法m,n均小于10二、算法分析设f(m,n) 为m个苹果,n个盘子的放法数目,则先对...【详细内容】
2021-11-17  C语言编程    Tags:C语言   点击:(49)  评论:(0)  加入收藏
一、为什么需要使用内存池在C/C++中我们通常使用malloc,free或new,delete来动态分配内存。一方面,因为这些函数涉及到了系统调用,所以频繁的调用必然会导致程序性能的损耗;另一...【详细内容】
2021-11-17  深度Linux    Tags:C++   点击:(38)  评论:(0)  加入收藏
OpenCV(Open Source Computer Vision Library)是一个(开源免费)发行的跨平台计算机视觉库,可以运行在Linux、Windows、Android、ios等操作系统上,它轻量级而且高效---由一系列...【详细内容】
2021-11-11  zls315    Tags:C#   点击:(50)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条