diff --git a/src/rx/LINUX/rx_kmutex.c b/src/rx/LINUX/rx_kmutex.c index fc049811de..ea31a87824 100644 --- a/src/rx/LINUX/rx_kmutex.c +++ b/src/rx/LINUX/rx_kmutex.c @@ -72,7 +72,7 @@ afs_mutex_exit(afs_kmutex_t * l) int afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok) { - int isAFSGlocked = ISAFS_GLOCK(); + int seq, isAFSGlocked = ISAFS_GLOCK(); sigset_t saved_set; #ifdef DECLARE_WAITQUEUE DECLARE_WAITQUEUE(wait, current); @@ -80,8 +80,10 @@ afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok) struct wait_queue wait = { current, NULL }; #endif - add_wait_queue(cv, &wait); + seq = cv->seq; + set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&cv->waitq, &wait); if (isAFSGlocked) AFS_GUNLOCK(); @@ -95,8 +97,13 @@ afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok) SIG_UNLOCK(current); } - schedule(); - remove_wait_queue(cv, &wait); + while(seq == cv->seq) { + schedule(); + /* should we refrigerate? */ + } + + remove_wait_queue(&cv->waitq, &wait); + set_current_state(TASK_RUNNING); if (!sigok) { SIG_LOCK(current); @@ -115,23 +122,30 @@ afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok) void afs_cv_timedwait(afs_kcondvar_t * cv, afs_kmutex_t * l, int waittime) { - int isAFSGlocked = ISAFS_GLOCK(); + int seq, isAFSGlocked = ISAFS_GLOCK(); long t = waittime * HZ / 1000; #ifdef DECLARE_WAITQUEUE DECLARE_WAITQUEUE(wait, current); #else struct wait_queue wait = { current, NULL }; #endif + seq = cv->seq; - add_wait_queue(cv, &wait); set_current_state(TASK_INTERRUPTIBLE); + add_wait_queue(&cv->waitq, &wait); if (isAFSGlocked) AFS_GUNLOCK(); MUTEX_EXIT(l); - t = schedule_timeout(t); - remove_wait_queue(cv, &wait); + while(seq == cv->seq) { + t = schedule_timeout(t); + if (!t) /* timeout */ + break; + } + + remove_wait_queue(&cv->waitq, &wait); + set_current_state(TASK_RUNNING); if (isAFSGlocked) AFS_GLOCK(); diff --git a/src/rx/LINUX/rx_kmutex.h b/src/rx/LINUX/rx_kmutex.h index 8473e19f71..8e08f004f2 100644 --- a/src/rx/LINUX/rx_kmutex.h +++ b/src/rx/LINUX/rx_kmutex.h @@ -43,11 +43,14 @@ typedef struct afs_kmutex { #define set_current_state(X) current->state=X #endif +typedef struct afs_kcondvar { + int seq; #if defined(AFS_LINUX24_ENV) -typedef wait_queue_head_t afs_kcondvar_t; + wait_queue_head_t waitq; #else -typedef struct wait_queue *afs_kcondvar_t; + struct wait_queue *waitq; #endif +} afs_kcondvar_t; static inline int MUTEX_ISMINE(afs_kmutex_t * l) @@ -62,7 +65,7 @@ MUTEX_ISMINE(afs_kmutex_t * l) #define MUTEX_EXIT afs_mutex_exit #if defined(AFS_LINUX24_ENV) -#define CV_INIT(cv,b,c,d) init_waitqueue_head((wait_queue_head_t *)(cv)) +#define CV_INIT(cv,b,c,d) do { (cv)->seq = 0; init_waitqueue_head(&(cv)->waitq); } while (0) #else #define CV_INIT(cv,b,c,d) init_waitqueue((struct wait_queue**)(cv)) #endif @@ -71,12 +74,11 @@ MUTEX_ISMINE(afs_kmutex_t * l) #define CV_WAIT(cv, m) afs_cv_wait(cv, m, 0) #define CV_TIMEDWAIT afs_cv_timedwait +#define CV_SIGNAL(cv) do { ++(cv)->seq; wake_up(&(cv)->waitq); } while (0) #if defined(AFS_LINUX24_ENV) -#define CV_SIGNAL(cv) wake_up((wait_queue_head_t *)cv) -#define CV_BROADCAST(cv) wake_up((wait_queue_head_t *)cv) +#define CV_BROADCAST(cv) do { ++(cv)->seq; wake_up_all(&(cv)->waitq); } while (0) #else -#define CV_SIGNAL(cv) wake_up((struct wait_queue**)cv) -#define CV_BROADCAST(cv) wake_up((struct wait_queue**)cv) +#define CV_BROADCAST(cv) do { ++(cv)->seq; wake_up(&(cv)->waitq); } while (0) #endif #endif /* RX_KMUTEX_H_ */ diff --git a/src/rx/rx.c b/src/rx/rx.c index 9c24720e5d..7b44ff62ed 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -1170,11 +1170,17 @@ rx_NewCall(register struct rx_connection *conn) MUTEX_ENTER(&call->lock); while (call->flags & RX_CALL_TQ_BUSY) { call->flags |= RX_CALL_TQ_WAIT; + call->tqWaiters++; #ifdef RX_ENABLE_LOCKS + osirx_AssertMine(&call->lock, "rxi_Start lock4"); CV_WAIT(&call->cv_tq, &call->lock); #else /* RX_ENABLE_LOCKS */ osi_rxSleep(&call->tq); #endif /* RX_ENABLE_LOCKS */ + call->tqWaiters--; + if (call->tqWaiters == 0) { + call->flags &= ~RX_CALL_TQ_WAIT; + } } if (call->flags & RX_CALL_TQ_CLEARME) { rxi_ClearTransmitQueue(call, 0); @@ -2638,11 +2644,16 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket, while ((call->state == RX_STATE_ACTIVE) && (call->flags & RX_CALL_TQ_BUSY)) { call->flags |= RX_CALL_TQ_WAIT; + call->tqWaiters++; #ifdef RX_ENABLE_LOCKS + osirx_AssertMine(&call->lock, "rxi_Start lock3"); CV_WAIT(&call->cv_tq, &call->lock); #else /* RX_ENABLE_LOCKS */ osi_rxSleep(&call->tq); #endif /* RX_ENABLE_LOCKS */ + call->tqWaiters--; + if (call->tqWaiters == 0) + call->flags &= ~RX_CALL_TQ_WAIT; } #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ /* If the new call cannot be taken right now send a busy and set @@ -3802,11 +3813,16 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np, call->flags |= RX_CALL_FAST_RECOVER_WAIT; while (call->flags & RX_CALL_TQ_BUSY) { call->flags |= RX_CALL_TQ_WAIT; + call->tqWaiters++; #ifdef RX_ENABLE_LOCKS + osirx_AssertMine(&call->lock, "rxi_Start lock2"); CV_WAIT(&call->cv_tq, &call->lock); #else /* RX_ENABLE_LOCKS */ osi_rxSleep(&call->tq); #endif /* RX_ENABLE_LOCKS */ + call->tqWaiters--; + if (call->tqWaiters == 0) + call->flags &= ~RX_CALL_TQ_WAIT; } MUTEX_ENTER(&peer->peer_lock); #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ @@ -4340,7 +4356,7 @@ rxi_CallError(register struct rx_call *call, afs_int32 error) if (call->error) error = call->error; #ifdef RX_GLOBAL_RXLOCK_KERNEL - if (!(call->flags & RX_CALL_TQ_BUSY)) { + if (!((call->flags & RX_CALL_TQ_BUSY) || (call->tqWaiters > 0))) { rxi_ResetCall(call, 0); } #else @@ -4416,7 +4432,7 @@ rxi_ResetCall(register struct rx_call *call, register int newcall) flags = call->flags; rxi_ClearReceiveQueue(call); #ifdef AFS_GLOBAL_RXLOCK_KERNEL - if (call->flags & RX_CALL_TQ_BUSY) { + if (flags & RX_CALL_TQ_BUSY) { call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY; call->flags |= (flags & RX_CALL_TQ_WAIT); } else @@ -4424,7 +4440,18 @@ rxi_ResetCall(register struct rx_call *call, register int newcall) { rxi_ClearTransmitQueue(call, 0); queue_Init(&call->tq); + if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) { + dpf(("rcall %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags)); + } call->flags = 0; + while (call->tqWaiters) { +#ifdef RX_ENABLE_LOCKS + CV_BROADCAST(&call->cv_tq); +#else /* RX_ENABLE_LOCKS */ + osi_rxWakeup(&call->tq); +#endif /* RX_ENABLE_LOCKS */ + call->tqWaiters--; + } } queue_Init(&call->rq); call->error = 0; @@ -4977,11 +5004,16 @@ rxi_Start(struct rxevent *event, register struct rx_call *call, call->flags |= RX_CALL_FAST_RECOVER_WAIT; while (call->flags & RX_CALL_TQ_BUSY) { call->flags |= RX_CALL_TQ_WAIT; + call->tqWaiters++; #ifdef RX_ENABLE_LOCKS + osirx_AssertMine(&call->lock, "rxi_Start lock1"); CV_WAIT(&call->cv_tq, &call->lock); #else /* RX_ENABLE_LOCKS */ osi_rxSleep(&call->tq); #endif /* RX_ENABLE_LOCKS */ + call->tqWaiters--; + if (call->tqWaiters == 0) + call->flags &= ~RX_CALL_TQ_WAIT; } #endif /* AFS_GLOBAL_RXLOCK_KERNEL */ call->flags &= ~RX_CALL_FAST_RECOVER_WAIT; @@ -5137,14 +5169,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call, */ if (call->flags & RX_CALL_FAST_RECOVER_WAIT) { call->flags &= ~RX_CALL_TQ_BUSY; - if (call->flags & RX_CALL_TQ_WAIT) { - call->flags &= ~RX_CALL_TQ_WAIT; -#ifdef RX_ENABLE_LOCKS - CV_BROADCAST(&call->cv_tq); -#else /* RX_ENABLE_LOCKS */ - osi_rxWakeup(&call->tq); -#endif /* RX_ENABLE_LOCKS */ + if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) { + dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags)); } +#ifdef RX_ENABLE_LOCKS + osirx_AssertMine(&call->lock, "rxi_Start start"); + CV_BROADCAST(&call->cv_tq); +#else /* RX_ENABLE_LOCKS */ + osi_rxWakeup(&call->tq); +#endif /* RX_ENABLE_LOCKS */ return; } if (call->error) { @@ -5156,14 +5189,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call, rx_tq_debug.rxi_start_aborted++; MUTEX_EXIT(&rx_stats_mutex); call->flags &= ~RX_CALL_TQ_BUSY; - if (call->flags & RX_CALL_TQ_WAIT) { - call->flags &= ~RX_CALL_TQ_WAIT; -#ifdef RX_ENABLE_LOCKS - CV_BROADCAST(&call->cv_tq); -#else /* RX_ENABLE_LOCKS */ - osi_rxWakeup(&call->tq); -#endif /* RX_ENABLE_LOCKS */ + if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) { + dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags)); } +#ifdef RX_ENABLE_LOCKS + osirx_AssertMine(&call->lock, "rxi_Start middle"); + CV_BROADCAST(&call->cv_tq); +#else /* RX_ENABLE_LOCKS */ + osi_rxWakeup(&call->tq); +#endif /* RX_ENABLE_LOCKS */ rxi_CallError(call, call->error); return; } @@ -5243,14 +5277,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call, * protected by the global lock. */ call->flags &= ~RX_CALL_TQ_BUSY; - if (call->flags & RX_CALL_TQ_WAIT) { - call->flags &= ~RX_CALL_TQ_WAIT; -#ifdef RX_ENABLE_LOCKS - CV_BROADCAST(&call->cv_tq); -#else /* RX_ENABLE_LOCKS */ - osi_rxWakeup(&call->tq); -#endif /* RX_ENABLE_LOCKS */ + if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) { + dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags)); } +#ifdef RX_ENABLE_LOCKS + osirx_AssertMine(&call->lock, "rxi_Start end"); + CV_BROADCAST(&call->cv_tq); +#else /* RX_ENABLE_LOCKS */ + osi_rxWakeup(&call->tq); +#endif /* RX_ENABLE_LOCKS */ } else { call->flags |= RX_CALL_NEED_START; } diff --git a/src/rx/rx.h b/src/rx/rx.h index 03c035e323..77ead1a88b 100644 --- a/src/rx/rx.h +++ b/src/rx/rx.h @@ -540,6 +540,7 @@ struct rx_call { struct clock startTime; /* time call was started */ afs_hyper_t bytesSent; /* Number bytes sent */ afs_hyper_t bytesRcvd; /* Number bytes received */ + u_short tqWaiters; }; #ifndef KDUMP_RX_LOCK