diff --git a/src/ubik/recovery.c b/src/ubik/recovery.c index c48a1746a9..b0b8e9c83c 100644 --- a/src/ubik/recovery.c +++ b/src/ubik/recovery.c @@ -432,6 +432,7 @@ urecovery_Interact(void) #ifndef OLD_URECOVERY char pbuffer[1028]; int flen, fd = -1; + afs_int32 epoch, pass; #endif /* otherwise, begin interaction */ @@ -566,16 +567,16 @@ urecovery_Interact(void) ubik_dprint("truncate io error=%d\n", code); goto FetchEndCall; } - + tversion.counter = 0; +#endif /* give invalid label during file transit */ tversion.epoch = 0; - tversion.counter = 0; code = (*ubik_dbase->setlabel) (ubik_dbase, file, &tversion); if (code) { ubik_dprint("setlabel io error=%d\n", code); goto FetchEndCall; } -#else +#ifndef OLD_URECOVERY flen = length; afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName); fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600); @@ -592,6 +593,10 @@ urecovery_Interact(void) while (length > 0) { tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length); +#ifndef AFS_PTHREAD_ENV + if (pass % 4 == 0) + IOMGR_Poll(); +#endif nbytes = rx_Read(rxcall, tbuffer, tlen); if (nbytes != tlen) { ubik_dprint("Rx-read bulk error=%d\n", code = BULK_ERROR); @@ -605,6 +610,7 @@ urecovery_Interact(void) tlen); #else nbytes = write(fd, tbuffer, tlen); + pass++; #endif if (nbytes != tlen) { code = UIOERROR; @@ -658,6 +664,10 @@ urecovery_Interact(void) if (code) { #ifndef OLD_URECOVERY unlink(pbuffer); + /* + * We will effectively invalidate the old data forever now. + * Unclear if we *should* but we do. + */ #endif ubik_dbase->version.epoch = 0; ubik_dbase->version.counter = 0; @@ -665,6 +675,7 @@ urecovery_Interact(void) code); } else { ubik_print("Ubik: Synchronize database completed\n"); + urecovery_state |= UBIK_RECHAVEDB; } udisk_Invalidate(ubik_dbase, 0); /* data has changed */ LWP_NoYieldSignal(&ubik_dbase->version); diff --git a/src/ubik/remote.c b/src/ubik/remote.c index e80875ac29..c81cfb2b9e 100644 --- a/src/ubik/remote.c +++ b/src/ubik/remote.c @@ -500,6 +500,7 @@ SDISK_SendFile(rxcall, file, length, avers) #ifndef OLD_URECOVERY char pbuffer[1028]; int flen, fd = -1; + afs_int32 epoch, pass; #endif /* send the file back to the requester */ @@ -540,10 +541,13 @@ SDISK_SendFile(rxcall, file, length, avers) offset = 0; #ifdef OLD_URECOVERY (*dbase->truncate) (dbase, file, 0); /* truncate first */ - tversion.epoch = 0; /* start off by labelling in-transit db as invalid */ tversion.counter = 0; - (*dbase->setlabel) (dbase, file, &tversion); /* setlabel does sync */ #else + epoch = +#endif + tversion.epoch = 0; /* start off by labelling in-transit db as invalid */ + (*dbase->setlabel) (dbase, file, &tversion); /* setlabel does sync */ +#ifndef OLD_URECOVERY flen = length; afs_snprintf(pbuffer, sizeof(pbuffer), "%s.DB0.TMP", ubik_dbase->pathName); fd = open(pbuffer, O_CREAT | O_RDWR | O_TRUNC, 0600); @@ -556,10 +560,16 @@ SDISK_SendFile(rxcall, file, length, avers) close(fd); goto failed; } +#else + pass = 0; #endif memcpy(&ubik_dbase->version, &tversion, sizeof(struct ubik_version)); while (length > 0) { tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length); +#if !defined(OLD_URECOVERY) && defined(AFS_PTHREAD_ENV) + if (pass % 4 == 0) + IOMGR_Poll(); +#endif code = rx_Read(rxcall, tbuffer, tlen); if (code != tlen) { DBRELE(dbase); @@ -572,6 +582,7 @@ SDISK_SendFile(rxcall, file, length, avers) code = (*dbase->write) (dbase, file, tbuffer, offset, tlen); #else code = write(fd, tbuffer, tlen); + pass++; #endif if (code != tlen) { DBRELE(dbase); @@ -621,6 +632,9 @@ SDISK_SendFile(rxcall, file, length, avers) if (code) { #ifndef OLD_URECOVERY unlink(pbuffer); + /* Failed to sync. Allow reads again for now. */ + tversion.epoch = epoch; + (*dbase->setlabel) (dbase, file, &tversion); #endif ubik_print ("Ubik: Synchronize database with server %s failed (error = %d)\n",