Provide an abstract thread pool object

Add some routines to maintain a pool of threads, for working through a
Vwork_queue.

This adds the afs_tp* series of functions. Originally written by
Tom Keiser.

Change-Id: I8735aa14ca6622ae0eca7a7589e69b0f3b3daf08
Reviewed-on: http://gerrit.openafs.org/1863
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Derrick Brashear <shadow@dementia.org>
This commit is contained in:
Andrew Deason 2010-03-11 10:43:54 -06:00 committed by Derrick Brashear
parent bfbc65676d
commit 4ca57f3fd3
6 changed files with 655 additions and 0 deletions

View File

@ -40,6 +40,8 @@ includes = \
${TOP_INCDIR}/afs/softsig.h \
${TOP_INCDIR}/afs/work_queue.h \
${TOP_INCDIR}/afs/work_queue_types.h \
${TOP_INCDIR}/afs/thread_pool.h \
${TOP_INCDIR}/afs/thread_pool_types.h \
${TOP_INCDIR}/potpourri.h
all: ${includes} \
@ -106,6 +108,12 @@ ${TOP_INCDIR}/afs/work_queue.h: ${srcdir}/work_queue.h
${TOP_INCDIR}/afs/work_queue_types.h: ${srcdir}/work_queue_types.h
${INSTALL_DATA} $? $@
${TOP_INCDIR}/afs/thread_pool.h: ${srcdir}/thread_pool.h
${INSTALL_DATA} $? $@
${TOP_INCDIR}/afs/thread_pool_types.h: ${srcdir}/thread_pool_types.h
${INSTALL_DATA} $? $@
${TOP_INCDIR}/potpourri.h: ${srcdir}/potpourri.h
${INSTALL_DATA} $? $@
@ -255,6 +263,8 @@ install: dirpath.h util.a sys
${INSTALL_DATA} ${srcdir}/softsig.h ${DESTDIR}${includedir}/afs/softsig.h
${INSTALL_DATA} ${srcdir}/work_queue.h ${DESTDIR}${includedir}/afs/work_queue.h
${INSTALL_DATA} ${srcdir}/work_queue_types.h ${DESTDIR}${includedir}/afs/work_queue_types.h
${INSTALL_DATA} ${srcdir}/thread_pool.h ${DESTDIR}${includedir}/afs/thread_pool.h
${INSTALL_DATA} ${srcdir}/thread_pool_types.h ${DESTDIR}${includedir}/afs/thread_pool_types.h
${INSTALL_DATA} ${srcdir}/potpourri.h ${DESTDIR}${includedir}/potpourri.h
${INSTALL_DATA} util.a ${DESTDIR}${libdir}/afs/util.a
${INSTALL_DATA} util.a ${DESTDIR}${libdir}/afs/libafsutil.a
@ -282,6 +292,8 @@ dest: dirpath.h util.a sys
${INSTALL_DATA} ${srcdir}/softsig.h ${DEST}/include/afs/softsig.h
${INSTALL_DATA} ${srcdir}/work_queue.h ${DEST}/include/afs/work_queue.h
${INSTALL_DATA} ${srcdir}/work_queue_types.h ${DEST}/include/afs/work_queue_types.h
${INSTALL_DATA} ${srcdir}/thread_pool.h ${DEST}/include/afs/thread_pool.h
${INSTALL_DATA} ${srcdir}/thread_pool_types.h ${DEST}/include/afs/thread_pool_types.h
${INSTALL_DATA} ${srcdir}/potpourri.h ${DEST}/include/potpourri.h
${INSTALL_DATA} util.a ${DEST}/lib/afs/util.a
${INSTALL_DATA} util.a ${DEST}/lib/afs/libafsutil.a

484
src/util/thread_pool.c Normal file
View File

@ -0,0 +1,484 @@
/*
* Copyright 2008-2010, Sine Nomine Associates and others.
* All Rights Reserved.
*
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
*/
#include <afsconfig.h>
#include <afs/param.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <dirent.h>
#include <afs/assert.h>
#include <string.h>
#include <sys/file.h>
#include <sys/param.h>
#include <lock.h>
#if defined(AFS_SUN5_ENV) || defined(AFS_HPUX_ENV)
#include <unistd.h>
#endif
#include <afs/afsutil.h>
#include <lwp.h>
#include <afs/afsint.h>
#define __AFS_THREAD_POOL_IMPL 1
#include "work_queue.h"
#include "thread_pool.h"
#include "thread_pool_impl.h"
/**
* public interfaces for thread_pool.
*/
/**
* allocate a thread pool object.
*
* @param[inout] pool_out address in which to store pool object pointer
*
* @return operation status
* @retval 0 success
* @retval ENOMEM out of memory
*
* @internal
*/
static int
_afs_tp_alloc(struct afs_thread_pool ** pool_out)
{
int ret = 0;
struct afs_thread_pool * pool;
*pool_out = pool = malloc(sizeof(*pool));
if (pool == NULL) {
ret = ENOMEM;
goto error;
}
error:
return ret;
}
/**
* free a thread pool object.
*
* @param[in] pool thread pool object
*
* @return operation status
* @retval 0 success
*
* @internal
*/
static int
_afs_tp_free(struct afs_thread_pool * pool)
{
int ret = 0;
free(pool);
return ret;
}
/**
* allocate a thread worker object.
*
* @param[inout] worker_out address in which to store worker object pointer
*
* @return operation status
* @retval 0 success
* @retval ENOMEM out of memory
*
* @internal
*/
static int
_afs_tp_worker_alloc(struct afs_thread_pool_worker ** worker_out)
{
int ret = 0;
struct afs_thread_pool_worker * worker;
*worker_out = worker = malloc(sizeof(*worker));
if (worker == NULL) {
ret = ENOMEM;
goto error;
}
queue_NodeInit(&worker->worker_list);
error:
return ret;
}
/**
* free a thread worker object.
*
* @param[in] worker thread worker object
*
* @return operation status
* @retval 0 success
*
* @internal
*/
static int
_afs_tp_worker_free(struct afs_thread_pool_worker * worker)
{
int ret = 0;
free(worker);
return ret;
}
/**
* low-level thread entry point.
*
* @param[in] rock opaque pointer to thread worker object
*
* @return opaque return pointer from pool entry function
*
* @internal
*/
static void *
_afs_tp_worker_run(void * rock)
{
struct afs_thread_pool_worker * worker = rock;
struct afs_thread_pool * pool = worker->pool;
/* register worker with pool */
assert(pthread_mutex_lock(&pool->lock) == 0);
queue_Append(&pool->thread_list, worker);
pool->nthreads++;
assert(pthread_mutex_unlock(&pool->lock) == 0);
/* call high-level entry point */
worker->ret = (*pool->entry)(pool, worker, pool->work_queue, pool->rock);
/* adjust pool live thread count */
assert(pthread_mutex_lock(&pool->lock) == 0);
assert(pool->nthreads);
queue_Remove(worker);
pool->nthreads--;
if (!pool->nthreads) {
assert(pthread_cond_broadcast(&pool->shutdown_cv) == 0);
pool->state = AFS_TP_STATE_STOPPED;
}
assert(pthread_mutex_unlock(&pool->lock) == 0);
_afs_tp_worker_free(worker);
return NULL;
}
/**
* default high-level thread entry point.
*
* @internal
*/
static void *
_afs_tp_worker_default(struct afs_thread_pool *pool,
struct afs_thread_pool_worker *worker,
struct afs_work_queue *queue,
void *rock)
{
int code = 0;
while (code == 0 && afs_tp_worker_continue(worker)) {
code = afs_wq_do(queue, NULL /* no call rock */);
}
return NULL;
}
/**
* start a worker thread.
*
* @param[in] pool thread pool object
* @param[inout] worker_out address in which to store worker thread object pointer
*
* @return operation status
* @retval 0 success
* @retval ENOMEM out of memory
*/
static int
_afs_tp_worker_start(struct afs_thread_pool * pool,
struct afs_thread_pool_worker ** worker_out)
{
int ret = 0;
pthread_attr_t attrs;
struct afs_thread_pool_worker * worker;
ret = _afs_tp_worker_alloc(worker_out);
if (ret) {
goto error;
}
worker = *worker_out;
worker->pool = pool;
worker->req_shutdown = 0;
assert(pthread_attr_init(&attrs) == 0);
assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
ret = pthread_create(&worker->tid, &attrs, &_afs_tp_worker_run, worker);
error:
return ret;
}
/**
* create a thread pool.
*
* @param[inout] pool_out address in which to store pool object pointer.
* @param[in] queue work queue serviced by thread pool
*
* @return operation status
* @retval 0 success
* @retval ENOMEM out of memory
*/
int
afs_tp_create(struct afs_thread_pool ** pool_out,
struct afs_work_queue * queue)
{
int ret = 0;
struct afs_thread_pool * pool;
ret = _afs_tp_alloc(pool_out);
if (ret) {
goto error;
}
pool = *pool_out;
assert(pthread_mutex_init(&pool->lock, NULL) == 0);
assert(pthread_cond_init(&pool->shutdown_cv, NULL) == 0);
queue_Init(&pool->thread_list);
pool->work_queue = queue;
pool->entry = &_afs_tp_worker_default;
pool->rock = NULL;
pool->nthreads = 0;
pool->max_threads = 4;
pool->state = AFS_TP_STATE_INIT;
error:
return ret;
}
/**
* destroy a thread pool.
*
* @param[in] pool thread pool object to be destroyed
*
* @return operation status
* @retval 0 success
* @retval AFS_TP_ERROR pool not in a quiescent state
*/
int
afs_tp_destroy(struct afs_thread_pool * pool)
{
int ret = 0;
assert(pthread_mutex_lock(&pool->lock) == 0);
switch (pool->state) {
case AFS_TP_STATE_INIT:
case AFS_TP_STATE_STOPPED:
_afs_tp_free(pool);
break;
default:
ret = AFS_TP_ERROR;
assert(pthread_mutex_unlock(&pool->lock) == 0);
}
return ret;
}
/**
* set the number of threads to spawn.
*
* @param[in] pool thread pool object
* @param[in] threads number of threads to spawn
*
* @return operation status
* @retval 0 success
* @retval AFS_TP_ERROR thread pool has already been started
*/
int
afs_tp_set_threads(struct afs_thread_pool *pool,
afs_uint32 threads)
{
int ret = 0;
assert(pthread_mutex_lock(&pool->lock) == 0);
if (pool->state != AFS_TP_STATE_INIT) {
ret = AFS_TP_ERROR;
} else {
pool->max_threads = threads;
}
assert(pthread_mutex_unlock(&pool->lock) == 0);
return ret;
}
/**
* set a custom thread entry point.
*
* @param[in] pool thread pool object
* @param[in] entry thread entry function pointer
* @param[in] rock opaque pointer passed to thread
*
* @return operation status
* @retval 0 success
* @retval AFS_TP_ERROR thread pool has already been started
*/
int
afs_tp_set_entry(struct afs_thread_pool * pool,
afs_tp_worker_func_t * entry,
void * rock)
{
int ret = 0;
assert(pthread_mutex_lock(&pool->lock) == 0);
if (pool->state != AFS_TP_STATE_INIT) {
ret = AFS_TP_ERROR;
} else {
pool->entry = entry;
pool->rock = rock;
}
assert(pthread_mutex_unlock(&pool->lock) == 0);
return ret;
}
/**
* start a thread pool.
*
* @param[in] pool thread pool object
*
* @return operation status
* @retval 0 success
* @retval AFS_TP_ERROR thread create failure
*/
int
afs_tp_start(struct afs_thread_pool * pool)
{
int code, ret = 0;
struct afs_thread_pool_worker * worker;
afs_uint32 i;
assert(pthread_mutex_lock(&pool->lock) == 0);
if (pool->state != AFS_TP_STATE_INIT) {
ret = AFS_TP_ERROR;
goto done_sync;
}
pool->state = AFS_TP_STATE_STARTING;
assert(pthread_mutex_unlock(&pool->lock) == 0);
for (i = 0; i < pool->max_threads; i++) {
code = _afs_tp_worker_start(pool, &worker);
if (code) {
ret = code;
}
}
assert(pthread_mutex_lock(&pool->lock) == 0);
pool->state = AFS_TP_STATE_RUNNING;
done_sync:
assert(pthread_mutex_unlock(&pool->lock) == 0);
return ret;
}
/**
* shut down all threads in pool.
*
* @param[in] pool thread pool object
* @param[in] block wait for all threads to terminate, if asserted
*
* @return operation status
* @retval 0 success
*/
int
afs_tp_shutdown(struct afs_thread_pool * pool,
int block)
{
int ret = 0;
struct afs_thread_pool_worker * worker, *nn;
assert(pthread_mutex_lock(&pool->lock) == 0);
if (pool->state == AFS_TP_STATE_STOPPED
|| pool->state == AFS_TP_STATE_STOPPING) {
goto done_stopped;
}
if (pool->state != AFS_TP_STATE_RUNNING) {
ret = AFS_TP_ERROR;
goto done_sync;
}
pool->state = AFS_TP_STATE_STOPPING;
for (queue_Scan(&pool->thread_list, worker, nn, afs_thread_pool_worker)) {
worker->req_shutdown = 1;
}
if (!pool->nthreads) {
pool->state = AFS_TP_STATE_STOPPED;
}
/* need to drop lock to get a membar here */
assert(pthread_mutex_unlock(&pool->lock) == 0);
ret = afs_wq_shutdown(pool->work_queue);
if (ret) {
goto error;
}
assert(pthread_mutex_lock(&pool->lock) == 0);
done_stopped:
if (block) {
while (pool->nthreads) {
assert(pthread_cond_wait(&pool->shutdown_cv,
&pool->lock) == 0);
}
}
done_sync:
assert(pthread_mutex_unlock(&pool->lock) == 0);
error:
return ret;
}
/**
* check whether thread pool is online.
*
* @param[in] pool thread pool object
*
* @return whether pool is online
* @retval 1 pool is online
* @retval 0 pool is not online
*/
int
afs_tp_is_online(struct afs_thread_pool * pool)
{
int ret;
assert(pthread_mutex_lock(&pool->lock) == 0);
ret = (pool->state == AFS_TP_STATE_RUNNING);
assert(pthread_mutex_unlock(&pool->lock) == 0);
return ret;
}
/**
* check whether a given worker thread can continue to run.
*
* @param[in] worker worker thread object pointer
*
* @return whether thread can continue to execute
* @retval 1 execution can continue
* @retval 0 shutdown has been requested
*/
int
afs_tp_worker_continue(struct afs_thread_pool_worker * worker)
{
return !worker->req_shutdown;
}

38
src/util/thread_pool.h Normal file
View File

@ -0,0 +1,38 @@
/*
* Copyright 2008-2010, Sine Nomine Associates and others.
* All Rights Reserved.
*
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
*/
#ifndef AFS_UTIL_THREAD_POOL_H
#define AFS_UTIL_THREAD_POOL_H 1
#include "thread_pool_types.h"
/**
* public interfaces for thread_pool.
*/
/* XXX move these into an et */
#define AFS_TP_ERROR -1 /**< fatal error in thread_pool package */
extern int afs_tp_create(struct afs_thread_pool **,
struct afs_work_queue *);
extern int afs_tp_destroy(struct afs_thread_pool *);
extern int afs_tp_set_threads(struct afs_thread_pool *, afs_uint32 threads);
extern int afs_tp_set_entry(struct afs_thread_pool *,
afs_tp_worker_func_t *,
void * rock);
extern int afs_tp_start(struct afs_thread_pool *);
extern int afs_tp_shutdown(struct afs_thread_pool *,
int block);
extern int afs_tp_is_online(struct afs_thread_pool *);
extern int afs_tp_worker_continue(struct afs_thread_pool_worker *);
#endif /* AFS_UTIL_THREAD_POOL_H */

View File

@ -0,0 +1,20 @@
/*
* Copyright 2008-2010, Sine Nomine Associates and others.
* All Rights Reserved.
*
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
*/
#ifndef AFS_UTIL_THREAD_POOL_IMPL_H
#define AFS_UTIL_THREAD_POOL_IMPL_H 1
#include "thread_pool.h"
#include "thread_pool_impl_types.h"
/**
* implementation-private interfaces for thread_pool.
*/
#endif /* AFS_UTIL_THREAD_POOL_IMPL_H */

View File

@ -0,0 +1,64 @@
/*
* Copyright 2008-2010, Sine Nomine Associates and others.
* All Rights Reserved.
*
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
*/
#ifndef AFS_UTIL_THREAD_POOL_IMPL_TYPES_H
#define AFS_UTIL_THREAD_POOL_IMPL_TYPES_H 1
#ifndef __AFS_THREAD_POOL_IMPL
#error "do not include this file outside of the thread pool implementation"
#endif
#include "thread_pool_types.h"
#include <rx/rx_queue.h>
/**
*
* implementation-private type definitions for thread_pool.
*/
/**
* thread_pool worker state.
*/
typedef enum {
AFS_TP_STATE_INIT, /**< initial state */
AFS_TP_STATE_STARTING, /**< pool is starting up */
AFS_TP_STATE_RUNNING, /**< pool is running normally */
AFS_TP_STATE_STOPPING, /**< stop requested */
AFS_TP_STATE_STOPPED, /**< pool is shut down */
/* add new states above this line */
AFS_TP_STATE_TERMINAL
} afs_tp_state_t;
/**
* thread_pool worker.
*/
struct afs_thread_pool_worker {
struct rx_queue worker_list; /**< linked list of thread workers. */
struct afs_thread_pool * pool; /**< associated thread pool */
void * ret; /**< return value from worker thread entry point */
pthread_t tid; /**< thread id */
int req_shutdown; /**< request shutdown of this thread */
};
/**
* thread pool.
*/
struct afs_thread_pool {
struct rx_queue thread_list; /**< linked list of threads */
struct afs_work_queue * work_queue; /**< work queue serviced by this thread pool. */
afs_tp_worker_func_t * entry; /**< worker thread entry point */
void * rock; /**< opaque pointer passed to worker thread entry point */
afs_uint32 nthreads; /**< current pool size */
afs_tp_state_t state; /**< pool state */
afs_uint32 max_threads; /**< pool options */
pthread_mutex_t lock; /**< pool global state lock */
pthread_cond_t shutdown_cv; /**< thread shutdown cv */
};
#endif /* AFS_UTIL_THREAD_POOL_IMPL_TYPES_H */

View File

@ -0,0 +1,37 @@
/*
* Copyright 2008-2010, Sine Nomine Associates and others.
* All Rights Reserved.
*
* This software has been released under the terms of the IBM Public
* License. For details, see the LICENSE file in the top-level source
* directory or online at http://www.openafs.org/dl/license10.html
*/
#ifndef AFS_UTIL_THREAD_POOL_TYPES_H
#define AFS_UTIL_THREAD_POOL_TYPES_H 1
/**
* public type definitions for thread_pool.
*/
/* forward declare opaque types */
struct afs_thread_pool_worker;
struct afs_thread_pool;
struct afs_work_queue;
/**
* thread_pool worker thread entry function.
*
* @param[in] pool thread pool object pointer
* @param[in] worker worker thread object pointer
* @param[in] queue work queue object pointer
* @param[in] rock opaque pointer
*
* @return opaque pointer
*/
typedef void * afs_tp_worker_func_t(struct afs_thread_pool * pool,
struct afs_thread_pool_worker * worker,
struct afs_work_queue * queue,
void * rock);
#endif /* AFS_UTIL_THREAD_POOL_TYPES_H */