mirror of
https://git.openafs.org/openafs.git
synced 2025-01-20 07:51:00 +00:00
ubik: add interface for reading during write locks
Add ubik_BeginTransReadAnyWrite, which allows for reading from the database, even while there is a conflicting ubik write lock. Reads are still blocked while the local database is updating due to a write transaction commit. Change-Id: I025e595ad699d5a969a0676691530d90c65f1920 Reviewed-on: http://gerrit.openafs.org/2592 Reviewed-by: Derrick Brashear <shadow@dementia.org> Tested-by: Derrick Brashear <shadow@dementia.org>
This commit is contained in:
parent
570236dd55
commit
fd7f7469bd
@ -80,6 +80,10 @@ ulock_getLock(struct ubik_trans *atrans, int atype, int await)
|
||||
if (atrans->flags & TRDONE)
|
||||
return UDONE;
|
||||
|
||||
if (atype != LOCKREAD && (atrans->flags & TRREADWRITE)) {
|
||||
return EINVAL;
|
||||
}
|
||||
|
||||
if (atrans->locktype != 0) {
|
||||
ubik_print("Ubik: Internal Error: attempted to take lock twice\n");
|
||||
abort();
|
||||
@ -91,7 +95,7 @@ ulock_getLock(struct ubik_trans *atrans, int atype, int await)
|
||||
*/
|
||||
|
||||
/* Check if the lock would would block */
|
||||
if (!await) {
|
||||
if (!await && !(atrans->flags & TRREADWRITE)) {
|
||||
if (atype == LOCKREAD) {
|
||||
if (WouldReadBlock(&rwlock))
|
||||
return EAGAIN;
|
||||
@ -120,7 +124,9 @@ ulock_getLock(struct ubik_trans *atrans, int atype, int await)
|
||||
atrans->locktype = LOCKWAIT;
|
||||
#endif /* UBIK_PAUSE */
|
||||
DBRELE(dbase);
|
||||
if (atype == LOCKREAD) {
|
||||
if (atrans->flags & TRREADWRITE) {
|
||||
/* noop; don't actually lock anything for TRREADWRITE */
|
||||
} else if (atype == LOCKREAD) {
|
||||
ObtainReadLock(&rwlock);
|
||||
} else {
|
||||
ObtainWriteLock(&rwlock);
|
||||
@ -156,7 +162,15 @@ ulock_relLock(struct ubik_trans *atrans)
|
||||
if (rwlockinit)
|
||||
return;
|
||||
|
||||
if (atrans->locktype == LOCKREAD) {
|
||||
if (atrans->locktype == LOCKWRITE && (atrans->flags & TRREADWRITE)) {
|
||||
ubik_print("Ubik: Internal Error: unlocking write lock with "
|
||||
"TRREADWRITE?\n");
|
||||
abort();
|
||||
}
|
||||
|
||||
if (atrans->flags & TRREADWRITE) {
|
||||
/* noop, TRREADWRITE means we don't actually lock anything */
|
||||
} else if (atrans->locktype == LOCKREAD) {
|
||||
ReleaseReadLock(&rwlock);
|
||||
} else if (atrans->locktype == LOCKWRITE) {
|
||||
ReleaseWriteLock(&rwlock);
|
||||
|
@ -114,10 +114,15 @@ SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
|
||||
}
|
||||
|
||||
dbase = ubik_currentTrans->dbase;
|
||||
|
||||
ObtainWriteLock(&dbase->cache_lock);
|
||||
|
||||
DBHOLD(dbase);
|
||||
|
||||
urecovery_CheckTid(atid);
|
||||
if (!ubik_currentTrans) {
|
||||
DBRELE(dbase);
|
||||
ReleaseWriteLock(&dbase->cache_lock);
|
||||
return USYNC;
|
||||
}
|
||||
|
||||
@ -127,6 +132,7 @@ SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid)
|
||||
ubik_dbVersion = ubik_dbase->version;
|
||||
}
|
||||
DBRELE(dbase);
|
||||
ReleaseWriteLock(&dbase->cache_lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -83,6 +83,7 @@ afs_int32 ubik_epochTime = 0;
|
||||
afs_int32 urecovery_state = 0;
|
||||
int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *);
|
||||
void *ubik_SRXSecurityRock;
|
||||
int (*ubik_SyncWriterCacheProc) (void);
|
||||
struct ubik_server *ubik_servers;
|
||||
short ubik_callPortal;
|
||||
|
||||
@ -602,8 +603,9 @@ ubik_ServerInit(afs_uint32 myHost, short myPort, afs_uint32 serverList[],
|
||||
* An open mode of ubik_READTRANS identifies this as a read transaction,
|
||||
* while a mode of ubik_WRITETRANS identifies this as a write transaction.
|
||||
* transPtr is set to the returned transaction control block.
|
||||
* The readAny flag is set to 0 or 1 by the wrapper functions ubik_BeginTrans() or
|
||||
* ubik_BeginTransReadAny() below.
|
||||
* The readAny flag is set to 0 or 1 or 2 by the wrapper functions
|
||||
* ubik_BeginTrans() or ubik_BeginTransReadAny() or
|
||||
* ubik_BeginTransReadAnyWrite() below.
|
||||
*
|
||||
* \note We can only begin transaction when we have an up-to-date database.
|
||||
*/
|
||||
@ -618,6 +620,16 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
|
||||
int count;
|
||||
#endif /* UBIK_PAUSE */
|
||||
|
||||
if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) {
|
||||
/* it's not safe to use ubik_BeginTransReadAnyWrite without a
|
||||
* cache-syncing function; fall back to ubik_BeginTransReadAny,
|
||||
* which is safe but slower */
|
||||
ubik_print("ubik_BeginTransReadAnyWrite called, but "
|
||||
"ubik_SyncWriterCacheProc not set; pretending "
|
||||
"ubik_BeginTransReadAny was called instead\n");
|
||||
readAny = 1;
|
||||
}
|
||||
|
||||
if ((transMode != UBIK_READTRANS) && readAny)
|
||||
return UBADTYPE;
|
||||
DBHOLD(dbase);
|
||||
@ -687,8 +699,12 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode,
|
||||
DBRELE(dbase);
|
||||
return code;
|
||||
}
|
||||
if (readAny)
|
||||
if (readAny) {
|
||||
tt->flags |= TRREADANY;
|
||||
if (readAny > 1) {
|
||||
tt->flags |= TRREADWRITE;
|
||||
}
|
||||
}
|
||||
/* label trans and dbase with new tid */
|
||||
tt->tid.epoch = ubik_epochTime;
|
||||
/* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */
|
||||
@ -739,6 +755,16 @@ ubik_BeginTransReadAny(struct ubik_dbase *dbase, afs_int32 transMode,
|
||||
return BeginTrans(dbase, transMode, transPtr, 1);
|
||||
}
|
||||
|
||||
/*!
|
||||
* \see BeginTrans()
|
||||
*/
|
||||
int
|
||||
ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase, afs_int32 transMode,
|
||||
struct ubik_trans **transPtr)
|
||||
{
|
||||
return BeginTrans(dbase, transMode, transPtr, 2);
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief This routine ends a read or write transaction by aborting it.
|
||||
*/
|
||||
@ -794,6 +820,23 @@ ubik_AbortTrans(struct ubik_trans *transPtr)
|
||||
return (code ? code : code2);
|
||||
}
|
||||
|
||||
static void
|
||||
WritebackApplicationCache(struct ubik_dbase *dbase)
|
||||
{
|
||||
int code = 0;
|
||||
if (ubik_SyncWriterCacheProc) {
|
||||
code = ubik_SyncWriterCacheProc();
|
||||
}
|
||||
if (code) {
|
||||
/* we failed to sync the local cache, so just invalidate the cache;
|
||||
* we'll try to read the cache in again on the next read */
|
||||
memset(&dbase->cachedVersion, 0, sizeof(dbase->cachedVersion));
|
||||
} else {
|
||||
memcpy(&dbase->cachedVersion, &dbase->version,
|
||||
sizeof(dbase->cachedVersion));
|
||||
}
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief This routine ends a read or write transaction on the open transaction identified by transPtr.
|
||||
* \return an error code.
|
||||
@ -806,6 +849,7 @@ ubik_EndTrans(struct ubik_trans *transPtr)
|
||||
afs_int32 realStart;
|
||||
struct ubik_server *ts;
|
||||
afs_int32 now;
|
||||
int cachelocked = 0;
|
||||
struct ubik_dbase *dbase;
|
||||
|
||||
if (transPtr->type == UBIK_WRITETRANS) {
|
||||
@ -822,6 +866,13 @@ ubik_EndTrans(struct ubik_trans *transPtr)
|
||||
ReleaseReadLock(&dbase->cache_lock);
|
||||
transPtr->flags &= ~TRCACHELOCKED;
|
||||
}
|
||||
|
||||
if (transPtr->type != UBIK_READTRANS) {
|
||||
/* must hold cache_lock before DBHOLD'ing */
|
||||
ObtainWriteLock(&dbase->cache_lock);
|
||||
cachelocked = 1;
|
||||
}
|
||||
|
||||
DBHOLD(dbase);
|
||||
|
||||
/* give up if no longer current */
|
||||
@ -852,8 +903,20 @@ ubik_EndTrans(struct ubik_trans *transPtr)
|
||||
|
||||
/* now it is safe to do commit */
|
||||
code = udisk_commit(transPtr);
|
||||
if (code == 0)
|
||||
if (code == 0) {
|
||||
/* db data has been committed locally; update the local cache so
|
||||
* readers can get at it */
|
||||
WritebackApplicationCache(dbase);
|
||||
|
||||
ReleaseWriteLock(&dbase->cache_lock);
|
||||
|
||||
code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion);
|
||||
|
||||
} else {
|
||||
memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
|
||||
ReleaseWriteLock(&dbase->cache_lock);
|
||||
}
|
||||
cachelocked = 0;
|
||||
if (code) {
|
||||
/* failed to commit, so must return failure. Try to clear locks first, just for fun
|
||||
* Note that we don't know if this transaction will eventually commit at this point.
|
||||
@ -900,12 +963,6 @@ ubik_EndTrans(struct ubik_trans *transPtr)
|
||||
break; /* no down ones still pseudo-active */
|
||||
}
|
||||
|
||||
/* the commit bumped the dbase version, and since the write was local
|
||||
* our cache should still be up to date, so make sure to update
|
||||
* cachedVersion, too */
|
||||
memcpy(&dbase->cachedVersion, &dbase->version,
|
||||
sizeof(dbase->cachedVersion));
|
||||
|
||||
/* finally, unlock all the dudes. We can return success independent of the number of servers
|
||||
* that really unlock the dbase; the others will do it if/when they elect a new sync site.
|
||||
* The transaction is committed anyway, since we succeeded in contacting a quorum
|
||||
@ -918,10 +975,15 @@ ubik_EndTrans(struct ubik_trans *transPtr)
|
||||
/* don't update cachedVersion here; it should have been updated way back
|
||||
* in ubik_CheckCache, and earlier in this function for writes */
|
||||
DBRELE(dbase);
|
||||
if (cachelocked) {
|
||||
ReleaseWriteLock(&dbase->cache_lock);
|
||||
}
|
||||
return 0;
|
||||
|
||||
error:
|
||||
ObtainWriteLock(&dbase->cache_lock);
|
||||
if (!cachelocked) {
|
||||
ObtainWriteLock(&dbase->cache_lock);
|
||||
}
|
||||
memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version));
|
||||
ReleaseWriteLock(&dbase->cache_lock);
|
||||
return code;
|
||||
|
@ -223,6 +223,22 @@ extern int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *);
|
||||
extern void *ubik_CheckRXSecurityRock;
|
||||
/*\}*/
|
||||
|
||||
/*
|
||||
* For applications that make use of ubik_BeginTransReadAnyWrite, writing
|
||||
* processes must not update the application-level cache as they write,
|
||||
* or else readers can read the new cache before the data is committed to
|
||||
* the db. So, when a commit occurs, the cache must be updated right then.
|
||||
* If set, this function will be called during commits of write transactions,
|
||||
* to update the application-level cache after a write. This will be called
|
||||
* immediately after the local disk commit succeeds, and it will be called
|
||||
* with a lock held that prevents other threads from reading from the cache
|
||||
* or the db in general.
|
||||
*
|
||||
* Note that this function MUST be set in order to make use of
|
||||
* ubik_BeginTransReadAnyWrite.
|
||||
*/
|
||||
extern int (*ubik_SyncWriterCacheProc) (void);
|
||||
|
||||
/****************INTERNALS BELOW ****************/
|
||||
|
||||
#ifdef UBIK_INTERNALS
|
||||
@ -251,6 +267,8 @@ extern void *ubik_CheckRXSecurityRock;
|
||||
#define TRCACHELOCKED 32 /*!< this trans has locked dbase->cache_lock
|
||||
* (meaning, this trans has called
|
||||
* ubik_CheckCache at some point */
|
||||
#define TRREADWRITE 64 /*!< read even if there's a conflicting ubik-
|
||||
* level write lock */
|
||||
/*\}*/
|
||||
|
||||
/*! \name ubik_lock flags */
|
||||
@ -493,6 +511,9 @@ extern int ubik_BeginTrans(struct ubik_dbase *dbase,
|
||||
extern int ubik_BeginTransReadAny(struct ubik_dbase *dbase,
|
||||
afs_int32 transMode,
|
||||
struct ubik_trans **transPtr);
|
||||
extern int ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase,
|
||||
afs_int32 transMode,
|
||||
struct ubik_trans **transPtr);
|
||||
extern int ubik_AbortTrans(struct ubik_trans *transPtr);
|
||||
|
||||
extern int ubik_EndTrans(struct ubik_trans *transPtr);
|
||||
|
Loading…
Reference in New Issue
Block a user