DAFS: avoid volume lock contention during initialization

Avoid the excessive volume lock contention during startup to
improve the time to pre-attach a very large number of volumes.
The parallel attach worker threads avoid the volume lock
while scanning the partitions for volumes and send batches of
volume ids to the main thread to be preattached under the
volume lock.

FIXES 124489

Change-Id: Ieb33a3bdd5b06349abd9c3dd994c620021cd6194
Reviewed-on: http://gerrit.openafs.org/1092
Reviewed-by: Derrick Brashear <shadow@dementia.org>
Tested-by: Derrick Brashear <shadow@dementia.org>
This commit is contained in:
Michael Meffie 2010-01-11 21:16:06 -05:00 committed by Derrick Brashear
parent 523d39f020
commit 6b549bdba6

View File

@ -284,20 +284,77 @@ ffs(x)
#endif /* !AFS_HAVE_FFS */
#ifdef AFS_PTHREAD_ENV
/**
* disk partition queue element
*/
typedef struct diskpartition_queue_t {
struct rx_queue queue;
struct DiskPartition64 * diskP;
struct rx_queue queue; /**< queue header */
struct DiskPartition64 *diskP; /**< disk partition table entry */
} diskpartition_queue_t;
#ifndef AFS_DEMAND_ATTACH_FS
typedef struct vinitvolumepackage_thread_t {
struct rx_queue queue;
pthread_cond_t thread_done_cv;
int n_threads_complete;
} vinitvolumepackage_thread_t;
static void * VInitVolumePackageThread(void * args);
#else /* !AFS_DEMAND_ATTTACH_FS */
#define VINIT_BATCH_MAX_SIZE 512
/**
* disk partition work queue
*/
struct partition_queue {
struct rx_queue head; /**< diskpartition_queue_t queue */
pthread_mutex_t mutex;
pthread_cond_t cv;
};
/**
* volumes parameters for preattach
*/
struct volume_init_batch {
struct rx_queue queue; /**< queue header */
int thread; /**< posting worker thread */
int last; /**< indicates thread is done */
int size; /**< number of volume ids in batch */
Volume *batch[VINIT_BATCH_MAX_SIZE]; /**< volumes ids to preattach */
};
/**
* volume parameters work queue
*/
struct volume_init_queue {
struct rx_queue head; /**< volume_init_batch queue */
pthread_mutex_t mutex;
pthread_cond_t cv;
};
/**
* volume init worker thread parameters
*/
struct vinitvolumepackage_thread_param {
int nthreads; /**< total number of worker threads */
int thread; /**< thread number for this worker thread */
struct partition_queue *pq; /**< queue partitions to scan */
struct volume_init_queue *vq; /**< queue of volume to preattach */
};
static void *VInitVolumePackageThread(void *args);
static struct DiskPartition64 *VInitNextPartition(struct partition_queue *pq);
static VolId VInitNextVolumeId(DIR *dirp);
static int VInitPreAttachVolumes(int nthreads, struct volume_init_queue *vq);
#endif /* !AFS_DEMAND_ATTACH_FS */
#endif /* AFS_PTHREAD_ENV */
#ifndef AFS_DEMAND_ATTACH_FS
static int VAttachVolumesByPartition(struct DiskPartition64 *diskP,
int * nAttached, int * nUnattached);
#endif /* AFS_DEMAND_ATTACH_FS */
#ifdef AFS_DEMAND_ATTACH_FS
@ -572,13 +629,55 @@ VInitVolumePackage2(ProgramType pt, VolumePackageOptions * opts)
return 0;
}
#if !defined(AFS_PTHREAD_ENV)
/**
* Attach volumes in vice partitions
*
* @param[in] pt calling program type
*
* @return 0
* @note This is the original, non-threaded version of attach parititions.
*
* @post VInit state is 2
*/
int
VInitAttachVolumes(ProgramType pt)
{
assert(VInit==1);
if (pt == fileServer) {
struct DiskPartition64 *diskP;
/* Attach all the volumes in this partition */
for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
int nAttached = 0, nUnattached = 0;
assert(VAttachVolumesByPartition(diskP, &nAttached, &nUnattached) == 0);
}
}
VOL_LOCK;
VInit = 2; /* Initialized, and all volumes have been attached */
LWP_NoYieldSignal(VInitAttachVolumes);
VOL_UNLOCK;
return 0;
}
#endif /* !AFS_PTHREAD_ENV */
#if defined(AFS_PTHREAD_ENV) && !defined(AFS_DEMAND_ATTACH_FS)
/**
* Attach volumes in vice partitions
*
* @param[in] pt calling program type
*
* @return 0
* @note Threaded version of attach parititions.
*
* @post VInit state is 2
*/
int
VInitAttachVolumes(ProgramType pt)
{
assert(VInit==1);
if (pt == fileServer) {
struct DiskPartition64 *diskP;
#ifdef AFS_PTHREAD_ENV
struct vinitvolumepackage_thread_t params;
struct diskpartition_queue_t * dpq;
int i, threads, parts;
@ -605,13 +704,8 @@ VInitAttachVolumes(ProgramType pt)
assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
Log("VInitVolumePackage: beginning parallel fileserver startup\n");
#ifdef AFS_DEMAND_ATTACH_FS
Log("VInitVolumePackage: using %d threads to pre-attach volumes on %d partitions\n",
threads, parts);
#else /* AFS_DEMAND_ATTACH_FS */
Log("VInitVolumePackage: using %d threads to attach volumes on %d partitions\n",
threads, parts);
#endif /* AFS_DEMAND_ATTACH_FS */
VOL_LOCK;
for (i=0; i < threads; i++) {
@ -633,40 +727,21 @@ VInitAttachVolumes(ProgramType pt)
/* if we're only going to run one init thread, don't bother creating
* another LWP */
Log("VInitVolumePackage: beginning single-threaded fileserver startup\n");
#ifdef AFS_DEMAND_ATTACH_FS
Log("VInitVolumePackage: using 1 thread to pre-attach volumes on %d partition(s)\n",
parts);
#else /* AFS_DEMAND_ATTACH_FS */
Log("VInitVolumePackage: using 1 thread to attach volumes on %d partition(s)\n",
parts);
#endif /* AFS_DEMAND_ATTACH_FS */
VInitVolumePackageThread(&params);
}
assert(pthread_cond_destroy(&params.thread_done_cv) == 0);
#else /* AFS_PTHREAD_ENV */
/* Attach all the volumes in this partition */
for (diskP = DiskPartitionList; diskP; diskP = diskP->next) {
int nAttached = 0, nUnattached = 0;
assert(VAttachVolumesByPartition(diskP, &nAttached, &nUnattached) == 0);
}
#endif /* AFS_PTHREAD_ENV */
}
VOL_LOCK;
VInit = 2; /* Initialized, and all volumes have been attached */
#ifdef AFS_PTHREAD_ENV
assert(pthread_cond_broadcast(&vol_init_attach_cond) == 0);
#else
LWP_NoYieldSignal(VInitAttachVolumes);
#endif /* AFS_PTHREAD_ENV */
VOL_UNLOCK;
return 0;
}
#ifdef AFS_PTHREAD_ENV
static void *
VInitVolumePackageThread(void * args) {
@ -704,8 +779,285 @@ done:
VOL_UNLOCK;
return NULL;
}
#endif /* AFS_PTHREAD_ENV */
#endif /* AFS_PTHREAD_ENV && !AFS_DEMAND_ATTACH_FS */
#if defined(AFS_DEMAND_ATTACH_FS)
/**
* Attach volumes in vice partitions
*
* @param[in] pt calling program type
*
* @return 0
* @note Threaded version of attach partitions.
*
* @post VInit state is 2
*/
int
VInitAttachVolumes(ProgramType pt)
{
assert(VInit==1);
if (pt == fileServer) {
struct DiskPartition64 *diskP;
struct partition_queue pq;
struct volume_init_queue vq;
int i, threads, parts;
pthread_t tid;
pthread_attr_t attrs;
/* create partition work queue */
queue_Init(&pq);
assert(pthread_cond_init(&(pq.cv), NULL) == 0);
assert(pthread_mutex_init(&(pq.mutex), NULL) == 0);
for (parts = 0, diskP = DiskPartitionList; diskP; diskP = diskP->next, parts++) {
struct diskpartition_queue_t *dp;
dp = (struct diskpartition_queue_t*)malloc(sizeof(struct diskpartition_queue_t));
assert(dp != NULL);
dp->diskP = diskP;
queue_Append(&pq, dp);
}
/* number of worker threads; at least one, not to exceed the number of partitions */
threads = MIN(parts, vol_attach_threads);
/* create volume work queue */
queue_Init(&vq);
assert(pthread_cond_init(&(vq.cv), NULL) == 0);
assert(pthread_mutex_init(&(vq.mutex), NULL) == 0);
assert(pthread_attr_init(&attrs) == 0);
assert(pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED) == 0);
Log("VInitVolumePackage: beginning parallel fileserver startup\n");
Log("VInitVolumePackage: using %d threads to pre-attach volumes on %d partitions\n",
threads, parts);
/* create threads to scan disk partitions. */
for (i=0; i < threads; i++) {
struct vinitvolumepackage_thread_param *params;
params = (struct vinitvolumepackage_thread_param *)malloc(sizeof(struct vinitvolumepackage_thread_param));
assert(params);
params->pq = &pq;
params->vq = &vq;
params->nthreads = threads;
params->thread = i+1;
AFS_SIGSET_DECL;
AFS_SIGSET_CLEAR();
assert(pthread_create (&tid, &attrs, &VInitVolumePackageThread, (void*)params) == 0);
AFS_SIGSET_RESTORE();
}
VInitPreAttachVolumes(threads, &vq);
assert(pthread_attr_destroy(&attrs) == 0);
assert(pthread_cond_destroy(&pq.cv) == 0);
assert(pthread_mutex_destroy(&pq.mutex) == 0);
assert(pthread_cond_destroy(&vq.cv) == 0);
assert(pthread_mutex_destroy(&vq.mutex) == 0);
}
VOL_LOCK;
VInit = 2; /* Initialized, and all volumes have been attached */
assert(pthread_cond_broadcast(&vol_init_attach_cond) == 0);
VOL_UNLOCK;
return 0;
}
/**
* Volume package initialization worker thread. Scan partitions for volume
* header files. Gather batches of volume ids and dispatch them to
* the main thread to be preattached. The volume preattachement is done
* in the main thread to avoid global volume lock contention.
*/
static void *
VInitVolumePackageThread(void *args)
{
struct vinitvolumepackage_thread_param *params;
struct DiskPartition64 *partition;
struct partition_queue *pq;
struct volume_init_queue *vq;
struct volume_init_batch *vb;
assert(args);
params = (struct vinitvolumepackage_thread_param *)args;
pq = params->pq;
vq = params->vq;
assert(pq);
assert(vq);
vb = (struct volume_init_batch*)malloc(sizeof(struct volume_init_batch));
assert(vb);
vb->thread = params->thread;
vb->last = 0;
vb->size = 0;
Log("Scanning partitions on thread %d of %d\n", params->thread, params->nthreads);
while((partition = VInitNextPartition(pq))) {
DIR *dirp;
VolId vid;
Log("Partition %s: pre-attaching volumes\n", partition->name);
dirp = opendir(VPartitionPath(partition));
if (!dirp) {
Log("opendir on Partition %s failed, errno=%d!\n", partition->name, errno);
continue;
}
while ((vid = VInitNextVolumeId(dirp))) {
Volume *vp = (Volume*)malloc(sizeof(Volume));
assert(vp);
memset(vp, 0, sizeof(Volume));
vp->device = partition->device;
vp->partition = partition;
vp->hashid = vid;
queue_Init(&vp->vnode_list);
assert(pthread_cond_init(&V_attachCV(vp), NULL) == 0);
vb->batch[vb->size++] = vp;
if (vb->size == VINIT_BATCH_MAX_SIZE) {
assert(pthread_mutex_lock(&vq->mutex) == 0);
queue_Append(vq, vb);
assert(pthread_cond_broadcast(&vq->cv) == 0);
assert(pthread_mutex_unlock(&vq->mutex) == 0);
vb = (struct volume_init_batch*)malloc(sizeof(struct volume_init_batch));
assert(vb);
vb->thread = params->thread;
vb->size = 0;
vb->last = 0;
}
}
closedir(dirp);
}
vb->last = 1;
assert(pthread_mutex_lock(&vq->mutex) == 0);
queue_Append(vq, vb);
assert(pthread_cond_broadcast(&vq->cv) == 0);
assert(pthread_mutex_unlock(&vq->mutex) == 0);
Log("Partition scan thread %d of %d ended\n", params->thread, params->nthreads);
free(params);
return NULL;
}
/**
* Read next element from the pre-populated partition list.
*/
static struct DiskPartition64*
VInitNextPartition(struct partition_queue *pq)
{
struct DiskPartition64 *partition;
struct diskpartition_queue_t *dp; /* queue element */
if (vinit_attach_abort) {
Log("Aborting volume preattach thread.\n");
return NULL;
}
/* get next partition to scan */
assert(pthread_mutex_lock(&pq->mutex) == 0);
if (queue_IsEmpty(pq)) {
assert(pthread_mutex_unlock(&pq->mutex) == 0);
return NULL;
}
dp = queue_First(pq, diskpartition_queue_t);
queue_Remove(dp);
assert(pthread_mutex_unlock(&pq->mutex) == 0);
assert(dp);
assert(dp->diskP);
partition = dp->diskP;
free(dp);
return partition;
}
/**
* Find next volume id on the partition.
*/
static VolId
VInitNextVolumeId(DIR *dirp)
{
struct dirent *d;
VolId vid = 0;
char *ext;
while((d = readdir(dirp))) {
if (vinit_attach_abort) {
Log("Aborting volume preattach thread.\n");
break;
}
ext = strrchr(d->d_name, '.');
if (d->d_name[0] == 'V' && ext && strcmp(ext, VHDREXT) == 0) {
vid = VolumeNumber(d->d_name);
if (vid) {
break;
}
Log("Warning: bogus volume header file: %s\n", d->d_name);
}
}
return vid;
}
/**
* Preattach volumes in batches to avoid lock contention.
*/
static int
VInitPreAttachVolumes(int nthreads, struct volume_init_queue *vq)
{
struct volume_init_batch *vb;
int i;
while (nthreads) {
/* dequeue next volume */
pthread_mutex_lock(&vq->mutex);
if (queue_IsEmpty(vq)) {
pthread_cond_wait(&vq->cv, &vq->mutex);
}
vb = queue_First(vq, volume_init_batch);
queue_Remove(vb);
pthread_mutex_unlock(&vq->mutex);
if (vb->size) {
VOL_LOCK;
for (i = 0; i<vb->size; i++) {
Volume *vp;
Volume *dup;
Error ec = 0;
vp = vb->batch[i];
dup = VLookupVolume_r(&ec, vp->hashid, NULL);
if (ec) {
Log("Error looking up volume, code=%d\n", ec);
}
else if (dup) {
Log("Warning: Duplicate volume id %d detected.\n", vp->hashid);
}
else {
/* put pre-attached volume onto the hash table
* and bring it up to the pre-attached state */
AddVolumeToHashTable(vp, vp->hashid);
AddVolumeToVByPList_r(vp);
VLRU_Init_Node_r(vp);
VChangeState_r(vp, VOL_STATE_PREATTACHED);
}
}
VOL_UNLOCK;
}
if (vb->last) {
nthreads--;
}
free(vb);
}
return 0;
}
#endif /* AFS_DEMAND_ATTACH_FS */
#if !defined(AFS_DEMAND_ATTACH_FS)
/*
* attach all volumes on a given disk partition
*/
@ -735,12 +1087,8 @@ VAttachVolumesByPartition(struct DiskPartition64 *diskP, int * nAttached, int *
if (p != NULL && strcmp(p, VHDREXT) == 0) {
Error error;
Volume *vp;
#ifdef AFS_DEMAND_ATTACH_FS
vp = VPreAttachVolumeByName(&error, diskP->name, dp->d_name);
#else /* AFS_DEMAND_ATTACH_FS */
vp = VAttachVolumeByName(&error, diskP->name, dp->d_name,
V_VOLUPD);
#endif /* AFS_DEMAND_ATTACH_FS */
(*(vp ? nAttached : nUnattached))++;
if (error == VOFFLINE)
Log("Volume %d stays offline (/vice/offline/%s exists)\n", VolumeNumber(dp->d_name), dp->d_name);
@ -749,11 +1097,9 @@ VAttachVolumesByPartition(struct DiskPartition64 *diskP, int * nAttached, int *
diskP->name, VolumeNumber(dp->d_name),
dp->d_name);
}
#if !defined(AFS_DEMAND_ATTACH_FS)
if (vp) {
VPutVolume(vp);
}
#endif /* AFS_DEMAND_ATTACH_FS */
}
}
@ -762,7 +1108,7 @@ done:
closedir(dirp);
return ret;
}
#endif /* !AFS_DEMAND_ATTACH_FS */
/***************************************************/
/* Shutdown routines */