From 2b4d2224bac4e656d7504ce2783450117b74dd47 Mon Sep 17 00:00:00 2001 From: Marcus Watts Date: Mon, 24 Jun 2002 17:30:34 +0000 Subject: [PATCH] ubik-pause-collapsing-20020624 I'm told that after we introduce ptserver nested groups we can expect to see periodic pauses in ubik operations, and this fixes the problem. if it happens, we can start with the UBIK_PAUSE code and go from there --- src/ubik/beacon.c | 6 +++++ src/ubik/disk.c | 13 ++++++++++ src/ubik/lock.c | 62 +++++++++++++++++++++++++++++++++++++++++++++ src/ubik/recovery.c | 47 ++++++++++++++++++++++++++++++++++ src/ubik/remote.c | 21 ++++++++++++--- src/ubik/ubik.c | 30 ++++++++++++++++++++++ src/ubik/ubik.p.h | 9 +++++++ 7 files changed, 184 insertions(+), 4 deletions(-) diff --git a/src/ubik/beacon.c b/src/ubik/beacon.c index 598f5d390b..140927fdbf 100644 --- a/src/ubik/beacon.c +++ b/src/ubik/beacon.c @@ -364,6 +364,9 @@ ubeacon_Interact() { } else ttid.counter = ubik_dbase->tidCounter+1; +#if defined(UBIK_PAUSE) + ubik_dbase->flags |= DBVOTING; +#endif /* UBIK_PAUSE */ /* now analyze return codes, counting up our votes */ yesVotes = 0; /* count how many to ensure we have quorum */ @@ -417,6 +420,9 @@ ubeacon_Interact() { if (amIMagic) yesVotes++; /* extra epsilon */ if (i < oldestYesVote) oldestYesVote = i; } +#if defined(UBIK_PAUSE) + ubik_dbase->flags &= ~DBVOTING; +#endif /* UBIK_PAUSE */ /* now decide if we have enough votes to become sync site. Note that we can still get enough votes even if we didn't for ourself. */ diff --git a/src/ubik/disk.c b/src/ubik/disk.c index a41aef44bb..15fb858fd2 100644 --- a/src/ubik/disk.c +++ b/src/ubik/disk.c @@ -820,6 +820,19 @@ udisk_end(atrans) struct ubik_trans *atrans; { struct ubik_dbase *dbase; +#if defined(UBIK_PAUSE) + /* Another thread is trying to lock this transaction. + * That can only be an RPC doing SDISK_Lock. + * Unlock the transaction, 'cause otherwise the other + * thread will never wake up. Don't free it because + * the caller will do that already. + */ + if (atrans->flags & TRSETLOCK) { + atrans->flags |= TRSTALE; + ulock_relLock(atrans); + return; + } +#endif /* UBIK_PAUSE */ if (!(atrans->flags & TRDONE)) udisk_abort(atrans); dbase = atrans->dbase; diff --git a/src/ubik/lock.c b/src/ubik/lock.c index e5d8a5b251..b764bc039d 100644 --- a/src/ubik/lock.c +++ b/src/ubik/lock.c @@ -96,10 +96,23 @@ ulock_getLock(atrans, atype, await) } /* Create new lock record and add to spec'd transaction: +#if defined(UBIK_PAUSE) + * locktype. Before doing that, set TRSETLOCK, + * to tell udisk_end that another thread (us) is waiting. +#else * locktype. This field also tells us if the thread is * waiting for a lock: It will be equal to LOCKWAIT. +#endif */ +#if defined(UBIK_PAUSE) + if (atrans->flag & TRSETLOCK) { + printf ("Ubik: Internal Error: TRSETLOCK already set?\n"); + return EBUSY; + } + atrans->flag |= TRSETLOCK; +#else atrans->locktype = LOCKWAIT; +#endif /* UBIK_PAUSE */ DBRELE(dbase); if (atype == LOCKREAD) { ObtainReadLock(&rwlock); @@ -108,6 +121,18 @@ ulock_getLock(atrans, atype, await) } DBHOLD(dbase); atrans->locktype = atype; +#if defined(UBIK_PAUSE) + atrans->flag &= ~TRSETLOCK; +#if 0 + /* We don't do this here, because this can only happen in SDISK_Lock, + * and there's already code there to catch this condition. + */ + if (atrans->flag & TRSTALE) { + udisk_end(atrans); + return UINTERNAL; + } +#endif +#endif /* UBIK_PAUSE */ /* *ubik_print("Ubik: DEBUG: Thread 0x%x took %s lock\n", lwp_cpptr, @@ -150,3 +175,40 @@ ulock_Debug(aparm) } } +#if defined(UBIK_PAUSE) +/* Find the TID of the current write lock (or the best approximation thereof) */ +ulock_FindWLock(struct ubik_dbase *dbase, struct ubik_tid *atid) +{ + register struct ubik_lock *tl; + register struct ubik_trans *tt, *best; + + best = 0; + for(tt=dbase->activeTrans; tt; tt=tt->next) { + if (tt->type != UBIK_WRITETRANS) continue; + if (!best || best->tid.counter > tt->tid.counter) { + best = tt; + } + for(tl=tt->activeLocks; tl; tl=tl->next) { + if (tl->type == LOCKWRITE) { + *atid = tt->tid; +#ifdef GRAND_PAUSE_DEBUGGING + ubik_print ("Found real write lock tid %d.%d\n", + atid->epoch, atid->counter); +#endif + return 0; + } + } + } + /* if we get here, no locks pending, return the best guess */ + if (best) { + *atid = best->tid; +#ifdef GRAND_PAUSE_DEBUGGING + ubik_print ("Found possible write transaction tid %d.%d\n", + atid->epoch, atid->counter); +#endif + return 0; + } + return EINVAL; +} +#endif /* UBIK_PAUSE */ + diff --git a/src/ubik/recovery.c b/src/ubik/recovery.c index f4b0a40f27..6878534c73 100644 --- a/src/ubik/recovery.c +++ b/src/ubik/recovery.c @@ -147,9 +147,13 @@ urecovery_CheckTid(atid) if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) { /* don't match, abort it */ /* If the thread is not waiting for lock - ok to end it */ +#if !defined(UBIK_PAUSE) if (ubik_currentTrans->locktype != LOCKWAIT) { +#endif /* UBIK_PAUSE */ udisk_end(ubik_currentTrans); +#if !defined(UBIK_PAUSE) } +#endif /* UBIK_PAUSE */ ubik_currentTrans = (struct ubik_trans *) 0; } } @@ -466,7 +470,15 @@ urecovery_Interact() { urecovery_state |= UBIK_RECFOUNDDB; urecovery_state &= ~UBIK_RECSENTDB; } +#if defined(UBIK_PAUSE) + /* it's not possible for UBIK_RECFOUNDDB not to be set here. + * However, we might have lost UBIK_RECSYNCSITE, and that + * IS important. + */ + if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */ +#else if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */ +#endif /* UBIK_PAUSE */ /* If we, the sync site, do not have the best db version, then * go and get it from the server that does. @@ -475,7 +487,11 @@ urecovery_Interact() { urecovery_state |= UBIK_RECHAVEDB; } else { /* we don't have the best version; we should fetch it. */ +#if defined(UBIK_PAUSE) + DBHOLD(ubik_dbase); +#else ObtainWriteLock(&ubik_dbase->versionLock); +#endif /* UBIK_PAUSE */ urecovery_AbortAll(ubik_dbase); /* Rx code to do the Bulk fetch */ @@ -551,8 +567,15 @@ FetchEndCall: } udisk_Invalidate(ubik_dbase, 0); /* data has changed */ LWP_NoYieldSignal(&ubik_dbase->version); +#if defined(UBIK_PAUSE) + DBRELE(ubik_dbase); +#else ReleaseWriteLock(&ubik_dbase->versionLock); +#endif /* UBIK_PAUSE */ } +#if defined(UBIK_PAUSE) + if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */ +#endif /* UBIK_PAUSE */ if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */ /* If the database was newly initialized, then when we establish quorum, write @@ -561,7 +584,11 @@ FetchEndCall: * database and overwrite this one. */ if (ubik_dbase->version.epoch == 1) { +#if defined(UBIK_PAUSE) + DBHOLD(ubik_dbase); +#else ObtainWriteLock(&ubik_dbase->versionLock); +#endif /* UBIK_PAUSE */ urecovery_AbortAll(ubik_dbase); ubik_epochTime = 2; ubik_dbase->version.epoch = ubik_epochTime; @@ -569,7 +596,11 @@ FetchEndCall: code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version); udisk_Invalidate(ubik_dbase, 0); /* data may have changed */ LWP_NoYieldSignal(&ubik_dbase->version); +#if defined(UBIK_PAUSE) + DBRELE(ubik_dbase); +#else ReleaseWriteLock(&ubik_dbase->versionLock); +#endif /* UBIK_PAUSE */ } /* Check the other sites and send the database to them if they @@ -579,7 +610,11 @@ FetchEndCall: /* now propagate out new version to everyone else */ dbok = 1; /* start off assuming they all worked */ +#if defined(UBIK_PAUSE) + DBHOLD(ubik_dbase); +#else ObtainWriteLock(&ubik_dbase->versionLock); +#endif /* UBIK_PAUSE */ /* * Check if a write transaction is in progress. We can't send the * db when a write is in progress here because the db would be @@ -595,11 +630,19 @@ FetchEndCall: tv.tv_sec = 0; tv.tv_usec = 50000; while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) { +#if defined(UBIK_PAUSE) + DBRELE(ubik_dbase); +#else ReleaseWriteLock(&ubik_dbase->versionLock); +#endif /* UBIK_PAUSE */ /* sleep for a little while */ IOMGR_Select(0, 0, 0, 0, &tv); tv.tv_usec += 10000; safety++; +#if defined(UBIK_PAUSE) + DBHOLD(ubik_dbase); +#else ObtainWriteLock(&ubik_dbase->versionLock); +#endif /* UBIK_PAUSE */ } } @@ -658,7 +701,11 @@ StoreEndCall: ts->currentDB = 1; } } +#if defined(UBIK_PAUSE) + DBRELE(ubik_dbase); +#else ReleaseWriteLock(&ubik_dbase->versionLock); +#endif /* UBIK_PAUSE */ if (dbok) urecovery_state |= UBIK_RECSENTDB; } } diff --git a/src/ubik/remote.c b/src/ubik/remote.c index 513b4fb6e5..35f22a1cf2 100644 --- a/src/ubik/remote.c +++ b/src/ubik/remote.c @@ -72,9 +72,13 @@ afs_int32 SDISK_Begin(rxcall, atid) urecovery_CheckTid(atid); if (ubik_currentTrans) { /* If the thread is not waiting for lock - ok to end it */ +#if !defined(UBIK_PAUSE) if (ubik_currentTrans->locktype != LOCKWAIT) { +#endif /* UBIK_PAUSE */ udisk_end(ubik_currentTrans); +#if !defined(UBIK_PAUSE) } +#endif /* UBIK_PAUSE */ ubik_currentTrans = (struct ubik_trans *) 0; } code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans); @@ -153,9 +157,13 @@ afs_int32 SDISK_ReleaseLocks(rxcall, atid) } /* If the thread is not waiting for lock - ok to end it */ +#if !defined(UBIK_PAUSE) if (ubik_currentTrans->locktype != LOCKWAIT) { - udisk_end(ubik_currentTrans); - } +#endif /* UBIK_PAUSE */ + udisk_end(ubik_currentTrans); +#if !defined(UBIK_PAUSE) + } +#endif /* UBIK_PAUSE */ ubik_currentTrans = (struct ubik_trans *) 0; DBRELE(dbase); return 0; @@ -190,9 +198,13 @@ afs_int32 SDISK_Abort(rxcall, atid) code = udisk_abort(ubik_currentTrans); /* If the thread is not waiting for lock - ok to end it */ +#if !defined(UBIK_PAUSE) if (ubik_currentTrans->locktype != LOCKWAIT) { - udisk_end(ubik_currentTrans); - } +#endif /* UBIK_PAUSE */ + udisk_end(ubik_currentTrans); +#if !defined(UBIK_PAUSE) + } +#endif /* UBIK_PAUSE */ ubik_currentTrans = (struct ubik_trans *) 0; DBRELE(dbase); return code; @@ -606,6 +618,7 @@ UbikInterfaceAddr *inAddr, *outAddr; for ( i=0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++) ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i]))); ubik_print("\n"); + fflush(stdout); fflush(stderr); printServerInfo(); return UBADHOST; } diff --git a/src/ubik/ubik.c b/src/ubik/ubik.c index 12772caa22..f4854be22b 100644 --- a/src/ubik/ubik.c +++ b/src/ubik/ubik.c @@ -323,9 +323,35 @@ static int BeginTrans(dbase, transMode, transPtr, readAny) struct ubik_trans *jt; register struct ubik_trans *tt; register afs_int32 code; +#if defined(UBIK_PAUSE) + int count; +#endif /* UBIK_PAUSE */ if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE; DBHOLD(dbase); +#if defined(UBIK_PAUSE) + /* if we're polling the slave sites, wait until the returns + * are all in. Otherwise, the urecovery_CheckTid call may + * glitch us. + */ + if (transMode == UBIK_WRITETRANS) + for (count = 75; dbase->flags & DBVOTING; --count) { + DBRELE(dbase); +#ifdef GRAND_PAUSE_DEBUGGING + if (count==75) + fprintf (stderr,"%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n", time(0), ntohs(ubik_callPortal)); + else +#endif + if (count <= 0) { +#if 1 + fprintf (stderr,"%ld: myport=%d: BeginTrans failed because of voting conflict\n", time(0), ntohs(ubik_callPortal)); +#endif + return UNOQUORUM; /* a white lie */ + } + IOMGR_Sleep(2); + DBHOLD(dbase); + } +#endif /* UBIK_PAUSE */ if (urecovery_AllBetter(dbase, readAny)==0) { DBRELE(dbase); return UNOQUORUM; @@ -364,7 +390,11 @@ static int BeginTrans(dbase, transMode, transPtr, readAny) if (transMode == UBIK_WRITETRANS) { /* for a write trans, we have to keep track of the write tid counter too */ +#if defined(UBIK_PAUSE) + dbase->writeTidCounter = tt->tid.counter; +#else dbase->writeTidCounter += 2; +#endif /* UBIK_PAUSE */ /* next try to start transaction on appropriate number of machines */ code = ContactQuorum(DISK_Begin, tt, 0); diff --git a/src/ubik/ubik.p.h b/src/ubik/ubik.p.h index 9f0d1a4fd9..5d6cb36a7a 100644 --- a/src/ubik/ubik.p.h +++ b/src/ubik/ubik.p.h @@ -47,7 +47,9 @@ /* ubik_lock types */ #define LOCKREAD 1 #define LOCKWRITE 2 +#if !defined(UBIK_PAUSE) #define LOCKWAIT 3 +#endif /* UBIK_PAUSE */ /* ubik client flags */ #define UPUBIKONLY 1 /* only check servers presumed functional */ @@ -187,11 +189,18 @@ extern char *ubik_CheckRXSecurityRock; /* ubik_dbase flags */ #define DBWRITING 1 /* are any write trans. in progress */ +#if defined(UBIK_PAUSE) +#define DBVOTING 2 /* the beacon task is polling */ +#endif /* UBIK_PAUSE */ /* ubik trans flags */ #define TRDONE 1 /* commit or abort done */ #define TRABORT 2 /* if TRDONE, tells if aborted */ #define TRREADANY 4 /* read any data available in trans */ +#if defined(UBIK_PAUSE) +#define TRSETLOCK 8 /* SetLock is using trans */ +#define TRSTALE 16 /* udisk_end during getLock */ +#endif /* UBIK_PAUSE */ /* ubik_lock flags */ #define LWANT 1