rx: Don't treat calls specially in event package

Many different structures can be passed to the rxevent package as
data. Don't give calls special treatment by making rxevent aware of
how to release their reference counts when an event is cancelled.

Update all of the callers of rxevent_Cancel to use the new arguments,
and where they were cancelling functions with calls as parameters add
the appropriate CALL_RELE directives. In many cases, this has led to
new helper functions to cancel particular call-based events.

Change-Id: Ic02778e48fd950e8850b77bd3c076c235453274d
Reviewed-on: http://gerrit.openafs.org/8538
Reviewed-by: Derrick Brashear <shadow@your-file-system.com>
Tested-by: BuildBot <buildbot@rampaginggeek.com>
Reviewed-by: Jeffrey Altman <jaltman@your-file-system.com>
This commit is contained in:
Simon Wilkinson 2012-11-03 23:15:50 +00:00 committed by Jeffrey Altman
parent 4abcf4a7d1
commit 20034a8157
8 changed files with 89 additions and 71 deletions

View File

@ -153,6 +153,9 @@ static void rxi_GrowMTUOn(struct rx_call *call);
static void rxi_ChallengeOn(struct rx_connection *conn);
static int rxi_CheckCall(struct rx_call *call, int haveCTLock);
static void rxi_AckAllInTransmitQueue(struct rx_call *call);
static void rxi_CancelKeepAliveEvent(struct rx_call *call);
static void rxi_CancelDelayedAbortEvent(struct rx_call *call);
static void rxi_CancelGrowMTUEvent(struct rx_call *call);
#ifdef RX_ENABLE_LOCKS
struct rx_tq_debug {
@ -727,7 +730,8 @@ rxi_rto_startTimer(struct rx_call *call, int lastPacket, int istack)
static_inline void
rxi_rto_cancel(struct rx_call *call)
{
rxevent_Cancel(&call->resendEvent, call, RX_CALL_REFCOUNT_RESEND);
rxevent_Cancel(&call->resendEvent);
CALL_RELE(call, RX_CALL_REFCOUNT_RESEND);
}
/*!
@ -829,13 +833,16 @@ rxi_PostDelayedAckEvent(struct rx_call *call, struct clock *offset)
when = now;
clock_Add(&when, offset);
if (!call->delayedAckEvent
|| clock_Gt(&call->delayedAckTime, &when)) {
if (call->delayedAckEvent && clock_Gt(&call->delayedAckTime, &when)) {
/* The event we're cancelling already has a reference, so we don't
* need a new one */
rxevent_Cancel(&call->delayedAckEvent);
call->delayedAckEvent = rxevent_Post(&when, &now, rxi_SendDelayedAck,
call, NULL, 0);
rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
call->delayedAckTime = when;
} else if (!call->delayedAckEvent) {
CALL_HOLD(call, RX_CALL_REFCOUNT_DELAY);
call->delayedAckEvent = rxevent_Post(&when, &now,
rxi_SendDelayedAck,
call, NULL, 0);
@ -843,6 +850,15 @@ rxi_PostDelayedAckEvent(struct rx_call *call, struct clock *offset)
}
}
void
rxi_CancelDelayedAckEvent(struct rx_call *call)
{
if (call->delayedAckEvent) {
rxevent_Cancel(&call->delayedAckEvent);
CALL_RELE(call, RX_CALL_REFCOUNT_DELAY);
}
}
/* called with unincremented nRequestsRunning to see if it is OK to start
* a new thread in this service. Could be "no" for two reasons: over the
* max quota, or would prevent others from reaching their min quota.
@ -1302,8 +1318,7 @@ rxi_DestroyConnectionNoLock(struct rx_connection *conn)
/* Push the final acknowledgment out now--there
* won't be a subsequent call to acknowledge the
* last reply packets */
rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
if (call->state == RX_STATE_PRECALL
|| call->state == RX_STATE_ACTIVE) {
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
@ -1343,7 +1358,7 @@ rxi_DestroyConnectionNoLock(struct rx_connection *conn)
}
if (conn->delayedAbortEvent) {
rxevent_Cancel(&conn->delayedAbortEvent, NULL, 0);
rxevent_Cancel(&conn->delayedAbortEvent);
packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
if (packet) {
MUTEX_ENTER(&conn->conn_data_lock);
@ -1371,9 +1386,9 @@ rxi_DestroyConnectionNoLock(struct rx_connection *conn)
/* Make sure the connection is completely reset before deleting it. */
/* get rid of pending events that could zap us later */
rxevent_Cancel(&conn->challengeEvent, NULL, 0);
rxevent_Cancel(&conn->checkReachEvent, NULL, 0);
rxevent_Cancel(&conn->natKeepAliveEvent, NULL, 0);
rxevent_Cancel(&conn->challengeEvent);
rxevent_Cancel(&conn->checkReachEvent);
rxevent_Cancel(&conn->natKeepAliveEvent);
/* Add the connection to the list of destroyed connections that
* need to be cleaned up. This is necessary to avoid deadlocks
@ -2426,8 +2441,7 @@ rx_EndCall(struct rx_call *call, afs_int32 rc)
call->state = RX_STATE_DALLY;
rxi_ClearTransmitQueue(call, 0);
rxi_rto_cancel(call);
rxevent_Cancel(&call->keepAliveEvent, call,
RX_CALL_REFCOUNT_ALIVE);
rxi_CancelKeepAliveEvent(call);
}
} else { /* Client connection */
char dummy;
@ -2445,8 +2459,7 @@ rx_EndCall(struct rx_call *call, afs_int32 rc)
* and force-send it now.
*/
if (call->delayedAckEvent) {
rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
rxi_SendDelayedAck(NULL, call, NULL, 0);
}
@ -3898,8 +3911,7 @@ rxi_ReceiveDataPacket(struct rx_call *call,
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
dpf(("packet %"AFS_PTR_FMT" dropped on receipt - duplicate\n", np));
rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
ackNeeded = 0;
call->rprev = seq;
@ -3989,8 +4001,7 @@ rxi_ReceiveDataPacket(struct rx_call *call,
if (seq < call->rnext) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE, istack);
ackNeeded = 0;
call->rprev = seq;
@ -4001,8 +4012,7 @@ rxi_ReceiveDataPacket(struct rx_call *call,
* accomodated by the current window, then send a negative
* acknowledge and drop the packet */
if ((call->rnext + call->rwind) <= seq) {
rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, RX_ACK_EXCEEDS_WINDOW,
istack);
ackNeeded = 0;
@ -4021,8 +4031,7 @@ rxi_ReceiveDataPacket(struct rx_call *call,
if (seq == tp->header.seq) {
if (rx_stats_active)
rx_atomic_inc(&rx_stats.dupPacketsRead);
rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, RX_ACK_DUPLICATE,
istack);
ackNeeded = 0;
@ -4133,10 +4142,10 @@ rxi_ReceiveDataPacket(struct rx_call *call,
* received. Always send a soft ack for the last packet in
* the server's reply. */
if (ackNeeded) {
rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, ackNeeded, istack);
} else if (call->nSoftAcks > (u_short) rxi_SoftAckRate) {
rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
np = rxi_SendAck(call, np, serial, RX_ACK_IDLE, istack);
} else if (call->nSoftAcks) {
if (haveLast && !(flags & RX_CLIENT_INITIATED))
@ -4144,7 +4153,7 @@ rxi_ReceiveDataPacket(struct rx_call *call,
else
rxi_PostDelayedAckEvent(call, &rx_softAckDelay);
} else if (call->flags & RX_CALL_RECEIVE_DONE) {
rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
}
return np;
@ -4703,7 +4712,7 @@ rxi_ReceiveAckPacket(struct rx_call *call, struct rx_packet *np,
&& call->tfirst + call->nSoftAcked >= call->tnext) {
call->state = RX_STATE_DALLY;
rxi_ClearTransmitQueue(call, 0);
rxevent_Cancel(&call->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE);
rxi_CancelKeepAliveEvent(call);
} else if (!opr_queue_IsEmpty(&call->tq)) {
rxi_Start(call, istack);
}
@ -5093,10 +5102,7 @@ rxi_SendCallAbort(struct rx_call *call, struct rx_packet *packet,
if (force || rxi_callAbortThreshhold == 0
|| call->abortCount < rxi_callAbortThreshhold) {
if (call->delayedAbortEvent) {
rxevent_Cancel(&call->delayedAbortEvent, call,
RX_CALL_REFCOUNT_ABORT);
}
rxi_CancelDelayedAbortEvent(call);
error = htonl(cerror);
call->abortCount++;
packet =
@ -5113,6 +5119,15 @@ rxi_SendCallAbort(struct rx_call *call, struct rx_packet *packet,
return packet;
}
static void
rxi_CancelDelayedAbortEvent(struct rx_call *call)
{
if (call->delayedAbortEvent) {
rxevent_Cancel(&call->delayedAbortEvent);
CALL_RELE(call, RX_CALL_REFCOUNT_ABORT);
}
}
/* Send an abort packet for the specified connection. Packet is an
* optional pointer to a packet that can be used to send the abort.
* Once the number of abort messages reaches the threshhold, an
@ -5139,7 +5154,7 @@ rxi_SendConnectionAbort(struct rx_connection *conn,
if (force || rxi_connAbortThreshhold == 0
|| conn->abortCount < rxi_connAbortThreshhold) {
rxevent_Cancel(&conn->delayedAbortEvent, NULL, 0);
rxevent_Cancel(&conn->delayedAbortEvent);
error = htonl(conn->error);
conn->abortCount++;
MUTEX_EXIT(&conn->conn_data_lock);
@ -5173,10 +5188,10 @@ rxi_ConnectionError(struct rx_connection *conn,
dpf(("rxi_ConnectionError conn %"AFS_PTR_FMT" error %d\n", conn, error));
MUTEX_ENTER(&conn->conn_data_lock);
rxevent_Cancel(&conn->challengeEvent, NULL, 0);
rxevent_Cancel(&conn->natKeepAliveEvent, NULL, 0);
rxevent_Cancel(&conn->challengeEvent);
rxevent_Cancel(&conn->natKeepAliveEvent);
if (conn->checkReachEvent) {
rxevent_Cancel(&conn->checkReachEvent, NULL, 0);
rxevent_Cancel(&conn->checkReachEvent);
conn->flags &= ~(RX_CONN_ATTACHWAIT|RX_CONN_NAT_PING);
putConnection(conn);
}
@ -5253,10 +5268,10 @@ rxi_ResetCall(struct rx_call *call, int newcall)
}
rxevent_Cancel(&call->growMTUEvent, call, RX_CALL_REFCOUNT_MTU);
rxi_CancelGrowMTUEvent(call);
if (call->delayedAbortEvent) {
rxevent_Cancel(&call->delayedAbortEvent, call, RX_CALL_REFCOUNT_ABORT);
rxi_CancelDelayedAbortEvent(call);
packet = rxi_AllocPacket(RX_PACKET_CLASS_SPECIAL);
if (packet) {
rxi_SendCallAbort(call, packet, 0, 1);
@ -5385,8 +5400,8 @@ rxi_ResetCall(struct rx_call *call, int newcall)
}
#endif /* RX_ENABLE_LOCKS */
rxi_KeepAliveOff(call);
rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
rxi_CancelKeepAliveEvent(call);
rxi_CancelDelayedAckEvent(call);
}
/* Send an acknowledge for the indicated packet (seq,serial) of the
@ -5753,7 +5768,7 @@ rxi_SendList(struct rx_call *call, struct xmitlist *xmit,
/* Since we're about to send a data packet to the peer, it's
* safe to nuke any scheduled end-of-packets ack */
rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
MUTEX_EXIT(&call->lock);
CALL_HOLD(call, RX_CALL_REFCOUNT_SEND);
@ -6195,7 +6210,7 @@ rxi_Send(struct rx_call *call, struct rx_packet *p,
/* Since we're about to send SOME sort of packet to the peer, it's
* safe to nuke any scheduled end-of-packets ack */
rxevent_Cancel(&call->delayedAckEvent, call, RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
/* Actually send the packet, filling in more connection-specific fields */
MUTEX_EXIT(&call->lock);
@ -6318,13 +6333,10 @@ rxi_CheckCall(struct rx_call *call, int haveCTLock)
} else {
#ifdef RX_ENABLE_LOCKS
/* Cancel pending events */
rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
rxi_rto_cancel(call);
rxevent_Cancel(&call->keepAliveEvent, call,
RX_CALL_REFCOUNT_ALIVE);
rxevent_Cancel(&call->growMTUEvent, call,
RX_CALL_REFCOUNT_MTU);
rxi_CancelKeepAliveEvent(call);
rxi_CancelGrowMTUEvent(call);
MUTEX_ENTER(&rx_refcnt_mutex);
/* if rxi_FreeCall returns 1 it has freed the call */
if (call->refCount == 0 &&
@ -6602,6 +6614,14 @@ rxi_ScheduleKeepAliveEvent(struct rx_call *call)
}
}
static void
rxi_CancelKeepAliveEvent(struct rx_call *call) {
if (call->keepAliveEvent) {
rxevent_Cancel(&call->keepAliveEvent);
CALL_RELE(call, RX_CALL_REFCOUNT_ALIVE);
}
}
static void
rxi_ScheduleGrowMTUEvent(struct rx_call *call, int secs)
{
@ -6625,7 +6645,15 @@ rxi_ScheduleGrowMTUEvent(struct rx_call *call, int secs)
}
}
/* N.B. rxi_KeepAliveOff: is defined earlier as a macro */
static void
rxi_CancelGrowMTUEvent(struct rx_call *call)
{
if (call->growMTUEvent) {
rxevent_Cancel(&call->growMTUEvent);
CALL_RELE(call, RX_CALL_REFCOUNT_MTU);
}
}
static void
rxi_KeepAliveOn(struct rx_call *call)
{
@ -6638,16 +6666,14 @@ rxi_KeepAliveOn(struct rx_call *call)
rxi_ScheduleKeepAliveEvent(call);
}
/*
* Solely in order that callers not need to include rx_call.h
*/
void
rx_KeepAliveOff(struct rx_call *call)
{
MUTEX_ENTER(&call->lock);
rxi_KeepAliveOff(call);
rxi_CancelKeepAliveEvent(call);
MUTEX_EXIT(&call->lock);
}
void
rx_KeepAliveOn(struct rx_call *call)
{

View File

@ -314,7 +314,7 @@ resetFirst(struct rxevent *ev)
}
void
rxevent_Cancel(struct rxevent **evp, struct rx_call *call, int type)
rxevent_Cancel(struct rxevent **evp)
{
struct rxevent *event;
@ -368,9 +368,6 @@ rxevent_Cancel(struct rxevent **evp, struct rx_call *call, int type)
*evp = NULL;
rxevent_put(event); /* Dispose of caller's reference */
if (call)
CALL_RELE(call, type);
}
/* Process all events which have expired. If events remain, then the relative

View File

@ -30,8 +30,7 @@ extern struct rxevent *rxevent_Post(struct clock *when, struct clock *now,
/* Remove the indicated event from the event queue. The event must be
* pending. Note that a currently executing event may not cancel itself.
*/
struct rx_call;
extern void rxevent_Cancel(struct rxevent **, struct rx_call *, int type);
extern void rxevent_Cancel(struct rxevent **);
/* The actions specified for each event that has reached the current clock
* time will be taken. The current time returned by GetTime is used

View File

@ -528,11 +528,9 @@ EXT afs_kmutex_t rx_connHashTable_lock;
/* Forward definitions of internal procedures */
#define rxi_ChallengeOff(conn) \
rxevent_Cancel(&(conn)->challengeEvent, NULL, 0)
#define rxi_KeepAliveOff(call) \
rxevent_Cancel(&(call)->keepAliveEvent, call, RX_CALL_REFCOUNT_ALIVE)
rxevent_Cancel(&(conn)->challengeEvent)
#define rxi_NatKeepAliveOff(conn) \
rxevent_Cancel(&(conn)->natKeepAliveEvent, NULL, 0)
rxevent_Cancel(&(conn)->natKeepAliveEvent)
#define rxi_AllocSecurityObject() rxi_Alloc(sizeof(struct rx_securityClass))
#define rxi_FreeSecurityObject(obj) rxi_Free(obj, sizeof(struct rx_securityClass))

View File

@ -20,6 +20,7 @@ extern rx_atomic_t rx_nWaited;
/* Prototypes for internal functions */
/* rx.c */
extern void rxi_CancelDelayedAckEvent(struct rx_call *);
extern void rxi_PacketsUnWait(void);
extern void rxi_SetPeerMtu(struct rx_peer *peer, afs_uint32 host,
afs_uint32 port, int mtu);

View File

@ -220,8 +220,7 @@ extern struct rxevent *rxevent_Post(struct clock *when, struct clock *now,
extern void shutdown_rxevent(void);
extern struct rxepoch *rxepoch_Allocate(struct clock *when);
extern void rxevent_Init(int nEvents, void (*scheduler) (void));
extern void rxevent_Cancel(struct rxevent **ev, struct rx_call *call,
int type);
extern void rxevent_Cancel(struct rxevent **ev);
extern int rxevent_RaiseEvents(struct clock *next);

View File

@ -203,8 +203,7 @@ rxi_ReadProc(struct rx_call *call, char *buf,
if (call->app.currentPacket) {
if (!(call->flags & RX_CALL_RECEIVE_DONE)) {
if (call->nHardAcks > (u_short) rxi_HardAckRate) {
rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
rxi_SendAck(call, 0, 0, RX_ACK_DELAY, 0);
} else {
/* Delay to consolidate ack packets */
@ -486,8 +485,7 @@ rxi_FillReadVec(struct rx_call *call, afs_uint32 serial)
* send a hard ack. */
if (didConsume && (!(call->flags & RX_CALL_RECEIVE_DONE))) {
if (call->nHardAcks > (u_short) rxi_HardAckRate) {
rxevent_Cancel(&call->delayedAckEvent, call,
RX_CALL_REFCOUNT_DELAY);
rxi_CancelDelayedAckEvent(call);
rxi_SendAck(call, 0, serial, RX_ACK_DELAY, 0);
didHardAck = 1;
} else {

View File

@ -116,7 +116,7 @@ main(void)
/* Test for a problem when there is only a single event in the tree */
event = rxevent_Post(&now, &now, reportSub, NULL, NULL, 0);
ok(event != NULL, "Created a single event");
rxevent_Cancel(&event, NULL, 0);
rxevent_Cancel(&event);
ok(1, "Cancelled a single event");
rxevent_RaiseEvents(&now);
ok(1, "RaiseEvents happened without error");
@ -148,7 +148,7 @@ main(void)
int victim = random() % counter;
if (events[victim].event != NULL) {
rxevent_Cancel(&events[victim].event, NULL, 0);
rxevent_Cancel(&events[victim].event);
events[victim].cancelled = 1;
}
}