📄 rpp.c
字号:
I8TOH(sp->stream_id, (char *)&spp->data[len+RPP_HDR_SID]) I8TOH(spp->sequence, (char *)&spp->data[len+RPP_HDR_SEQ]) I8TOH(crc(spp->data, (u_long)(len+RPP_PKT_CRC)), (char *)&spp->data[len+RPP_PKT_CRC]) if (bottom) bottom->down = spp; spp->up = bottom; spp->down = NULL; if (top == NULL) /* first one */ top = spp; bottom = spp; } break; default: DBPRT((DBTO, "%s: UNKNOWN packet type %d stream %d sequence %d\n", id, type, streamid, sequence)) free(data); break; } return -2;err_out: free(data); return -2;}/*** Do recv calls until there is one that shows data.*/staticintrpp_recv_all(){ int i, ret; int rc = -3; for (i=0; i<rpp_fd_num; i++) { ret = rpp_recv_pkt(rpp_fd_array[i]); rc = MAX(ret, rc); if (ret == -1) break; } return rc;}/*** Check to see if any packet being sent out on a stream has** been sent more than a reasonable number of times.*/staticvoidrpp_stale(sp) struct stream *sp;{ struct send_packet *pp; if (sp->state <= RPP_FREE || sp->state == RPP_STALE) return; for (pp = sp->send_head; pp; pp = pp->next) { if (pp->sent_out >= sp->retry) break; } if (pp) { DBPRT((DBTO, "STALE PACKET seq %d sent %d of %d\n", pp->sequence, pp->sent_out, sp->retry)) switch (sp->state) { case RPP_OPEN_PEND: case RPP_CLOSE_WAIT1: case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: clear_stream(sp); break; default: sp->state = RPP_STALE; break; } }}/*** Form data packets for any pending data. If flag is true,** create an EOD packet too.*/staticintrpp_dopending(index, flag) int index; int flag;{ DOID("dopending") struct stream *sp; struct pending *pp; DBPRT((DBTO, "%s: entered index %d\n", id, index)) sp = &stream_array[index]; for (pp=sp->pend_head; pp != sp->pend_tail; pp=sp->pend_head) { rpp_form_pkt(index, RPP_DATA, sp->send_sequence, pp->data, RPP_PKT_DATA); sp->pend_head = pp->next; free(pp); sp->pend_attempt -= RPP_PKT_DATA; if (next_seq(&sp->send_sequence) == -1) return -1; } if (flag) { rpp_form_pkt(index, RPP_EOD, sp->send_sequence, pp ? pp->data : NULL, sp->pend_attempt); if (pp) { free(pp); sp->pend_head = NULL; sp->pend_tail = NULL; } sp->pend_attempt = 0; if (next_seq(&sp->send_sequence) == -1) return -1; } sp->pend_commit = sp->pend_attempt; return 0;}/*** Flush all data out of a stream -- do an end of message.** Return 0 if it all went well, -1 on error.*/intrpp_flush(index) int index;{ DOID("flush") struct stream *sp; DBPRT((DBTO, "%s: entered index %d\n", id, index)) if (index < 0 || index >= stream_num) { errno = EINVAL; return -1; } sp = &stream_array[index]; switch (sp->state) { case RPP_CLOSE_PEND: errno = EPIPE; return -1; case RPP_DEAD: case RPP_FREE: case RPP_OPEN_PEND: case RPP_CLOSE_WAIT1: case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: errno = ENOTCONN; return -1; default: break; }/*** if something is pending or we need to return a zero len EOM,** call rpp_dopending().*/ if (sp->pend_head != NULL || sp->send_head == NULL) { if (rpp_dopending(index, TRUE)) return -1; } if (rpp_recv_all() == -1) return -1; rpp_send_out(); return 0;}/*** Create a new socket if needed and bind a local port.** If port is 0, pick a free port number.*/intrpp_bind(port) uint port;{ struct sockaddr_in from; int flags; if (rpp_fd == -1) { if ((rpp_fd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) return -1; /* set close on exec */ if ((flags = fcntl(rpp_fd, F_GETFD)) == -1) { close(rpp_fd); rpp_fd = -1; return -1; } flags |= FD_CLOEXEC; if (fcntl(rpp_fd, F_SETFD, flags) == -1) { close(rpp_fd); rpp_fd = -1; return -1; } /* set no delay */ if ((flags = fcntl(rpp_fd, F_GETFL)) == -1) { close(rpp_fd); rpp_fd = -1; return -1; }#if defined(FNDELAY) && !defined(__hpux) flags |= FNDELAY;#else flags |= O_NONBLOCK;#endif if (fcntl(rpp_fd, F_SETFL, flags) == -1) { close(rpp_fd); rpp_fd = -1; return -1; } } if (rpp_fd_array != NULL) { int i; for (i=0; i<rpp_fd_num; i++) { if (rpp_fd_array[i] == rpp_fd) return rpp_fd; } } memset(&from, '\0', sizeof(from)); from.sin_family = AF_INET; from.sin_addr.s_addr = htonl(INADDR_ANY); from.sin_port = htons((u_short)port); if (bind(rpp_fd, (struct sockaddr *)&from, sizeof(from)) == -1) return -1; DBPRT((DBTO, "bind to port %d\n", ntohs(from.sin_port))) if (rpp_fd_array == NULL) { rpp_fd_array = (int *)malloc(sizeof(int)); rpp_fd_num = 1;#if defined(HAVE_ATEXIT) (void)atexit(rpp_shutdown);#elif defined(HAVE_ON_EXIT) (void)atexit(rpp_shutdown, 0);#else /* atexit() or on_exit() must be defined */ abort compile#endif } else { rpp_fd_num++; rpp_fd_array = (int *)realloc(rpp_fd_array, sizeof(int)*rpp_fd_num); } assert(rpp_fd_array); rpp_fd_array[rpp_fd_num-1] = rpp_fd; return rpp_fd;}/*** Allocate a communication stream.*/intrpp_open(name, port) char *name; uint port;{ DOID("rpp_open") int i, stream; struct hostent *hp; struct stream *sp; DBPRT((DBTO, "%s: entered %s:%d\n", id, name, port)) if (rpp_bind(0) == -1) /* bind if we need to */ return -1; /* ** First, we look up the IP address for this name. */ if ((hp = gethostbyname(name)) == NULL) { DBPRT((DBTO, "%s: host %s not found\n", id, name)) errno = ENOENT; return -1; } /* ** Look for previously existant stream to the given ** host. If one is found in an open state, just ** return it. */ for (i=0; i<stream_num; i++) { sp = &stream_array[i]; if (sp->state <= RPP_FREE) continue; if (memcmp(&sp->addr.sin_addr, hp->h_addr, hp->h_length)) continue; if (sp->addr.sin_port != htons((unsigned short)port)) continue; if (sp->addr.sin_family != hp->h_addrtype) continue; if (sp->state > RPP_CLOSE_PEND) { DBPRT((DBTO, "%s: OLD STREAM state %d reopened %d\n", id, sp->state, sp->open_key)) clear_stream(sp); /* old stream */ } else { DBPRT((DBTO, "%s: reopen of %s, sp->retry %d, global %d\n", id, netaddr(&sp->addr), sp->retry, rpp_retry)) sp->retry = rpp_retry; return i; } } stream = rpp_create_sp(); if (stream == -1) return -1; sp = &stream_array[stream]; if (open_key == 0) open_key = (int)time(0) & 0x0fff; /* ** We save the address returned for the name given so we ** can send out on the perfered interface. */ memcpy(&sp->addr.sin_addr, hp->h_addr, hp->h_length); sp->addr.sin_port = htons((unsigned short)port); sp->addr.sin_family = hp->h_addrtype; sp->fd = rpp_fd; sp->retry = rpp_retry; if (hp->h_addr_list[1] == NULL) { if ((hp = rpp_get_cname(&sp->addr)) == NULL) { errno = ENOENT; return -1; } } rpp_alist(hp, sp); sp->stream_id = stream; /* use my streamid for HELLO1 */ sp->state = RPP_OPEN_WAIT; sp->open_key = open_key++; rpp_form_pkt(stream, RPP_HELLO1, sp->open_key, NULL, 0); sp->stream_id = -1; /* don't know his stream id yet */ if (rpp_recv_all() == -1) return -1; rpp_send_out(); return stream;}/*** Return the network address for a stream.*/struct sockaddr_in*rpp_getaddr(index) int index;{ DOID("getaddr") struct stream *sp; DBPRT((DBTO, "%s: entered index %d\n", id, index)) if (index < 0 || index >= stream_num) { errno = EINVAL; return NULL; } sp = &stream_array[index]; if (sp->state <= RPP_FREE) { errno = ENOTCONN; return NULL; } return &sp->addr;}/*** Free all memory and close the socket.*/voidrpp_terminate(){ struct stream *sp; struct send_packet *spp; struct pending *ppp; struct recv_packet *rpp; int i; for (i=0; i<rpp_fd_num; i++) (void)close(rpp_fd_array[i]); if (rpp_fd_array) { free(rpp_fd_array); rpp_fd_array = NULL; rpp_fd_num = 0; } for (i=0; i<stream_num; i++) { sp = &stream_array[i]; if (sp->state == RPP_DEAD) continue; for (ppp = sp->pend_head; ppp; ppp = sp->pend_head) { free(ppp->data); sp->pend_head = ppp->next; free(ppp); } for (rpp = sp->recv_head; rpp; rpp = sp->recv_head) { if (rpp->data) free(rpp->data); sp->recv_head = rpp->next; free(rpp); } for (spp = sp->send_head; spp; spp = sp->send_head) { free(spp->data); sp->send_head = spp->next; free(spp); } } top = NULL; bottom = NULL; if (stream_array) free(stream_array); stream_num = 0; stream_array = NULL; rpp_fd = -1;}/*** Shutdown the library. Flush and close all open streams** and call rpp_terminate().*/voidrpp_shutdown(){ int timeouts, num, i; fd_set fdset; struct timeval tv; FD_ZERO(&fdset); for (i=0; i<stream_num; i++) (void)rpp_close(i); for (timeouts = 0; timeouts < 3;) { for (i=0; i<stream_num; i++) { if (stream_array[i].state > RPP_FREE) break; } if (i == stream_num) break; DBPRT((DBTO, "shutdown: stream %d state %d\n", i, stream_array[i].state)) if ((num = rpp_recv_all()) == -1) break; rpp_send_out(); if (num == -3) { /* got nothing -- wait a bit */ tv.tv_sec = RPP_TIMEOUT; tv.tv_usec = 0; for (i=0; i<rpp_fd_num; i++) FD_SET(rpp_fd_array[i], &fdset); i = select(FD_SETSIZE, &fdset, NULL, NULL, &tv); if (i == 0) timeouts++; if (i == -1) break; } } rpp_terminate();}/*** Terminate a connection stream.** Return 0 if it all went well, -1 on error.*/intrpp_close(index) int index;{ DOID("close") struct stream *sp; DBPRT((DBTO, "%s: entered index %d\n", id, index)) if (index < 0 || index >= stream_num) { errno = EINVAL; return -1; } sp = &stream_array[index]; switch (sp->state) { case RPP_STALE: clear_stream(sp); return 0; case RPP_CLOSE_PEND: sp->state = RPP_LAST_ACK; break; case RPP_OPEN_WAIT: case RPP_CONNECT: if (sp->pend_head != NULL) { if (rpp_dopending(index, TRUE)) return -1; } sp->state = RPP_CLOSE_WAIT1; break; default: errno = ENOTCONN; return -1; /* stream closed */ } rpp_form_pkt(index, RPP_GOODBYE, sp->send_sequence, NULL, 0); if (rpp_recv_all() == -1) return -1; rpp_send_out(); return 0;}/*** Add information to the stream given by index.** Return -1 on error, otherwise number of bytes written.*/intrpp_write(index, buf, len) int index; void *buf;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -