DEVEL15-rx-finer-grained-locking-20091010

LICENSE MIT

not everything should be under the rx_stats_mutex.  doing so
results in too much lock contention.  add new mutexes:
rx_quota_mutex, rx_waiting_mutex, rx_pthread_mutex, and rx_packets_mutex.
Each new mutex protects an associated group of variables.
This commit is contained in:
Jeffrey Altman 2009-01-11 04:52:00 +00:00 committed by Jeffrey Altman
parent 3698203264
commit 449a759be2
5 changed files with 102 additions and 53 deletions

View File

@ -163,6 +163,10 @@ static unsigned int rxi_rpc_process_stat_cnt;
*/
extern pthread_mutex_t rx_stats_mutex;
extern pthread_mutex_t rx_waiting_mutex;
extern pthread_mutex_t rx_quota_mutex;
extern pthread_mutex_t rx_pthread_mutex;
extern pthread_mutex_t rx_packets_mutex;
extern pthread_mutex_t des_init_mutex;
extern pthread_mutex_t des_random_mutex;
extern pthread_mutex_t rx_clock_mutex;
@ -191,6 +195,14 @@ rxi_InitPthread(void)
== 0);
assert(pthread_mutex_init(&rx_stats_mutex, (const pthread_mutexattr_t *)0)
== 0);
assert(pthread_mutex_init(&rx_waiting_mutex, (const pthread_mutexattr_t *)0)
== 0);
assert(pthread_mutex_init(&rx_quota_mutex, (const pthread_mutexattr_t *)0)
== 0);
assert(pthread_mutex_init(&rx_pthread_mutex, (const pthread_mutexattr_t *)0)
== 0);
assert(pthread_mutex_init(&rx_packets_mutex, (const pthread_mutexattr_t *)0)
== 0);
assert(pthread_mutex_init
(&rxi_connCacheMutex, (const pthread_mutexattr_t *)0) == 0);
assert(pthread_mutex_init(&rx_init_mutex, (const pthread_mutexattr_t *)0)
@ -254,19 +266,40 @@ pthread_once_t rx_once_init = PTHREAD_ONCE_INIT;
assert(pthread_once(&rx_once_init, rxi_InitPthread)==0)
/*
* The rx_stats_mutex mutex protects the following global variables:
* rxi_dataQuota
* rxi_minDeficit
* rxi_availProcs
* rxi_totalMin
* rxi_lowConnRefCount
* rxi_lowPeerRefCount
* rxi_nCalls
* rxi_Alloccnt
* rxi_Allocsize
* rx_nFreePackets
* rx_tq_debug
* rx_stats
*/
/*
* The rx_quota_mutex mutex protects the following global variables:
* rxi_dataQuota
* rxi_minDeficit
* rxi_availProcs
* rxi_totalMin
*/
/*
* The rx_freePktQ_lock protects the following global variables:
* rx_nFreePackets
*/
/*
* The rx_packets_mutex mutex protects the following global variables:
* rx_nPackets
* rx_TSFPQLocalMax
* rx_TSFPQGlobSize
* rx_TSFPQMaxProcs
*/
/*
* The rx_pthread_mutex mutex protects the following global variables:
* rxi_pthread_hinum
*/
#else
#define INIT_PTHREAD_LOCKS
#endif
@ -459,6 +492,10 @@ rx_InitHost(u_int host, u_int port)
rxdb_init();
#endif /* RX_LOCKS_DB */
MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_waiting_mutex, "rx_waiting_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_quota_mutex, "rx_quota_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_pthread_mutex, "rx_pthread_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_packets_mutex, "rx_packets_mutex", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats", MUTEX_DEFAULT, 0);
MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock", MUTEX_DEFAULT, 0);
MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock", MUTEX_DEFAULT, 0);
@ -540,9 +577,9 @@ rx_InitHost(u_int host, u_int port)
rx_SetEpoch(tv.tv_sec); /* Start time of this package, rxkad
* will provide a randomer value. */
#endif
MUTEX_ENTER(&rx_stats_mutex);
rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_ENTER(&rx_quota_mutex);
rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */
MUTEX_EXIT(&rx_quota_mutex);
/* *Slightly* random start time for the cid. This is just to help
* out with the hashing function at the peer */
rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT);
@ -604,7 +641,8 @@ QuotaOK(register struct rx_service *aservice)
/* otherwise, can use only if there are enough to allow everyone
* to go to their min quota after this guy starts.
*/
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_quota_mutex);
if ((aservice->nRequestsRunning < aservice->minProcs)
|| (rxi_availProcs > rxi_minDeficit)) {
aservice->nRequestsRunning++;
@ -613,10 +651,10 @@ QuotaOK(register struct rx_service *aservice)
if (aservice->nRequestsRunning <= aservice->minProcs)
rxi_minDeficit--;
rxi_availProcs--;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_quota_mutex);
return 1;
}
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_quota_mutex);
return 0;
}
@ -625,11 +663,11 @@ static void
ReturnToServerPool(register struct rx_service *aservice)
{
aservice->nRequestsRunning--;
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_quota_mutex);
if (aservice->nRequestsRunning < aservice->minProcs)
rxi_minDeficit++;
rxi_availProcs++;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_quota_mutex);
}
#else /* RX_ENABLE_LOCKS */
@ -730,13 +768,13 @@ rx_StartServer(int donateMe)
service = rx_services[i];
if (service == (struct rx_service *)0)
break;
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_quota_mutex);
rxi_totalMin += service->minProcs;
/* below works even if a thread is running, since minDeficit would
* still have been decremented and later re-incremented.
*/
rxi_minDeficit += service->minProcs;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_quota_mutex);
}
/* Turn on reaping of idle server connections */
@ -1631,9 +1669,9 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
if (call->flags & RX_CALL_WAIT_PROC) {
call->flags &= ~RX_CALL_WAIT_PROC;
MUTEX_ENTER(&rx_stats_mutex);
rx_nWaiting--;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_ENTER(&rx_waiting_mutex);
rx_nWaiting--;
MUTEX_EXIT(&rx_waiting_mutex);
}
if (call->state != RX_STATE_PRECALL || call->error) {
@ -3063,7 +3101,8 @@ static int
TooLow(struct rx_packet *ap, struct rx_call *acall)
{
int rc = 0;
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_quota_mutex);
if (((ap->header.seq != 1) && (acall->flags & RX_CALL_CLEARED)
&& (acall->state == RX_STATE_PRECALL))
|| ((rx_nFreePackets < rxi_dataQuota + 2)
@ -3071,7 +3110,7 @@ TooLow(struct rx_packet *ap, struct rx_call *acall)
&& (acall->flags & RX_CALL_READER_WAIT)))) {
rc = 1;
}
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_quota_mutex);
return rc;
}
#endif /* KERNEL */
@ -4199,10 +4238,10 @@ rxi_AttachServerProc(register struct rx_call *call,
if (!(call->flags & RX_CALL_WAIT_PROC)) {
call->flags |= RX_CALL_WAIT_PROC;
MUTEX_ENTER(&rx_stats_mutex);
rx_nWaiting++;
rx_nWaited++;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_ENTER(&rx_waiting_mutex);
rx_nWaiting++;
rx_nWaited++;
MUTEX_EXIT(&rx_waiting_mutex);
rxi_calltrace(RX_CALL_ARRIVAL, call);
SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock);
queue_Append(&rx_incomingCallQueue, call);
@ -4229,9 +4268,10 @@ rxi_AttachServerProc(register struct rx_call *call,
call->flags &= ~RX_CALL_WAIT_PROC;
if (queue_IsOnQueue(call)) {
queue_Remove(call);
MUTEX_ENTER(&rx_stats_mutex);
rx_nWaiting--;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_ENTER(&rx_waiting_mutex);
rx_nWaiting--;
MUTEX_EXIT(&rx_waiting_mutex);
}
}
call->state = RX_STATE_ACTIVE;
@ -4716,9 +4756,10 @@ rxi_ResetCall(register struct rx_call *call, register int newcall)
if (queue_IsOnQueue(call)) {
queue_Remove(call);
if (flags & RX_CALL_WAIT_PROC) {
MUTEX_ENTER(&rx_stats_mutex);
rx_nWaiting--;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_ENTER(&rx_waiting_mutex);
rx_nWaiting--;
MUTEX_EXIT(&rx_waiting_mutex);
}
}
MUTEX_EXIT(call->call_queue_lock);
@ -6981,11 +7022,10 @@ shutdown_rx(void)
rxi_FreeAllPackets();
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_quota_mutex);
rxi_dataQuota = RX_MAX_QUOTA;
rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_quota_mutex);
rxinit_status = 1;
UNLOCK_RX_INIT;
}

View File

@ -265,6 +265,7 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to
* by each call to AllocPacketBufs() will increase indefinitely without a cap on the transfer
* glob size. A cap of 64 is selected because that will produce an allocation of greater than
* three times that amount which is greater than half of ncalls * maxReceiveWindow.
* Must be called under rx_packets_mutex.
*/
#define RX_TS_FPQ_COMPUTE_LIMITS \
do { \
@ -306,9 +307,9 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to
(rx_ts_info_p)->_FPQ.ltog_ops++; \
(rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \
if ((rx_ts_info_p)->_FPQ.delta) { \
MUTEX_ENTER(&rx_stats_mutex); \
MUTEX_ENTER(&rx_packets_mutex); \
RX_TS_FPQ_COMPUTE_LIMITS; \
MUTEX_EXIT(&rx_stats_mutex); \
MUTEX_EXIT(&rx_packets_mutex); \
(rx_ts_info_p)->_FPQ.delta = 0; \
} \
} while(0)
@ -326,9 +327,9 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to
(rx_ts_info_p)->_FPQ.ltog_ops++; \
(rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \
if ((rx_ts_info_p)->_FPQ.delta) { \
MUTEX_ENTER(&rx_stats_mutex); \
MUTEX_ENTER(&rx_packets_mutex); \
RX_TS_FPQ_COMPUTE_LIMITS; \
MUTEX_EXIT(&rx_stats_mutex); \
MUTEX_EXIT(&rx_packets_mutex); \
(rx_ts_info_p)->_FPQ.delta = 0; \
} \
} while(0)
@ -599,14 +600,18 @@ EXT int rxi_callAbortDelay GLOBALSINIT(3000);
EXT int rxi_fcfs_thread_num GLOBALSINIT(0);
EXT pthread_key_t rx_thread_id_key;
/* keep track of pthread numbers - protected by rx_stats_mutex,
except in rx_Init() before mutex exists! */
* except in rx_Init() before mutex exists! */
EXT int rxi_pthread_hinum GLOBALSINIT(0);
#else
#define rxi_fcfs_thread_num (0)
#endif
#if defined(RX_ENABLE_LOCKS)
EXT afs_kmutex_t rx_stats_mutex; /* used to activate stats gathering */
EXT afs_kmutex_t rx_stats_mutex; /* used to protect stats gathering */
EXT afs_kmutex_t rx_waiting_mutex; /* used to protect waiting counters */
EXT afs_kmutex_t rx_quota_mutex; /* used to protect quota counters */
EXT afs_kmutex_t rx_pthread_mutex; /* used to protect pthread counters */
EXT afs_kmutex_t rx_packets_mutex; /* used to protect packet counters */
#endif
EXT2 int rx_enable_stats GLOBALSINIT(0);

View File

@ -289,6 +289,7 @@ rx_ServerProc(void *unused)
{
int threadID;
/* jaltman - rxi_dataQuota is protected by a mutex everywhere else */
rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
/* threadID is used for making decisions in GetCall. Get it by bumping

View File

@ -552,10 +552,11 @@ rxi_MorePackets(int apackets)
RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets);
/* TSFPQ patch also needs to keep track of total packets */
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_packets_mutex);
rx_nPackets += apackets;
RX_TS_FPQ_COMPUTE_LIMITS;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_packets_mutex);
for (e = p + apackets; p < e; p++) {
RX_PACKET_IOV_INIT(p);
@ -644,10 +645,10 @@ rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local)
RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets);
/* TSFPQ patch also needs to keep track of total packets */
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_packets_mutex);
rx_nPackets += apackets;
RX_TS_FPQ_COMPUTE_LIMITS;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_packets_mutex);
for (e = p + apackets; p < e; p++) {
RX_PACKET_IOV_INIT(p);
@ -727,10 +728,10 @@ rxi_MorePacketsNoLock(int apackets)
rx_nFreePackets += apackets;
#ifdef RX_ENABLE_TSFPQ
/* TSFPQ patch also needs to keep track of total packets */
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_packets_mutex);
rx_nPackets += apackets;
RX_TS_FPQ_COMPUTE_LIMITS;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_packets_mutex);
#endif /* RX_ENABLE_TSFPQ */
rxi_NeedMorePackets = FALSE;
rxi_PacketsUnWait();

View File

@ -287,7 +287,7 @@ rx_ServerProc(void * dummy)
struct rx_call *newcall = NULL;
rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_quota_mutex);
rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
/* threadID is used for making decisions in GetCall. Get it by bumping
* number of threads handling incoming calls */
@ -302,11 +302,13 @@ rx_ServerProc(void * dummy)
* So either introduce yet another counter or flag the FCFS
* thread... chose the latter.
*/
MUTEX_ENTER(&rx_pthread_mutex);
threadID = ++rxi_pthread_hinum;
MUTEX_EXIT(&rx_pthread_mutex);
if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
rxi_fcfs_thread_num = threadID;
++rxi_availProcs;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_quota_mutex);
while (1) {
sock = OSI_NULLSOCKET;
@ -357,9 +359,9 @@ rxi_StartListener(void)
dpf(("Unable to create Rx event handling thread\n"));
exit(1);
}
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_pthread_mutex);
++rxi_pthread_hinum;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_pthread_mutex);
AFS_SIGSET_RESTORE();
assert(pthread_mutex_lock(&listener_mutex) == 0);
@ -396,9 +398,9 @@ rxi_Listen(osi_socket sock)
dpf(("Unable to create socket listener thread\n"));
exit(1);
}
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_pthread_mutex);
++rxi_pthread_hinum;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_pthread_mutex);
AFS_SIGSET_RESTORE();
return 0;
}
@ -451,10 +453,10 @@ struct rx_ts_info_t * rx_ts_info_init() {
#ifdef RX_ENABLE_TSFPQ
queue_Init(&rx_ts_info->_FPQ);
MUTEX_ENTER(&rx_stats_mutex);
MUTEX_ENTER(&rx_packets_mutex);
rx_TSFPQMaxProcs++;
RX_TS_FPQ_COMPUTE_LIMITS;
MUTEX_EXIT(&rx_stats_mutex);
MUTEX_EXIT(&rx_packets_mutex);
#endif /* RX_ENABLE_TSFPQ */
return rx_ts_info;
}