diff --git a/src/rx/rx.c b/src/rx/rx.c index 21d456ba34..0e5df40465 100644 --- a/src/rx/rx.c +++ b/src/rx/rx.c @@ -957,9 +957,10 @@ static void rxi_DestroyConnectionNoLock(conn) /* Make sure the connection is completely reset before deleting it. */ /* get rid of pending events that could zap us later */ - if (conn->challengeEvent) { + if (conn->challengeEvent) rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0); - } + if (conn->checkReachEvent) + rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0); /* Add the connection to the list of destroyed connections that * need to be cleaned up. This is necessary to avoid deadlocks @@ -2438,7 +2439,7 @@ struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp) MUTEX_ENTER(&conn->conn_data_lock); if (conn->maxSerial < np->header.serial) - conn->maxSerial = np->header.serial; + conn->maxSerial = np->header.serial; MUTEX_EXIT(&conn->conn_data_lock); /* If the connection is in an error state, send an abort packet and ignore @@ -2555,7 +2556,9 @@ struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp) return np; } if (!call) { + MUTEX_ENTER(&conn->conn_call_lock); call = rxi_NewCall(conn, channel); + MUTEX_EXIT(&conn->conn_call_lock); *call->callNumber = np->header.callNumber; call->state = RX_STATE_PRECALL; clock_GetTime(&call->queueTime); @@ -2865,20 +2868,102 @@ static TooLow(ap, acall) } #endif /* KERNEL */ +static void rxi_CheckReachEvent(event, conn, acall) + struct rxevent *event; + struct rx_connection *conn; + struct rx_call *acall; +{ + struct rx_call *call = acall; + struct clock when; + int i, waiting; + + MUTEX_ENTER(&conn->conn_call_lock); + MUTEX_ENTER(&conn->conn_data_lock); + conn->checkReachEvent = (struct rxevent *) 0; + waiting = conn->flags & RX_CONN_ATTACHWAIT; + if (event) conn->refCount--; + MUTEX_EXIT(&conn->conn_data_lock); + + if (waiting) { + if (!call) + for (i=0; icall[i]; + if (tc && tc->state == RX_STATE_PRECALL) { + call = tc; + break; + } + } + + if (call) { + if (call != acall) MUTEX_ENTER(&call->lock); + rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0); + if (call != acall) MUTEX_EXIT(&call->lock); + + MUTEX_ENTER(&conn->conn_data_lock); + conn->refCount++; + MUTEX_EXIT(&conn->conn_data_lock); + clock_GetTime(&when); + when.sec += RX_CHECKREACH_TIMEOUT; + conn->checkReachEvent = + rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL); + } + } + MUTEX_EXIT(&conn->conn_call_lock); +} + +static int rxi_CheckConnReach(conn, call) + struct rx_connection *conn; + struct rx_call *call; +{ + struct rx_service *service = conn->service; + struct rx_peer *peer = conn->peer; + afs_uint32 now, lastReach; + + MUTEX_ENTER(&rx_serverPool_lock); + if (service->nRequestsRunning <= service->maxProcs/2) { + MUTEX_EXIT(&rx_serverPool_lock); + return 0; + } + MUTEX_EXIT(&rx_serverPool_lock); + + now = clock_Sec(); + MUTEX_ENTER(&peer->peer_lock); + lastReach = peer->lastReachTime; + MUTEX_EXIT(&peer->peer_lock); + if (now - lastReach < RX_CHECKREACH_TTL) + return 0; + + MUTEX_ENTER(&conn->conn_data_lock); + if (conn->flags & RX_CONN_ATTACHWAIT) { + MUTEX_EXIT(&conn->conn_data_lock); + return 1; + } + conn->flags |= RX_CONN_ATTACHWAIT; + MUTEX_EXIT(&conn->conn_data_lock); + if (!conn->checkReachEvent) + rxi_CheckReachEvent((struct rxevent *)0, conn, call); + + return 1; +} + /* try to attach call, if authentication is complete */ -static void TryAttach(acall, socket, tnop, newcallp) -register struct rx_call *acall; -register osi_socket socket; -register int *tnop; -register struct rx_call **newcallp; { - register struct rx_connection *conn; - conn = acall->conn; - if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) { +static void TryAttach(acall, socket, tnop, newcallp, reachOverride) + register struct rx_call *acall; + register osi_socket socket; + register int *tnop; + register struct rx_call **newcallp; + int reachOverride; +{ + struct rx_connection *conn = acall->conn; + + if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) { /* Don't attach until we have any req'd. authentication. */ if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) { - rxi_AttachServerProc(acall, socket, tnop, newcallp); - /* Note: this does not necessarily succeed; there - may not any proc available */ + if (reachOverride || rxi_CheckConnReach(conn, acall) == 0) + rxi_AttachServerProc(acall, socket, tnop, newcallp); + /* Note: this does not necessarily succeed; there + * may not any proc available + */ } else { rxi_ChallengeOn(acall->conn); @@ -3053,7 +3138,7 @@ struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host, * server thread is available, this thread becomes a server * thread and the server thread becomes a listener thread. */ if (isFirst) { - TryAttach(call, socket, tnop, newcallp); + TryAttach(call, socket, tnop, newcallp, 0); } } /* This is not the expected next packet. */ @@ -3241,6 +3326,37 @@ nextloop:; static void rxi_ComputeRate(); #endif +static void rxi_UpdatePeerReach(conn, acall) + struct rx_connection *conn; + struct rx_call *acall; +{ + struct rx_peer *peer = conn->peer; + + MUTEX_ENTER(&peer->peer_lock); + peer->lastReachTime = clock_Sec(); + MUTEX_EXIT(&peer->peer_lock); + + MUTEX_ENTER(&conn->conn_call_lock); + MUTEX_ENTER(&conn->conn_data_lock); + if (conn->flags & RX_CONN_ATTACHWAIT) { + int i; + + conn->flags &= ~RX_CONN_ATTACHWAIT; + MUTEX_EXIT(&conn->conn_data_lock); + + for (i=0; icall[i]; + if (call) { + if (call != acall) MUTEX_ENTER(&call->lock); + TryAttach(call, -1, NULL, NULL, 1); + if (call != acall) MUTEX_EXIT(&call->lock); + } + } + } else + MUTEX_EXIT(&conn->conn_data_lock); + MUTEX_EXIT(&conn->conn_call_lock); +} + /* The real smarts of the whole thing. */ struct rx_packet *rxi_ReceiveAckPacket(call, np, istack) register struct rx_call *call; @@ -3288,6 +3404,9 @@ struct rx_packet *rxi_ReceiveAckPacket(call, np, istack) if (np->header.flags & RX_SLOW_START_OK) { call->flags |= RX_CALL_SLOW_START_OK; } + + if (ap->reason == RX_ACK_PING_RESPONSE) + rxi_UpdatePeerReach(conn, call); #ifdef RXDEBUG if (rx_Log) { @@ -3743,8 +3862,9 @@ struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack) } else { /* If the response is valid, any calls waiting to attach - * servers can now do so */ + * servers can now do so */ int i; + for (i=0; icall[i]; if (call) { @@ -3754,6 +3874,12 @@ struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack) MUTEX_EXIT(&call->lock); } } + + /* Update the peer reachability information, just in case + * some calls went into attach-wait while we were waiting + * for authentication.. + */ + rxi_UpdatePeerReach(conn, NULL); } return np; } @@ -3804,10 +3930,10 @@ rxi_ReceiveChallengePacket(conn, np, istack) * call so it eventually gets one */ void rxi_AttachServerProc(call, socket, tnop, newcallp) -register struct rx_call *call; -register osi_socket socket; -register int *tnop; -register struct rx_call **newcallp; + register struct rx_call *call; + register osi_socket socket; + register int *tnop; + register struct rx_call **newcallp; { register struct rx_serverQueueEntry *sq; register struct rx_service *service = call->conn->service; @@ -4161,6 +4287,10 @@ void rxi_ConnectionError(conn, error) register int i; if (conn->challengeEvent) rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0); + if (conn->checkReachEvent) { + rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0); + conn->checkReachEvent = 0; + } for (i=0; icall[i]; if (call) { diff --git a/src/rx/rx.h b/src/rx/rx.h index 6242ec3d42..1f51b57b3a 100644 --- a/src/rx/rx.h +++ b/src/rx/rx.h @@ -452,6 +452,7 @@ struct rx_peer { afs_hyper_t bytesSent; /* Number of bytes sent to this peer */ afs_hyper_t bytesReceived; /* Number of bytes received from this peer */ struct rx_queue rpcStats; /* rpc statistic list */ + int lastReachTime; /* Last time we verified reachability */ }; /* A connection is an authenticated communication path, allowing @@ -486,6 +487,7 @@ struct rx_connection { /* peer process could be restarted on us. Includes RX Header. */ struct rxevent *challengeEvent; /* Scheduled when the server is challenging a */ struct rxevent *delayedAbortEvent; /* Scheduled to throttle looping client */ + struct rxevent *checkReachEvent; /* Scheduled when checking reachability */ int abortCount; /* count of abort messages sent */ /* client-- to retransmit the challenge */ struct rx_service *service; /* used by servers only */ @@ -519,6 +521,7 @@ struct rx_connection { #define RX_CONN_KNOW_WINDOW 8 /* window size negotiation works */ #define RX_CONN_RESET 16 /* connection is reset, remove */ #define RX_CONN_BUSY 32 /* connection is busy; don't delete */ +#define RX_CONN_ATTACHWAIT 64 /* attach waiting for peer->lastReach */ /* Type of connection, client or server */ #define RX_CLIENT_CONNECTION 0 @@ -740,6 +743,8 @@ struct rx_ackPacket { #define RX_CHALLENGE_TIMEOUT 2 /* Number of seconds before another authentication request packet is generated */ #define RX_CHALLENGE_MAXTRIES 50 /* Max # of times we resend challenge */ +#define RX_CHECKREACH_TIMEOUT 2 /* Number of seconds before another ping is generated */ +#define RX_CHECKREACH_TTL 60 /* Re-check reachability this often */ /* RX error codes. RX uses error codes from -1 to -64. Rxgen may use other error codes < -64; user programs are expected to return positive error codes */