diff --git a/src/ubik/lock.c b/src/ubik/lock.c index 6c609e5b82..b8a7a27fc9 100644 --- a/src/ubik/lock.c +++ b/src/ubik/lock.c @@ -80,6 +80,10 @@ ulock_getLock(struct ubik_trans *atrans, int atype, int await) if (atrans->flags & TRDONE) return UDONE; + if (atype != LOCKREAD && (atrans->flags & TRREADWRITE)) { + return EINVAL; + } + if (atrans->locktype != 0) { ubik_print("Ubik: Internal Error: attempted to take lock twice\n"); abort(); @@ -91,7 +95,7 @@ ulock_getLock(struct ubik_trans *atrans, int atype, int await) */ /* Check if the lock would would block */ - if (!await) { + if (!await && !(atrans->flags & TRREADWRITE)) { if (atype == LOCKREAD) { if (WouldReadBlock(&rwlock)) return EAGAIN; @@ -120,7 +124,9 @@ ulock_getLock(struct ubik_trans *atrans, int atype, int await) atrans->locktype = LOCKWAIT; #endif /* UBIK_PAUSE */ DBRELE(dbase); - if (atype == LOCKREAD) { + if (atrans->flags & TRREADWRITE) { + /* noop; don't actually lock anything for TRREADWRITE */ + } else if (atype == LOCKREAD) { ObtainReadLock(&rwlock); } else { ObtainWriteLock(&rwlock); @@ -156,7 +162,15 @@ ulock_relLock(struct ubik_trans *atrans) if (rwlockinit) return; - if (atrans->locktype == LOCKREAD) { + if (atrans->locktype == LOCKWRITE && (atrans->flags & TRREADWRITE)) { + ubik_print("Ubik: Internal Error: unlocking write lock with " + "TRREADWRITE?\n"); + abort(); + } + + if (atrans->flags & TRREADWRITE) { + /* noop, TRREADWRITE means we don't actually lock anything */ + } else if (atrans->locktype == LOCKREAD) { ReleaseReadLock(&rwlock); } else if (atrans->locktype == LOCKWRITE) { ReleaseWriteLock(&rwlock); diff --git a/src/ubik/remote.c b/src/ubik/remote.c index 3654b77422..45404ecf53 100644 --- a/src/ubik/remote.c +++ b/src/ubik/remote.c @@ -114,10 +114,15 @@ SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid) } dbase = ubik_currentTrans->dbase; + + ObtainWriteLock(&dbase->cache_lock); + DBHOLD(dbase); + urecovery_CheckTid(atid); if (!ubik_currentTrans) { DBRELE(dbase); + ReleaseWriteLock(&dbase->cache_lock); return USYNC; } @@ -127,6 +132,7 @@ SDISK_Commit(struct rx_call *rxcall, struct ubik_tid *atid) ubik_dbVersion = ubik_dbase->version; } DBRELE(dbase); + ReleaseWriteLock(&dbase->cache_lock); return code; } diff --git a/src/ubik/ubik.c b/src/ubik/ubik.c index fd8caa6c77..b8088a5686 100644 --- a/src/ubik/ubik.c +++ b/src/ubik/ubik.c @@ -83,6 +83,7 @@ afs_int32 ubik_epochTime = 0; afs_int32 urecovery_state = 0; int (*ubik_SRXSecurityProc) (void *, struct rx_securityClass **, afs_int32 *); void *ubik_SRXSecurityRock; +int (*ubik_SyncWriterCacheProc) (void); struct ubik_server *ubik_servers; short ubik_callPortal; @@ -602,8 +603,9 @@ ubik_ServerInit(afs_uint32 myHost, short myPort, afs_uint32 serverList[], * An open mode of ubik_READTRANS identifies this as a read transaction, * while a mode of ubik_WRITETRANS identifies this as a write transaction. * transPtr is set to the returned transaction control block. - * The readAny flag is set to 0 or 1 by the wrapper functions ubik_BeginTrans() or - * ubik_BeginTransReadAny() below. + * The readAny flag is set to 0 or 1 or 2 by the wrapper functions + * ubik_BeginTrans() or ubik_BeginTransReadAny() or + * ubik_BeginTransReadAnyWrite() below. * * \note We can only begin transaction when we have an up-to-date database. */ @@ -618,6 +620,16 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode, int count; #endif /* UBIK_PAUSE */ + if (readAny > 1 && ubik_SyncWriterCacheProc == NULL) { + /* it's not safe to use ubik_BeginTransReadAnyWrite without a + * cache-syncing function; fall back to ubik_BeginTransReadAny, + * which is safe but slower */ + ubik_print("ubik_BeginTransReadAnyWrite called, but " + "ubik_SyncWriterCacheProc not set; pretending " + "ubik_BeginTransReadAny was called instead\n"); + readAny = 1; + } + if ((transMode != UBIK_READTRANS) && readAny) return UBADTYPE; DBHOLD(dbase); @@ -687,8 +699,12 @@ BeginTrans(struct ubik_dbase *dbase, afs_int32 transMode, DBRELE(dbase); return code; } - if (readAny) + if (readAny) { tt->flags |= TRREADANY; + if (readAny > 1) { + tt->flags |= TRREADWRITE; + } + } /* label trans and dbase with new tid */ tt->tid.epoch = ubik_epochTime; /* bump by two, since tidCounter+1 means trans id'd by tidCounter has finished */ @@ -739,6 +755,16 @@ ubik_BeginTransReadAny(struct ubik_dbase *dbase, afs_int32 transMode, return BeginTrans(dbase, transMode, transPtr, 1); } +/*! + * \see BeginTrans() + */ +int +ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase, afs_int32 transMode, + struct ubik_trans **transPtr) +{ + return BeginTrans(dbase, transMode, transPtr, 2); +} + /*! * \brief This routine ends a read or write transaction by aborting it. */ @@ -794,6 +820,23 @@ ubik_AbortTrans(struct ubik_trans *transPtr) return (code ? code : code2); } +static void +WritebackApplicationCache(struct ubik_dbase *dbase) +{ + int code = 0; + if (ubik_SyncWriterCacheProc) { + code = ubik_SyncWriterCacheProc(); + } + if (code) { + /* we failed to sync the local cache, so just invalidate the cache; + * we'll try to read the cache in again on the next read */ + memset(&dbase->cachedVersion, 0, sizeof(dbase->cachedVersion)); + } else { + memcpy(&dbase->cachedVersion, &dbase->version, + sizeof(dbase->cachedVersion)); + } +} + /*! * \brief This routine ends a read or write transaction on the open transaction identified by transPtr. * \return an error code. @@ -806,6 +849,7 @@ ubik_EndTrans(struct ubik_trans *transPtr) afs_int32 realStart; struct ubik_server *ts; afs_int32 now; + int cachelocked = 0; struct ubik_dbase *dbase; if (transPtr->type == UBIK_WRITETRANS) { @@ -822,6 +866,13 @@ ubik_EndTrans(struct ubik_trans *transPtr) ReleaseReadLock(&dbase->cache_lock); transPtr->flags &= ~TRCACHELOCKED; } + + if (transPtr->type != UBIK_READTRANS) { + /* must hold cache_lock before DBHOLD'ing */ + ObtainWriteLock(&dbase->cache_lock); + cachelocked = 1; + } + DBHOLD(dbase); /* give up if no longer current */ @@ -852,8 +903,20 @@ ubik_EndTrans(struct ubik_trans *transPtr) /* now it is safe to do commit */ code = udisk_commit(transPtr); - if (code == 0) + if (code == 0) { + /* db data has been committed locally; update the local cache so + * readers can get at it */ + WritebackApplicationCache(dbase); + + ReleaseWriteLock(&dbase->cache_lock); + code = ContactQuorum_NoArguments(DISK_Commit, transPtr, CStampVersion); + + } else { + memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version)); + ReleaseWriteLock(&dbase->cache_lock); + } + cachelocked = 0; if (code) { /* failed to commit, so must return failure. Try to clear locks first, just for fun * Note that we don't know if this transaction will eventually commit at this point. @@ -900,12 +963,6 @@ ubik_EndTrans(struct ubik_trans *transPtr) break; /* no down ones still pseudo-active */ } - /* the commit bumped the dbase version, and since the write was local - * our cache should still be up to date, so make sure to update - * cachedVersion, too */ - memcpy(&dbase->cachedVersion, &dbase->version, - sizeof(dbase->cachedVersion)); - /* finally, unlock all the dudes. We can return success independent of the number of servers * that really unlock the dbase; the others will do it if/when they elect a new sync site. * The transaction is committed anyway, since we succeeded in contacting a quorum @@ -918,10 +975,15 @@ ubik_EndTrans(struct ubik_trans *transPtr) /* don't update cachedVersion here; it should have been updated way back * in ubik_CheckCache, and earlier in this function for writes */ DBRELE(dbase); + if (cachelocked) { + ReleaseWriteLock(&dbase->cache_lock); + } return 0; error: - ObtainWriteLock(&dbase->cache_lock); + if (!cachelocked) { + ObtainWriteLock(&dbase->cache_lock); + } memset(&dbase->cachedVersion, 0, sizeof(struct ubik_version)); ReleaseWriteLock(&dbase->cache_lock); return code; diff --git a/src/ubik/ubik.p.h b/src/ubik/ubik.p.h index 939501f381..17622be145 100644 --- a/src/ubik/ubik.p.h +++ b/src/ubik/ubik.p.h @@ -223,6 +223,22 @@ extern int (*ubik_CheckRXSecurityProc) (void *, struct rx_call *); extern void *ubik_CheckRXSecurityRock; /*\}*/ +/* + * For applications that make use of ubik_BeginTransReadAnyWrite, writing + * processes must not update the application-level cache as they write, + * or else readers can read the new cache before the data is committed to + * the db. So, when a commit occurs, the cache must be updated right then. + * If set, this function will be called during commits of write transactions, + * to update the application-level cache after a write. This will be called + * immediately after the local disk commit succeeds, and it will be called + * with a lock held that prevents other threads from reading from the cache + * or the db in general. + * + * Note that this function MUST be set in order to make use of + * ubik_BeginTransReadAnyWrite. + */ +extern int (*ubik_SyncWriterCacheProc) (void); + /****************INTERNALS BELOW ****************/ #ifdef UBIK_INTERNALS @@ -251,6 +267,8 @@ extern void *ubik_CheckRXSecurityRock; #define TRCACHELOCKED 32 /*!< this trans has locked dbase->cache_lock * (meaning, this trans has called * ubik_CheckCache at some point */ +#define TRREADWRITE 64 /*!< read even if there's a conflicting ubik- + * level write lock */ /*\}*/ /*! \name ubik_lock flags */ @@ -493,6 +511,9 @@ extern int ubik_BeginTrans(struct ubik_dbase *dbase, extern int ubik_BeginTransReadAny(struct ubik_dbase *dbase, afs_int32 transMode, struct ubik_trans **transPtr); +extern int ubik_BeginTransReadAnyWrite(struct ubik_dbase *dbase, + afs_int32 transMode, + struct ubik_trans **transPtr); extern int ubik_AbortTrans(struct ubik_trans *transPtr); extern int ubik_EndTrans(struct ubik_trans *transPtr);