From 043c31bf8d81f5aba968f69c613b639e1d0e3ee7 Mon Sep 17 00:00:00 2001 From: Simon Wilkinson Date: Sun, 23 Oct 2011 21:21:39 +0100 Subject: [PATCH] rx: Use a red black tree for the event stack Instead of the current event stack, which uses a sorted linked list, use a red/black tree to maintain the timer stack. This dramatically improves event insertion times, at the expense of some additional implementation complexity. This change also adds reference counting to the rxevent structure. We've always had a race between an event being fired, and that event being simultaneously cancelled by the user thread. Reference counting avoids that race resulting in the structure appearing twice in the free list. Change-Id: Icbef6e04e01f3eef5b888bc3cb77b7a3d1be26ae Reviewed-on: http://gerrit.openafs.org/5841 Tested-by: BuildBot Tested-by: Jeffrey Altman Reviewed-by: Jeffrey Altman --- Makefile.in | 1 + configure.ac | 1 + src/afsmonitor/Makefile.in | 1 + src/aklog/Makefile.in | 1 + src/libadmin/samples/Makefile.in | 1 + src/libadmin/test/Makefile.in | 1 + src/libuafs/Makefile.common.in | 8 +- src/rx/rx.c | 51 +- src/rx/rx.h | 1 + src/rx/rx_clock.h | 5 + src/rx/rx_event.c | 781 +++++++++++++++---------------- src/rx/rx_event.h | 93 +--- src/rx/rx_packet.h | 3 + src/rxdebug/Makefile.in | 8 +- src/shlibafsrpc/Makefile.in | 6 +- src/tbutc/Makefile.in | 1 + src/xstat/Makefile.in | 1 + tests/Makefile.in | 2 +- tests/TESTS | 1 + tests/auth/Makefile.in | 1 + tests/rx/Makefile.in | 24 + tests/rx/event-t.c | 178 +++++++ tests/volser/Makefile.in | 1 + 23 files changed, 675 insertions(+), 496 deletions(-) create mode 100644 tests/rx/Makefile.in create mode 100644 tests/rx/event-t.c diff --git a/Makefile.in b/Makefile.in index 626666b960..535baf2e3a 100644 --- a/Makefile.in +++ b/Makefile.in @@ -941,6 +941,7 @@ distclean: clean tests/auth/Makefile \ tests/cmd/Makefile \ tests/common/Makefile \ + tests/rx/Makefile \ tests/opr/Makefile \ tests/util/Makefile \ tests/volser/Makefile \ diff --git a/configure.ac b/configure.ac index 5c539062dd..588b50a1ae 100644 --- a/configure.ac +++ b/configure.ac @@ -257,6 +257,7 @@ tests/cmd/Makefile \ tests/common/Makefile \ tests/opr/Makefile \ tests/rpctestlib/Makefile \ +tests/rx/Makefile \ tests/tap/Makefile \ tests/util/Makefile \ tests/volser/Makefile, diff --git a/src/afsmonitor/Makefile.in b/src/afsmonitor/Makefile.in index 129b5dea6b..b8cd117884 100644 --- a/src/afsmonitor/Makefile.in +++ b/src/afsmonitor/Makefile.in @@ -34,6 +34,7 @@ LIBS=${TOP_LIBDIR}/libxstat_fs.a \ ${TOP_LIBDIR}/librx.a \ ${TOP_LIBDIR}/liblwp.a \ ${TOP_LIBDIR}/libsys.a \ + $(TOP_LIBDIR)/libopr.a \ ${TOP_LIBDIR}/util.a EXTRA_LIBS=${LIB_curses} ${XLIBS} diff --git a/src/aklog/Makefile.in b/src/aklog/Makefile.in index a8566707cd..01eb1df0ec 100644 --- a/src/aklog/Makefile.in +++ b/src/aklog/Makefile.in @@ -14,6 +14,7 @@ AFSLIBS= ${TOP_LIBDIR}/libafsauthent.a \ ${TOP_LIBDIR}/libafsrpc.a \ ${TOP_LIBDIR}/libafshcrypto.a \ ${TOP_LIBDIR}/libcmd.a \ + ${TOP_LIBDIR}/libopr.a \ ${TOP_LIBDIR}/util.a SRCS= aklog.c krb_util.c linked_list.c diff --git a/src/libadmin/samples/Makefile.in b/src/libadmin/samples/Makefile.in index b44b3e0bac..a31ece0ba0 100644 --- a/src/libadmin/samples/Makefile.in +++ b/src/libadmin/samples/Makefile.in @@ -41,6 +41,7 @@ SAMPLELIBS =\ ${TOP_LIBDIR}/libafsauthent.a \ ${TOP_LIBDIR}/libafsrpc.a \ $(TOP_LIBDIR)/libafsutil.a \ + $(TOP_LIBDIR)/libopr.a \ ${TOP_LIBDIR}/libafshcrypto_lwp.a all test tests: $(SAMPLEPROGS) diff --git a/src/libadmin/test/Makefile.in b/src/libadmin/test/Makefile.in index 234a2e5a83..241d952722 100644 --- a/src/libadmin/test/Makefile.in +++ b/src/libadmin/test/Makefile.in @@ -23,6 +23,7 @@ AFSCPLIBS =\ $(TOP_LIBDIR)/libafsauthent.a \ $(TOP_LIBDIR)/libafsrpc.a \ $(TOP_LIBDIR)/libcmd.a \ + $(TOP_LIBDIR)/libopr.a \ $(TOP_LIBDIR)/libafsutil.a \ ${TOP_LIBDIR}/libafshcrypto_lwp.a diff --git a/src/libuafs/Makefile.common.in b/src/libuafs/Makefile.common.in index d2e11181de..4de08f427f 100644 --- a/src/libuafs/Makefile.common.in +++ b/src/libuafs/Makefile.common.in @@ -101,7 +101,7 @@ linktest: UAFS/$(LIBUAFS) $(LDFLAGS_roken) $(LDFLAGS_hcrypto) -o linktest \ ${srcdir}/linktest.c $(COMMON_INCLUDE) -DUKERNEL \ UAFS/$(LIBUAFS) ${TOP_LIBDIR}/libcmd.a \ - ${TOP_LIBDIR}/libafsutil.a \ + ${TOP_LIBDIR}/libafsutil.a $(TOP_LIBDIR)/libopr.a \ $(LIB_hcrypto) $(LIB_roken) $(LIB_crypt) $(TEST_LIBS) $(XLIBS) CRULE1= $(CC) $(COMMON_INCLUDE) $(CPPFLAGS_roken) $(OPTF) -DKERNEL $(LIBJUAFS_FLAGS) $(CFLAGS) -c $? @@ -2033,8 +2033,10 @@ $(PERLUAFS)/ukernel.so: $(PERLUAFS)/ukernel_swig_perl.o UAFS.pic/libuafs_pic.a $(SWIG_PERL_LDFLAGS) $(LDFLAGS) \ $(PERLUAFS)/ukernel_swig_perl.o \ UAFS.pic/libuafs_pic.a ${TOP_LIBDIR}/libcmd_pic.a \ - ${TOP_LIBDIR}/libafsutil_pic.a $(LDFLAGS_roken) \ - $(LDFLAGS_hcrypto) $(LIB_hcrypto) $(LIB_roken) $(LIB_crypt) \ + ${TOP_LIBDIR}/libafsutil_pic.a \ + $(TOP_LIBDIR)/libopr.a \ + $(LDFLAGS_roken) $(LDFLAGS_hcrypto) $(LIB_hcrypto) \ + $(LIB_roken) $(LIB_crypt) \ $(XLIBS) clean: diff --git a/src/rx/rx.c b/src/rx/rx.c index cd1aa75f96..1c5622e0b2 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -78,6 +78,7 @@ extern afs_int32 afs_termState; #include "rx_trace.h" #include "rx_internal.h" #include "rx_stats.h" +#include "rx_event.h" #include @@ -196,7 +197,6 @@ extern afs_kmutex_t des_init_mutex; extern afs_kmutex_t des_random_mutex; extern afs_kmutex_t rx_clock_mutex; extern afs_kmutex_t rxi_connCacheMutex; -extern afs_kmutex_t rx_event_mutex; extern afs_kmutex_t event_handler_mutex; extern afs_kmutex_t listener_mutex; extern afs_kmutex_t rx_if_init_mutex; @@ -222,7 +222,6 @@ rxi_InitPthread(void) MUTEX_INIT(&rx_refcnt_mutex, "refcnts", MUTEX_DEFAULT, 0); MUTEX_INIT(&epoch_mutex, "epoch", MUTEX_DEFAULT, 0); MUTEX_INIT(&rx_init_mutex, "init", MUTEX_DEFAULT, 0); - MUTEX_INIT(&rx_event_mutex, "event", MUTEX_DEFAULT, 0); MUTEX_INIT(&event_handler_mutex, "event handler", MUTEX_DEFAULT, 0); MUTEX_INIT(&rxi_connCacheMutex, "conn cache", MUTEX_DEFAULT, 0); MUTEX_INIT(&listener_mutex, "listener", MUTEX_DEFAULT, 0); @@ -776,7 +775,7 @@ rxi_PostDelayedAckEvent(struct rx_call *call, struct clock *offset) clock_Add(&when, offset); if (!call->delayedAckEvent - || clock_Gt(&call->delayedAckEvent->eventTime, &when)) { + || clock_Gt(&call->delayedAckTime, &when)) { rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); @@ -787,11 +786,10 @@ rxi_PostDelayedAckEvent(struct rx_call *call, struct clock *offset) call->delayedAckEvent = rxevent_Post(&when, &now, rxi_SendDelayedAck, call, NULL, 0); + call->delayedAckTime = when; } } - - /* called with unincremented nRequestsRunning to see if it is OK to start * a new thread in this service. Could be "no" for two reasons: over the * max quota, or would prevent others from reaching their min quota. @@ -3698,7 +3696,12 @@ rxi_CheckReachEvent(struct rxevent *event, void *arg1, void *arg2, int dummy) int i, waiting; MUTEX_ENTER(&conn->conn_data_lock); - conn->checkReachEvent = NULL; + + if (event) { + rxevent_Put(conn->checkReachEvent); + conn->checkReachEvent = NULL; + } + waiting = conn->flags & RX_CONN_ATTACHWAIT; if (event) { MUTEX_ENTER(&rx_refcnt_mutex); @@ -4904,6 +4907,7 @@ rxi_AckAll(struct rxevent *event, struct rx_call *call, char *dummy) #ifdef RX_ENABLE_LOCKS if (event) { MUTEX_ENTER(&call->lock); + rxevent_Put(call->delayedAckEvent); call->delayedAckEvent = NULL; MUTEX_ENTER(&rx_refcnt_mutex); CALL_RELE(call, RX_CALL_REFCOUNT_ACKALL); @@ -4915,8 +4919,10 @@ rxi_AckAll(struct rxevent *event, struct rx_call *call, char *dummy) if (event) MUTEX_EXIT(&call->lock); #else /* RX_ENABLE_LOCKS */ - if (event) + if (event) { + rxevent_Put(call->delayedAckEvent); call->delayedAckEvent = NULL; + } rxi_SendSpecial(call, call->conn, (struct rx_packet *)0, RX_PACKET_TYPE_ACKALL, NULL, 0, 0); call->flags |= RX_CALL_ACKALL_SENT; @@ -4931,8 +4937,10 @@ rxi_SendDelayedAck(struct rxevent *event, void *arg1, void *unused1, #ifdef RX_ENABLE_LOCKS if (event) { MUTEX_ENTER(&call->lock); - if (event == call->delayedAckEvent) + if (event == call->delayedAckEvent) { + rxevent_Put(call->delayedAckEvent); call->delayedAckEvent = NULL; + } MUTEX_ENTER(&rx_refcnt_mutex); CALL_RELE(call, RX_CALL_REFCOUNT_DELAY); MUTEX_EXIT(&rx_refcnt_mutex); @@ -4941,8 +4949,10 @@ rxi_SendDelayedAck(struct rxevent *event, void *arg1, void *unused1, if (event) MUTEX_EXIT(&call->lock); #else /* RX_ENABLE_LOCKS */ - if (event) + if (event) { + rxevent_Put(call->delayedAckEvent); call->delayedAckEvent = NULL; + } (void)rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0); #endif /* RX_ENABLE_LOCKS */ } @@ -5895,6 +5905,7 @@ rxi_Resend(struct rxevent *event, void *arg0, void *arg1, int istack) MUTEX_ENTER(&rx_refcnt_mutex); CALL_RELE(call, RX_CALL_REFCOUNT_RESEND); MUTEX_EXIT(&rx_refcnt_mutex); + rxevent_Put(call->resendEvent); call->resendEvent = NULL; } @@ -6369,6 +6380,7 @@ rxi_NatKeepAliveEvent(struct rxevent *event, void *arg1, MUTEX_ENTER(&rx_refcnt_mutex); /* Only reschedule ourselves if the connection would not be destroyed */ if (conn->refCount <= 1) { + rxevent_Put(conn->natKeepAliveEvent); conn->natKeepAliveEvent = NULL; MUTEX_EXIT(&rx_refcnt_mutex); MUTEX_EXIT(&conn->conn_data_lock); @@ -6376,6 +6388,7 @@ rxi_NatKeepAliveEvent(struct rxevent *event, void *arg1, } else { conn->refCount--; /* drop the reference for this */ MUTEX_EXIT(&rx_refcnt_mutex); + rxevent_Put(conn->natKeepAliveEvent); conn->natKeepAliveEvent = NULL; rxi_ScheduleNatKeepAliveEvent(conn); MUTEX_EXIT(&conn->conn_data_lock); @@ -6442,8 +6455,12 @@ rxi_KeepAliveEvent(struct rxevent *event, void *arg1, void *dummy, CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE); MUTEX_EXIT(&rx_refcnt_mutex); MUTEX_ENTER(&call->lock); - if (event == call->keepAliveEvent) + + if (event == call->keepAliveEvent) { + rxevent_Put(call->keepAliveEvent); call->keepAliveEvent = NULL; + } + now = clock_Sec(); #ifdef RX_ENABLE_LOCKS @@ -6485,8 +6502,10 @@ rxi_GrowMTUEvent(struct rxevent *event, void *arg1, void *dummy, int dummy2) MUTEX_EXIT(&rx_refcnt_mutex); MUTEX_ENTER(&call->lock); - if (event == call->growMTUEvent) + if (event == call->growMTUEvent) { + rxevent_Put(call->growMTUEvent); call->growMTUEvent = NULL; + } #ifdef RX_ENABLE_LOCKS if (rxi_CheckCall(call, 0)) { @@ -6594,6 +6613,7 @@ rxi_SendDelayedConnAbort(struct rxevent *event, void *arg1, void *unused, struct rx_packet *packet; MUTEX_ENTER(&conn->conn_data_lock); + rxevent_Put(conn->delayedAbortEvent); conn->delayedAbortEvent = NULL; error = htonl(conn->error); conn->abortCount++; @@ -6620,6 +6640,7 @@ rxi_SendDelayedCallAbort(struct rxevent *event, void *arg1, void *dummy, struct rx_packet *packet; MUTEX_ENTER(&call->lock); + rxevent_Put(call->delayedAbortEvent); call->delayedAbortEvent = NULL; error = htonl(call->error); call->abortCount++; @@ -6646,7 +6667,11 @@ rxi_ChallengeEvent(struct rxevent *event, { struct rx_connection *conn = arg0; - conn->challengeEvent = NULL; + if (event) { + rxevent_Put(conn->challengeEvent); + conn->challengeEvent = NULL; + } + if (RXS_CheckAuthentication(conn->securityObject, conn) != 0) { struct rx_packet *packet; struct clock when, now; @@ -7074,7 +7099,7 @@ rxi_ReapConnections(struct rxevent *unused, void *unused1, void *unused2, when = now; when.sec += RX_REAP_TIME; /* Check every RX_REAP_TIME seconds */ - rxevent_Post(&when, &now, rxi_ReapConnections, 0, NULL, 0); + rxevent_Put(rxevent_Post(&when, &now, rxi_ReapConnections, 0, NULL, 0)); } diff --git a/src/rx/rx.h b/src/rx/rx.h index 42fb3bd441..50d2bf28ba 100644 --- a/src/rx/rx.h +++ b/src/rx/rx.h @@ -529,6 +529,7 @@ struct rx_call { struct rxevent *keepAliveEvent; /* Scheduled periodically in active calls to keep call alive */ struct rxevent *growMTUEvent; /* Scheduled periodically in active calls to discover true maximum MTU */ struct rxevent *delayedAckEvent; /* Scheduled after all packets are received to send an ack if a reply or new call is not generated soon */ + struct clock delayedAckTime; /* Time that next delayed ack was scheduled for */ struct rxevent *delayedAbortEvent; /* Scheduled to throttle looping client */ int abortCode; /* error code from last RPC */ int abortCount; /* number of times last error was sent */ diff --git a/src/rx/rx_clock.h b/src/rx/rx_clock.h index 2bdc0b81e4..648d57132d 100644 --- a/src/rx/rx_clock.h +++ b/src/rx/rx_clock.h @@ -86,6 +86,11 @@ extern int clock_nUpdates; /* Current clock time, truncated to seconds */ #define clock_Sec() ((!clock_haveCurrentTime)? clock_UpdateTime(), clock_now.sec:clock_now.sec) + +extern void clock_Init(void); +extern int clock_UnInit(void); +extern void clock_UpdateTime(void); + #endif /* AFS_USE_GETTIMEOFDAY || AFS_PTHREAD_ENV */ #else /* KERNEL */ #define clock_Init() diff --git a/src/rx/rx_event.c b/src/rx/rx_event.c index 39798cf2c1..6baf8a8e62 100644 --- a/src/rx/rx_event.c +++ b/src/rx/rx_event.c @@ -1,484 +1,453 @@ /* - * Copyright 2000, International Business Machines Corporation and others. - * All Rights Reserved. + * Copyright (c) 2011 Your File System Inc. All rights reserved. * - * This software has been released under the terms of the IBM Public - * License. For details, see the LICENSE file in the top-level source - * directory or online at http://www.openafs.org/dl/license10.html + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR `AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* A reimplementation of the rx_event handler using red/black trees + * + * The first rx_event implementation used a simple sorted queue of all + * events, which lead to O(n^2) performance, where n is the number of + * outstanding events. This was found to scale poorly, so was replaced. + * + * The second implementation used a set of per-second buckets to store + * events. Each bucket (referred to as an epoch in the code) stored all + * of the events which expired in that second. However, on modern networks + * where RTT times are in the millisecond, most connections will have events + * expiring within the next second, so the problem reoccurs. + * + * This new implementation uses Red-Black trees to store a sorted list of + * events. Red Black trees are guaranteed to have no worse than O(log N) + * insertion, and are commonly used in timer applications */ #include #include -#ifdef AFS_SUN59_ENV -# include +#ifdef KERNEL +# include "afs/sysincludes.h" +# include "afsincludes.h" +#else +# include #endif -#ifdef KERNEL -# ifndef UKERNEL -# include "afs/afs_osi.h" -# else /* !UKERNEL */ -# include "afs/sysincludes.h" -# include "afsincludes.h" -# endif /* !UKERNEL */ -# include "rx_kernel.h" -# include "rx_kmutex.h" -# if defined(AFS_SGI_ENV) -# include "sys/debug.h" -/* These are necessary to get curproc (used by GLOCK asserts) to work. */ -# include "h/proc.h" -# if !defined(AFS_SGI64_ENV) && !defined(UKERNEL) -# include "h/user.h" -# endif -extern void *osi_Alloc(); -# endif -# if defined(AFS_OBSD_ENV) -# if defined(AFS_OBSD48_ENV) -# include "h/systm.h" -# else -# include "h/proc.h" -# endif -# endif -#else /* KERNEL */ -# include -# include "rx_user.h" -# ifdef AFS_PTHREAD_ENV -# include "rx_pthread.h" -# else -# include "rx_lwp.h" -# endif -#endif /* KERNEL */ +#include +#include +#include #include "rx.h" -#include "rx_clock.h" -#include "rx_queue.h" -#include "rx_event.h" -#include "rx_globals.h" +#include "rx_atomic.h" -/* All event processing is relative to the apparent current time given by clock_GetTime */ +struct rxevent { + struct opr_queue q; + struct opr_rbtree_node node; + struct clock eventTime; + struct rxevent *next; + rx_atomic_t refcnt; + int handled; + void (*func)(struct rxevent *, void *, void *, int); + void *arg; + void *arg1; + int arg2; +}; -/* This should be static, but event_test wants to look at the free list... */ -struct rx_queue rxevent_free; /* It's somewhat bogus to use a doubly-linked queue for the free list */ -struct rx_queue rxepoch_free; /* It's somewhat bogus to use a doubly-linked queue for the free list */ -static struct rx_queue rxepoch_queue; /* list of waiting epochs */ -static int rxevent_allocUnit = 10; /* Allocation unit (number of event records to allocate at one time) */ -static int rxepoch_allocUnit = 10; /* Allocation unit (number of epoch records to allocate at one time) */ -int rxevent_nFree; /* Number of free event records */ -int rxevent_nPosted; /* Current number of posted events */ -int rxepoch_nFree; /* Number of free epoch records */ -static void (*rxevent_ScheduledEarlierEvent) (void); /* Proc to call when an event is scheduled that is earlier than all other events */ -struct xfreelist { +struct malloclist { void *mem; int size; - struct xfreelist *next; + struct malloclist *next; }; -static struct xfreelist *xfreemallocs = 0, *xsp = 0; -struct clock rxevent_nextRaiseEvents; /* Time of next call to raise events */ -struct clock rxevent_lastEvent; /* backwards time detection */ -int rxevent_raiseScheduled; /* true if raise events is scheduled */ +static struct { + afs_kmutex_t lock; + struct opr_queue list; + struct malloclist *mallocs; +} freeEvents; -#ifdef RX_ENABLE_LOCKS -#ifdef RX_LOCKS_DB -/* rxdb_fileID is used to identify the lock location, along with line#. */ -static int rxdb_fileID = RXDB_FILE_RX_EVENT; -#endif /* RX_LOCKS_DB */ -#define RX_ENABLE_LOCKS 1 -afs_kmutex_t rxevent_lock; -#endif /* RX_ENABLE_LOCKS */ +static struct { + afs_kmutex_t lock; + struct opr_rbtree head; + struct rxevent *first; +} eventTree; -#ifdef AFS_PTHREAD_ENV -/* - * This mutex protects the following global variables: - * rxevent_initialized - */ +static struct { + afs_kmutex_t lock; + struct clock last; + struct clock next; + void (*func)(void); + int raised; +} eventSchedule; -afs_kmutex_t rx_event_mutex; -#define LOCK_EV_INIT MUTEX_ENTER(&rx_event_mutex) -#define UNLOCK_EV_INIT MUTEX_EXIT(&rx_event_mutex) +static int allocUnit = 10; + +static struct rxevent * +rxevent_alloc(void) { + struct rxevent *evlist; + struct rxevent *ev; + struct malloclist *mrec; + int i; + + MUTEX_ENTER(&freeEvents.lock); + if (opr_queue_IsEmpty(&freeEvents.list)) { + MUTEX_EXIT(&freeEvents.lock); + +#if defined(AFS_AIX32_ENV) && defined(KERNEL) + ev = rxi_Alloc(sizeof(struct rxevent)); #else -#define LOCK_EV_INIT -#define UNLOCK_EV_INIT -#endif /* AFS_PTHREAD_ENV */ + evlist = osi_Alloc(sizeof(struct rxevent) * allocUnit); + mrec = osi_Alloc(sizeof(struct malloclist)); + mrec->mem = evlist; + mrec->size = sizeof(struct rxevent) * allocUnit; -int -rxevent_adjTimes(struct clock *adjTime) -{ - /* backwards clock correction */ - int nAdjusted = 0; - struct rxepoch *qep, *nqep; - struct rxevent *qev, *nqev; - - for (queue_Scan(&rxepoch_queue, qep, nqep, rxepoch)) { - for (queue_Scan(&qep->events, qev, nqev, rxevent)) { - if (clock_Gt(&qev->eventTime, adjTime)) { - clock_Sub(&qev->eventTime, adjTime); - nAdjusted++; - } - } - if (qep->epochSec > adjTime->sec) { - qep->epochSec -= adjTime->sec; + MUTEX_ENTER(&freeEvents.lock); + for (i = 1; i < allocUnit; i++) { + opr_queue_Append(&freeEvents.list, &evlist[i].q); } + mrec->next = freeEvents.mallocs; + freeEvents.mallocs = mrec; + MUTEX_EXIT(&freeEvents.lock); +#endif + ev = &evlist[0]; + } else { + ev = opr_queue_First(&freeEvents.list, struct rxevent, q); + opr_queue_Remove(&ev->q); + MUTEX_EXIT(&freeEvents.lock); + } + + memset(ev, 0, sizeof(struct rxevent)); + rx_atomic_set(&ev->refcnt, 1); + + return ev; +} + +static void +rxevent_free(struct rxevent *ev) { + MUTEX_ENTER(&freeEvents.lock); + opr_queue_Prepend(&freeEvents.list, &ev->q); + MUTEX_EXIT(&freeEvents.lock); +} + +static_inline void +rxevent_put(struct rxevent *ev) { + if (rx_atomic_dec_and_read(&ev->refcnt) == 0) { + rxevent_free(ev); } - return nAdjusted; } -/* Pass in the number of events to allocate at a time */ -int rxevent_initialized = 0; void -rxevent_Init(int nEvents, void (*scheduler) (void)) +rxevent_Put(struct rxevent *ev) { + rxevent_put(ev); +} + +static_inline struct rxevent * +rxevent_get(struct rxevent *ev) { + rx_atomic_inc(&ev->refcnt); + return ev; +} + +struct rxevent * +rxevent_Get(struct rxevent *ev) { + return rxevent_get(ev); +} + +/* Called if the time now is older than the last time we recorded running an + * event. This test catches machines where the system time has been set + * backwards, and avoids RX completely stalling when timers fail to fire. + * + * Take the different between now and the last event time, and subtract that + * from the timing of every event on the system. This does a relatively slow + * walk of the completely eventTree, but time-travel will hopefully be a pretty + * rare occurrence. + * + * This can only safely be called from the event thread, as it plays with the + * schedule directly. + * + */ +static void +adjustTimes(void) { - LOCK_EV_INIT; - if (rxevent_initialized) { - UNLOCK_EV_INIT; + struct opr_rbtree_node *node; + struct clock adjTime, now; + + MUTEX_ENTER(&eventTree.lock); + /* Time adjustment is expensive, make absolutely certain that we have + * to do it, by getting an up to date time to base our decision on + * once we've acquired the relevant locks. + */ + clock_GetTime(&now); + if (!clock_Lt(&now, &eventSchedule.last)) + goto out; + + adjTime = eventSchedule.last; + clock_Zero(&eventSchedule.last); + + clock_Sub(&adjTime, &now); + + node = opr_rbtree_first(&eventTree.head); + while(node) { + struct rxevent *event = opr_containerof(node, struct rxevent, node); + + clock_Sub(&event->eventTime, &adjTime); + node = opr_rbtree_next(node); + } + eventSchedule.next = eventTree.first->eventTime; + +out: + MUTEX_EXIT(&eventTree.lock); +} + +static int initialised = 0; +void +rxevent_Init(int nEvents, void (*scheduler)(void)) +{ + if (initialised) return; - } - MUTEX_INIT(&rxevent_lock, "rxevent_lock", MUTEX_DEFAULT, 0); + + initialised = 1; + clock_Init(); + MUTEX_INIT(&eventTree.lock, "event tree lock", MUTEX_DEFAULT, 0); + opr_rbtree_init(&eventTree.head); + + MUTEX_INIT(&freeEvents.lock, "free events lock", MUTEX_DEFAULT, 0); + opr_queue_Init(&freeEvents.list); + freeEvents.mallocs = NULL; + if (nEvents) - rxevent_allocUnit = nEvents; - queue_Init(&rxevent_free); - queue_Init(&rxepoch_free); - queue_Init(&rxepoch_queue); - rxevent_nFree = rxevent_nPosted = 0; - rxepoch_nFree = 0; - rxevent_ScheduledEarlierEvent = scheduler; - rxevent_initialized = 1; - clock_Zero(&rxevent_nextRaiseEvents); - clock_Zero(&rxevent_lastEvent); - rxevent_raiseScheduled = 0; - UNLOCK_EV_INIT; + allocUnit = nEvents; + + clock_Zero(&eventSchedule.next); + clock_Zero(&eventSchedule.last); + eventSchedule.raised = 0; + eventSchedule.func = scheduler; } -/* Create and initialize new epoch structure */ -struct rxepoch * -rxepoch_Allocate(struct clock *when) -{ - struct rxepoch *ep; - int i; - - /* If we are short on free epoch entries, create a block of new oned - * and add them to the free queue */ - if (queue_IsEmpty(&rxepoch_free)) { -#if defined(AFS_AIX32_ENV) && defined(KERNEL) - ep = rxi_Alloc(sizeof(struct rxepoch)); - queue_Append(&rxepoch_free, &ep[0]), rxepoch_nFree++; -#else -#if defined(KERNEL) && !defined(UKERNEL) && defined(AFS_FBSD80_ENV) - ep = (struct rxepoch *) - afs_osi_Alloc_NoSleep(sizeof(struct rxepoch) * rxepoch_allocUnit); - xsp = xfreemallocs; - xfreemallocs = - (struct xfreelist *)afs_osi_Alloc_NoSleep(sizeof(struct xfreelist)); -#else - ep = (struct rxepoch *) - osi_Alloc(sizeof(struct rxepoch) * rxepoch_allocUnit); - xsp = xfreemallocs; - xfreemallocs = - (struct xfreelist *)osi_Alloc(sizeof(struct xfreelist)); -#endif - xfreemallocs->mem = (void *)ep; - xfreemallocs->size = sizeof(struct rxepoch) * rxepoch_allocUnit; - xfreemallocs->next = xsp; - for (i = 0; i < rxepoch_allocUnit; i++) - queue_Append(&rxepoch_free, &ep[i]), rxepoch_nFree++; -#endif - } - ep = queue_First(&rxepoch_free, rxepoch); - queue_Remove(ep); - rxepoch_nFree--; - ep->epochSec = when->sec; - queue_Init(&ep->events); - return ep; -} - -/* Add the indicated event (function, arg) at the specified clock time. The - * "when" argument specifies when "func" should be called, in clock (clock.h) - * units. */ - struct rxevent * rxevent_Post(struct clock *when, struct clock *now, - void (*func) (struct rxevent *, void *, void *, int), - void *arg, void *arg1, int arg2) + void (*func) (struct rxevent *, void *, void *, int), + void *arg, void *arg1, int arg2) { - struct rxevent *ev, *evqe, *evqpr; - struct rxepoch *ep, *epqe, *epqpr; - int isEarliest = 0; + struct rxevent *ev, *event; + struct opr_rbtree_node **childptr, *parent = NULL; - MUTEX_ENTER(&rxevent_lock); -#ifdef RXDEBUG - if (rx_Log_event) { - struct clock now1; - clock_GetTime(&now1); - fprintf(rx_Log_event, "%ld.%ld: rxevent_Post(%ld.%ld, " - "%"AFS_PTR_FMT", %"AFS_PTR_FMT", " - "%"AFS_PTR_FMT", %d)\n", - afs_printable_int32_ld(now1.sec), - afs_printable_int32_ld(now1.usec), - afs_printable_int32_ld(when->sec), - afs_printable_int32_ld(when->usec), - func, arg, - arg1, arg2); - } -#endif - /* If a time was provided, check for consistency */ - if (now->sec) { - if (clock_Gt(&rxevent_lastEvent, now)) { - struct clock adjTime = rxevent_lastEvent; - clock_Sub(&adjTime, now); - rxevent_adjTimes(&adjTime); - } - rxevent_lastEvent = *now; - } - /* Get a pointer to the epoch for this event, if none is found then - * create a new epoch and insert it into the sorted list */ - for (ep = NULL, queue_ScanBackwards(&rxepoch_queue, epqe, epqpr, rxepoch)) { - if (when->sec == epqe->epochSec) { - /* already have an structure for this epoch */ - ep = epqe; - if (ep == queue_First(&rxepoch_queue, rxepoch)) - isEarliest = 1; - break; - } else if (when->sec > epqe->epochSec) { - /* Create a new epoch and insert after qe */ - ep = rxepoch_Allocate(when); - queue_InsertAfter(epqe, ep); - break; - } - } - if (ep == NULL) { - /* Create a new epoch and place it at the head of the list */ - ep = rxepoch_Allocate(when); - queue_Prepend(&rxepoch_queue, ep); - isEarliest = 1; - } - - /* If we're short on free event entries, create a block of new ones and add - * them to the free queue */ - if (queue_IsEmpty(&rxevent_free)) { - int i; -#if defined(AFS_AIX32_ENV) && defined(KERNEL) - ev = rxi_Alloc(sizeof(struct rxevent)); - queue_Append(&rxevent_free, &ev[0]), rxevent_nFree++; -#else - -#if defined(KERNEL) && !defined(UKERNEL) && defined(AFS_FBSD80_ENV) - ev = (struct rxevent *)afs_osi_Alloc_NoSleep(sizeof(struct rxevent) * - rxevent_allocUnit); - xsp = xfreemallocs; - xfreemallocs = - (struct xfreelist *)afs_osi_Alloc_NoSleep(sizeof(struct xfreelist)); -#else - ev = (struct rxevent *)osi_Alloc(sizeof(struct rxevent) * - rxevent_allocUnit); - xsp = xfreemallocs; - xfreemallocs = - (struct xfreelist *)osi_Alloc(sizeof(struct xfreelist)); -#endif - xfreemallocs->mem = (void *)ev; - xfreemallocs->size = sizeof(struct rxevent) * rxevent_allocUnit; - xfreemallocs->next = xsp; - for (i = 0; i < rxevent_allocUnit; i++) - queue_Append(&rxevent_free, &ev[i]), rxevent_nFree++; -#endif - } - - /* Grab and initialize a new rxevent structure */ - ev = queue_First(&rxevent_free, rxevent); - queue_Remove(ev); - rxevent_nFree--; - - /* Record user defined event state */ + ev = rxevent_alloc(); ev->eventTime = *when; ev->func = func; ev->arg = arg; ev->arg1 = arg1; ev->arg2 = arg2; - rxevent_nPosted += 1; /* Rather than ++, to shut high-C up - * regarding never-set variables - */ - /* Insert the event into the sorted list of events for this epoch */ - for (queue_ScanBackwards(&ep->events, evqe, evqpr, rxevent)) { - if (when->usec >= evqe->eventTime.usec) { - /* Insert event after evqe */ - queue_InsertAfter(evqe, ev); - MUTEX_EXIT(&rxevent_lock); - return ev; + if (clock_Lt(now, &eventSchedule.last)) + adjustTimes(); + + MUTEX_ENTER(&eventTree.lock); + + /* Work out where in the tree we'll be storing this */ + childptr = &eventTree.head.root; + + while(*childptr) { + event = opr_containerof((*childptr), struct rxevent, node); + + parent = *childptr; + if (clock_Lt(when, &event->eventTime)) + childptr = &(*childptr)->left; + else if (clock_Gt(when, &event->eventTime)) + childptr = &(*childptr)->right; + else { + opr_queue_Append(&event->q, &ev->q); + goto out; } } - /* Insert event at head of current epoch */ - queue_Prepend(&ep->events, ev); - if (isEarliest && rxevent_ScheduledEarlierEvent - && (!rxevent_raiseScheduled - || clock_Lt(&ev->eventTime, &rxevent_nextRaiseEvents))) { - rxevent_raiseScheduled = 1; - clock_Zero(&rxevent_nextRaiseEvents); - MUTEX_EXIT(&rxevent_lock); - /* Notify our external scheduler */ - (*rxevent_ScheduledEarlierEvent) (); - MUTEX_ENTER(&rxevent_lock); + opr_queue_Init(&ev->q); + opr_rbtree_insert(&eventTree.head, parent, childptr, &ev->node); + + if (eventTree.first == NULL || + clock_Lt(when, &(eventTree.first->eventTime))) { + eventTree.first = ev; + eventSchedule.raised = 1; + clock_Zero(&eventSchedule.next); + MUTEX_EXIT(&eventTree.lock); + (*eventSchedule.func)(); + return rxevent_get(ev); } - MUTEX_EXIT(&rxevent_lock); - return ev; + +out: + MUTEX_EXIT(&eventTree.lock); + return rxevent_get(ev); } -/* Cancel an event by moving it from the event queue to the free list. - * Warning, the event must be on the event queue! If not, this should core - * dump (reference through 0). This routine should be called using the macro - * event_Cancel, which checks for a null event and also nulls the caller's - * event pointer after cancelling the event. - */ -#ifdef RX_ENABLE_LOCKS -#ifdef RX_REFCOUNT_CHECK -int rxevent_Cancel_type = 0; -#endif -#endif +/* We're going to remove ev from the tree, so set the first pointer to the + * next event after it */ +static_inline void +resetFirst(struct rxevent *ev) +{ + struct opr_rbtree_node *next = opr_rbtree_next(&ev->node); + if (next) + eventTree.first = opr_containerof(next, struct rxevent, node); + else + eventTree.first = NULL; +} void rxevent_Cancel(struct rxevent **evp, struct rx_call *call, int type) { - struct rxevent *ev = *evp; + struct rxevent *event; -#ifdef RXDEBUG - if (rx_Log_event) { - struct clock now; - clock_GetTime(&now); - fprintf(rx_Log_event, "%d.%d: rxevent_Cancel_1(%d.%d, %" - AFS_PTR_FMT ", %p" AFS_PTR_FMT ")\n", - (int)now.sec, (int)now.usec, (int)ev->eventTime.sec, - (int)ev->eventTime.usec, ev->func, - ev->arg); - } -#endif - /* Append it to the free list (rather than prepending) to keep the free - * list hot so nothing pages out - */ - MUTEX_ENTER(&rxevent_lock); - if (!ev) { - MUTEX_EXIT(&rxevent_lock); + if (!evp || !*evp) return; + + event = *evp; + + MUTEX_ENTER(&eventTree.lock); + + if (!event->handled) { + /* We're a node on the red/black tree. If our list is non-empty, + * then swap the first element in the list in in our place, + * promoting it to the list head */ + if (event->node.parent == NULL + && eventTree.head.root != &event->node) { + /* Not in the rbtree, therefore must be a list element */ + opr_queue_Remove(&event->q); + } else { + if (!opr_queue_IsEmpty(&event->q)) { + struct rxevent *next; + + next = opr_queue_First(&event->q, struct rxevent, q); + opr_queue_Remove(&next->q); /* Remove ourselves from list */ + if (event->q.prev == &event->q) { + next->q.prev = next->q.next = &next->q; + } else { + next->q = event->q; + next->q.prev->next = &next->q; + next->q.next->prev = &next->q; + } + + opr_rbtree_replace(&eventTree.head, &event->node, + &next->node); + + if (eventTree.first == event) + eventTree.first = next; + + } else { + if (eventTree.first == event) + resetFirst(event); + + opr_rbtree_remove(&eventTree.head, &event->node); + } + } + event->handled = 1; + rxevent_put(event); /* Dispose of eventTree reference */ } + MUTEX_EXIT(&eventTree.lock); + *evp = NULL; + rxevent_put(event); /* Dispose of caller's reference */ -#ifdef RX_ENABLE_LOCKS - /* It's possible we're currently processing this event. */ - if (queue_IsOnQueue(ev)) { - queue_MoveAppend(&rxevent_free, ev); - rxevent_nPosted--; - rxevent_nFree++; - if (call) { - call->refCount--; -#ifdef RX_REFCOUNT_CHECK - call->refCDebug[type]--; - if (call->refCDebug[type] < 0) { - rxevent_Cancel_type = type; - osi_Panic("rxevent_Cancel: call refCount < 0"); - } -#endif /* RX_REFCOUNT_CHECK */ - } - } -#else /* RX_ENABLE_LOCKS */ - queue_MoveAppend(&rxevent_free, ev); - rxevent_nPosted--; - rxevent_nFree++; -#endif /* RX_ENABLE_LOCKS */ - MUTEX_EXIT(&rxevent_lock); + if (call) + CALL_RELE(call, type); } -/* Process all epochs that have expired relative to the current clock time - * (which is not re-evaluated unless clock_NewTime has been called). The - * relative time to the next epoch is returned in the output parameter next - * and the function returns 1. If there are is no next epoch, the function - * returns 0. +/* Process all events which have expired. If events remain, then the relative + * time until the next event is returned in the parameter 'wait', and the + * function returns 1. If no events currently remain, the function returns 0 + * + * If the current time is older than that of the last event processed, then we + * assume that time has gone backwards (for example, due to a system time reset) + * When this happens, all events in the current queue are rescheduled, using + * the difference between the current time and the last event time as a delta */ + int -rxevent_RaiseEvents(struct clock *next) +rxevent_RaiseEvents(struct clock *wait) { - struct rxepoch *ep; - struct rxevent *ev; - volatile struct clock now; - MUTEX_ENTER(&rxevent_lock); + struct clock now; + struct rxevent *event; + int ret; - /* Events are sorted by time, so only scan until an event is found that has - * not yet timed out */ + clock_GetTime(&now); - clock_Zero(&now); - while (queue_IsNotEmpty(&rxepoch_queue)) { - ep = queue_First(&rxepoch_queue, rxepoch); - if (queue_IsEmpty(&ep->events)) { - queue_Remove(ep); - queue_Append(&rxepoch_free, ep); - rxepoch_nFree++; - continue; + /* Check for time going backwards */ + if (clock_Lt(&now, &eventSchedule.last)) + adjustTimes(); + eventSchedule.last = now; + + MUTEX_ENTER(&eventTree.lock); + /* Lock our event tree */ + while (eventTree.first != NULL + && clock_Lt(&eventTree.first->eventTime, &now)) { + + /* Grab the next node, either in the event's list, or in the tree node + * itself, and remove it from the event tree */ + event = eventTree.first; + if (!opr_queue_IsEmpty(&event->q)) { + event = opr_queue_Last(&event->q, struct rxevent, q); + opr_queue_Remove(&event->q); + } else { + resetFirst(event); + opr_rbtree_remove(&eventTree.head, &event->node); } - do { - reraise: - ev = queue_First(&ep->events, rxevent); - if (clock_Lt(&now, &ev->eventTime)) { - clock_GetTime(&now); - if (clock_Gt(&rxevent_lastEvent, &now)) { - struct clock adjTime = rxevent_lastEvent; - int adjusted; - clock_Sub(&adjTime, &now); - adjusted = rxevent_adjTimes(&adjTime); - rxevent_lastEvent = now; - if (adjusted > 0) - goto reraise; - } - if (clock_Lt(&now, &ev->eventTime)) { - *next = rxevent_nextRaiseEvents = ev->eventTime; - rxevent_raiseScheduled = 1; - clock_Sub(next, &now); - MUTEX_EXIT(&rxevent_lock); - return 1; - } - } - queue_Remove(ev); - rxevent_nPosted--; - MUTEX_EXIT(&rxevent_lock); - ev->func(ev, ev->arg, ev->arg1, ev->arg2); - MUTEX_ENTER(&rxevent_lock); - queue_Append(&rxevent_free, ev); - rxevent_nFree++; - } while (queue_IsNotEmpty(&ep->events)); + event->handled = 1; + MUTEX_EXIT(&eventTree.lock); + + /* Fire the event, then free the structure */ + event->func(event, event->arg, event->arg1, event->arg2); + rxevent_put(event); + + MUTEX_ENTER(&eventTree.lock); } -#ifdef RXDEBUG - if (rx_Log_event) - fprintf(rx_Log_event, "rxevent_RaiseEvents(%d.%d)\n", (int)now.sec, - (int)now.usec); -#endif - rxevent_raiseScheduled = 0; - MUTEX_EXIT(&rxevent_lock); - return 0; + + /* Figure out when we next need to be scheduled */ + if (eventTree.first != NULL) { + *wait = eventSchedule.next = eventTree.first->eventTime; + ret = eventSchedule.raised = 1; + clock_Sub(wait, &now); + } else { + ret = eventSchedule.raised = 0; + } + + MUTEX_EXIT(&eventTree.lock); + + return ret; } void shutdown_rxevent(void) { - struct xfreelist *xp, *nxp; + struct malloclist *mrec, *nmrec; - LOCK_EV_INIT; - if (!rxevent_initialized) { - UNLOCK_EV_INIT; + if (!initialised) { return; } - rxevent_initialized = 0; - UNLOCK_EV_INIT; - MUTEX_DESTROY(&rxevent_lock); -#if defined(AFS_AIX32_ENV) && defined(KERNEL) - /* Everything is freed in afs_osinet.c */ -#else - xp = xfreemallocs; - while (xp) { - nxp = xp->next; - osi_Free((char *)xp->mem, xp->size); - osi_Free((char *)xp, sizeof(struct xfreelist)); - xp = nxp; - } - xfreemallocs = NULL; -#endif + MUTEX_DESTROY(&eventTree.lock); +#if !defined(AFS_AIX32_ENV) || !defined(KERNEL) + MUTEX_DESTROY(&freeEvents.lock); + mrec = freeEvents.mallocs; + while (mrec) { + nmrec = mrec->next; + osi_Free(mrec->mem, mrec->size); + osi_Free(mrec, sizeof(struct malloclist)); + mrec = nmrec; + } + mrec = NULL; +#endif } diff --git a/src/rx/rx_event.h b/src/rx/rx_event.h index e1921a2058..df9b426d35 100644 --- a/src/rx/rx_event.h +++ b/src/rx/rx_event.h @@ -9,91 +9,44 @@ /* Event package */ -#ifndef _EVENT_ -#define _EVENT_ - -#ifdef KERNEL -#include "rx/rx_queue.h" -#include "rx/rx_clock.h" -#else /* KERNEL */ -#include "rx_queue.h" -#include "rx_clock.h" -#endif /* KERNEL */ - -/* An event is something that will happen at (or after) a specified clock - * time, unless cancelled prematurely. The user routine (*func)() is called - * with arguments (event, arg, arg1) when the event occurs. - * Warnings: - * (1) The user supplied routine should NOT cause process preemption. - * (2) The event passed to the user is still on the event queue at that - * time. The user must not remove (event_Cancel) it explicitly, but - * the user may remove or schedule any OTHER event at this time. - */ - -struct rxevent { - struct rx_queue junk; /* Events are queued */ - struct clock eventTime; /* When this event times out (in clock.c units) */ - void (*func) (struct rxevent *, void *, void *, int); - void *arg; /* Argument to the function */ - void *arg1; /* Another argument */ - int arg2; /* An integer argument */ -}; - -/* We used to maintain a sorted list of events, but the amount of CPU - * required to maintain the list grew with the square of the number of - * connections. Now we keep a list of epochs, each epoch contains the - * events scheduled for a particular second. Each epoch contains a sorted - * list of the events scheduled for that epoch. */ -struct rxepoch { - struct rx_queue junk; /* Epochs are queued */ - int epochSec; /* each epoch spans one second */ - struct rx_queue events; /* list of events for this epoch */ -}; - -/* Some macros to make macros more reasonable (this allows a block to be - * used within a macro which does not cause if statements to screw up). - * That is, you can use "if (...) macro_name(); else ...;" without - * having things blow up on the semi-colon. */ - -#ifndef BEGIN -#define BEGIN do { -#define END } while(0) -#endif +#ifndef OPENAFS_RX_EVENT_H +#define OPENAFS_RX_EVENT_H /* This routine must be called to initialize the event package. * nEvents is the number of events to allocate in a batch whenever * more are needed. If this is 0, a default number (10) will be * allocated. */ -#if 0 -extern void rxevent_Init( /* nEvents, scheduler */ ); -#endif +extern void rxevent_Init( int nEvents, void (*scheduler)(void) ); -/* Get the expiration time for the next event */ -#if 0 -extern void exevent_NextEvent( /* when */ ); -#endif - -/* Arrange for the indicated event at the appointed time. When is a +/* Arrange for the indicated event at the appointed time. when is a * "struct clock", in the clock.c time base */ -#if 0 -extern struct rxevent *rxevent_Post( /* when, func, arg, arg1 */ ); -#endif +struct clock; +struct rxevent; +extern struct rxevent *rxevent_Post(struct clock *when, struct clock *now, + void (*func) (struct rxevent *, void *, + void *, int), + void *arg, void *arg1, int arg2); /* Remove the indicated event from the event queue. The event must be - * pending. Also see the warning, above. The event pointer supplied - * is zeroed. + * pending. Note that a currently executing event may not cancel itself. */ -#if 0 -extern struct rxevent *rxevent_Cancel(struct rxevent *, struct rx_call *, int) -#endif +struct rx_call; +extern void rxevent_Cancel(struct rxevent **, struct rx_call *, int type); /* The actions specified for each event that has reached the current clock * time will be taken. The current time returned by GetTime is used * (warning: this may be an old time if the user has not called * clock_NewTime) */ -#if 0 -extern int rxevent_RaiseEvents(); -#endif +extern int rxevent_RaiseEvents(struct clock *wait); + +/* Acquire a reference to an event */ +extern struct rxevent *rxevent_Get(struct rxevent *event); + +/* Release a reference to an event */ +extern void rxevent_Put(struct rxevent *event); + +/* Shutdown the event package */ +extern void shutdown_rxevent(void); #endif /* _EVENT_ */ diff --git a/src/rx/rx_packet.h b/src/rx/rx_packet.h index c60b27d50d..b7cbd0ab5c 100644 --- a/src/rx/rx_packet.h +++ b/src/rx/rx_packet.h @@ -9,6 +9,9 @@ #ifndef _RX_PACKET_ #define _RX_PACKET_ + +#include "rx_queue.h" + #if defined(AFS_NT40_ENV) #include "rx_xmit_nt.h" #endif diff --git a/src/rxdebug/Makefile.in b/src/rxdebug/Makefile.in index a9e470ca7b..df83359ff7 100644 --- a/src/rxdebug/Makefile.in +++ b/src/rxdebug/Makefile.in @@ -10,8 +10,12 @@ include @TOP_OBJDIR@/src/config/Makefile.config include @TOP_OBJDIR@/src/config/Makefile.lwp -LIBS=${TOP_LIBDIR}/librx.a ${TOP_LIBDIR}/liblwp.a ${TOP_LIBDIR}/libcmd.a \ - ${TOP_LIBDIR}/libsys.a ${TOP_LIBDIR}/libafsutil.a +LIBS=${TOP_LIBDIR}/librx.a \ + ${TOP_LIBDIR}/liblwp.a \ + ${TOP_LIBDIR}/libcmd.a \ + ${TOP_LIBDIR}/libsys.a \ + ${TOP_LIBDIR}/libopr.a \ + ${TOP_LIBDIR}/libafsutil.a all: rxdebug rxdumptrace diff --git a/src/shlibafsrpc/Makefile.in b/src/shlibafsrpc/Makefile.in index e55b679d6f..30ff565a1d 100644 --- a/src/shlibafsrpc/Makefile.in +++ b/src/shlibafsrpc/Makefile.in @@ -39,7 +39,8 @@ SYSOBJS =\ UTILOBJS =\ assert.o \ casestrcpy.o \ - base64.o + base64.o \ + rbtree.o COMERROBJS =\ error_msg.o \ @@ -277,6 +278,9 @@ casestrcpy.o: ${OPR}/casestrcpy.c assert.o: ${OPR}/assert.c $(AFS_CCRULE) $(OPR)/assert.c +rbtree.o: ${OPR}/rbtree.c + $(AFS_CCRULE) $(OPR)/rbtree.c + base64.o: ${UTIL}/base64.c $(AFS_CCRULE) $(UTIL)/base64.c diff --git a/src/tbutc/Makefile.in b/src/tbutc/Makefile.in index 14e829b58d..de86dd16a2 100644 --- a/src/tbutc/Makefile.in +++ b/src/tbutc/Makefile.in @@ -57,6 +57,7 @@ BUTCLIBS=${TOP_LIBDIR}/libbudb.a \ ${TOP_LIBDIR}/libafsrpc.a \ ${TOP_LIBDIR}/libcmd.a \ ${TOP_LIBDIR}/util.a \ + $(TOP_LIBDIR)/libopr.a \ ${TOP_LIBDIR}/libusd.a \ ${TOP_LIBDIR}/libprocmgmt.a diff --git a/src/xstat/Makefile.in b/src/xstat/Makefile.in index a27ba89847..f91b67e271 100644 --- a/src/xstat/Makefile.in +++ b/src/xstat/Makefile.in @@ -20,6 +20,7 @@ LIBS=${TOP_LIBDIR}/libafsint.a \ ${TOP_LIBDIR}/librx.a \ ${TOP_LIBDIR}/liblwp.a \ ${TOP_LIBDIR}/libsys.a \ + ${TOP_LIBDIR}/libopr.a \ ${TOP_LIBDIR}/util.a all: \ diff --git a/tests/Makefile.in b/tests/Makefile.in index cfbef1ba57..f62f2483e9 100644 --- a/tests/Makefile.in +++ b/tests/Makefile.in @@ -9,7 +9,7 @@ include @TOP_OBJDIR@/src/config/Makefile.pthread MODULE_CFLAGS = -DSOURCE='"$(abs_top_srcdir)/tests"' \ -DBUILD='"$(abs_top_builddir)/tests"' -SUBDIRS = tap common auth util cmd volser opr +SUBDIRS = tap common auth util cmd volser opr rx all: runtests @for A in $(SUBDIRS); do cd $$A && $(MAKE) $@ && cd .. || exit 1; done diff --git a/tests/TESTS b/tests/TESTS index 217546b7b2..f9cb17e5e7 100644 --- a/tests/TESTS +++ b/tests/TESTS @@ -9,6 +9,7 @@ opr/queues opr/rbtree ptserver/pt_util ptserver/pts-man +rx/event volser/vos-man bucoord/backup-man kauth/kas-man diff --git a/tests/auth/Makefile.in b/tests/auth/Makefile.in index b6c84be39e..d37368750e 100644 --- a/tests/auth/Makefile.in +++ b/tests/auth/Makefile.in @@ -14,6 +14,7 @@ MODULE_LIBS = ../tap/libtap.a \ $(abs_top_builddir)/lib/libafsauthent.a \ $(abs_top_builddir)/lib/libafsrpc.a \ $(abs_top_builddir)/lib/libafshcrypto.a \ + $(abs_top_builddir)/lib/libopr.a \ $(LIB_rfc3961) $(LIB_roken) -lafsutil\ $(XLIBS) diff --git a/tests/rx/Makefile.in b/tests/rx/Makefile.in new file mode 100644 index 0000000000..2ff8a76d86 --- /dev/null +++ b/tests/rx/Makefile.in @@ -0,0 +1,24 @@ +# Build rules for the OpenAFS RX test suite. + +srcdir=@srcdir@ +abs_top_builddir=@abs_top_builddir@ +include @TOP_OBJDIR@/src/config/Makefile.config +include @TOP_OBJDIR@/src/config/Makefile.pthread + +MODULE_CFLAGS = -I$(srcdir)/.. + +LIBS = ../tap/libtap.a \ + $(abs_top_builddir)/lib/libafsrpc.a \ + $(abs_top_builddir)/lib/libopr.a + +tests = event-t + +all check test tests: $(tests) + +event-t: event-t.o $(LIBS) + $(AFS_LDRULE) event-t.o $(LIBS) $(XLIBS) + +install: + +clean distclean: + $(RM) -f $(tests) *.o core diff --git a/tests/rx/event-t.c b/tests/rx/event-t.c new file mode 100644 index 0000000000..c66ef711d0 --- /dev/null +++ b/tests/rx/event-t.c @@ -0,0 +1,178 @@ +/* A simple test of the rx event layer */ + +#include +#include + +#include +#include + +#include + +#include "rx/rx_event.h" +#include "rx/rx_clock.h" + +#define NUMEVENTS 10000 + +/* Mutexes and condvars for the scheduler */ +static int rescheduled = 0; +static pthread_mutex_t eventMutex; +static pthread_cond_t eventCond; + +/* Mutexes and condvars for the event list */ + +static pthread_mutex_t eventListMutex; +struct testEvent { + struct rxevent *event; + int fired; + int cancelled; +}; + +static struct testEvent events[NUMEVENTS]; + +static void +reschedule(void) +{ + pthread_mutex_lock(&eventMutex); + pthread_cond_signal(&eventCond); + rescheduled = 1; + pthread_mutex_unlock(&eventMutex); + return; +} + +static void +eventSub(struct rxevent *event, void *arg, void *arg1, int arg2) +{ + struct testEvent *evrecord = arg; + + pthread_mutex_lock(&eventListMutex); + rxevent_Put(evrecord->event); + evrecord->event = NULL; + evrecord->fired = 1; + pthread_mutex_unlock(&eventListMutex); + return; +} + +static void +reportSub(struct rxevent *event, void *arg, void *arg1, int arg2) +{ + printf("Event fired\n"); +} + +static void * +eventHandler(void *dummy) { + struct timespec nextEvent; + struct clock cv; + struct clock next; + + pthread_mutex_lock(&eventMutex); + while (1) { + pthread_mutex_unlock(&eventMutex); + + next.sec = 30; + next.usec = 0; + clock_GetTime(&cv); + rxevent_RaiseEvents(&next); + + pthread_mutex_lock(&eventMutex); + + /* If we were rescheduled whilst running the event queue, + * process the queue again */ + if (rescheduled) { + rescheduled = 0; + continue; + } + + clock_Add(&cv, &next); + nextEvent.tv_sec = cv.sec; + nextEvent.tv_nsec = cv.usec * 1000; + pthread_cond_timedwait(&eventCond, &eventMutex, &nextEvent); + } + pthread_mutex_unlock(&eventMutex); + + return NULL; +} + +int +main(void) +{ + int when, counter, fail, fired, cancelled; + struct clock now, eventTime; + struct rxevent *event; + pthread_t handler; + + plan(8); + + pthread_mutex_init(&eventMutex, NULL); + pthread_cond_init(&eventCond, NULL); + + memset(events, sizeof(events), 0); + pthread_mutex_init(&eventListMutex, NULL); + + /* Start up the event system */ + rxevent_Init(20, reschedule); + ok(1, "Started event subsystem"); + + clock_GetTime(&now); + /* Test for a problem when there is only a single event in the tree */ + event = rxevent_Post(&now, &now, reportSub, NULL, NULL, 0); + ok(event != NULL, "Created a single event"); + rxevent_Cancel(&event, NULL, 0); + ok(1, "Cancelled a single event"); + rxevent_RaiseEvents(&now); + ok(1, "RaiseEvents happened without error"); + + ok(pthread_create(&handler, NULL, eventHandler, NULL) == 0, + "Created handler thread"); + + /* Add 1000 random events to fire over the next 3 seconds */ + + for (counter = 0; counter < NUMEVENTS; counter++) { + when = random() % 3000; + clock_GetTime(&now); + eventTime = now; + clock_Addmsec(&eventTime, when); + pthread_mutex_lock(&eventListMutex); + events[counter].event + = rxevent_Post(&eventTime, &now, eventSub, &events[counter], NULL, 0); + + /* A 10% chance that we will schedule another event at the same time */ + if (counter!=999 && random() % 10 == 0) { + counter++; + events[counter].event + = rxevent_Post(&eventTime, &now, eventSub, &events[counter], + NULL, 0); + } + + /* A 25% chance that we will cancel a random event */ + if (random() % 4 == 0) { + int victim = random() % counter; + + if (events[victim].event != NULL) { + rxevent_Cancel(&events[victim].event, NULL, 0); + events[victim].cancelled = 1; + } + } + pthread_mutex_unlock(&eventListMutex); + } + + ok(1, "Added %d events", NUMEVENTS); + + sleep(3); + + fired = 0; + cancelled = 0; + fail = 0; + for (counter = 0; counter < NUMEVENTS; counter++) { + if (events[counter].fired) + fired++; + if (events[counter].cancelled) + cancelled++; + if (events[counter].cancelled && events[counter].fired) + fail = 1; + } + ok(!fail, "Didn't fire any cancelled events"); + ok(fired+cancelled == NUMEVENTS, + "Number of fired and cancelled events sum to correct total"); + + return 0; +} diff --git a/tests/volser/Makefile.in b/tests/volser/Makefile.in index db64cbd759..c859f39d25 100644 --- a/tests/volser/Makefile.in +++ b/tests/volser/Makefile.in @@ -20,6 +20,7 @@ MODULE_LIBS = ../tap/libtap.a \ $(abs_top_builddir)/lib/libafsauthent.a \ $(abs_top_builddir)/lib/libafsrpc.a \ $(abs_top_builddir)/lib/libafshcrypto.a \ + $(abs_top_builddir)/lib/libopr.a \ $(LIB_rfc3961) $(LIB_roken) -lafsutil\ $(XLIBS)