rx-protect-servers-from-half-reachable-clients-20020119

This patch should protect Rx-based servers from half-reachable clients,
which issue requests but don't acknowledge the server's response, thereby
eventually tying up all of the server threads, and denying service to all
other clients.  Such clients can arise in case of uni-directional routing
failures, whereby all packets from the server to client are lost but not
the other way around.

The idea it to ping clients (using Rx ack ping) before attaching them to
a thread, if (a) we're running low on threads, and (b) the client hasn't
responsed to a ping recently.
This commit is contained in:
Nickolai Zeldovich 2002-01-19 16:28:33 +00:00 committed by Derrick Brashear
parent 7827ac9af1
commit c8f461dcb0
2 changed files with 155 additions and 20 deletions

View File

@ -957,9 +957,10 @@ static void rxi_DestroyConnectionNoLock(conn)
/* Make sure the connection is completely reset before deleting it. */
/* get rid of pending events that could zap us later */
if (conn->challengeEvent) {
if (conn->challengeEvent)
rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
}
if (conn->checkReachEvent)
rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
/* Add the connection to the list of destroyed connections that
* need to be cleaned up. This is necessary to avoid deadlocks
@ -2555,7 +2556,9 @@ struct rx_packet *rxi_ReceivePacket(np, socket, host, port, tnop, newcallp)
return np;
}
if (!call) {
MUTEX_ENTER(&conn->conn_call_lock);
call = rxi_NewCall(conn, channel);
MUTEX_EXIT(&conn->conn_call_lock);
*call->callNumber = np->header.callNumber;
call->state = RX_STATE_PRECALL;
clock_GetTime(&call->queueTime);
@ -2865,20 +2868,102 @@ static TooLow(ap, acall)
}
#endif /* KERNEL */
static void rxi_CheckReachEvent(event, conn, acall)
struct rxevent *event;
struct rx_connection *conn;
struct rx_call *acall;
{
struct rx_call *call = acall;
struct clock when;
int i, waiting;
MUTEX_ENTER(&conn->conn_call_lock);
MUTEX_ENTER(&conn->conn_data_lock);
conn->checkReachEvent = (struct rxevent *) 0;
waiting = conn->flags & RX_CONN_ATTACHWAIT;
if (event) conn->refCount--;
MUTEX_EXIT(&conn->conn_data_lock);
if (waiting) {
if (!call)
for (i=0; i<RX_MAXCALLS; i++) {
struct rx_call *tc = conn->call[i];
if (tc && tc->state == RX_STATE_PRECALL) {
call = tc;
break;
}
}
if (call) {
if (call != acall) MUTEX_ENTER(&call->lock);
rxi_SendAck(call, NULL, 0, 0, 0, RX_ACK_PING, 0);
if (call != acall) MUTEX_EXIT(&call->lock);
MUTEX_ENTER(&conn->conn_data_lock);
conn->refCount++;
MUTEX_EXIT(&conn->conn_data_lock);
clock_GetTime(&when);
when.sec += RX_CHECKREACH_TIMEOUT;
conn->checkReachEvent =
rxevent_Post(&when, rxi_CheckReachEvent, conn, NULL);
}
}
MUTEX_EXIT(&conn->conn_call_lock);
}
static int rxi_CheckConnReach(conn, call)
struct rx_connection *conn;
struct rx_call *call;
{
struct rx_service *service = conn->service;
struct rx_peer *peer = conn->peer;
afs_uint32 now, lastReach;
MUTEX_ENTER(&rx_serverPool_lock);
if (service->nRequestsRunning <= service->maxProcs/2) {
MUTEX_EXIT(&rx_serverPool_lock);
return 0;
}
MUTEX_EXIT(&rx_serverPool_lock);
now = clock_Sec();
MUTEX_ENTER(&peer->peer_lock);
lastReach = peer->lastReachTime;
MUTEX_EXIT(&peer->peer_lock);
if (now - lastReach < RX_CHECKREACH_TTL)
return 0;
MUTEX_ENTER(&conn->conn_data_lock);
if (conn->flags & RX_CONN_ATTACHWAIT) {
MUTEX_EXIT(&conn->conn_data_lock);
return 1;
}
conn->flags |= RX_CONN_ATTACHWAIT;
MUTEX_EXIT(&conn->conn_data_lock);
if (!conn->checkReachEvent)
rxi_CheckReachEvent((struct rxevent *)0, conn, call);
return 1;
}
/* try to attach call, if authentication is complete */
static void TryAttach(acall, socket, tnop, newcallp)
register struct rx_call *acall;
register osi_socket socket;
register int *tnop;
register struct rx_call **newcallp; {
register struct rx_connection *conn;
conn = acall->conn;
if ((conn->type==RX_SERVER_CONNECTION) && (acall->state == RX_STATE_PRECALL)) {
static void TryAttach(acall, socket, tnop, newcallp, reachOverride)
register struct rx_call *acall;
register osi_socket socket;
register int *tnop;
register struct rx_call **newcallp;
int reachOverride;
{
struct rx_connection *conn = acall->conn;
if (conn->type==RX_SERVER_CONNECTION && acall->state==RX_STATE_PRECALL) {
/* Don't attach until we have any req'd. authentication. */
if (RXS_CheckAuthentication(conn->securityObject, conn) == 0) {
if (reachOverride || rxi_CheckConnReach(conn, acall) == 0)
rxi_AttachServerProc(acall, socket, tnop, newcallp);
/* Note: this does not necessarily succeed; there
may not any proc available */
* may not any proc available
*/
}
else {
rxi_ChallengeOn(acall->conn);
@ -3053,7 +3138,7 @@ struct rx_packet *rxi_ReceiveDataPacket(call, np, istack, socket, host,
* server thread is available, this thread becomes a server
* thread and the server thread becomes a listener thread. */
if (isFirst) {
TryAttach(call, socket, tnop, newcallp);
TryAttach(call, socket, tnop, newcallp, 0);
}
}
/* This is not the expected next packet. */
@ -3241,6 +3326,37 @@ nextloop:;
static void rxi_ComputeRate();
#endif
static void rxi_UpdatePeerReach(conn, acall)
struct rx_connection *conn;
struct rx_call *acall;
{
struct rx_peer *peer = conn->peer;
MUTEX_ENTER(&peer->peer_lock);
peer->lastReachTime = clock_Sec();
MUTEX_EXIT(&peer->peer_lock);
MUTEX_ENTER(&conn->conn_call_lock);
MUTEX_ENTER(&conn->conn_data_lock);
if (conn->flags & RX_CONN_ATTACHWAIT) {
int i;
conn->flags &= ~RX_CONN_ATTACHWAIT;
MUTEX_EXIT(&conn->conn_data_lock);
for (i=0; i<RX_MAXCALLS; i++) {
struct rx_call *call = conn->call[i];
if (call) {
if (call != acall) MUTEX_ENTER(&call->lock);
TryAttach(call, -1, NULL, NULL, 1);
if (call != acall) MUTEX_EXIT(&call->lock);
}
}
} else
MUTEX_EXIT(&conn->conn_data_lock);
MUTEX_EXIT(&conn->conn_call_lock);
}
/* The real smarts of the whole thing. */
struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
register struct rx_call *call;
@ -3289,6 +3405,9 @@ struct rx_packet *rxi_ReceiveAckPacket(call, np, istack)
call->flags |= RX_CALL_SLOW_START_OK;
}
if (ap->reason == RX_ACK_PING_RESPONSE)
rxi_UpdatePeerReach(conn, call);
#ifdef RXDEBUG
if (rx_Log) {
fprintf( rx_Log,
@ -3745,6 +3864,7 @@ struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
/* If the response is valid, any calls waiting to attach
* servers can now do so */
int i;
for (i=0; i<RX_MAXCALLS; i++) {
struct rx_call *call = conn->call[i];
if (call) {
@ -3754,6 +3874,12 @@ struct rx_packet *rxi_ReceiveResponsePacket(conn, np, istack)
MUTEX_EXIT(&call->lock);
}
}
/* Update the peer reachability information, just in case
* some calls went into attach-wait while we were waiting
* for authentication..
*/
rxi_UpdatePeerReach(conn, NULL);
}
return np;
}
@ -3804,10 +3930,10 @@ rxi_ReceiveChallengePacket(conn, np, istack)
* call so it eventually gets one */
void
rxi_AttachServerProc(call, socket, tnop, newcallp)
register struct rx_call *call;
register osi_socket socket;
register int *tnop;
register struct rx_call **newcallp;
register struct rx_call *call;
register osi_socket socket;
register int *tnop;
register struct rx_call **newcallp;
{
register struct rx_serverQueueEntry *sq;
register struct rx_service *service = call->conn->service;
@ -4161,6 +4287,10 @@ void rxi_ConnectionError(conn, error)
register int i;
if (conn->challengeEvent)
rxevent_Cancel(conn->challengeEvent, (struct rx_call*)0, 0);
if (conn->checkReachEvent) {
rxevent_Cancel(conn->checkReachEvent, (struct rx_call*)0, 0);
conn->checkReachEvent = 0;
}
for (i=0; i<RX_MAXCALLS; i++) {
struct rx_call *call = conn->call[i];
if (call) {

View File

@ -452,6 +452,7 @@ struct rx_peer {
afs_hyper_t bytesSent; /* Number of bytes sent to this peer */
afs_hyper_t bytesReceived; /* Number of bytes received from this peer */
struct rx_queue rpcStats; /* rpc statistic list */
int lastReachTime; /* Last time we verified reachability */
};
/* A connection is an authenticated communication path, allowing
@ -486,6 +487,7 @@ struct rx_connection {
/* peer process could be restarted on us. Includes RX Header. */
struct rxevent *challengeEvent; /* Scheduled when the server is challenging a */
struct rxevent *delayedAbortEvent; /* Scheduled to throttle looping client */
struct rxevent *checkReachEvent; /* Scheduled when checking reachability */
int abortCount; /* count of abort messages sent */
/* client-- to retransmit the challenge */
struct rx_service *service; /* used by servers only */
@ -519,6 +521,7 @@ struct rx_connection {
#define RX_CONN_KNOW_WINDOW 8 /* window size negotiation works */
#define RX_CONN_RESET 16 /* connection is reset, remove */
#define RX_CONN_BUSY 32 /* connection is busy; don't delete */
#define RX_CONN_ATTACHWAIT 64 /* attach waiting for peer->lastReach */
/* Type of connection, client or server */
#define RX_CLIENT_CONNECTION 0
@ -740,6 +743,8 @@ struct rx_ackPacket {
#define RX_CHALLENGE_TIMEOUT 2 /* Number of seconds before another authentication request packet is generated */
#define RX_CHALLENGE_MAXTRIES 50 /* Max # of times we resend challenge */
#define RX_CHECKREACH_TIMEOUT 2 /* Number of seconds before another ping is generated */
#define RX_CHECKREACH_TTL 60 /* Re-check reachability this often */
/* RX error codes. RX uses error codes from -1 to -64. Rxgen may use other error codes < -64; user programs are expected to return positive error codes */