多线程编程—线程池的实现

 

执行与任务分离的组件— 线程池

 

多线程技术主要解决了处理器单元内多个线程执行的问题,它可以显著的减少处理器单元的闲置时间,增加处理器单元的吞吐能力。线程池是多线程编程的一个必要组件,并且对于很多编程人员都是透明的,更是神秘的。有幸能为大家解析其中缘由,尚有不妥之处,欢迎大家抛砖。

 

线程池的概念,是一个用来管理一组执行任务线程的工具。既然是管理工具,那么该工具管理是用来管理任务与执行的。如图一线程池组件拓扑图,执行队列(Workers),任务队列(Jobs)和池管理(Pool Manager)三部分组成。

执行队列(Workers)是用来存放运行线程的队列。

任务队列(Jobs)是用来存放需要被执行的任务队列。

池管理(Pool Manager)主要是管理执行队列的执行顺序,执行任务的时间长短,对长时间没有使用的执行单元进行释放,执行单元满负荷运行的时及时添加执行单元;记录未执行的任务数量,对新任务入队,即将执行的任务出队等等。

图一 线程池组件拓扑图

执行队列(Workers)中的每一个执行单元(Worker)由哪些元素组成?线程ID,退出标志。

 

任务队列(Jobs)中的每一个任务(Jobs)的组成元素?执行每一个任务的具体执行函数,每一个任务的执行参数。

 

池管理(Pool Manager)由哪些元素组成?每一个新任务添加与执行时的移除用的互斥锁,每一个线程挂起的时所等待的条件变量。

 

根据分析如图二线程池的类图。

图二线程池的类图

到这里一个简单的线程池就已经可以呼之欲出了。以下为实现代码

/* * Author: WangBoJing * email: 1989wangbojing@gmail.com  * github: https://github.com/wangbojing */#include 
#include 
#include 
#include 
#define LL_ADD(item, list) do { \ item->prev = NULL;   \ item->next = list;   \ list = item;    \} while(0)#define LL_REMOVE(item, list) do {        \ if (item->prev != NULL) item->prev->next = item->next; \ if (item->next != NULL) item->next->prev = item->prev; \ if (list == item) list = item->next;     \ item->prev = item->next = NULL;       \} while(0)typedef void (*JOB_CALLBACK)(void *);struct NTHREADPOOL;typedef struct NWORKER { pthread_t thread; int terminate; struct NTHREADPOOL *pool; struct NWORKER *next; struct NWORKER *prev;} nWorker;typedef struct NJOB { JOB_CALLBACK job_func; void *arg; struct NJOB *next; struct NJOB *prev;} nJob;typedef struct NTHREADPOOL { struct NWORKER *workers; struct NJOB *jobs; pthread_mutex_t jobs_mtx; pthread_cond_t jobs_cond;} nThreadPool;void *ntyWorkerThread(void *arg) { nWorker *worker = (nWorker*)arg;  while (1) {  pthread_mutex_lock(&worker->pool->jobs_mtx);  while (worker->pool->jobs == NULL) {   if (worker->terminate) break;   pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mtx);  }    if (worker->terminate) {   pthread_mutex_unlock(&worker->pool->jobs_mtx);   break;  }   nJob *job = worker->pool->jobs;  if (job != NULL) {   LL_REMOVE(job, worker->pool->jobs);  }    pthread_mutex_unlock(&worker->pool->jobs_mtx);  if (job == NULL) continue;  job->job_func(job);    usleep(1); }  free(worker); pthread_exit(NULL); }int ntyThreadPoolCreate(nThreadPool *pool, int numWorkers) { if (pool == NULL) return 1; if (numWorkers < 1) numWorkers = 1;  memset(pool, 0, sizeof(nThreadPool));  pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&pool->jobs_cond, &blank_cond, sizeof(pool->jobs_cond));  pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&pool->jobs_mtx, &blank_mutex, sizeof(pool->jobs_mtx));  int i = 0; for (i = 0;i < numWorkers;i ++) {   nWorker *worker = (nWorker*)malloc(sizeof(nWorker));  if (worker == NULL) {   perror("malloc");   return 1;  }    memset(worker, 0, sizeof(nWorker));  worker->pool = pool;    int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void*)worker);  if (ret) {   perror("pthread_create");   free(worker);   return 1;  }    LL_ADD(worker, worker->pool->workers); }}void ntyThreadPoolShutdown(nThreadPool *pool) { nWorker *worker = NULL; for (worker = pool->workers;worker != NULL;worker = worker->next) {  worker->terminate = 1; }  pthread_mutex_lock(&pool->jobs_mtx);  pool->workers = NULL; pool->jobs = NULL; pthread_cond_broadcast(&pool->jobs_cond);  pthread_mutex_unlock(&pool->jobs_mtx); }void ntyThreadPoolPush(nThreadPool *pool, nJob *job) { pthread_mutex_lock(&pool->jobs_mtx); LL_ADD(job, pool->jobs);  pthread_cond_signal(&pool->jobs_cond); pthread_mutex_unlock(&pool->jobs_mtx); }/********************************* debug thread pool *********************************/#define KING_MAX_THREADS  80#define KING_COUNTER_SIZE 1000void king_counter(void *arg) { nJob *job = (nJob*)arg;  int index = *(int *)job->arg; printf("index: %d, selfid:%lu\n", index, pthread_self()); free(job->arg); free(job);}int main(int argc, char *argv[]) { nThreadPool pool; ntyThreadPoolCreate(&pool, KING_MAX_THREADS);  int i = 0; for (i = 0;i < KING_COUNTER_SIZE;i ++) {   nJob *job = (nJob*)malloc(sizeof(nJob));  if (job == NULL) {   perror("malloc");   exit(1);  }    job->job_func = king_counter;  job->arg = malloc(sizeof(int));    *(int*)job->arg = i;  ntyThreadPoolPush(&pool, job);   } getchar(); printf("You are very good !!!!\n"); }

这样的线程池还是只是一个Demo,原因有如下几点需要我们值得改进的。

  1. 线程池的线程数量是确定的,不能随着系统任务请求数量放缩线程池的大小。

  2. 任务数量的统计,并没有对任务队列进行统计

  3. 执行任务中的线程数量,等待执行的任务数量进行统计

  4. 每一个执行任务的时间没有做限制,

  5. IO密集型与计算密集型区分,线程池非常常用,但是根据不同的业务场景需要设置不同配置

  6. 在用户任务执行函数里,用户主动的调用了pthread_exit退出线程的保护机制

针对于以上几点问题,改进了一版线程池

/* * Author: WangBoJing * email: 1989wangbojing@gmail.com  * github: https://github.com/wangbojing */  #include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
typedef void (*JOB_CALLBACK)(void *);typedef struct NJOB { struct NJOB *next; JOB_CALLBACK func; void *arg;} nJob;typedef struct NWORKER { struct NWORKER *active_next; pthread_t active_tid;} nWorker;typedef struct NTHREADPOOL { struct NTHREADPOOL *forw; struct NTHREADPOOL *back; pthread_mutex_t mtx;  pthread_cond_t busycv; pthread_cond_t workcv; pthread_cond_t waitcv;  nWorker *active; nJob *head; nJob *tail; pthread_attr_t attr;  int flags; unsigned int linger; int minimum; int maximum; int nthreads; int idle; } nThreadPool;static void* ntyWorkerThread(void *arg);#define NTY_POOL_WAIT   0x01#define NTY_POOL_DESTROY  0x02static pthread_mutex_t nty_pool_lock = PTHREAD_MUTEX_INITIALIZER;static sigset_t fillset;nThreadPool *thread_pool = NULL;static int ntyWorkerCreate(nThreadPool *pool) { sigset_t oset; pthread_t thread_id; pthread_sigmask(SIG_SETMASK, &fillset, &oset); int error = pthread_create(&thread_id, &pool->attr, ntyWorkerThread, pool); pthread_sigmask(SIG_SETMASK, &oset, NULL); return error;}static void ntyWorkerCleanup(nThreadPool * pool) { --pool->nthreads; if (pool->flags & NTY_POOL_DESTROY) {  if (pool->nthreads == 0) {   pthread_cond_broadcast(&pool->busycv);  } } else if (pool->head != NULL && pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {  pool->nthreads ++; } pthread_mutex_unlock(&pool->mtx); }static void ntyNotifyWaiters(nThreadPool *pool) {  if (pool->head == NULL && pool->active == NULL) {  pool->flags &= ~NTY_POOL_WAIT;  pthread_cond_broadcast(&pool->waitcv); } }static void ntyJobCleanup(nThreadPool *pool) {  pthread_t tid = pthread_self(); nWorker *activep; nWorker **activepp;  pthread_mutex_lock(&pool->mtx); for (activepp = &pool->active;(activep = *activepp) != NULL;activepp = &activep->active_next) {  *activepp = activep->active_next;  break; } if (pool->flags & NTY_POOL_WAIT) ntyNotifyWaiters(pool); }static void* ntyWorkerThread(void *arg) { nThreadPool *pool = (nThreadPool*)arg; nWorker active;  int timeout; struct timespec ts; JOB_CALLBACK func;  pthread_mutex_lock(&pool->mtx); pthread_cleanup_push(ntyWorkerCleanup, pool); active.active_tid = pthread_self();  while (1) {   pthread_sigmask(SIG_SETMASK, &fillset, NULL);  pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);    timeout = 0;  pool->idle ++;    if (pool->flags & NTY_POOL_WAIT) {   ntyNotifyWaiters(pool);  }    while (pool->head == NULL && !(pool->flags & NTY_POOL_DESTROY)) {   if (pool->nthreads <= pool->minimum) {        pthread_cond_wait(&pool->workcv, &pool->mtx);       } else {       clock_gettime(CLOCK_REALTIME, &ts);    ts.tv_sec += pool->linger;    if (pool->linger == 0 || pthread_cond_timedwait(&pool->workcv, &pool->mtx, &ts) == ETIMEDOUT) {         timeout = 1;     break;    }   }  }  pool->idle --;    if (pool->flags & NTY_POOL_DESTROY) break;    nJob *job = pool->head;    if (job != NULL) {     timeout = 0;   func = job->func;      void *job_arg = job->arg;   pool->head = job->next;      if (job == pool->tail) {    pool->tail == NULL;   }      active.active_next = pool->active;   pool->active = &active;      pthread_mutex_unlock(&pool->mtx);   pthread_cleanup_push(ntyJobCleanup, pool);      free(job);   func(job_arg);      pthread_cleanup_pop(1);  }    if (timeout && (pool->nthreads > pool->minimum)) {   break;  } } pthread_cleanup_pop(1);  return NULL; }static void ntyCloneAttributes(pthread_attr_t *new_attr, pthread_attr_t *old_attr) { struct sched_param param; void *addr; size_t size; int value;  pthread_attr_init(new_attr);  if (old_attr != NULL) {   pthread_attr_getstack(old_attr, &addr, &size);  pthread_attr_setstack(new_attr, NULL, size);    pthread_attr_getscope(old_attr, &value);  pthread_attr_setscope(new_attr, value);    pthread_attr_getinheritsched(old_attr, &value);  pthread_attr_setinheritsched(new_attr, value);    pthread_attr_getschedpolicy(old_attr, &value);  pthread_attr_setschedpolicy(new_attr, value);    pthread_attr_getschedparam(old_attr, &param);  pthread_attr_setschedparam(new_attr, &param);    pthread_attr_getguardsize(old_attr, &size);  pthread_attr_setguardsize(new_attr, size);   } pthread_attr_setdetachstate(new_attr, PTHREAD_CREATE_DETACHED); }nThreadPool *ntyThreadPoolCreate(int min_threads, int max_threads, int linger, pthread_attr_t *attr) { sigfillset(&fillset); if (min_threads > max_threads || max_threads < 1) {  errno = EINVAL;  return NULL; }  nThreadPool *pool = (nThreadPool*)malloc(sizeof(nThreadPool)); if (pool == NULL) {  errno = ENOMEM;  return NULL; }  pthread_mutex_init(&pool->mtx, NULL); pthread_cond_init(&pool->busycv, NULL); pthread_cond_init(&pool->workcv, NULL); pthread_cond_init(&pool->waitcv, NULL);  pool->active = NULL; pool->head = NULL; pool->tail = NULL; pool->flags = 0; pool->linger = linger; pool->minimum = min_threads; pool->maximum = max_threads; pool->nthreads = 0; pool->idle = 0;  ntyCloneAttributes(&pool->attr, attr); pthread_mutex_lock(&nty_pool_lock);  if (thread_pool == NULL) {  pool->forw = pool;  pool->back = pool;    thread_pool = pool;   } else {   thread_pool->back->forw = pool;  pool->forw = thread_pool;  pool->back = pool->back;  thread_pool->back = pool;   }  pthread_mutex_unlock(&nty_pool_lock); return pool; }int ntyThreadPoolQueue(nThreadPool *pool, JOB_CALLBACK func, void *arg) { nJob *job = (nJob*)malloc(sizeof(nJob)); if (job == NULL) {  errno = ENOMEM;  return -1; } job->next = NULL; job->func = func; job->arg = arg;  pthread_mutex_lock(&pool->mtx); if (pool->head == NULL) {  pool->head = job; } else {  pool->tail->next = job; } pool->tail = job;  if (pool->idle > 0) {  pthread_cond_signal(&pool->workcv); } else if (pool->nthreads < pool->maximum && ntyWorkerCreate(pool) == 0) {  pool->nthreads ++; }  pthread_mutex_unlock(&pool->mtx);}void nThreadPoolWait(nThreadPool *pool) { pthread_mutex_lock(&pool->mtx); pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);  while (pool->head != NULL || pool->active != NULL) {  pool->flags |= NTY_POOL_WAIT;  pthread_cond_wait(&pool->waitcv, &pool->mtx); }  pthread_cleanup_pop(1);}void nThreadPoolDestroy(nThreadPool *pool) { nWorker *activep; nJob *job;  pthread_mutex_lock(&pool->mtx); pthread_cleanup_push(pthread_mutex_unlock, &pool->mtx);  pool->flags |= NTY_POOL_DESTROY; pthread_cond_broadcast(&pool->workcv);  for (activep = pool->active;activep != NULL;activep = activep->active_next) {  pthread_cancel(activep->active_tid); }  while (pool->nthreads != 0) {  pthread_cond_wait(&pool->busycv, &pool->mtx); }  pthread_cleanup_pop(1); pthread_mutex_lock(&nty_pool_lock);  if (thread_pool == pool) {  thread_pool = pool->forw; }  if (thread_pool == pool) {  thread_pool = NULL; } else {  pool->back->forw = pool->forw;  pool->forw->back = pool->back; }  pthread_mutex_unlock(&nty_pool_lock);  for (job = pool->head;job != NULL;job = pool->head) {  pool->head = job->next;  free(job); } pthread_attr_destroy(&pool->attr); free(pool); }/********************************* debug thread pool *********************************/void king_counter(void *arg) { int index = *(int*)arg; printf("index : %d, selfid : %lu\n", index, pthread_self());  free(arg); usleep(1);}#define KING_COUNTER_SIZE 1000int main(int argc, char *argv[]) { nThreadPool *pool = ntyThreadPoolCreate(10, 20, 15, NULL);  int i = 0; for (i = 0;i < KING_COUNTER_SIZE;i ++) {   int *index = (int*)malloc(sizeof(int));    memset(index, 0, sizeof(int));  memcpy(index, &i, sizeof(int));    ntyThreadPoolQueue(pool, king_counter, index);   }   getchar(); printf("You are very good !!!!\n");}

QQ图片20180315212334.png