From 449a759be2540a7961f03b5321af1f6b3ac318c6 Mon Sep 17 00:00:00 2001 From: Jeffrey Altman Date: Sun, 11 Jan 2009 04:52:00 +0000 Subject: [PATCH] DEVEL15-rx-finer-grained-locking-20091010 LICENSE MIT not everything should be under the rx_stats_mutex. doing so results in too much lock contention. add new mutexes: rx_quota_mutex, rx_waiting_mutex, rx_pthread_mutex, and rx_packets_mutex. Each new mutex protects an associated group of variables. --- src/rx/rx.c | 106 ++++++++++++++++++++++++++++++-------------- src/rx/rx_globals.h | 17 ++++--- src/rx/rx_kcommon.c | 1 + src/rx/rx_packet.c | 13 +++--- src/rx/rx_pthread.c | 18 ++++---- 5 files changed, 102 insertions(+), 53 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index 9957585657..d77ba0e9d7 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -163,6 +163,10 @@ static unsigned int rxi_rpc_process_stat_cnt; */ extern pthread_mutex_t rx_stats_mutex; +extern pthread_mutex_t rx_waiting_mutex; +extern pthread_mutex_t rx_quota_mutex; +extern pthread_mutex_t rx_pthread_mutex; +extern pthread_mutex_t rx_packets_mutex; extern pthread_mutex_t des_init_mutex; extern pthread_mutex_t des_random_mutex; extern pthread_mutex_t rx_clock_mutex; @@ -191,6 +195,14 @@ rxi_InitPthread(void) == 0); assert(pthread_mutex_init(&rx_stats_mutex, (const pthread_mutexattr_t *)0) == 0); + assert(pthread_mutex_init(&rx_waiting_mutex, (const pthread_mutexattr_t *)0) + == 0); + assert(pthread_mutex_init(&rx_quota_mutex, (const pthread_mutexattr_t *)0) + == 0); + assert(pthread_mutex_init(&rx_pthread_mutex, (const pthread_mutexattr_t *)0) + == 0); + assert(pthread_mutex_init(&rx_packets_mutex, (const pthread_mutexattr_t *)0) + == 0); assert(pthread_mutex_init (&rxi_connCacheMutex, (const pthread_mutexattr_t *)0) == 0); assert(pthread_mutex_init(&rx_init_mutex, (const pthread_mutexattr_t *)0) @@ -254,19 +266,40 @@ pthread_once_t rx_once_init = PTHREAD_ONCE_INIT; assert(pthread_once(&rx_once_init, rxi_InitPthread)==0) /* * The rx_stats_mutex mutex protects the following global variables: - * rxi_dataQuota - * rxi_minDeficit - * rxi_availProcs - * rxi_totalMin * rxi_lowConnRefCount * rxi_lowPeerRefCount * rxi_nCalls * rxi_Alloccnt * rxi_Allocsize - * rx_nFreePackets * rx_tq_debug * rx_stats */ + +/* + * The rx_quota_mutex mutex protects the following global variables: + * rxi_dataQuota + * rxi_minDeficit + * rxi_availProcs + * rxi_totalMin + */ + +/* + * The rx_freePktQ_lock protects the following global variables: + * rx_nFreePackets + */ + +/* + * The rx_packets_mutex mutex protects the following global variables: + * rx_nPackets + * rx_TSFPQLocalMax + * rx_TSFPQGlobSize + * rx_TSFPQMaxProcs + */ + +/* + * The rx_pthread_mutex mutex protects the following global variables: + * rxi_pthread_hinum + */ #else #define INIT_PTHREAD_LOCKS #endif @@ -459,6 +492,10 @@ rx_InitHost(u_int host, u_int port) rxdb_init(); #endif /* RX_LOCKS_DB */ MUTEX_INIT(&rx_stats_mutex, "rx_stats_mutex", MUTEX_DEFAULT, 0); + MUTEX_INIT(&rx_waiting_mutex, "rx_waiting_mutex", MUTEX_DEFAULT, 0); + MUTEX_INIT(&rx_quota_mutex, "rx_quota_mutex", MUTEX_DEFAULT, 0); + MUTEX_INIT(&rx_pthread_mutex, "rx_pthread_mutex", MUTEX_DEFAULT, 0); + MUTEX_INIT(&rx_packets_mutex, "rx_packets_mutex", MUTEX_DEFAULT, 0); MUTEX_INIT(&rx_rpc_stats, "rx_rpc_stats", MUTEX_DEFAULT, 0); MUTEX_INIT(&rx_freePktQ_lock, "rx_freePktQ_lock", MUTEX_DEFAULT, 0); MUTEX_INIT(&freeSQEList_lock, "freeSQEList lock", MUTEX_DEFAULT, 0); @@ -540,9 +577,9 @@ rx_InitHost(u_int host, u_int port) rx_SetEpoch(tv.tv_sec); /* Start time of this package, rxkad * will provide a randomer value. */ #endif - MUTEX_ENTER(&rx_stats_mutex); - rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */ - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_ENTER(&rx_quota_mutex); + rxi_dataQuota += rx_extraQuota; /* + extra pkts caller asked to rsrv */ + MUTEX_EXIT(&rx_quota_mutex); /* *Slightly* random start time for the cid. This is just to help * out with the hashing function at the peer */ rx_nextCid = ((tv.tv_sec ^ tv.tv_usec) << RX_CIDSHIFT); @@ -604,7 +641,8 @@ QuotaOK(register struct rx_service *aservice) /* otherwise, can use only if there are enough to allow everyone * to go to their min quota after this guy starts. */ - MUTEX_ENTER(&rx_stats_mutex); + + MUTEX_ENTER(&rx_quota_mutex); if ((aservice->nRequestsRunning < aservice->minProcs) || (rxi_availProcs > rxi_minDeficit)) { aservice->nRequestsRunning++; @@ -613,10 +651,10 @@ QuotaOK(register struct rx_service *aservice) if (aservice->nRequestsRunning <= aservice->minProcs) rxi_minDeficit--; rxi_availProcs--; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); return 1; } - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); return 0; } @@ -625,11 +663,11 @@ static void ReturnToServerPool(register struct rx_service *aservice) { aservice->nRequestsRunning--; - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_quota_mutex); if (aservice->nRequestsRunning < aservice->minProcs) rxi_minDeficit++; rxi_availProcs++; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); } #else /* RX_ENABLE_LOCKS */ @@ -730,13 +768,13 @@ rx_StartServer(int donateMe) service = rx_services[i]; if (service == (struct rx_service *)0) break; - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_quota_mutex); rxi_totalMin += service->minProcs; /* below works even if a thread is running, since minDeficit would * still have been decremented and later re-incremented. */ rxi_minDeficit += service->minProcs; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); } /* Turn on reaping of idle server connections */ @@ -1631,9 +1669,9 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp) if (call->flags & RX_CALL_WAIT_PROC) { call->flags &= ~RX_CALL_WAIT_PROC; - MUTEX_ENTER(&rx_stats_mutex); - rx_nWaiting--; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_ENTER(&rx_waiting_mutex); + rx_nWaiting--; + MUTEX_EXIT(&rx_waiting_mutex); } if (call->state != RX_STATE_PRECALL || call->error) { @@ -3063,7 +3101,8 @@ static int TooLow(struct rx_packet *ap, struct rx_call *acall) { int rc = 0; - MUTEX_ENTER(&rx_stats_mutex); + + MUTEX_ENTER(&rx_quota_mutex); if (((ap->header.seq != 1) && (acall->flags & RX_CALL_CLEARED) && (acall->state == RX_STATE_PRECALL)) || ((rx_nFreePackets < rxi_dataQuota + 2) @@ -3071,7 +3110,7 @@ TooLow(struct rx_packet *ap, struct rx_call *acall) && (acall->flags & RX_CALL_READER_WAIT)))) { rc = 1; } - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); return rc; } #endif /* KERNEL */ @@ -4199,10 +4238,10 @@ rxi_AttachServerProc(register struct rx_call *call, if (!(call->flags & RX_CALL_WAIT_PROC)) { call->flags |= RX_CALL_WAIT_PROC; - MUTEX_ENTER(&rx_stats_mutex); - rx_nWaiting++; - rx_nWaited++; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_ENTER(&rx_waiting_mutex); + rx_nWaiting++; + rx_nWaited++; + MUTEX_EXIT(&rx_waiting_mutex); rxi_calltrace(RX_CALL_ARRIVAL, call); SET_CALL_QUEUE_LOCK(call, &rx_serverPool_lock); queue_Append(&rx_incomingCallQueue, call); @@ -4229,9 +4268,10 @@ rxi_AttachServerProc(register struct rx_call *call, call->flags &= ~RX_CALL_WAIT_PROC; if (queue_IsOnQueue(call)) { queue_Remove(call); - MUTEX_ENTER(&rx_stats_mutex); - rx_nWaiting--; - MUTEX_EXIT(&rx_stats_mutex); + + MUTEX_ENTER(&rx_waiting_mutex); + rx_nWaiting--; + MUTEX_EXIT(&rx_waiting_mutex); } } call->state = RX_STATE_ACTIVE; @@ -4716,9 +4756,10 @@ rxi_ResetCall(register struct rx_call *call, register int newcall) if (queue_IsOnQueue(call)) { queue_Remove(call); if (flags & RX_CALL_WAIT_PROC) { - MUTEX_ENTER(&rx_stats_mutex); - rx_nWaiting--; - MUTEX_EXIT(&rx_stats_mutex); + + MUTEX_ENTER(&rx_waiting_mutex); + rx_nWaiting--; + MUTEX_EXIT(&rx_waiting_mutex); } } MUTEX_EXIT(call->call_queue_lock); @@ -6981,11 +7022,10 @@ shutdown_rx(void) rxi_FreeAllPackets(); - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_quota_mutex); rxi_dataQuota = RX_MAX_QUOTA; rxi_availProcs = rxi_totalMin = rxi_minDeficit = 0; - MUTEX_EXIT(&rx_stats_mutex); - + MUTEX_EXIT(&rx_quota_mutex); rxinit_status = 1; UNLOCK_RX_INIT; } diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index 9a90688d33..df27dd7d88 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -265,6 +265,7 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to * by each call to AllocPacketBufs() will increase indefinitely without a cap on the transfer * glob size. A cap of 64 is selected because that will produce an allocation of greater than * three times that amount which is greater than half of ncalls * maxReceiveWindow. + * Must be called under rx_packets_mutex. */ #define RX_TS_FPQ_COMPUTE_LIMITS \ do { \ @@ -306,9 +307,9 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to (rx_ts_info_p)->_FPQ.ltog_ops++; \ (rx_ts_info_p)->_FPQ.ltog_xfer += tsize; \ if ((rx_ts_info_p)->_FPQ.delta) { \ - MUTEX_ENTER(&rx_stats_mutex); \ + MUTEX_ENTER(&rx_packets_mutex); \ RX_TS_FPQ_COMPUTE_LIMITS; \ - MUTEX_EXIT(&rx_stats_mutex); \ + MUTEX_EXIT(&rx_packets_mutex); \ (rx_ts_info_p)->_FPQ.delta = 0; \ } \ } while(0) @@ -326,9 +327,9 @@ EXT void rxi_FlushLocalPacketsTSFPQ(void); /* flush all thread-local packets to (rx_ts_info_p)->_FPQ.ltog_ops++; \ (rx_ts_info_p)->_FPQ.ltog_xfer += (num_transfer); \ if ((rx_ts_info_p)->_FPQ.delta) { \ - MUTEX_ENTER(&rx_stats_mutex); \ + MUTEX_ENTER(&rx_packets_mutex); \ RX_TS_FPQ_COMPUTE_LIMITS; \ - MUTEX_EXIT(&rx_stats_mutex); \ + MUTEX_EXIT(&rx_packets_mutex); \ (rx_ts_info_p)->_FPQ.delta = 0; \ } \ } while(0) @@ -599,14 +600,18 @@ EXT int rxi_callAbortDelay GLOBALSINIT(3000); EXT int rxi_fcfs_thread_num GLOBALSINIT(0); EXT pthread_key_t rx_thread_id_key; /* keep track of pthread numbers - protected by rx_stats_mutex, - except in rx_Init() before mutex exists! */ + * except in rx_Init() before mutex exists! */ EXT int rxi_pthread_hinum GLOBALSINIT(0); #else #define rxi_fcfs_thread_num (0) #endif #if defined(RX_ENABLE_LOCKS) -EXT afs_kmutex_t rx_stats_mutex; /* used to activate stats gathering */ +EXT afs_kmutex_t rx_stats_mutex; /* used to protect stats gathering */ +EXT afs_kmutex_t rx_waiting_mutex; /* used to protect waiting counters */ +EXT afs_kmutex_t rx_quota_mutex; /* used to protect quota counters */ +EXT afs_kmutex_t rx_pthread_mutex; /* used to protect pthread counters */ +EXT afs_kmutex_t rx_packets_mutex; /* used to protect packet counters */ #endif EXT2 int rx_enable_stats GLOBALSINIT(0); diff --git a/src/rx/rx_kcommon.c b/src/rx/rx_kcommon.c index 72c8dbacb2..ea5d284371 100644 --- a/src/rx/rx_kcommon.c +++ b/src/rx/rx_kcommon.c @@ -289,6 +289,7 @@ rx_ServerProc(void *unused) { int threadID; +/* jaltman - rxi_dataQuota is protected by a mutex everywhere else */ rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */ rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */ /* threadID is used for making decisions in GetCall. Get it by bumping diff --git a/src/rx/rx_packet.c b/src/rx/rx_packet.c index 1113ba9e8c..536f95c377 100644 --- a/src/rx/rx_packet.c +++ b/src/rx/rx_packet.c @@ -552,10 +552,11 @@ rxi_MorePackets(int apackets) RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets); /* TSFPQ patch also needs to keep track of total packets */ - MUTEX_ENTER(&rx_stats_mutex); + + MUTEX_ENTER(&rx_packets_mutex); rx_nPackets += apackets; RX_TS_FPQ_COMPUTE_LIMITS; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_packets_mutex); for (e = p + apackets; p < e; p++) { RX_PACKET_IOV_INIT(p); @@ -644,10 +645,10 @@ rxi_MorePacketsTSFPQ(int apackets, int flush_global, int num_keep_local) RX_TS_FPQ_LOCAL_ALLOC(rx_ts_info,apackets); /* TSFPQ patch also needs to keep track of total packets */ - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_packets_mutex); rx_nPackets += apackets; RX_TS_FPQ_COMPUTE_LIMITS; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_packets_mutex); for (e = p + apackets; p < e; p++) { RX_PACKET_IOV_INIT(p); @@ -727,10 +728,10 @@ rxi_MorePacketsNoLock(int apackets) rx_nFreePackets += apackets; #ifdef RX_ENABLE_TSFPQ /* TSFPQ patch also needs to keep track of total packets */ - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_packets_mutex); rx_nPackets += apackets; RX_TS_FPQ_COMPUTE_LIMITS; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_packets_mutex); #endif /* RX_ENABLE_TSFPQ */ rxi_NeedMorePackets = FALSE; rxi_PacketsUnWait(); diff --git a/src/rx/rx_pthread.c b/src/rx/rx_pthread.c index 69857eac8e..cdb19576f4 100644 --- a/src/rx/rx_pthread.c +++ b/src/rx/rx_pthread.c @@ -287,7 +287,7 @@ rx_ServerProc(void * dummy) struct rx_call *newcall = NULL; rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */ - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_quota_mutex); rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */ /* threadID is used for making decisions in GetCall. Get it by bumping * number of threads handling incoming calls */ @@ -302,11 +302,13 @@ rx_ServerProc(void * dummy) * So either introduce yet another counter or flag the FCFS * thread... chose the latter. */ + MUTEX_ENTER(&rx_pthread_mutex); threadID = ++rxi_pthread_hinum; + MUTEX_EXIT(&rx_pthread_mutex); if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID) rxi_fcfs_thread_num = threadID; ++rxi_availProcs; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_quota_mutex); while (1) { sock = OSI_NULLSOCKET; @@ -357,9 +359,9 @@ rxi_StartListener(void) dpf(("Unable to create Rx event handling thread\n")); exit(1); } - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_pthread_mutex); ++rxi_pthread_hinum; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_pthread_mutex); AFS_SIGSET_RESTORE(); assert(pthread_mutex_lock(&listener_mutex) == 0); @@ -396,9 +398,9 @@ rxi_Listen(osi_socket sock) dpf(("Unable to create socket listener thread\n")); exit(1); } - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_pthread_mutex); ++rxi_pthread_hinum; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_pthread_mutex); AFS_SIGSET_RESTORE(); return 0; } @@ -451,10 +453,10 @@ struct rx_ts_info_t * rx_ts_info_init() { #ifdef RX_ENABLE_TSFPQ queue_Init(&rx_ts_info->_FPQ); - MUTEX_ENTER(&rx_stats_mutex); + MUTEX_ENTER(&rx_packets_mutex); rx_TSFPQMaxProcs++; RX_TS_FPQ_COMPUTE_LIMITS; - MUTEX_EXIT(&rx_stats_mutex); + MUTEX_EXIT(&rx_packets_mutex); #endif /* RX_ENABLE_TSFPQ */ return rx_ts_info; }