DEVEL15-windows-bkgdaemon-retry-failed-requests-20070203

background request functions such as BkgFetch and BkgStore do not
return success or failure.  the bkgDaemon thread simply executes
the request and discards it regardless of whether or not the request
was completed.  this can result in background store event failing
to be written to the file server.

modify the background request functions to return success or failure.

modify the background daemon to put failed requests back onto the queue
if the reason for failure was transient.

modify the background daemon only to process requests when the servers
for the volume are marked up.


(cherry picked from commit 37d16dd70b)
This commit is contained in:
Jeffrey Altman 2007-02-04 02:46:25 +00:00
parent fdccb816f9
commit 9cdf31e816
9 changed files with 138 additions and 37 deletions

View File

@ -844,11 +844,57 @@ long cm_ConnByServer(cm_server_t *serverp, cm_user_t *userp, cm_conn_t **connpp)
return 0;
}
long cm_ServerAvailable(struct cm_fid *fidp, struct cm_user *userp)
{
long code;
cm_req_t req;
cm_serverRef_t **serverspp;
cm_serverRef_t *tsrp;
cm_server_t *tsp;
int someBusy = 0, someOffline = 0, allOffline = 1, allBusy = 1, allDown = 1;
cm_InitReq(&req);
code = cm_GetServerList(fidp, userp, &req, &serverspp);
if (code)
return 0;
lock_ObtainWrite(&cm_serverLock);
for (tsrp = *serverspp; tsrp; tsrp=tsrp->next) {
tsp = tsrp->server;
cm_GetServerNoLock(tsp);
if (!(tsp->flags & CM_SERVERFLAG_DOWN)) {
allDown = 0;
if (tsrp->status == busy) {
allOffline = 0;
someBusy = 1;
} else if (tsrp->status == offline) {
allBusy = 0;
someOffline = 1;
} else {
allOffline = 0;
allBusy = 0;
}
}
cm_PutServerNoLock(tsp);
}
lock_ReleaseWrite(&cm_serverLock);
cm_FreeServerList(serverspp);
if (allDown)
return 0;
else if (allBusy)
return 0;
else if (allOffline || (someBusy && someOffline))
return 0;
else
return 1;
}
long cm_Conn(struct cm_fid *fidp, struct cm_user *userp, cm_req_t *reqp,
cm_conn_t **connpp)
{
long code;
cm_serverRef_t **serverspp;
code = cm_GetServerList(fidp, userp, reqp, &serverspp);

View File

@ -123,4 +123,6 @@ extern struct rx_connection * cm_GetRxConn(cm_conn_t *connp);
extern void cm_ForceNewConnections(cm_server_t *serverp);
extern long cm_ServerAvailable(struct cm_fid *fidp, struct cm_user *userp);
#endif /* __CM_CONN_H_ENV__ */

View File

@ -71,6 +71,7 @@ void cm_IpAddrDaemon(long parm)
void cm_BkgDaemon(long parm)
{
cm_bkgRequest_t *rp;
afs_int32 code;
rx_StartClientThread();
@ -83,24 +84,55 @@ void cm_BkgDaemon(long parm)
}
/* we found a request */
rp = cm_bkgListEndp;
cm_bkgListEndp = (cm_bkgRequest_t *) osi_QPrev(&rp->q);
osi_QRemove((osi_queue_t **) &cm_bkgListp, &rp->q);
for (rp = cm_bkgListEndp; rp; rp = (cm_bkgRequest_t *) osi_QPrev(&rp->q))
{
if (cm_ServerAvailable(&rp->scp->fid, rp->userp))
break;
}
if (rp == NULL) {
/* we couldn't find a request that we could process at the current time */
lock_ReleaseWrite(&cm_daemonLock);
Sleep(1000);
lock_ObtainWrite(&cm_daemonLock);
continue;
}
osi_QRemoveHT((osi_queue_t **) &cm_bkgListp, (osi_queue_t **) &cm_bkgListEndp, &rp->q);
osi_assert(cm_bkgQueueCount-- > 0);
lock_ReleaseWrite(&cm_daemonLock);
#ifdef DEBUG_REFCOUNT
osi_Log2(afsd_logp,"cm_BkgDaemon (before) scp 0x%x ref %d",rp->scp, rp->scp->refCount);
#endif
(*rp->procp)(rp->scp, rp->p1, rp->p2, rp->p3, rp->p4, rp->userp);
code = (*rp->procp)(rp->scp, rp->p1, rp->p2, rp->p3, rp->p4, rp->userp);
#ifdef DEBUG_REFCOUNT
osi_Log2(afsd_logp,"cm_BkgDaemon (after) scp 0x%x ref %d",rp->scp, rp->scp->refCount);
#endif
cm_ReleaseUser(rp->userp);
cm_ReleaseSCache(rp->scp);
free(rp);
if (code == 0) {
cm_ReleaseUser(rp->userp);
cm_ReleaseSCache(rp->scp);
free(rp);
}
lock_ObtainWrite(&cm_daemonLock);
switch ( code ) {
case CM_ERROR_TIMEDOUT:
case CM_ERROR_RETRY:
case CM_ERROR_WOULDBLOCK:
case CM_ERROR_ALLBUSY:
case CM_ERROR_ALLDOWN:
case CM_ERROR_ALLOFFLINE:
case CM_ERROR_PARTIALWRITE:
osi_Log2(afsd_logp,"cm_BkgDaemon re-queueing failed request 0x%p code 0x%x",
rp, code);
cm_bkgQueueCount++;
osi_QAddT((osi_queue_t **) &cm_bkgListp, (osi_queue_t **)&cm_bkgListEndp, &rp->q);
break;
default:
osi_Log2(afsd_logp,"cm_BkgDaemon failed request dropped 0x%p code 0x%x",
rp, code);
}
}
lock_ReleaseWrite(&cm_daemonLock);
}
@ -343,7 +375,7 @@ void cm_Daemon(long parm)
if (now > lastVolCheck + cm_daemonCheckVolInterval) {
lastVolCheck = now;
cm_CheckVolumes();
cm_RefreshVolumes();
now = osi_Time();
}

View File

@ -24,7 +24,7 @@ void cm_DaemonShutdown(void);
void cm_InitDaemon(int nDaemons);
typedef void (cm_bkgProc_t)(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3,
typedef afs_int32 (cm_bkgProc_t)(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3,
afs_uint32 p4, struct cm_user *up);
typedef struct cm_bkgRequest {

View File

@ -160,32 +160,43 @@ long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags,
#ifdef AFS_LARGEFILES
if (SERVERHAS64BIT(connp)) {
osi_Log4(afsd_logp, "CALL StoreData64 scp 0x%p, offset 0x%x:%08x, length 0x%x",
osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData64 scp 0x%p, offset 0x%x:%08x, length 0x%x",
scp, biod.offset.HighPart, biod.offset.LowPart, nbytes);
code = StartRXAFS_StoreData64(callp, &tfid, &inStatus,
biod.offset.QuadPart,
nbytes,
truncPos.QuadPart);
if (code)
osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData64 FAILURE, code 0x%x", code);
else
osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData64 SUCCESS");
} else {
if (require_64bit_ops) {
osi_Log0(afsd_logp, "Skipping StoreData. The operation requires StoreData64");
osi_Log0(afsd_logp, "Skipping StartRXAFS_StoreData. The operation requires large file support in the server.");
code = CM_ERROR_TOOBIG;
} else {
osi_Log4(afsd_logp, "CALL StoreData scp 0x%p, offset 0x%x:%08x, length 0x%x",
osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData scp 0x%p, offset 0x%x:%08x, length 0x%x",
scp, biod.offset.HighPart, biod.offset.LowPart, nbytes);
code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
biod.offset.LowPart, nbytes, truncPos.LowPart);
if (code)
osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData FAILURE, code 0x%x", code);
else
osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData SUCCESS");
}
}
#else
osi_Log4(afsd_logp, "CALL StoreData scp 0x%p, offset 0x%x:%08x, length 0x%x",
osi_Log4(afsd_logp, "CALL StartRXAFS_StoreData scp 0x%p, offset 0x%x:%08x, length 0x%x",
scp, biod.offset.HighPart, biod.offset.LowPart, nbytes);
code = StartRXAFS_StoreData(callp, &tfid, &inStatus,
biod.offset.LowPart, nbytes, truncPos.LowPart);
if (code)
osi_Log1(afsd_logp, "CALL StartRXAFS_StoreData FAILURE, code 0x%x", code);
else
osi_Log0(afsd_logp, "CALL StartRXAFS_StoreData SUCCESS");
#endif
if (code == 0) {
@ -214,20 +225,21 @@ long cm_BufWrite(void *vscp, osi_hyper_t *offsetp, long length, long flags,
}
nbytes -= wbytes;
} /* while more bytes to write */
} /* if RPC started successfully */
else {
osi_Log2(afsd_logp, "StartRXAFS_StoreData?? scp 0x%p failed (%lX)",scp,code);
}
} /* if RPC started successfully */
if (code == 0) {
if (SERVERHAS64BIT(connp)) {
code = EndRXAFS_StoreData64(callp, &outStatus, &volSync);
if (code)
osi_Log2(afsd_logp, "EndRXAFS_StoreData64 scp 0x%p failed (%lX)", scp, code);
osi_Log2(afsd_logp, "EndRXAFS_StoreData64 FAILURE scp 0x%p code %lX", scp, code);
else
osi_Log0(afsd_logp, "EndRXAFS_StoreData64 SUCCESS");
} else {
code = EndRXAFS_StoreData(callp, &outStatus, &volSync);
if (code)
osi_Log2(afsd_logp, "EndRXAFS_StoreData scp 0x%p failed (%lX)",scp,code);
osi_Log2(afsd_logp, "EndRXAFS_StoreData FAILURE scp 0x%p code %lX",scp,code);
else
osi_Log0(afsd_logp, "EndRXAFS_StoreData SUCCESS");
}
}
@ -591,21 +603,22 @@ long cm_CheckFetchRange(cm_scache_t *scp, osi_hyper_t *startBasep, long length,
return code;
}
void cm_BkgStore(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, afs_uint32 p4,
cm_user_t *userp)
afs_int32
cm_BkgStore(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, afs_uint32 p4,
cm_user_t *userp)
{
osi_hyper_t toffset;
long length;
cm_req_t req;
long code;
long code = 0;
if (scp->flags & CM_SCACHEFLAG_DELETED) {
osi_Log4(afsd_logp, "Skipping BKG store - Deleted scp 0x%p, offset 0x%x:%08x, length 0x%x", scp, p2, p1, p3);
} else {
cm_InitReq(&req);
#ifdef NO_BKG_RETRIES
/* Retries will be performed by the BkgDaemon thread if appropriate */
req.flags |= CM_REQ_NORETRY;
#endif
toffset.LowPart = p1;
toffset.HighPart = p2;
@ -614,11 +627,15 @@ void cm_BkgStore(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3,
osi_Log4(afsd_logp, "Starting BKG store scp 0x%p, offset 0x%x:%08x, length 0x%x", scp, p2, p1, p3);
code = cm_BufWrite(scp, &toffset, length, /* flags */ 0, userp, &req);
osi_Log4(afsd_logp, "Finished BKG store scp 0x%p, offset 0x%x:%08x, code 0x%x", scp, p2, p1, code);
}
lock_ObtainMutex(&scp->mx);
cm_SyncOpDone(scp, NULL, CM_SCACHESYNC_ASYNCSTORE);
lock_ReleaseMutex(&scp->mx);
return code;
}
/* Called with scp locked */
@ -639,9 +656,11 @@ void cm_ClearPrefetchFlag(long code, cm_scache_t *scp, osi_hyper_t *base)
scp->flags &= ~CM_SCACHEFLAG_PREFETCHING;
}
/* do the prefetch */
void cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, afs_uint32 p4,
cm_user_t *userp)
/* do the prefetch. if the prefetch fails, return 0 (success)
* because there is no harm done. */
afs_int32
cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, afs_uint32 p4,
cm_user_t *userp)
{
long length;
osi_hyper_t base;
@ -651,6 +670,8 @@ void cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p
cm_req_t req;
cm_InitReq(&req);
/* Retries will be performed by the BkgDaemon thread if appropriate */
req.flags |= CM_REQ_NORETRY;
base.LowPart = p1;
@ -668,7 +689,7 @@ void cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p
lock_ReleaseMutex(&scp->mx);
if (bp)
buf_Release(bp);
return;
return 0;
}
code = cm_GetBuffer(scp, bp, &cpff, userp, &req);
@ -676,7 +697,7 @@ void cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p
cm_ClearPrefetchFlag(code, scp, &base);
lock_ReleaseMutex(&scp->mx);
buf_Release(bp);
return;
return code;
}
/* a read was issued to offsetp, and we have to determine whether we should

View File

@ -42,10 +42,10 @@ extern void cm_ReleaseBIOD(cm_bulkIO_t *biop, int isStore);
extern long cm_SetupStoreBIOD(cm_scache_t *scp, osi_hyper_t *inOffsetp,
long inSize, cm_bulkIO_t *biop, cm_user_t *userp, cm_req_t *reqp);
extern void cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, afs_uint32 p4,
extern afs_int32 cm_BkgPrefetch(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, afs_uint32 p4,
struct cm_user *userp);
extern void cm_BkgStore(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, afs_uint32 p4,
extern afs_int32 cm_BkgStore(cm_scache_t *scp, afs_uint32 p1, afs_uint32 p2, afs_uint32 p3, afs_uint32 p4,
struct cm_user *userp);
extern void cm_ConsiderPrefetch(cm_scache_t *scp, osi_hyper_t *offsetp,

View File

@ -1155,7 +1155,7 @@ long cm_IoctlGag(struct smb_ioctl *ioctlp, struct cm_user *userp)
long cm_IoctlCheckVolumes(struct smb_ioctl *ioctlp, struct cm_user *userp)
{
cm_CheckVolumes();
cm_RefreshVolumes();
return 0;
}

View File

@ -587,7 +587,7 @@ long cm_GetROVolumeID(cm_volume_t *volp)
return id;
}
void cm_CheckVolumes(void)
void cm_RefreshVolumes(void)
{
cm_volume_t *volp;
@ -613,7 +613,7 @@ void cm_CheckVolumes(void)
** Finds all volumes that reside on this server and reorders their
** RO list according to the changed rank of server.
*/
void cm_ChangeRankVolume(cm_server_t *tsp)
void cm_ChangeRankVolume(cm_server_t *tsp)
{
int code;
cm_volume_t* volp;

View File

@ -54,7 +54,7 @@ extern cm_serverRef_t **cm_GetVolServers(cm_volume_t *volp, unsigned long volume
extern void cm_ChangeRankVolume(cm_server_t *tsp);
extern void cm_CheckVolumes(void);
extern void cm_RefreshVolumes(void);
extern long cm_ValidateVolume(void);