diff --git a/src/util/Makefile.in b/src/util/Makefile.in index 1817811ce5..4a811364e5 100644 --- a/src/util/Makefile.in +++ b/src/util/Makefile.in @@ -40,6 +40,8 @@ includes = \ ${TOP_INCDIR}/afs/softsig.h \ ${TOP_INCDIR}/afs/work_queue.h \ ${TOP_INCDIR}/afs/work_queue_types.h \ + ${TOP_INCDIR}/afs/thread_pool.h \ + ${TOP_INCDIR}/afs/thread_pool_types.h \ ${TOP_INCDIR}/potpourri.h all: ${includes} \ @@ -106,6 +108,12 @@ ${TOP_INCDIR}/afs/work_queue.h: ${srcdir}/work_queue.h ${TOP_INCDIR}/afs/work_queue_types.h: ${srcdir}/work_queue_types.h ${INSTALL_DATA} $? $@ +${TOP_INCDIR}/afs/thread_pool.h: ${srcdir}/thread_pool.h + ${INSTALL_DATA} $? $@ + +${TOP_INCDIR}/afs/thread_pool_types.h: ${srcdir}/thread_pool_types.h + ${INSTALL_DATA} $? $@ + ${TOP_INCDIR}/potpourri.h: ${srcdir}/potpourri.h ${INSTALL_DATA} $? $@ @@ -255,6 +263,8 @@ install: dirpath.h util.a sys ${INSTALL_DATA} ${srcdir}/softsig.h ${DESTDIR}${includedir}/afs/softsig.h ${INSTALL_DATA} ${srcdir}/work_queue.h ${DESTDIR}${includedir}/afs/work_queue.h ${INSTALL_DATA} ${srcdir}/work_queue_types.h ${DESTDIR}${includedir}/afs/work_queue_types.h + ${INSTALL_DATA} ${srcdir}/thread_pool.h ${DESTDIR}${includedir}/afs/thread_pool.h + ${INSTALL_DATA} ${srcdir}/thread_pool_types.h ${DESTDIR}${includedir}/afs/thread_pool_types.h ${INSTALL_DATA} ${srcdir}/potpourri.h ${DESTDIR}${includedir}/potpourri.h ${INSTALL_DATA} util.a ${DESTDIR}${libdir}/afs/util.a ${INSTALL_DATA} util.a ${DESTDIR}${libdir}/afs/libafsutil.a @@ -282,6 +292,8 @@ dest: dirpath.h util.a sys ${INSTALL_DATA} ${srcdir}/softsig.h ${DEST}/include/afs/softsig.h ${INSTALL_DATA} ${srcdir}/work_queue.h ${DEST}/include/afs/work_queue.h ${INSTALL_DATA} ${srcdir}/work_queue_types.h ${DEST}/include/afs/work_queue_types.h + ${INSTALL_DATA} ${srcdir}/thread_pool.h ${DEST}/include/afs/thread_pool.h + ${INSTALL_DATA} ${srcdir}/thread_pool_types.h ${DEST}/include/afs/thread_pool_types.h ${INSTALL_DATA} ${srcdir}/potpourri.h ${DEST}/include/potpourri.h ${INSTALL_DATA} util.a ${DEST}/lib/afs/util.a ${INSTALL_DATA} util.a ${DEST}/lib/afs/libafsutil.a diff --git a/src/util/thread_pool.c b/src/util/thread_pool.c new file mode 100644 index 0000000000..5eed86075e --- /dev/null +++ b/src/util/thread_pool.c @@ -0,0 +1,484 @@ +/* + * Copyright 2008-2010, Sine Nomine Associates and others. + * All Rights Reserved. + * + * This software has been released under the terms of the IBM Public + * License. For details, see the LICENSE file in the top-level source + * directory or online at http://www.openafs.org/dl/license10.html + */ + +#include +#include + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#if defined(AFS_SUN5_ENV) || defined(AFS_HPUX_ENV) +#include +#endif +#include +#include +#include + +#define __AFS_THREAD_POOL_IMPL 1 +#include "work_queue.h" +#include "thread_pool.h" +#include "thread_pool_impl.h" + +/** + * public interfaces for thread_pool. + */ + +/** + * allocate a thread pool object. + * + * @param[inout] pool_out address in which to store pool object pointer + * + * @return operation status + * @retval 0 success + * @retval ENOMEM out of memory + * + * @internal + */ +static int +_afs_tp_alloc(struct afs_thread_pool ** pool_out) +{ + int ret = 0; + struct afs_thread_pool * pool; + + *pool_out = pool = malloc(sizeof(*pool)); + if (pool == NULL) { + ret = ENOMEM; + goto error; + } + + error: + return ret; +} + +/** + * free a thread pool object. + * + * @param[in] pool thread pool object + * + * @return operation status + * @retval 0 success + * + * @internal + */ +static int +_afs_tp_free(struct afs_thread_pool * pool) +{ + int ret = 0; + + free(pool); + + return ret; +} + +/** + * allocate a thread worker object. + * + * @param[inout] worker_out address in which to store worker object pointer + * + * @return operation status + * @retval 0 success + * @retval ENOMEM out of memory + * + * @internal + */ +static int +_afs_tp_worker_alloc(struct afs_thread_pool_worker ** worker_out) +{ + int ret = 0; + struct afs_thread_pool_worker * worker; + + *worker_out = worker = malloc(sizeof(*worker)); + if (worker == NULL) { + ret = ENOMEM; + goto error; + } + + queue_NodeInit(&worker->worker_list); + + error: + return ret; +} + +/** + * free a thread worker object. + * + * @param[in] worker thread worker object + * + * @return operation status + * @retval 0 success + * + * @internal + */ +static int +_afs_tp_worker_free(struct afs_thread_pool_worker * worker) +{ + int ret = 0; + + free(worker); + + return ret; +} + +/** + * low-level thread entry point. + * + * @param[in] rock opaque pointer to thread worker object + * + * @return opaque return pointer from pool entry function + * + * @internal + */ +static void * +_afs_tp_worker_run(void * rock) +{ + struct afs_thread_pool_worker * worker = rock; + struct afs_thread_pool * pool = worker->pool; + + /* register worker with pool */ + assert(pthread_mutex_lock(&pool->lock) == 0); + queue_Append(&pool->thread_list, worker); + pool->nthreads++; + assert(pthread_mutex_unlock(&pool->lock) == 0); + + /* call high-level entry point */ + worker->ret = (*pool->entry)(pool, worker, pool->work_queue, pool->rock); + + /* adjust pool live thread count */ + assert(pthread_mutex_lock(&pool->lock) == 0); + assert(pool->nthreads); + queue_Remove(worker); + pool->nthreads--; + if (!pool->nthreads) { + assert(pthread_cond_broadcast(&pool->shutdown_cv) == 0); + pool->state = AFS_TP_STATE_STOPPED; + } + assert(pthread_mutex_unlock(&pool->lock) == 0); + + _afs_tp_worker_free(worker); + + return NULL; +} + +/** + * default high-level thread entry point. + * + * @internal + */ +static void * +_afs_tp_worker_default(struct afs_thread_pool *pool, + struct afs_thread_pool_worker *worker, + struct afs_work_queue *queue, + void *rock) +{ + int code = 0; + while (code == 0 && afs_tp_worker_continue(worker)) { + code = afs_wq_do(queue, NULL /* no call rock */); + } + + return NULL; +} + +/** + * start a worker thread. + * + * @param[in] pool thread pool object + * @param[inout] worker_out address in which to store worker thread object pointer + * + * @return operation status + * @retval 0 success + * @retval ENOMEM out of memory + */ +static int +_afs_tp_worker_start(struct afs_thread_pool * pool, + struct afs_thread_pool_worker ** worker_out) +{ + int ret = 0; + pthread_attr_t attrs; + struct afs_thread_pool_worker * worker; + + ret = _afs_tp_worker_alloc(worker_out); + if (ret) { + goto error; + } + worker = *worker_out; + + worker->pool = pool; + worker->req_shutdown = 0; + + assert(pthread_attr_init(&attrs) == 0); + assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0); + + ret = pthread_create(&worker->tid, &attrs, &_afs_tp_worker_run, worker); + + error: + return ret; +} + +/** + * create a thread pool. + * + * @param[inout] pool_out address in which to store pool object pointer. + * @param[in] queue work queue serviced by thread pool + * + * @return operation status + * @retval 0 success + * @retval ENOMEM out of memory + */ +int +afs_tp_create(struct afs_thread_pool ** pool_out, + struct afs_work_queue * queue) +{ + int ret = 0; + struct afs_thread_pool * pool; + + ret = _afs_tp_alloc(pool_out); + if (ret) { + goto error; + } + pool = *pool_out; + + assert(pthread_mutex_init(&pool->lock, NULL) == 0); + assert(pthread_cond_init(&pool->shutdown_cv, NULL) == 0); + queue_Init(&pool->thread_list); + pool->work_queue = queue; + pool->entry = &_afs_tp_worker_default; + pool->rock = NULL; + pool->nthreads = 0; + pool->max_threads = 4; + pool->state = AFS_TP_STATE_INIT; + + error: + return ret; +} + +/** + * destroy a thread pool. + * + * @param[in] pool thread pool object to be destroyed + * + * @return operation status + * @retval 0 success + * @retval AFS_TP_ERROR pool not in a quiescent state + */ +int +afs_tp_destroy(struct afs_thread_pool * pool) +{ + int ret = 0; + + assert(pthread_mutex_lock(&pool->lock) == 0); + switch (pool->state) { + case AFS_TP_STATE_INIT: + case AFS_TP_STATE_STOPPED: + _afs_tp_free(pool); + break; + + default: + ret = AFS_TP_ERROR; + assert(pthread_mutex_unlock(&pool->lock) == 0); + } + + return ret; +} + +/** + * set the number of threads to spawn. + * + * @param[in] pool thread pool object + * @param[in] threads number of threads to spawn + * + * @return operation status + * @retval 0 success + * @retval AFS_TP_ERROR thread pool has already been started + */ +int +afs_tp_set_threads(struct afs_thread_pool *pool, + afs_uint32 threads) +{ + int ret = 0; + + assert(pthread_mutex_lock(&pool->lock) == 0); + if (pool->state != AFS_TP_STATE_INIT) { + ret = AFS_TP_ERROR; + } else { + pool->max_threads = threads; + } + assert(pthread_mutex_unlock(&pool->lock) == 0); + + return ret; +} + +/** + * set a custom thread entry point. + * + * @param[in] pool thread pool object + * @param[in] entry thread entry function pointer + * @param[in] rock opaque pointer passed to thread + * + * @return operation status + * @retval 0 success + * @retval AFS_TP_ERROR thread pool has already been started + */ +int +afs_tp_set_entry(struct afs_thread_pool * pool, + afs_tp_worker_func_t * entry, + void * rock) +{ + int ret = 0; + + assert(pthread_mutex_lock(&pool->lock) == 0); + if (pool->state != AFS_TP_STATE_INIT) { + ret = AFS_TP_ERROR; + } else { + pool->entry = entry; + pool->rock = rock; + } + assert(pthread_mutex_unlock(&pool->lock) == 0); + + return ret; +} + +/** + * start a thread pool. + * + * @param[in] pool thread pool object + * + * @return operation status + * @retval 0 success + * @retval AFS_TP_ERROR thread create failure + */ +int +afs_tp_start(struct afs_thread_pool * pool) +{ + int code, ret = 0; + struct afs_thread_pool_worker * worker; + afs_uint32 i; + + assert(pthread_mutex_lock(&pool->lock) == 0); + if (pool->state != AFS_TP_STATE_INIT) { + ret = AFS_TP_ERROR; + goto done_sync; + } + pool->state = AFS_TP_STATE_STARTING; + assert(pthread_mutex_unlock(&pool->lock) == 0); + + for (i = 0; i < pool->max_threads; i++) { + code = _afs_tp_worker_start(pool, &worker); + if (code) { + ret = code; + } + } + + assert(pthread_mutex_lock(&pool->lock) == 0); + pool->state = AFS_TP_STATE_RUNNING; + done_sync: + assert(pthread_mutex_unlock(&pool->lock) == 0); + + return ret; +} + +/** + * shut down all threads in pool. + * + * @param[in] pool thread pool object + * @param[in] block wait for all threads to terminate, if asserted + * + * @return operation status + * @retval 0 success + */ +int +afs_tp_shutdown(struct afs_thread_pool * pool, + int block) +{ + int ret = 0; + struct afs_thread_pool_worker * worker, *nn; + + assert(pthread_mutex_lock(&pool->lock) == 0); + if (pool->state == AFS_TP_STATE_STOPPED + || pool->state == AFS_TP_STATE_STOPPING) { + goto done_stopped; + } + if (pool->state != AFS_TP_STATE_RUNNING) { + ret = AFS_TP_ERROR; + goto done_sync; + } + pool->state = AFS_TP_STATE_STOPPING; + + for (queue_Scan(&pool->thread_list, worker, nn, afs_thread_pool_worker)) { + worker->req_shutdown = 1; + } + if (!pool->nthreads) { + pool->state = AFS_TP_STATE_STOPPED; + } + /* need to drop lock to get a membar here */ + assert(pthread_mutex_unlock(&pool->lock) == 0); + + ret = afs_wq_shutdown(pool->work_queue); + if (ret) { + goto error; + } + + assert(pthread_mutex_lock(&pool->lock) == 0); + done_stopped: + if (block) { + while (pool->nthreads) { + assert(pthread_cond_wait(&pool->shutdown_cv, + &pool->lock) == 0); + } + } + done_sync: + assert(pthread_mutex_unlock(&pool->lock) == 0); + + error: + return ret; +} + +/** + * check whether thread pool is online. + * + * @param[in] pool thread pool object + * + * @return whether pool is online + * @retval 1 pool is online + * @retval 0 pool is not online + */ +int +afs_tp_is_online(struct afs_thread_pool * pool) +{ + int ret; + + assert(pthread_mutex_lock(&pool->lock) == 0); + ret = (pool->state == AFS_TP_STATE_RUNNING); + assert(pthread_mutex_unlock(&pool->lock) == 0); + + return ret; +} + +/** + * check whether a given worker thread can continue to run. + * + * @param[in] worker worker thread object pointer + * + * @return whether thread can continue to execute + * @retval 1 execution can continue + * @retval 0 shutdown has been requested + */ +int +afs_tp_worker_continue(struct afs_thread_pool_worker * worker) +{ + return !worker->req_shutdown; +} diff --git a/src/util/thread_pool.h b/src/util/thread_pool.h new file mode 100644 index 0000000000..dd25fd1d26 --- /dev/null +++ b/src/util/thread_pool.h @@ -0,0 +1,38 @@ +/* + * Copyright 2008-2010, Sine Nomine Associates and others. + * All Rights Reserved. + * + * This software has been released under the terms of the IBM Public + * License. For details, see the LICENSE file in the top-level source + * directory or online at http://www.openafs.org/dl/license10.html + */ + +#ifndef AFS_UTIL_THREAD_POOL_H +#define AFS_UTIL_THREAD_POOL_H 1 + +#include "thread_pool_types.h" + +/** + * public interfaces for thread_pool. + */ + +/* XXX move these into an et */ +#define AFS_TP_ERROR -1 /**< fatal error in thread_pool package */ + +extern int afs_tp_create(struct afs_thread_pool **, + struct afs_work_queue *); +extern int afs_tp_destroy(struct afs_thread_pool *); + +extern int afs_tp_set_threads(struct afs_thread_pool *, afs_uint32 threads); +extern int afs_tp_set_entry(struct afs_thread_pool *, + afs_tp_worker_func_t *, + void * rock); + +extern int afs_tp_start(struct afs_thread_pool *); +extern int afs_tp_shutdown(struct afs_thread_pool *, + int block); + +extern int afs_tp_is_online(struct afs_thread_pool *); +extern int afs_tp_worker_continue(struct afs_thread_pool_worker *); + +#endif /* AFS_UTIL_THREAD_POOL_H */ diff --git a/src/util/thread_pool_impl.h b/src/util/thread_pool_impl.h new file mode 100644 index 0000000000..dce8ee6dc4 --- /dev/null +++ b/src/util/thread_pool_impl.h @@ -0,0 +1,20 @@ +/* + * Copyright 2008-2010, Sine Nomine Associates and others. + * All Rights Reserved. + * + * This software has been released under the terms of the IBM Public + * License. For details, see the LICENSE file in the top-level source + * directory or online at http://www.openafs.org/dl/license10.html + */ + +#ifndef AFS_UTIL_THREAD_POOL_IMPL_H +#define AFS_UTIL_THREAD_POOL_IMPL_H 1 + +#include "thread_pool.h" +#include "thread_pool_impl_types.h" + +/** + * implementation-private interfaces for thread_pool. + */ + +#endif /* AFS_UTIL_THREAD_POOL_IMPL_H */ diff --git a/src/util/thread_pool_impl_types.h b/src/util/thread_pool_impl_types.h new file mode 100644 index 0000000000..2467c0ddb6 --- /dev/null +++ b/src/util/thread_pool_impl_types.h @@ -0,0 +1,64 @@ +/* + * Copyright 2008-2010, Sine Nomine Associates and others. + * All Rights Reserved. + * + * This software has been released under the terms of the IBM Public + * License. For details, see the LICENSE file in the top-level source + * directory or online at http://www.openafs.org/dl/license10.html + */ + +#ifndef AFS_UTIL_THREAD_POOL_IMPL_TYPES_H +#define AFS_UTIL_THREAD_POOL_IMPL_TYPES_H 1 + +#ifndef __AFS_THREAD_POOL_IMPL +#error "do not include this file outside of the thread pool implementation" +#endif + +#include "thread_pool_types.h" +#include + +/** + * + * implementation-private type definitions for thread_pool. + */ + +/** + * thread_pool worker state. + */ +typedef enum { + AFS_TP_STATE_INIT, /**< initial state */ + AFS_TP_STATE_STARTING, /**< pool is starting up */ + AFS_TP_STATE_RUNNING, /**< pool is running normally */ + AFS_TP_STATE_STOPPING, /**< stop requested */ + AFS_TP_STATE_STOPPED, /**< pool is shut down */ + /* add new states above this line */ + AFS_TP_STATE_TERMINAL +} afs_tp_state_t; + +/** + * thread_pool worker. + */ +struct afs_thread_pool_worker { + struct rx_queue worker_list; /**< linked list of thread workers. */ + struct afs_thread_pool * pool; /**< associated thread pool */ + void * ret; /**< return value from worker thread entry point */ + pthread_t tid; /**< thread id */ + int req_shutdown; /**< request shutdown of this thread */ +}; + +/** + * thread pool. + */ +struct afs_thread_pool { + struct rx_queue thread_list; /**< linked list of threads */ + struct afs_work_queue * work_queue; /**< work queue serviced by this thread pool. */ + afs_tp_worker_func_t * entry; /**< worker thread entry point */ + void * rock; /**< opaque pointer passed to worker thread entry point */ + afs_uint32 nthreads; /**< current pool size */ + afs_tp_state_t state; /**< pool state */ + afs_uint32 max_threads; /**< pool options */ + pthread_mutex_t lock; /**< pool global state lock */ + pthread_cond_t shutdown_cv; /**< thread shutdown cv */ +}; + +#endif /* AFS_UTIL_THREAD_POOL_IMPL_TYPES_H */ diff --git a/src/util/thread_pool_types.h b/src/util/thread_pool_types.h new file mode 100644 index 0000000000..4767624824 --- /dev/null +++ b/src/util/thread_pool_types.h @@ -0,0 +1,37 @@ +/* + * Copyright 2008-2010, Sine Nomine Associates and others. + * All Rights Reserved. + * + * This software has been released under the terms of the IBM Public + * License. For details, see the LICENSE file in the top-level source + * directory or online at http://www.openafs.org/dl/license10.html + */ + +#ifndef AFS_UTIL_THREAD_POOL_TYPES_H +#define AFS_UTIL_THREAD_POOL_TYPES_H 1 + +/** + * public type definitions for thread_pool. + */ + +/* forward declare opaque types */ +struct afs_thread_pool_worker; +struct afs_thread_pool; +struct afs_work_queue; + +/** + * thread_pool worker thread entry function. + * + * @param[in] pool thread pool object pointer + * @param[in] worker worker thread object pointer + * @param[in] queue work queue object pointer + * @param[in] rock opaque pointer + * + * @return opaque pointer + */ +typedef void * afs_tp_worker_func_t(struct afs_thread_pool * pool, + struct afs_thread_pool_worker * worker, + struct afs_work_queue * queue, + void * rock); + +#endif /* AFS_UTIL_THREAD_POOL_TYPES_H */