From 20034a815750beff262d49b37fba225c72dd0ab1 Mon Sep 17 00:00:00 2001 From: Simon Wilkinson Date: Sat, 3 Nov 2012 23:15:50 +0000 Subject: [PATCH] rx: Don't treat calls specially in event package Many different structures can be passed to the rxevent package as data. Don't give calls special treatment by making rxevent aware of how to release their reference counts when an event is cancelled. Update all of the callers of rxevent_Cancel to use the new arguments, and where they were cancelling functions with calls as parameters add the appropriate CALL_RELE directives. In many cases, this has led to new helper functions to cancel particular call-based events. Change-Id: Ic02778e48fd950e8850b77bd3c076c235453274d Reviewed-on: http://gerrit.openafs.org/8538 Reviewed-by: Derrick Brashear Tested-by: BuildBot Reviewed-by: Jeffrey Altman --- src/rx/rx.c | 132 ++++++++++++++++++++++++----------------- src/rx/rx_event.c | 5 +- src/rx/rx_event.h | 3 +- src/rx/rx_globals.h | 6 +- src/rx/rx_internal.h | 1 + src/rx/rx_prototypes.h | 3 +- src/rx/rx_rdwr.c | 6 +- tests/rx/event-t.c | 4 +- 8 files changed, 89 insertions(+), 71 deletions(-) diff --git a/src/rx/rx.c b/src/rx/rx.c index 77b99e9c71..e95fc0e1a8 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -153,6 +153,9 @@ static void rxi_GrowMTUOn(struct rx_call *call); static void rxi_ChallengeOn(struct rx_connection *conn); static int rxi_CheckCall(struct rx_call *call, int haveCTLock); static void rxi_AckAllInTransmitQueue(struct rx_call *call); +static void rxi_CancelKeepAliveEvent(struct rx_call *call); +static void rxi_CancelDelayedAbortEvent(struct rx_call *call); +static void rxi_CancelGrowMTUEvent(struct rx_call *call); #ifdef RX_ENABLE_LOCKS struct rx_tq_debug { @@ -727,7 +730,8 @@ rxi_rto_startTimer(struct rx_call *call, int lastPacket, int istack) static_inline void rxi_rto_cancel(struct rx_call *call) { - rxevent_Cancel(&call->resendEvent, call, RX_CALL_REFCOUNT_RESEND); + rxevent_Cancel(&call->resendEvent); + CALL_RELE(call, RX_CALL_REFCOUNT_RESEND); } /*! @@ -829,13 +833,16 @@ rxi_PostDelayedAckEvent(struct rx_call *call, struct clock *offset) when = now; clock_Add(&when, offset); - if (!call->delayedAckEvent - || clock_Gt(&call->delayedAckTime, &when)) { + if (call->delayedAckEvent && clock_Gt(&call->delayedAckTime, &when)) { + /* The event we're cancelling already has a reference, so we don't + * need a new one */ + rxevent_Cancel(&call->delayedAckEvent); + call->delayedAckEvent = rxevent_Post(&when, &now, rxi_SendDelayedAck, + call, NULL, 0); - rxevent_Cancel(&call->delayedAckEvent, call, - RX_CALL_REFCOUNT_DELAY); + call->delayedAckTime = when; + } else if (!call->delayedAckEvent) { CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY); - call->delayedAckEvent = rxevent_Post(&when, &now, rxi_SendDelayedAck, call, NULL, 0); @@ -843,6 +850,15 @@ rxi_PostDelayedAckEvent(struct rx_call *call, struct clock *offset) } } +void +rxi_CancelDelayedAckEvent(struct rx_call *call) +{ + if (call->delayedAckEvent) { + rxevent_Cancel(&call->delayedAckEvent); + CALL_RELE(call, RX_CALL_REFCOUNT_DELAY); + } +} + /* called with unincremented nRequestsRunning to see if it is OK to start * a new thread in this service. Could be "no" for two reasons: over the * max quota, or would prevent others from reaching their min quota. @@ -1302,8 +1318,7 @@ rxi_DestroyConnectionNoLock(struct rx_connection *conn) /* Push the final acknowledgment out now--there * won't be a subsequent call to acknowledge the * last reply packets */ - rxevent_Cancel(&call->delayedAckEvent, call, - RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); if (call->state == RX_STATE_PRECALL || call->state == RX_STATE_ACTIVE) { rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0); @@ -1343,7 +1358,7 @@ rxi_DestroyConnectionNoLock(struct rx_connection *conn) } if (conn->delayedAbortEvent) { - rxevent_Cancel(&conn->delayedAbortEvent, NULL, 0); + rxevent_Cancel(&conn->delayedAbortEvent); packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL); if (packet) { MUTEX_ENTER(&conn->conn_data_lock); @@ -1371,9 +1386,9 @@ rxi_DestroyConnectionNoLock(struct rx_connection *conn) /* Make sure the connection is completely reset before deleting it. */ /* get rid of pending events that could zap us later */ - rxevent_Cancel(&conn->challengeEvent, NULL, 0); - rxevent_Cancel(&conn->checkReachEvent, NULL, 0); - rxevent_Cancel(&conn->natKeepAliveEvent, NULL, 0); + rxevent_Cancel(&conn->challengeEvent); + rxevent_Cancel(&conn->checkReachEvent); + rxevent_Cancel(&conn->natKeepAliveEvent); /* Add the connection to the list of destroyed connections that * need to be cleaned up. This is necessary to avoid deadlocks @@ -2426,8 +2441,7 @@ rx_EndCall(struct rx_call *call, afs_int32 rc) call->state = RX_STATE_DALLY; rxi_ClearTransmitQueue(call, 0); rxi_rto_cancel(call); - rxevent_Cancel(&call->keepAliveEvent, call, - RX_CALL_REFCOUNT_ALIVE); + rxi_CancelKeepAliveEvent(call); } } else { /* Client connection */ char dummy; @@ -2445,8 +2459,7 @@ rx_EndCall(struct rx_call *call, afs_int32 rc) * and force-send it now. */ if (call->delayedAckEvent) { - rxevent_Cancel(&call->delayedAckEvent, call, - RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); rxi_SendDelayedAck(NULL, call, NULL, 0); } @@ -3898,8 +3911,7 @@ rxi_ReceiveDataPacket(struct rx_call *call, if (rx_stats_active) rx_atomic_inc(&rx_stats.dupPacketsRead); dpf(("packet %"AFS_PTR_FMT" dropped on receipt - duplicate\n", np)); - rxevent_Cancel(&call->delayedAckEvent, call, - RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack); ackNeeded = 0; call->rprev = seq; @@ -3989,8 +4001,7 @@ rxi_ReceiveDataPacket(struct rx_call *call, if (seq < call->rnext) { if (rx_stats_active) rx_atomic_inc(&rx_stats.dupPacketsRead); - rxevent_Cancel(&call->delayedAckEvent, call, - RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack); ackNeeded = 0; call->rprev = seq; @@ -4001,8 +4012,7 @@ rxi_ReceiveDataPacket(struct rx_call *call, * accomodated by the current window, then send a negative * acknowledge and drop the packet */ if ((call->rnext + call->rwind) <= seq) { - rxevent_Cancel(&call->delayedAckEvent, call, - RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); np = rxi_SendAck(call, np, serial, RX_ACK_EXCEEDS_WINDOW, istack); ackNeeded = 0; @@ -4021,8 +4031,7 @@ rxi_ReceiveDataPacket(struct rx_call *call, if (seq == tp->header.seq) { if (rx_stats_active) rx_atomic_inc(&rx_stats.dupPacketsRead); - rxevent_Cancel(&call->delayedAckEvent, call, - RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack); ackNeeded = 0; @@ -4133,10 +4142,10 @@ rxi_ReceiveDataPacket(struct rx_call *call, * received. Always send a soft ack for the last packet in * the server's reply. */ if (ackNeeded) { - rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); np = rxi_SendAck(call, np, serial, ackNeeded, istack); } else if (call->nSoftAcks > (u_short) rxi_SoftAckRate) { - rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack); } else if (call->nSoftAcks) { if (haveLast && !(flags & RX_CLIENT_INITIATED)) @@ -4144,7 +4153,7 @@ rxi_ReceiveDataPacket(struct rx_call *call, else rxi_PostDelayedAckEvent(call, &rx_softAckDelay); } else if (call->flags & RX_CALL_RECEIVE_DONE) { - rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); } return np; @@ -4703,7 +4712,7 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np, && call->tfirst + call->nSoftAcked >= call->tnext) { call->state = RX_STATE_DALLY; rxi_ClearTransmitQueue(call, 0); - rxevent_Cancel(&call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE); + rxi_CancelKeepAliveEvent(call); } else if (!opr_queue_IsEmpty(&call->tq)) { rxi_Start(call, istack); } @@ -5093,10 +5102,7 @@ rxi_SendCallAbort(struct rx_call *call, struct rx_packet *packet, if (force || rxi_callAbortThreshhold == 0 || call->abortCount < rxi_callAbortThreshhold) { - if (call->delayedAbortEvent) { - rxevent_Cancel(&call->delayedAbortEvent, call, - RX_CALL_REFCOUNT_ABORT); - } + rxi_CancelDelayedAbortEvent(call); error = htonl(cerror); call->abortCount++; packet = @@ -5113,6 +5119,15 @@ rxi_SendCallAbort(struct rx_call *call, struct rx_packet *packet, return packet; } +static void +rxi_CancelDelayedAbortEvent(struct rx_call *call) +{ + if (call->delayedAbortEvent) { + rxevent_Cancel(&call->delayedAbortEvent); + CALL_RELE(call, RX_CALL_REFCOUNT_ABORT); + } +} + /* Send an abort packet for the specified connection. Packet is an * optional pointer to a packet that can be used to send the abort. * Once the number of abort messages reaches the threshhold, an @@ -5139,7 +5154,7 @@ rxi_SendConnectionAbort(struct rx_connection *conn, if (force || rxi_connAbortThreshhold == 0 || conn->abortCount < rxi_connAbortThreshhold) { - rxevent_Cancel(&conn->delayedAbortEvent, NULL, 0); + rxevent_Cancel(&conn->delayedAbortEvent); error = htonl(conn->error); conn->abortCount++; MUTEX_EXIT(&conn->conn_data_lock); @@ -5173,10 +5188,10 @@ rxi_ConnectionError(struct rx_connection *conn, dpf(("rxi_ConnectionError conn %"AFS_PTR_FMT" error %d\n", conn, error)); MUTEX_ENTER(&conn->conn_data_lock); - rxevent_Cancel(&conn->challengeEvent, NULL, 0); - rxevent_Cancel(&conn->natKeepAliveEvent, NULL, 0); + rxevent_Cancel(&conn->challengeEvent); + rxevent_Cancel(&conn->natKeepAliveEvent); if (conn->checkReachEvent) { - rxevent_Cancel(&conn->checkReachEvent, NULL, 0); + rxevent_Cancel(&conn->checkReachEvent); conn->flags &= ~(RX_CONN_ATTACHWAIT|RX_CONN_NAT_PING); putConnection(conn); } @@ -5253,10 +5268,10 @@ rxi_ResetCall(struct rx_call *call, int newcall) } - rxevent_Cancel(&call->growMTUEvent, call, RX_CALL_REFCOUNT_MTU); + rxi_CancelGrowMTUEvent(call); if (call->delayedAbortEvent) { - rxevent_Cancel(&call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT); + rxi_CancelDelayedAbortEvent(call); packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL); if (packet) { rxi_SendCallAbort(call, packet, 0, 1); @@ -5385,8 +5400,8 @@ rxi_ResetCall(struct rx_call *call, int newcall) } #endif /* RX_ENABLE_LOCKS */ - rxi_KeepAliveOff(call); - rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); + rxi_CancelKeepAliveEvent(call); + rxi_CancelDelayedAckEvent(call); } /* Send an acknowledge for the indicated packet (seq,serial) of the @@ -5753,7 +5768,7 @@ rxi_SendList(struct rx_call *call, struct xmitlist *xmit, /* Since we're about to send a data packet to the peer, it's * safe to nuke any scheduled end-of-packets ack */ - rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); MUTEX_EXIT(&call->lock); CALL_HOLD(call, RX_CALL_REFCOUNT_SEND); @@ -6195,7 +6210,7 @@ rxi_Send(struct rx_call *call, struct rx_packet *p, /* Since we're about to send SOME sort of packet to the peer, it's * safe to nuke any scheduled end-of-packets ack */ - rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); /* Actually send the packet, filling in more connection-specific fields */ MUTEX_EXIT(&call->lock); @@ -6318,13 +6333,10 @@ rxi_CheckCall(struct rx_call *call, int haveCTLock) } else { #ifdef RX_ENABLE_LOCKS /* Cancel pending events */ - rxevent_Cancel(&call->delayedAckEvent, call, - RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); rxi_rto_cancel(call); - rxevent_Cancel(&call->keepAliveEvent, call, - RX_CALL_REFCOUNT_ALIVE); - rxevent_Cancel(&call->growMTUEvent, call, - RX_CALL_REFCOUNT_MTU); + rxi_CancelKeepAliveEvent(call); + rxi_CancelGrowMTUEvent(call); MUTEX_ENTER(&rx_refcnt_mutex); /* if rxi_FreeCall returns 1 it has freed the call */ if (call->refCount == 0 && @@ -6602,6 +6614,14 @@ rxi_ScheduleKeepAliveEvent(struct rx_call *call) } } +static void +rxi_CancelKeepAliveEvent(struct rx_call *call) { + if (call->keepAliveEvent) { + rxevent_Cancel(&call->keepAliveEvent); + CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE); + } +} + static void rxi_ScheduleGrowMTUEvent(struct rx_call *call, int secs) { @@ -6625,7 +6645,15 @@ rxi_ScheduleGrowMTUEvent(struct rx_call *call, int secs) } } -/* N.B. rxi_KeepAliveOff: is defined earlier as a macro */ +static void +rxi_CancelGrowMTUEvent(struct rx_call *call) +{ + if (call->growMTUEvent) { + rxevent_Cancel(&call->growMTUEvent); + CALL_RELE(call, RX_CALL_REFCOUNT_MTU); + } +} + static void rxi_KeepAliveOn(struct rx_call *call) { @@ -6638,16 +6666,14 @@ rxi_KeepAliveOn(struct rx_call *call) rxi_ScheduleKeepAliveEvent(call); } -/* - * Solely in order that callers not need to include rx_call.h - */ void rx_KeepAliveOff(struct rx_call *call) { MUTEX_ENTER(&call->lock); - rxi_KeepAliveOff(call); + rxi_CancelKeepAliveEvent(call); MUTEX_EXIT(&call->lock); } + void rx_KeepAliveOn(struct rx_call *call) { diff --git a/src/rx/rx_event.c b/src/rx/rx_event.c index 0053b286f5..0807dc0f84 100644 --- a/src/rx/rx_event.c +++ b/src/rx/rx_event.c @@ -314,7 +314,7 @@ resetFirst(struct rxevent *ev) } void -rxevent_Cancel(struct rxevent **evp, struct rx_call *call, int type) +rxevent_Cancel(struct rxevent **evp) { struct rxevent *event; @@ -368,9 +368,6 @@ rxevent_Cancel(struct rxevent **evp, struct rx_call *call, int type) *evp = NULL; rxevent_put(event); /* Dispose of caller's reference */ - - if (call) - CALL_RELE(call, type); } /* Process all events which have expired. If events remain, then the relative diff --git a/src/rx/rx_event.h b/src/rx/rx_event.h index df9b426d35..23c06c9a68 100644 --- a/src/rx/rx_event.h +++ b/src/rx/rx_event.h @@ -30,8 +30,7 @@ extern struct rxevent *rxevent_Post(struct clock *when, struct clock *now, /* Remove the indicated event from the event queue. The event must be * pending. Note that a currently executing event may not cancel itself. */ -struct rx_call; -extern void rxevent_Cancel(struct rxevent **, struct rx_call *, int type); +extern void rxevent_Cancel(struct rxevent **); /* The actions specified for each event that has reached the current clock * time will be taken. The current time returned by GetTime is used diff --git a/src/rx/rx_globals.h b/src/rx/rx_globals.h index b25578b1b0..602a7e5c66 100644 --- a/src/rx/rx_globals.h +++ b/src/rx/rx_globals.h @@ -528,11 +528,9 @@ EXT afs_kmutex_t rx_connHashTable_lock; /* Forward definitions of internal procedures */ #define rxi_ChallengeOff(conn) \ - rxevent_Cancel(&(conn)->challengeEvent, NULL, 0) -#define rxi_KeepAliveOff(call) \ - rxevent_Cancel(&(call)->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE) + rxevent_Cancel(&(conn)->challengeEvent) #define rxi_NatKeepAliveOff(conn) \ - rxevent_Cancel(&(conn)->natKeepAliveEvent, NULL, 0) + rxevent_Cancel(&(conn)->natKeepAliveEvent) #define rxi_AllocSecurityObject() rxi_Alloc(sizeof(struct rx_securityClass)) #define rxi_FreeSecurityObject(obj) rxi_Free(obj, sizeof(struct rx_securityClass)) diff --git a/src/rx/rx_internal.h b/src/rx/rx_internal.h index d081cf5ca7..800732e50f 100644 --- a/src/rx/rx_internal.h +++ b/src/rx/rx_internal.h @@ -20,6 +20,7 @@ extern rx_atomic_t rx_nWaited; /* Prototypes for internal functions */ /* rx.c */ +extern void rxi_CancelDelayedAckEvent(struct rx_call *); extern void rxi_PacketsUnWait(void); extern void rxi_SetPeerMtu(struct rx_peer *peer, afs_uint32 host, afs_uint32 port, int mtu); diff --git a/src/rx/rx_prototypes.h b/src/rx/rx_prototypes.h index f83f181d12..763a95e9db 100644 --- a/src/rx/rx_prototypes.h +++ b/src/rx/rx_prototypes.h @@ -220,8 +220,7 @@ extern struct rxevent *rxevent_Post(struct clock *when, struct clock *now, extern void shutdown_rxevent(void); extern struct rxepoch *rxepoch_Allocate(struct clock *when); extern void rxevent_Init(int nEvents, void (*scheduler) (void)); -extern void rxevent_Cancel(struct rxevent **ev, struct rx_call *call, - int type); +extern void rxevent_Cancel(struct rxevent **ev); extern int rxevent_RaiseEvents(struct clock *next); diff --git a/src/rx/rx_rdwr.c b/src/rx/rx_rdwr.c index 4af9ab7a08..4b33a14de6 100644 --- a/src/rx/rx_rdwr.c +++ b/src/rx/rx_rdwr.c @@ -203,8 +203,7 @@ rxi_ReadProc(struct rx_call *call, char *buf, if (call->app.currentPacket) { if (!(call->flags & RX_CALL_RECEIVE_DONE)) { if (call->nHardAcks > (u_short) rxi_HardAckRate) { - rxevent_Cancel(&call->delayedAckEvent, call, - RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0); } else { /* Delay to consolidate ack packets */ @@ -486,8 +485,7 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial) * send a hard ack. */ if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) { if (call->nHardAcks > (u_short) rxi_HardAckRate) { - rxevent_Cancel(&call->delayedAckEvent, call, - RX_CALL_REFCOUNT_DELAY); + rxi_CancelDelayedAckEvent(call); rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0); didHardAck = 1; } else { diff --git a/tests/rx/event-t.c b/tests/rx/event-t.c index 885ad99512..f4ef105802 100644 --- a/tests/rx/event-t.c +++ b/tests/rx/event-t.c @@ -116,7 +116,7 @@ main(void) /* Test for a problem when there is only a single event in the tree */ event = rxevent_Post(&now, &now, reportSub, NULL, NULL, 0); ok(event != NULL, "Created a single event"); - rxevent_Cancel(&event, NULL, 0); + rxevent_Cancel(&event); ok(1, "Cancelled a single event"); rxevent_RaiseEvents(&now); ok(1, "RaiseEvents happened without error"); @@ -148,7 +148,7 @@ main(void) int victim = random() % counter; if (events[victim].event != NULL) { - rxevent_Cancel(&events[victim].event, NULL, 0); + rxevent_Cancel(&events[victim].event); events[victim].cancelled = 1; } }