ubik-pause-collapsing-20020624

I'm told that after we introduce ptserver nested groups we can expect to
see periodic pauses in ubik operations, and this fixes the problem. if it
happens, we can start with the UBIK_PAUSE code and go from there
This commit is contained in:
Marcus Watts 2002-06-24 17:30:34 +00:00 committed by Derrick Brashear
parent fe1648ca02
commit 2b4d2224ba
7 changed files with 184 additions and 4 deletions

View File

@ -364,6 +364,9 @@ ubeacon_Interact() {
}
else
ttid.counter = ubik_dbase->tidCounter+1;
#if defined(UBIK_PAUSE)
ubik_dbase->flags |= DBVOTING;
#endif /* UBIK_PAUSE */
/* now analyze return codes, counting up our votes */
yesVotes = 0; /* count how many to ensure we have quorum */
@ -417,6 +420,9 @@ ubeacon_Interact() {
if (amIMagic) yesVotes++; /* extra epsilon */
if (i < oldestYesVote) oldestYesVote = i;
}
#if defined(UBIK_PAUSE)
ubik_dbase->flags &= ~DBVOTING;
#endif /* UBIK_PAUSE */
/* now decide if we have enough votes to become sync site.
Note that we can still get enough votes even if we didn't for ourself. */

View File

@ -820,6 +820,19 @@ udisk_end(atrans)
struct ubik_trans *atrans; {
struct ubik_dbase *dbase;
#if defined(UBIK_PAUSE)
/* Another thread is trying to lock this transaction.
* That can only be an RPC doing SDISK_Lock.
* Unlock the transaction, 'cause otherwise the other
* thread will never wake up. Don't free it because
* the caller will do that already.
*/
if (atrans->flags & TRSETLOCK) {
atrans->flags |= TRSTALE;
ulock_relLock(atrans);
return;
}
#endif /* UBIK_PAUSE */
if (!(atrans->flags & TRDONE)) udisk_abort(atrans);
dbase = atrans->dbase;

View File

@ -96,10 +96,23 @@ ulock_getLock(atrans, atype, await)
}
/* Create new lock record and add to spec'd transaction:
#if defined(UBIK_PAUSE)
* locktype. Before doing that, set TRSETLOCK,
* to tell udisk_end that another thread (us) is waiting.
#else
* locktype. This field also tells us if the thread is
* waiting for a lock: It will be equal to LOCKWAIT.
#endif
*/
#if defined(UBIK_PAUSE)
if (atrans->flag & TRSETLOCK) {
printf ("Ubik: Internal Error: TRSETLOCK already set?\n");
return EBUSY;
}
atrans->flag |= TRSETLOCK;
#else
atrans->locktype = LOCKWAIT;
#endif /* UBIK_PAUSE */
DBRELE(dbase);
if (atype == LOCKREAD) {
ObtainReadLock(&rwlock);
@ -108,6 +121,18 @@ ulock_getLock(atrans, atype, await)
}
DBHOLD(dbase);
atrans->locktype = atype;
#if defined(UBIK_PAUSE)
atrans->flag &= ~TRSETLOCK;
#if 0
/* We don't do this here, because this can only happen in SDISK_Lock,
* and there's already code there to catch this condition.
*/
if (atrans->flag & TRSTALE) {
udisk_end(atrans);
return UINTERNAL;
}
#endif
#endif /* UBIK_PAUSE */
/*
*ubik_print("Ubik: DEBUG: Thread 0x%x took %s lock\n", lwp_cpptr,
@ -150,3 +175,40 @@ ulock_Debug(aparm)
}
}
#if defined(UBIK_PAUSE)
/* Find the TID of the current write lock (or the best approximation thereof) */
ulock_FindWLock(struct ubik_dbase *dbase, struct ubik_tid *atid)
{
register struct ubik_lock *tl;
register struct ubik_trans *tt, *best;
best = 0;
for(tt=dbase->activeTrans; tt; tt=tt->next) {
if (tt->type != UBIK_WRITETRANS) continue;
if (!best || best->tid.counter > tt->tid.counter) {
best = tt;
}
for(tl=tt->activeLocks; tl; tl=tl->next) {
if (tl->type == LOCKWRITE) {
*atid = tt->tid;
#ifdef GRAND_PAUSE_DEBUGGING
ubik_print ("Found real write lock tid %d.%d\n",
atid->epoch, atid->counter);
#endif
return 0;
}
}
}
/* if we get here, no locks pending, return the best guess */
if (best) {
*atid = best->tid;
#ifdef GRAND_PAUSE_DEBUGGING
ubik_print ("Found possible write transaction tid %d.%d\n",
atid->epoch, atid->counter);
#endif
return 0;
}
return EINVAL;
}
#endif /* UBIK_PAUSE */

View File

@ -147,9 +147,13 @@ urecovery_CheckTid(atid)
if (atid->epoch != ubik_currentTrans->tid.epoch || atid->counter > ubik_currentTrans->tid.counter) {
/* don't match, abort it */
/* If the thread is not waiting for lock - ok to end it */
#if !defined(UBIK_PAUSE)
if (ubik_currentTrans->locktype != LOCKWAIT) {
#endif /* UBIK_PAUSE */
udisk_end(ubik_currentTrans);
#if !defined(UBIK_PAUSE)
}
#endif /* UBIK_PAUSE */
ubik_currentTrans = (struct ubik_trans *) 0;
}
}
@ -466,7 +470,15 @@ urecovery_Interact() {
urecovery_state |= UBIK_RECFOUNDDB;
urecovery_state &= ~UBIK_RECSENTDB;
}
#if defined(UBIK_PAUSE)
/* it's not possible for UBIK_RECFOUNDDB not to be set here.
* However, we might have lost UBIK_RECSYNCSITE, and that
* IS important.
*/
if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
#else
if (!(urecovery_state & UBIK_RECFOUNDDB)) continue; /* not ready */
#endif /* UBIK_PAUSE */
/* If we, the sync site, do not have the best db version, then
* go and get it from the server that does.
@ -475,7 +487,11 @@ urecovery_Interact() {
urecovery_state |= UBIK_RECHAVEDB;
} else {
/* we don't have the best version; we should fetch it. */
#if defined(UBIK_PAUSE)
DBHOLD(ubik_dbase);
#else
ObtainWriteLock(&ubik_dbase->versionLock);
#endif /* UBIK_PAUSE */
urecovery_AbortAll(ubik_dbase);
/* Rx code to do the Bulk fetch */
@ -551,8 +567,15 @@ FetchEndCall:
}
udisk_Invalidate(ubik_dbase, 0); /* data has changed */
LWP_NoYieldSignal(&ubik_dbase->version);
#if defined(UBIK_PAUSE)
DBRELE(ubik_dbase);
#else
ReleaseWriteLock(&ubik_dbase->versionLock);
#endif /* UBIK_PAUSE */
}
#if defined(UBIK_PAUSE)
if (!(urecovery_state & UBIK_RECSYNCSITE)) continue; /* lost sync */
#endif /* UBIK_PAUSE */
if (!(urecovery_state & UBIK_RECHAVEDB)) continue; /* not ready */
/* If the database was newly initialized, then when we establish quorum, write
@ -561,7 +584,11 @@ FetchEndCall:
* database and overwrite this one.
*/
if (ubik_dbase->version.epoch == 1) {
#if defined(UBIK_PAUSE)
DBHOLD(ubik_dbase);
#else
ObtainWriteLock(&ubik_dbase->versionLock);
#endif /* UBIK_PAUSE */
urecovery_AbortAll(ubik_dbase);
ubik_epochTime = 2;
ubik_dbase->version.epoch = ubik_epochTime;
@ -569,7 +596,11 @@ FetchEndCall:
code = (*ubik_dbase->setlabel) (ubik_dbase, 0, &ubik_dbase->version);
udisk_Invalidate(ubik_dbase, 0); /* data may have changed */
LWP_NoYieldSignal(&ubik_dbase->version);
#if defined(UBIK_PAUSE)
DBRELE(ubik_dbase);
#else
ReleaseWriteLock(&ubik_dbase->versionLock);
#endif /* UBIK_PAUSE */
}
/* Check the other sites and send the database to them if they
@ -579,7 +610,11 @@ FetchEndCall:
/* now propagate out new version to everyone else */
dbok = 1; /* start off assuming they all worked */
#if defined(UBIK_PAUSE)
DBHOLD(ubik_dbase);
#else
ObtainWriteLock(&ubik_dbase->versionLock);
#endif /* UBIK_PAUSE */
/*
* Check if a write transaction is in progress. We can't send the
* db when a write is in progress here because the db would be
@ -595,11 +630,19 @@ FetchEndCall:
tv.tv_sec = 0;
tv.tv_usec = 50000;
while ((ubik_dbase->flags & DBWRITING) && (safety < 500)) {
#if defined(UBIK_PAUSE)
DBRELE(ubik_dbase);
#else
ReleaseWriteLock(&ubik_dbase->versionLock);
#endif /* UBIK_PAUSE */
/* sleep for a little while */
IOMGR_Select(0, 0, 0, 0, &tv);
tv.tv_usec += 10000; safety++;
#if defined(UBIK_PAUSE)
DBHOLD(ubik_dbase);
#else
ObtainWriteLock(&ubik_dbase->versionLock);
#endif /* UBIK_PAUSE */
}
}
@ -658,7 +701,11 @@ StoreEndCall:
ts->currentDB = 1;
}
}
#if defined(UBIK_PAUSE)
DBRELE(ubik_dbase);
#else
ReleaseWriteLock(&ubik_dbase->versionLock);
#endif /* UBIK_PAUSE */
if (dbok) urecovery_state |= UBIK_RECSENTDB;
}
}

View File

@ -72,9 +72,13 @@ afs_int32 SDISK_Begin(rxcall, atid)
urecovery_CheckTid(atid);
if (ubik_currentTrans) {
/* If the thread is not waiting for lock - ok to end it */
#if !defined(UBIK_PAUSE)
if (ubik_currentTrans->locktype != LOCKWAIT) {
#endif /* UBIK_PAUSE */
udisk_end(ubik_currentTrans);
#if !defined(UBIK_PAUSE)
}
#endif /* UBIK_PAUSE */
ubik_currentTrans = (struct ubik_trans *) 0;
}
code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
@ -153,9 +157,13 @@ afs_int32 SDISK_ReleaseLocks(rxcall, atid)
}
/* If the thread is not waiting for lock - ok to end it */
#if !defined(UBIK_PAUSE)
if (ubik_currentTrans->locktype != LOCKWAIT) {
udisk_end(ubik_currentTrans);
}
#endif /* UBIK_PAUSE */
udisk_end(ubik_currentTrans);
#if !defined(UBIK_PAUSE)
}
#endif /* UBIK_PAUSE */
ubik_currentTrans = (struct ubik_trans *) 0;
DBRELE(dbase);
return 0;
@ -190,9 +198,13 @@ afs_int32 SDISK_Abort(rxcall, atid)
code = udisk_abort(ubik_currentTrans);
/* If the thread is not waiting for lock - ok to end it */
#if !defined(UBIK_PAUSE)
if (ubik_currentTrans->locktype != LOCKWAIT) {
udisk_end(ubik_currentTrans);
}
#endif /* UBIK_PAUSE */
udisk_end(ubik_currentTrans);
#if !defined(UBIK_PAUSE)
}
#endif /* UBIK_PAUSE */
ubik_currentTrans = (struct ubik_trans *) 0;
DBRELE(dbase);
return code;
@ -606,6 +618,7 @@ UbikInterfaceAddr *inAddr, *outAddr;
for ( i=0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
ubik_print("\n");
fflush(stdout); fflush(stderr);
printServerInfo();
return UBADHOST;
}

View File

@ -323,9 +323,35 @@ static int BeginTrans(dbase, transMode, transPtr, readAny)
struct ubik_trans *jt;
register struct ubik_trans *tt;
register afs_int32 code;
#if defined(UBIK_PAUSE)
int count;
#endif /* UBIK_PAUSE */
if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE;
DBHOLD(dbase);
#if defined(UBIK_PAUSE)
/* if we're polling the slave sites, wait until the returns
* are all in. Otherwise, the urecovery_CheckTid call may
* glitch us.
*/
if (transMode == UBIK_WRITETRANS)
for (count = 75; dbase->flags & DBVOTING; --count) {
DBRELE(dbase);
#ifdef GRAND_PAUSE_DEBUGGING
if (count==75)
fprintf (stderr,"%ld: myport=%d: BeginTrans is waiting 'cause of voting conflict\n", time(0), ntohs(ubik_callPortal));
else
#endif
if (count <= 0) {
#if 1
fprintf (stderr,"%ld: myport=%d: BeginTrans failed because of voting conflict\n", time(0), ntohs(ubik_callPortal));
#endif
return UNOQUORUM; /* a white lie */
}
IOMGR_Sleep(2);
DBHOLD(dbase);
}
#endif /* UBIK_PAUSE */
if (urecovery_AllBetter(dbase, readAny)==0) {
DBRELE(dbase);
return UNOQUORUM;
@ -364,7 +390,11 @@ static int BeginTrans(dbase, transMode, transPtr, readAny)
if (transMode == UBIK_WRITETRANS) {
/* for a write trans, we have to keep track of the write tid counter too */
#if defined(UBIK_PAUSE)
dbase->writeTidCounter = tt->tid.counter;
#else
dbase->writeTidCounter += 2;
#endif /* UBIK_PAUSE */
/* next try to start transaction on appropriate number of machines */
code = ContactQuorum(DISK_Begin, tt, 0);

View File

@ -47,7 +47,9 @@
/* ubik_lock types */
#define LOCKREAD 1
#define LOCKWRITE 2
#if !defined(UBIK_PAUSE)
#define LOCKWAIT 3
#endif /* UBIK_PAUSE */
/* ubik client flags */
#define UPUBIKONLY 1 /* only check servers presumed functional */
@ -187,11 +189,18 @@ extern char *ubik_CheckRXSecurityRock;
/* ubik_dbase flags */
#define DBWRITING 1 /* are any write trans. in progress */
#if defined(UBIK_PAUSE)
#define DBVOTING 2 /* the beacon task is polling */
#endif /* UBIK_PAUSE */
/* ubik trans flags */
#define TRDONE 1 /* commit or abort done */
#define TRABORT 2 /* if TRDONE, tells if aborted */
#define TRREADANY 4 /* read any data available in trans */
#if defined(UBIK_PAUSE)
#define TRSETLOCK 8 /* SetLock is using trans */
#define TRSTALE 16 /* udisk_end during getLock */
#endif /* UBIK_PAUSE */
/* ubik_lock flags */
#define LWANT 1