rxtcp-cleanup-20060731

Various cleanup to make rxtcp work after conversion to new-world Rx.
This commit is contained in:
Ken Hornstein 2006-07-31 23:21:24 +00:00
parent e8e47f7f5c
commit 9d78ae05d3
5 changed files with 164 additions and 117 deletions

View File

@ -141,6 +141,15 @@ static unsigned int rxi_rpc_peer_stat_cnt;
static unsigned int rxi_rpc_process_stat_cnt;
/*
* A list of addresses that we established during rx_InitAddrs()
*/
static int n_addresses;
static struct sockaddr_storage *init_addresses;
static int *init_socklens;
static int *init_socktypes;
#if !defined(offsetof)
#include <stddef.h> /* for definition of offsetof() */
#endif
@ -457,7 +466,22 @@ int rx_InitAddrs(struct sockaddr_storage *saddrs, int *types, int *salens,
rx_socket = -1;
rx_port = 0;
/*
* Initialize the global variables holding the addresses (we'll use them
* later). Also create the necessary UDP socket(s) if they're specified.
*/
n_addresses = nelem;
init_addresses = (struct sockaddr_storage *)
malloc(sizeof(struct sockaddr_storage) * nelem);
init_socklens = (int *) malloc(sizeof(int) * nelem);
init_socktypes = (int *) malloc(sizeof(int) * nelem);
for (i = 0; i < nelem; i++) {
memcpy((void *) &init_addresses[i], &saddrs[i], salens[i]);
init_socklens[i] = salens[i];
init_socktypes[i] = types[i];
switch (types[i]) {
case SOCK_DGRAM:
rx_socket = rxi_GetHostUDPSocket(&saddrs[i], salens[i]);
@ -467,8 +491,15 @@ int rx_InitAddrs(struct sockaddr_storage *saddrs, int *types, int *salens,
}
rx_port = rx_ss2pn(&saddrs[i]);
break;
#ifdef AFS_PTHREAD_ENV
case SOCK_STREAM:
/*
* We don't create the socket until we get to rx_NewService
*/
break;
#endif /* AFS_PTHREAD_ENV */
default:
return RX_INVALID_OPERATION;
return RX_INVALID_OPERATION;
}
}
@ -536,7 +567,7 @@ int rx_InitAddrs(struct sockaddr_storage *saddrs, int *types, int *salens,
osi_GetTime(&tv);
#endif
if (! rx_port) {
if (! rx_port && types[0] != SOCK_STREAM) {
#if defined(KERNEL) && !defined(UKERNEL)
/* Really, this should never happen in a real kernel */
rx_port = 0;
@ -858,14 +889,12 @@ rx_NewConnectionAddrs(struct sockaddr_storage *saddr, int *type, int *slen,
conn->epoch = rx_epoch;
/*
* Right now we're going to just call rxi_FindPeer for UDP connections
* We're only going to support one.
* We're only going to support the first one now; later, we can
* support multiple. XXX
*/
for (i = 0; i < nelem; i++) {
if (type[i] == SOCK_DGRAM) {
conn->peer = rxi_FindPeer(&saddr[i], slen[i], type[i], 0, 1);
break;
}
}
conn->peer = rxi_FindPeer(&saddr[0], slen[0], type[0], 0, 1);
conn->serviceId = sservice;
conn->securityObject = securityObject;
/* This doesn't work in all compilers with void (they're buggy), so fake it
@ -897,12 +926,15 @@ rx_NewConnectionAddrs(struct sockaddr_storage *saddr, int *type, int *slen,
USERPRI;
/*
* Try to open a new TCP connection
* Try to open a new TCP connection, if we have a TCP connection
* requested. For now, just check the first one.
*/
#ifdef AFS_PTHREAD_ENV
rxi_TcpNewConnection(conn);
#endif
if (type[0] == SOCK_STREAM) {
rxi_TcpNewConnection(conn);
}
#endif /* AFS_PTHREAD_ENV */
return conn;
}
@ -1177,7 +1209,7 @@ rx_NewCall(register struct rx_connection *conn)
*/
#ifdef AFS_PTHREAD_ENV
if (conn->tcpDescriptor >= 0) {
if (rx_PeerOf(conn)->socktype == SOCK_STREAM) {
return rxi_TcpNewCall(conn);
}
#endif
@ -1379,9 +1411,8 @@ rx_NewService(u_short port, u_short serviceId, char *serviceName,
afs_int32(*serviceProc) (struct rx_call * acall))
{
osi_socket socket = OSI_NULLSOCKET;
osi_socket tcpsocket = OSI_NULLSOCKET;
register struct rx_service *tservice;
register int i;
register struct rx_service *tservice, *retservice = NULL;
register int i, j;
SPLVAR;
clock_NewTime();
@ -1393,7 +1424,7 @@ rx_NewService(u_short port, u_short serviceId, char *serviceName,
return 0;
}
if (port == 0) {
if (rx_port == 0) {
if (rx_ss2pn(&init_addresses[0]) == 0) {
(osi_Msg
"rx_NewService: A non-zero port must be specified on this call if a non-zero port was not provided at Rx initialization (service %s).\n",
serviceName);
@ -1405,78 +1436,78 @@ rx_NewService(u_short port, u_short serviceId, char *serviceName,
tservice = rxi_AllocService();
NETPRI;
for (i = 0; i < RX_MAX_SERVICES; i++) {
register struct rx_service *service = rx_services[i];
if (service) {
if (port == service->servicePort) {
if (service->serviceId == serviceId) {
/* The identical service has already been
* installed; if the caller was intending to
* change the security classes used by this
* service, he/she loses. */
(osi_Msg
"rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n",
serviceName, serviceId, service->serviceName);
USERPRI;
rxi_FreeService(tservice);
return service;
for (i = 0; i < n_addresses; i++) {
for (j = 0; j < RX_MAX_SERVICES; j++) {
register struct rx_service *service = rx_services[j];
if (service) {
if (init_socklens[i] == service->serviceAddrLen &&
memcmp((void *) &init_addresses[i], &service->serviceAddr,
init_socklens[i]) == 0) {
if (service->serviceId == serviceId) {
/* The identical service has already been
* installed; if the caller was intending to
* change the security classes used by this
* service, he/she loses. */
(osi_Msg
"rx_NewService: tried to install service %s with service id %d, which is already in use for service %s\n",
serviceName, serviceId, service->serviceName);
rxi_FreeService(tservice);
retservice = service;
}
/* Different service, same port: re-use the socket
* which is bound to the same port */
socket = service->socket;
}
/* Different service, same port: re-use the socket
* which is bound to the same port */
socket = service->socket;
tcpsocket = service->tcpSocket;
}
} else {
if (socket == OSI_NULLSOCKET) {
} else {
if (socket == OSI_NULLSOCKET) {
/* If we don't already have a socket (from another
* service on same port) get a new one */
struct sockaddr_in sin;
memset((void *) &sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_ANY);
sin.sin_port = port;
#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
sin.sin_len = sizeof(sin);
#endif
socket = rxi_GetHostUDPSocket((struct sockaddr_storage *) &sin,
sizeof(sin));
if (socket == OSI_NULLSOCKET) {
USERPRI;
rxi_FreeService(tservice);
return 0;
}
}
switch (init_socktypes[i]) {
case SOCK_DGRAM:
socket = rxi_GetHostUDPSocket(&init_addresses[i],
init_socklens[i]);
break;
#ifdef AFS_PTHREAD_ENV
if (tcpsocket == OSI_NULLSOCKET) {
tcpsocket = rxi_GetHostTCPSocket(htonl(INADDR_ANY), port);
if (tcpsocket == OSI_NULLSOCKET) {
USERPRI;
rxi_FreeService(tservice);
return 0;
}
case SOCK_STREAM:
socket = rxi_GetHostTCPSocket(&init_addresses[i],
init_socklens[i]);
break;
#endif /* AFS_PTHREAD_ENV */
}
if (socket == OSI_NULLSOCKET) {
USERPRI;
rxi_FreeService(tservice);
return 0;
}
}
service = tservice;
service->socket = socket;
memcpy((void *) &service->serviceAddr,
(void *) &init_addresses[i], init_socklens[i]);
service->serviceAddrLen = init_socklens[i];
service->socketType = init_socktypes[i];
service->serviceId = serviceId;
service->serviceName = serviceName;
service->nSecurityObjects = nSecurityObjects;
service->securityObjects = securityObjects;
service->minProcs = 0;
service->maxProcs = 1;
service->idleDeadTime = 60;
service->connDeadTime = rx_connDeadTime;
service->executeRequestProc = serviceProc;
service->checkReach = 0;
rx_services[i] = service; /* not visible until now */
retservice = service;
}
#endif
service = tservice;
service->socket = socket;
service->tcpSocket = tcpsocket;
service->servicePort = port;
service->serviceId = serviceId;
service->serviceName = serviceName;
service->nSecurityObjects = nSecurityObjects;
service->securityObjects = securityObjects;
service->minProcs = 0;
service->maxProcs = 1;
service->idleDeadTime = 60;
service->connDeadTime = rx_connDeadTime;
service->executeRequestProc = serviceProc;
service->checkReach = 0;
rx_services[i] = service; /* not visible until now */
USERPRI;
return service;
}
}
}
USERPRI;
if (retservice)
return retservice;
rxi_FreeService(tservice);
(osi_Msg "rx_NewService: cannot support > %d services\n",
RX_MAX_SERVICES);
@ -1778,8 +1809,8 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
rxi_calltrace(RX_CALL_START, call);
dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
call->conn->service->servicePort, call->conn->service->serviceId,
call));
rx_ss2pn(&call->conn->service->serviceAddr),
call->conn->service->serviceId, call));
CALL_HOLD(call, RX_CALL_REFCOUNT_BEGIN);
MUTEX_EXIT(&call->lock);
@ -1927,8 +1958,8 @@ rx_GetCall(int tno, struct rx_service *cur_service, osi_socket * socketp)
rxi_calltrace(RX_CALL_START, call);
dpf(("rx_GetCall(port=%d, service=%d) ==> call %x\n",
call->conn->service->servicePort, call->conn->service->serviceId,
call));
rx_ss2pn(&call->conn->service->serviceAddr),
call->conn->service->serviceId, call));
} else {
dpf(("rx_GetCall(socketp=0x%x, *socketp=0x%x)\n", socketp, *socketp));
}

View File

@ -100,7 +100,7 @@ typedef void (*rx_destructor_t) (void *);
int rx_KeyCreate(rx_destructor_t);
osi_socket rxi_GetHostUDPSocket(struct sockaddr_storage *saddr, int salen);
osi_socket rxi_GetUDPSocket(u_short port);
osi_socket rxi_GetHostTCPSocket(u_int host, u_short port);
osi_socket rxi_GetHostTCPSocket(struct sockaddr_storage *saddr, int salen);
#endif /* KERNEL */
@ -324,10 +324,11 @@ struct rx_connection {
struct rx_service {
u_short serviceId; /* Service number */
u_short servicePort; /* UDP port for this service */
struct sockaddr_storage serviceAddr; /* Service address */
int serviceAddrLen; /* Service address length */
int socketType; /* Socket type (UDP or TCP) */
char *serviceName; /* Name of the service */
osi_socket socket; /* socket structure or file descriptor */
osi_socket tcpSocket; /* TCP socket for this service */
u_short nRequestsRunning; /* Number of requests currently in progress */
u_short nSecurityObjects; /* Number of entries in security objects array */
struct rx_securityClass **securityObjects; /* Array of security class objects */

View File

@ -561,6 +561,7 @@ extern void rx_FlushWrite(struct rx_call *call);
/* rx_tcp.c */
extern void rxi_TcpNewConnection(struct rx_connection *);
extern void *rxi_TcpNewServerConnection(void *);
extern struct rx_call *rxi_TcpNewCall(struct rx_connection *);
extern void *rxi_TcpServerProc(void *);

View File

@ -162,6 +162,7 @@ int rxi_tcp_data_packets_received = 0;
int rxi_tcp_ack_packets_received = 0;
int rxi_tcp_transmit_window_closed = 0;
int rxi_tcp_last_packets_sent = 0;
int rxi_tcp_short_circuit_read = 0;
/*
* Setup variables needed for the RxTCP package.
@ -256,6 +257,7 @@ rxi_TcpNewConnection(struct rx_connection *conn)
if (err != EINPROGRESS) {
conn->tcpDescriptor = -1;
MUTEX_EXIT(&conn->conn_call_lock);
return;
}
@ -364,6 +366,14 @@ struct rx_call *rxi_TcpNewCall(struct rx_connection *conn)
afs_uint32 *ibuf = (afs_uint32 *) buf;
struct rx_call *call;
/*
* If the connection failed (no descriptor), return NULL right away
*/
if (conn->tcpDescriptor == -1) {
return NULL;
}
/*
* First off, allocate the call structure. Right now rxi_NewCall()
* still uses "channels", which we don't. So every call deals
@ -862,6 +872,7 @@ rxi_TcpReadPacket(struct rx_connection *conn, unsigned char *type,
(cc - data_needed);
conn->tcpPacketLen = cc - data_needed;
data_needed = 0;
rxi_tcp_short_circuit_read++;
} else {
iov[0].iov_base = (caddr_t) iov[0].iov_base + cc;
data_needed -= cc;
@ -1298,10 +1309,10 @@ rxi_TcpNewServerConnection(void *arg)
u_short port;
struct rx_connection *conn;
struct rx_call *call;
struct sockaddr_in sin;
socklen_t namelen = sizeof(sin);
struct sockaddr_storage ss;
socklen_t namelen = sizeof(ss);
if (getpeername(s, (struct sockaddr *) &sin, &namelen) < 0) {
if (getpeername(s, (struct sockaddr *) &ss, &namelen) < 0) {
close(s);
return NULL;
}
@ -1358,8 +1369,8 @@ rxi_TcpNewServerConnection(void *arg)
printf("Epoch = %d, cid = %d, service = %d\n", epoch, cid, service);
if ((conn = rxi_FindConnection(OSI_NULLSOCKET, sin.sin_addr.s_addr,
sin.sin_port, service, cid, epoch,
if ((conn = rxi_FindConnection(OSI_NULLSOCKET, &ss, namelen,
SOCK_STREAM, service, cid, epoch,
RX_SERVER_CONNECTION, 0)) == NULL) {
close(s);
return NULL;
@ -1460,13 +1471,12 @@ rxi_TcpNewServerConnection(void *arg)
}
osi_socket
rxi_GetHostTCPSocket(u_int host, u_short port)
rxi_GetHostTCPSocket(struct sockaddr_storage *saddr, int salen)
{
osi_socket socketFd = OSI_NULLSOCKET;
struct sockaddr_in taddr;
int code;
socketFd = socket(AF_INET, SOCK_STREAM, 0);
socketFd = socket(saddr->ss_family, SOCK_STREAM, 0);
if (socketFd < 0) {
perror("socket");
@ -1474,14 +1484,7 @@ rxi_GetHostTCPSocket(u_int host, u_short port)
goto error;
}
taddr.sin_addr.s_addr = host;
taddr.sin_family = AF_INET;
taddr.sin_port = (u_short) port;
#ifdef STRUCT_SOCKADDR_HAS_SA_LEN
taddr.sin_len = sizeof(struct sockaddr_in);
#endif
code = bind(socketFd, (struct sockaddr *) &taddr, sizeof(taddr));
code = bind(socketFd, (struct sockaddr *) saddr, salen);
if (code) {
perror("bind");

View File

@ -34,9 +34,9 @@
#include <afs/afsutil.h>
static void usage(char *);
static void do_client(struct sockaddr *);
static void do_client(struct sockaddr *, socklen_t);
static void *client_thread_send(void *);
static void do_server(unsigned short);
static void do_server(struct sockaddr_storage *, int);
static int32_t rxtest_ExecuteRequest(struct rx_call *call);
static int get_key(char *, int, struct ktc_encryptionKey *);
@ -177,7 +177,7 @@ main(int argc, char *argv[])
exit(1);
}
do_client(res->ai_addr);
do_client(res->ai_addr, res->ai_addrlen);
freeaddrinfo(res);
} else if (strcmp(argv[optind], "server") == 0) {
@ -200,7 +200,8 @@ main(int argc, char *argv[])
exit(1);
}
do_server(((struct sockaddr_in *) res->ai_addr)->sin_port);
do_server((struct sockaddr_storage *) res->ai_addr,
res->ai_addrlen);
freeaddrinfo(res);
} else {
@ -239,13 +240,12 @@ usage(char *argv0)
}
static void
do_client(struct sockaddr *s)
do_client(struct sockaddr *s, socklen_t slen)
{
struct sockaddr_in *sin = (struct sockaddr_in *) s;
struct rx_connection *conn;
struct rx_call *call;
struct rx_securityClass *secureobj;
int ret, secureindex = 0, i, totalbytes, code;
int ret, secureindex = 0, i, totalbytes, code, socktype = SOCK_STREAM;
struct timeval stp, etp;
double seconds;
unsigned char c = 0;
@ -341,8 +341,9 @@ do_client(struct sockaddr *s)
if (rx_window_size)
rx_TcpSetWindowSize(rx_window_size);
conn = rx_NewConnection(sin->sin_addr.s_addr, ntohs(sin->sin_port),
RX_SERVER_ID, secureobj, secureindex);
conn = rx_NewConnectionAddrs((struct sockaddr_storage *) s, &socktype,
(int *) &slen, 1, RX_SERVER_ID, secureobj,
secureindex);
if (!conn) {
fprintf(stderr, "Failed to contact server\n");
@ -397,6 +398,11 @@ do_client(struct sockaddr *s)
call = rx_NewCall(conn);
if (call == NULL) {
fprintf(stderr, "rx_NewCall() failed!\n");
exit(1);
}
if (gettimeofday(&stp, NULL) < 0) {
fprintf(stderr, "gettimeofday failed: %s\n",
strerror(errno));
@ -470,6 +476,11 @@ client_thread_send(void *arg)
call = rx_NewCall(ti->conn);
if (call == NULL) {
fprintf(stderr, "rx_NewCall() failed!\n");
exit(1);
}
for (i = 0, ti->totalbytes = 0;
ti->totalbytes + write_size < data_size; i++) {
@ -506,16 +517,16 @@ client_thread_send(void *arg)
}
static void
do_server(unsigned short port)
do_server(struct sockaddr_storage *saddr, int slen)
{
struct rx_service *service;
struct rx_securityClass *secureobj = rxnull_NewServerSecurityObject();
int ret, secureindex = 1;
int ret, secureindex = 1, socktype = SOCK_STREAM;
ret = rx_Init(port);
ret = rx_InitAddrs(saddr, &socktype, &slen, 1);
if (ret) {
fprintf(stderr, "rx_Init failed\n");
fprintf(stderr, "rx_InitAddrs failed\n");
exit(1);
}