STABLE14-rx-tq-waiter-rework-20050914

heavily reworked by jaltman@secure-endpoints.com
and then a little further editing by me


see if we can avoid doing to sleep forever waiting on the tq to flush

====================
This delta was composed from multiple commits as part of the CVS->Git migration.
The checkin message with each commit was inconsistent.
The following are the additional commit messages.
====================

do not decrement tqWaiters in the while evaluation.   This will
result in an invalid count if the value was zero to begin with.


(cherry picked from commit b08017e91f5b4d175b5c9b1f6d68adb9effa3360)
This commit is contained in:
Derrick Brashear 2005-09-14 09:53:12 +00:00
parent 5ce32b2e0e
commit 05c6baad5e
4 changed files with 90 additions and 38 deletions

View File

@ -72,7 +72,7 @@ afs_mutex_exit(afs_kmutex_t * l)
int int
afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok) afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok)
{ {
int isAFSGlocked = ISAFS_GLOCK(); int seq, isAFSGlocked = ISAFS_GLOCK();
sigset_t saved_set; sigset_t saved_set;
#ifdef DECLARE_WAITQUEUE #ifdef DECLARE_WAITQUEUE
DECLARE_WAITQUEUE(wait, current); DECLARE_WAITQUEUE(wait, current);
@ -80,8 +80,10 @@ afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok)
struct wait_queue wait = { current, NULL }; struct wait_queue wait = { current, NULL };
#endif #endif
add_wait_queue(cv, &wait); seq = cv->seq;
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
add_wait_queue(&cv->waitq, &wait);
if (isAFSGlocked) if (isAFSGlocked)
AFS_GUNLOCK(); AFS_GUNLOCK();
@ -95,8 +97,13 @@ afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok)
SIG_UNLOCK(current); SIG_UNLOCK(current);
} }
schedule(); while(seq == cv->seq) {
remove_wait_queue(cv, &wait); schedule();
/* should we refrigerate? */
}
remove_wait_queue(&cv->waitq, &wait);
set_current_state(TASK_RUNNING);
if (!sigok) { if (!sigok) {
SIG_LOCK(current); SIG_LOCK(current);
@ -115,23 +122,30 @@ afs_cv_wait(afs_kcondvar_t * cv, afs_kmutex_t * l, int sigok)
void void
afs_cv_timedwait(afs_kcondvar_t * cv, afs_kmutex_t * l, int waittime) afs_cv_timedwait(afs_kcondvar_t * cv, afs_kmutex_t * l, int waittime)
{ {
int isAFSGlocked = ISAFS_GLOCK(); int seq, isAFSGlocked = ISAFS_GLOCK();
long t = waittime * HZ / 1000; long t = waittime * HZ / 1000;
#ifdef DECLARE_WAITQUEUE #ifdef DECLARE_WAITQUEUE
DECLARE_WAITQUEUE(wait, current); DECLARE_WAITQUEUE(wait, current);
#else #else
struct wait_queue wait = { current, NULL }; struct wait_queue wait = { current, NULL };
#endif #endif
seq = cv->seq;
add_wait_queue(cv, &wait);
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
add_wait_queue(&cv->waitq, &wait);
if (isAFSGlocked) if (isAFSGlocked)
AFS_GUNLOCK(); AFS_GUNLOCK();
MUTEX_EXIT(l); MUTEX_EXIT(l);
t = schedule_timeout(t); while(seq == cv->seq) {
remove_wait_queue(cv, &wait); t = schedule_timeout(t);
if (!t) /* timeout */
break;
}
remove_wait_queue(&cv->waitq, &wait);
set_current_state(TASK_RUNNING);
if (isAFSGlocked) if (isAFSGlocked)
AFS_GLOCK(); AFS_GLOCK();

View File

@ -43,11 +43,14 @@ typedef struct afs_kmutex {
#define set_current_state(X) current->state=X #define set_current_state(X) current->state=X
#endif #endif
typedef struct afs_kcondvar {
int seq;
#if defined(AFS_LINUX24_ENV) #if defined(AFS_LINUX24_ENV)
typedef wait_queue_head_t afs_kcondvar_t; wait_queue_head_t waitq;
#else #else
typedef struct wait_queue *afs_kcondvar_t; struct wait_queue *waitq;
#endif #endif
} afs_kcondvar_t;
static inline int static inline int
MUTEX_ISMINE(afs_kmutex_t * l) MUTEX_ISMINE(afs_kmutex_t * l)
@ -62,7 +65,7 @@ MUTEX_ISMINE(afs_kmutex_t * l)
#define MUTEX_EXIT afs_mutex_exit #define MUTEX_EXIT afs_mutex_exit
#if defined(AFS_LINUX24_ENV) #if defined(AFS_LINUX24_ENV)
#define CV_INIT(cv,b,c,d) init_waitqueue_head((wait_queue_head_t *)(cv)) #define CV_INIT(cv,b,c,d) do { (cv)->seq = 0; init_waitqueue_head(&(cv)->waitq); } while (0)
#else #else
#define CV_INIT(cv,b,c,d) init_waitqueue((struct wait_queue**)(cv)) #define CV_INIT(cv,b,c,d) init_waitqueue((struct wait_queue**)(cv))
#endif #endif
@ -71,12 +74,11 @@ MUTEX_ISMINE(afs_kmutex_t * l)
#define CV_WAIT(cv, m) afs_cv_wait(cv, m, 0) #define CV_WAIT(cv, m) afs_cv_wait(cv, m, 0)
#define CV_TIMEDWAIT afs_cv_timedwait #define CV_TIMEDWAIT afs_cv_timedwait
#define CV_SIGNAL(cv) do { ++(cv)->seq; wake_up(&(cv)->waitq); } while (0)
#if defined(AFS_LINUX24_ENV) #if defined(AFS_LINUX24_ENV)
#define CV_SIGNAL(cv) wake_up((wait_queue_head_t *)cv) #define CV_BROADCAST(cv) do { ++(cv)->seq; wake_up_all(&(cv)->waitq); } while (0)
#define CV_BROADCAST(cv) wake_up((wait_queue_head_t *)cv)
#else #else
#define CV_SIGNAL(cv) wake_up((struct wait_queue**)cv) #define CV_BROADCAST(cv) do { ++(cv)->seq; wake_up(&(cv)->waitq); } while (0)
#define CV_BROADCAST(cv) wake_up((struct wait_queue**)cv)
#endif #endif
#endif /* RX_KMUTEX_H_ */ #endif /* RX_KMUTEX_H_ */

View File

@ -1170,11 +1170,17 @@ rx_NewCall(register struct rx_connection *conn)
MUTEX_ENTER(&call->lock); MUTEX_ENTER(&call->lock);
while (call->flags & RX_CALL_TQ_BUSY) { while (call->flags & RX_CALL_TQ_BUSY) {
call->flags |= RX_CALL_TQ_WAIT; call->flags |= RX_CALL_TQ_WAIT;
call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS #ifdef RX_ENABLE_LOCKS
osirx_AssertMine(&call->lock, "rxi_Start lock4");
CV_WAIT(&call->cv_tq, &call->lock); CV_WAIT(&call->cv_tq, &call->lock);
#else /* RX_ENABLE_LOCKS */ #else /* RX_ENABLE_LOCKS */
osi_rxSleep(&call->tq); osi_rxSleep(&call->tq);
#endif /* RX_ENABLE_LOCKS */ #endif /* RX_ENABLE_LOCKS */
call->tqWaiters--;
if (call->tqWaiters == 0) {
call->flags &= ~RX_CALL_TQ_WAIT;
}
} }
if (call->flags & RX_CALL_TQ_CLEARME) { if (call->flags & RX_CALL_TQ_CLEARME) {
rxi_ClearTransmitQueue(call, 0); rxi_ClearTransmitQueue(call, 0);
@ -2638,11 +2644,16 @@ rxi_ReceivePacket(register struct rx_packet *np, osi_socket socket,
while ((call->state == RX_STATE_ACTIVE) while ((call->state == RX_STATE_ACTIVE)
&& (call->flags & RX_CALL_TQ_BUSY)) { && (call->flags & RX_CALL_TQ_BUSY)) {
call->flags |= RX_CALL_TQ_WAIT; call->flags |= RX_CALL_TQ_WAIT;
call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS #ifdef RX_ENABLE_LOCKS
osirx_AssertMine(&call->lock, "rxi_Start lock3");
CV_WAIT(&call->cv_tq, &call->lock); CV_WAIT(&call->cv_tq, &call->lock);
#else /* RX_ENABLE_LOCKS */ #else /* RX_ENABLE_LOCKS */
osi_rxSleep(&call->tq); osi_rxSleep(&call->tq);
#endif /* RX_ENABLE_LOCKS */ #endif /* RX_ENABLE_LOCKS */
call->tqWaiters--;
if (call->tqWaiters == 0)
call->flags &= ~RX_CALL_TQ_WAIT;
} }
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */ #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
/* If the new call cannot be taken right now send a busy and set /* If the new call cannot be taken right now send a busy and set
@ -3802,11 +3813,16 @@ rxi_ReceiveAckPacket(register struct rx_call *call, struct rx_packet *np,
call->flags |= RX_CALL_FAST_RECOVER_WAIT; call->flags |= RX_CALL_FAST_RECOVER_WAIT;
while (call->flags & RX_CALL_TQ_BUSY) { while (call->flags & RX_CALL_TQ_BUSY) {
call->flags |= RX_CALL_TQ_WAIT; call->flags |= RX_CALL_TQ_WAIT;
call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS #ifdef RX_ENABLE_LOCKS
osirx_AssertMine(&call->lock, "rxi_Start lock2");
CV_WAIT(&call->cv_tq, &call->lock); CV_WAIT(&call->cv_tq, &call->lock);
#else /* RX_ENABLE_LOCKS */ #else /* RX_ENABLE_LOCKS */
osi_rxSleep(&call->tq); osi_rxSleep(&call->tq);
#endif /* RX_ENABLE_LOCKS */ #endif /* RX_ENABLE_LOCKS */
call->tqWaiters--;
if (call->tqWaiters == 0)
call->flags &= ~RX_CALL_TQ_WAIT;
} }
MUTEX_ENTER(&peer->peer_lock); MUTEX_ENTER(&peer->peer_lock);
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */ #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
@ -4340,7 +4356,7 @@ rxi_CallError(register struct rx_call *call, afs_int32 error)
if (call->error) if (call->error)
error = call->error; error = call->error;
#ifdef RX_GLOBAL_RXLOCK_KERNEL #ifdef RX_GLOBAL_RXLOCK_KERNEL
if (!(call->flags & RX_CALL_TQ_BUSY)) { if (!((call->flags & RX_CALL_TQ_BUSY) || (call->tqWaiters > 0))) {
rxi_ResetCall(call, 0); rxi_ResetCall(call, 0);
} }
#else #else
@ -4416,7 +4432,7 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
flags = call->flags; flags = call->flags;
rxi_ClearReceiveQueue(call); rxi_ClearReceiveQueue(call);
#ifdef AFS_GLOBAL_RXLOCK_KERNEL #ifdef AFS_GLOBAL_RXLOCK_KERNEL
if (call->flags & RX_CALL_TQ_BUSY) { if (flags & RX_CALL_TQ_BUSY) {
call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY; call->flags = RX_CALL_TQ_CLEARME | RX_CALL_TQ_BUSY;
call->flags |= (flags & RX_CALL_TQ_WAIT); call->flags |= (flags & RX_CALL_TQ_WAIT);
} else } else
@ -4424,7 +4440,18 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
{ {
rxi_ClearTransmitQueue(call, 0); rxi_ClearTransmitQueue(call, 0);
queue_Init(&call->tq); queue_Init(&call->tq);
if (call->tqWaiters || (flags & RX_CALL_TQ_WAIT)) {
dpf(("rcall %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
}
call->flags = 0; call->flags = 0;
while (call->tqWaiters) {
#ifdef RX_ENABLE_LOCKS
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
#endif /* RX_ENABLE_LOCKS */
call->tqWaiters--;
}
} }
queue_Init(&call->rq); queue_Init(&call->rq);
call->error = 0; call->error = 0;
@ -4977,11 +5004,16 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
call->flags |= RX_CALL_FAST_RECOVER_WAIT; call->flags |= RX_CALL_FAST_RECOVER_WAIT;
while (call->flags & RX_CALL_TQ_BUSY) { while (call->flags & RX_CALL_TQ_BUSY) {
call->flags |= RX_CALL_TQ_WAIT; call->flags |= RX_CALL_TQ_WAIT;
call->tqWaiters++;
#ifdef RX_ENABLE_LOCKS #ifdef RX_ENABLE_LOCKS
osirx_AssertMine(&call->lock, "rxi_Start lock1");
CV_WAIT(&call->cv_tq, &call->lock); CV_WAIT(&call->cv_tq, &call->lock);
#else /* RX_ENABLE_LOCKS */ #else /* RX_ENABLE_LOCKS */
osi_rxSleep(&call->tq); osi_rxSleep(&call->tq);
#endif /* RX_ENABLE_LOCKS */ #endif /* RX_ENABLE_LOCKS */
call->tqWaiters--;
if (call->tqWaiters == 0)
call->flags &= ~RX_CALL_TQ_WAIT;
} }
#endif /* AFS_GLOBAL_RXLOCK_KERNEL */ #endif /* AFS_GLOBAL_RXLOCK_KERNEL */
call->flags &= ~RX_CALL_FAST_RECOVER_WAIT; call->flags &= ~RX_CALL_FAST_RECOVER_WAIT;
@ -5137,14 +5169,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
*/ */
if (call->flags & RX_CALL_FAST_RECOVER_WAIT) { if (call->flags & RX_CALL_FAST_RECOVER_WAIT) {
call->flags &= ~RX_CALL_TQ_BUSY; call->flags &= ~RX_CALL_TQ_BUSY;
if (call->flags & RX_CALL_TQ_WAIT) { if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
call->flags &= ~RX_CALL_TQ_WAIT; dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
#endif /* RX_ENABLE_LOCKS */
} }
#ifdef RX_ENABLE_LOCKS
osirx_AssertMine(&call->lock, "rxi_Start start");
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
#endif /* RX_ENABLE_LOCKS */
return; return;
} }
if (call->error) { if (call->error) {
@ -5156,14 +5189,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
rx_tq_debug.rxi_start_aborted++; rx_tq_debug.rxi_start_aborted++;
MUTEX_EXIT(&rx_stats_mutex); MUTEX_EXIT(&rx_stats_mutex);
call->flags &= ~RX_CALL_TQ_BUSY; call->flags &= ~RX_CALL_TQ_BUSY;
if (call->flags & RX_CALL_TQ_WAIT) { if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
call->flags &= ~RX_CALL_TQ_WAIT; dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
#endif /* RX_ENABLE_LOCKS */
} }
#ifdef RX_ENABLE_LOCKS
osirx_AssertMine(&call->lock, "rxi_Start middle");
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
#endif /* RX_ENABLE_LOCKS */
rxi_CallError(call, call->error); rxi_CallError(call, call->error);
return; return;
} }
@ -5243,14 +5277,15 @@ rxi_Start(struct rxevent *event, register struct rx_call *call,
* protected by the global lock. * protected by the global lock.
*/ */
call->flags &= ~RX_CALL_TQ_BUSY; call->flags &= ~RX_CALL_TQ_BUSY;
if (call->flags & RX_CALL_TQ_WAIT) { if (call->tqWaiters || (call->flags & RX_CALL_TQ_WAIT)) {
call->flags &= ~RX_CALL_TQ_WAIT; dpf(("call %x has %d waiters and flags %d\n", call, call->tqWaiters, call->flags));
#ifdef RX_ENABLE_LOCKS
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
#endif /* RX_ENABLE_LOCKS */
} }
#ifdef RX_ENABLE_LOCKS
osirx_AssertMine(&call->lock, "rxi_Start end");
CV_BROADCAST(&call->cv_tq);
#else /* RX_ENABLE_LOCKS */
osi_rxWakeup(&call->tq);
#endif /* RX_ENABLE_LOCKS */
} else { } else {
call->flags |= RX_CALL_NEED_START; call->flags |= RX_CALL_NEED_START;
} }

View File

@ -540,6 +540,7 @@ struct rx_call {
struct clock startTime; /* time call was started */ struct clock startTime; /* time call was started */
afs_hyper_t bytesSent; /* Number bytes sent */ afs_hyper_t bytesSent; /* Number bytes sent */
afs_hyper_t bytesRcvd; /* Number bytes received */ afs_hyper_t bytesRcvd; /* Number bytes received */
u_short tqWaiters;
}; };
#ifndef KDUMP_RX_LOCK #ifndef KDUMP_RX_LOCK