📄 rpp.c
字号:
} free(hname); return hp;}/*** Allocate a list of alternate address for a host and save** them in the stream structure.*/staticvoidrpp_alist(hp, sp) struct hostent *hp; struct stream *sp;{ int i, j; for (i=1; hp->h_addr_list[i]; i++); if (i == 1) return; sp->addr_array = (struct in_addr *)calloc(i, sizeof(struct in_addr)); for (j=i=0; hp->h_addr_list[i]; i++) { if (memcmp(&sp->addr.sin_addr, hp->h_addr_list[i], hp->h_length) == 0) continue; memcpy(&sp->addr_array[j++], hp->h_addr_list[i], hp->h_length); } sp->addr_array[j].s_addr = 0; return;}staticintrpp_send_ack(sp, seq) struct stream *sp; int seq;{ DOID("send_ack") char buf[RPP_PKT_HEAD]; u_long xcrc; if (sp->stream_id < 0) { /* can't send yet */ DBPRT((DBTO, "%s: STREAM NOT OPEN seq %d\n", id, seq)) return 0; } I2TOH(RPP_ACK, buf) I8TOH(sp->stream_id, &buf[2]) I8TOH(seq, &buf[10]) xcrc = crc((u_char *)buf, (u_long)RPP_PKT_CRC); I8TOH(xcrc, &buf[RPP_PKT_CRC]) DBPRT((DBTO, "%s: seq %d to %s crc %lX\n", id, seq, netaddr(&sp->addr), xcrc)) if (sendto(sp->fd, buf, RPP_PKT_HEAD, 0, (struct sockaddr *)&sp->addr, sizeof(struct sockaddr_in)) == -1) { DBPRT((DBTO, "%s: ACK error %d\n", id, errno)) if (errno != EWOULDBLOCK && errno != ENOBUFS) return -1; } return 0;}/*** Take a packet off the send queue and free it.*/staticvoiddqueue(pp) struct send_packet *pp;{ if (pp->down == NULL) bottom = pp->up; else pp->down->up = pp->up; if (pp->up == NULL) top = pp->down; else pp->up->down = pp->down; if (--pkts_sent < 0) pkts_sent = 0; free(pp->data); free(pp); return;}/*** Get rid of anything on the pend and send queue for a stream.*/staticvoidclear_send(sp) struct stream *sp;{ struct pending *ppp, *pprev; struct send_packet *spp, *sprev; for (ppp=sp->pend_head; ppp; ppp=pprev) { pprev=ppp->next; free(ppp->data); free(ppp); } sp->pend_head = NULL; sp->pend_tail = NULL; sp->pend_commit = 0; sp->pend_attempt = 0; for (spp=sp->send_head; spp; spp=sprev) { sprev=spp->next; if (sp->stream_id == -1) { /* not open yet */ struct send_packet *look; /* might not be */ /* on send queue */ for (look = top; look; look = look->down) { if (look == spp) break; } if (look == NULL) { free(spp->data); free(spp); continue; } } dqueue(spp); } sp->send_head = NULL; sp->send_tail = NULL;}/*** Remove packets from receive, pending and send queues for** a stream, free all the memory and zero the stream_array** entry.*/staticvoidclear_stream(sp) struct stream *sp;{ struct recv_packet *rpp, *rprev; DBPRT((DBTO, "CLEAR stream %ld\n", ((long)sp - (long)stream_array)/sizeof(struct stream))) for (rpp=sp->recv_head; rpp; rpp=rprev) { rprev=rpp->next; if (rpp->data) free(rpp->data); free(rpp); } sp->recv_head = NULL; sp->recv_tail = NULL; clear_send(sp); if (sp->addr_array) { free(sp->addr_array); sp->addr_array = NULL; } sp->state = RPP_DEAD;}/*** Do a recvfrom call to get a packet off of all file descriptors.** Return the index of the stream the packet belonged to** or -2 if it was not data, or -1 if there was an error.** Return -3 if there was no data to read.** MAY CAUSE STATE CHANGE!*/staticintrpp_recv_pkt(fd) int fd;{ DOID("recv_pkt") int len, flen; struct sockaddr_in addr; struct hostent *hp; int i, streamid; struct send_packet *spp, *sprev; struct recv_packet *rpp, *rprev; struct recv_packet *pkt; struct stream *sp; char *data; int type; int sequence; u_long pktcrc; data = malloc(RPP_PKT_SIZE); assert(data != NULL); flen = sizeof(struct sockaddr_in); /* ** Loop so we can avoid failing on EINTR. Thanks to ** Pete Wyckoff for finding this. */ for (;;) { len = recvfrom(fd, data, RPP_PKT_SIZE, 0, (struct sockaddr *)&addr, &flen); if (len != -1) break; if (errno == EINTR) continue; free(data); if (errno == EWOULDBLOCK || errno == EAGAIN || errno == ECONNREFUSED) { errno = 0; return -3; } return -1; } DBPRT((DBTO, "%s: addr %s len %d\n", id, netaddr(&addr), len)) if (len < RPP_PKT_HEAD) /* less than minimum size */ goto err_out; HTOI8(&data[len-RPP_CRC_LEN], pktcrc) if (pktcrc != crc((u_char *)data, (u_long)(len-RPP_CRC_LEN))) { DBPRT((DBTO, "%s: packet crc %08lX failed\n", id, pktcrc)) goto err_out; } HTOI2(&data[len-RPP_PKT_HEAD], type) HTOI8(&data[len-RPP_PKT_HEAD+RPP_HDR_SID], streamid) HTOI8(&data[len-RPP_PKT_HEAD+RPP_HDR_SEQ], sequence) switch (type) { case RPP_ACK: DBPRT((DBTO, "%s: ACK stream %d sequence %d crc %08lX\n", id, streamid, sequence, pktcrc)) free(data); if ((sp = rpp_check_pkt(streamid, &addr)) == NULL) return -2; if (sp->state == RPP_OPEN_PEND) { if (sequence != sp->open_key) { DBPRT((DBTO, "%s: WILD ACK in RPP_OPEN_PEND %d\n", id, streamid)) return -2; } spp = sp->send_head; assert(spp->type == RPP_HELLO2); assert(spp->next == NULL); sp->state = RPP_CONNECT; sp->send_head = NULL; sp->send_tail = NULL; dqueue(spp); return streamid; } else if (sp->stream_id == -1) { DBPRT((DBTO, "%s: ACK for closed stream %d\n", id, streamid)) return -2; } for (spp=sp->send_head, sprev=NULL; spp; sprev=spp, spp=spp->next) { if (spp->sequence == sequence) break; } if (spp) { DBPRT((DBTO, "%s: stream %d seq %d took %ld\n", id, streamid, sequence, (long)(time(NULL) - spp->time_sent))) if (sp->state == RPP_CLOSE_WAIT1 && spp->type == RPP_GOODBYE) sp->state = RPP_CLOSE_WAIT2; if (sprev == NULL) sp->send_head = spp->next; else sprev->next = spp->next; if (sp->send_tail == spp) sp->send_tail = sprev; dqueue(spp); if (sp->state == RPP_LAST_ACK && sp->send_head == NULL) { clear_stream(sp); return -2; } } return streamid; case RPP_GOODBYE: DBPRT((DBTO, "%s: GOODBYE stream %d sequence %d crc %08lX\n", id, streamid, sequence, pktcrc)) free(data); if ((sp = rpp_check_pkt(streamid, &addr)) == NULL) return -2; if (rpp_send_ack(sp, sequence) == -1) return -1; switch (sp->state) { case RPP_OPEN_PEND: case RPP_OPEN_WAIT: case RPP_CLOSE_PEND: case RPP_LAST_ACK: return -2; case RPP_CLOSE_WAIT1: sp->state = RPP_LAST_ACK; return -2; case RPP_CLOSE_WAIT2: clear_stream(sp); return -2; default: break; } sp->state = RPP_CLOSE_PEND; clear_send(sp); /* other side not reading now */ for (rpp=sp->recv_head, rprev=NULL; rpp; rprev=rpp, rpp=rpp->next) { if (rpp->sequence >= sequence) break; } if (rpp == NULL || rpp->sequence > sequence) { DBPRT((DBTO, "%s: GOOD seq %d\n", id, sequence)) pkt = (struct recv_packet *) malloc(sizeof(struct recv_packet)); assert(pkt != NULL); pkt->type = type; pkt->sequence = sequence; pkt->len = 0; pkt->data = NULL; if (rprev == NULL) { pkt->next = sp->recv_head; sp->recv_head = pkt; } else { pkt->next = rprev->next; rprev->next = pkt; } if (sp->recv_tail == rprev) sp->recv_tail = pkt; } else { DBPRT((DBTO, "%s: DUPLICATE seq %d MAX seen %d\n", id, sequence, rpp->sequence)) } return -2; case RPP_DATA: case RPP_EOD: DBPRT((DBTO, "%s: DATA stream %d sequence %d crc %08lX len %d\n", id, streamid, sequence, pktcrc, len)) if ((sp = rpp_check_pkt(streamid, &addr)) == NULL) goto err_out; if (rpp_send_ack(sp, sequence) == -1) { free(data); return -1; } switch (sp->state) { case RPP_OPEN_WAIT: DBPRT((DBTO, "INPUT on unconnected stream %d\n", streamid)) free(data); return -2; case RPP_CLOSE_WAIT1: case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: DBPRT((DBTO, "INPUT on closed stream %d\n", streamid)) free(data); return -2; default: break; } if (sequence < sp->recv_sequence) { DBPRT((DBTO, "%s: OLD seq %d\n", id, sequence)) free(data); return -2; } for (rpp=sp->recv_head, rprev=NULL; rpp; rprev=rpp, rpp=rpp->next) { if (rpp->sequence >= sequence) break; } if (rpp == NULL || rpp->sequence > sequence) { DBPRT((DBTO, "%s: GOOD seq %d\n", id, sequence)) data = realloc(data, len); assert(data != NULL); pkt = (struct recv_packet *) malloc(sizeof(struct recv_packet)); assert(pkt != NULL); pkt->type = type; pkt->sequence = sequence; pkt->len = len-RPP_PKT_HEAD; pkt->data = (u_char *)data; if (rprev == NULL) { pkt->next = sp->recv_head; sp->recv_head = pkt; } else { pkt->next = rprev->next; rprev->next = pkt; } if (sp->recv_tail == rprev) sp->recv_tail = pkt; if (sp->state == RPP_OPEN_PEND) return -2; else return streamid; } else { DBPRT((DBTO, "%s: DUPLICATE seq %d MAX seen %d\n", id, sequence, rpp->sequence)) free(data); } break; case RPP_HELLO1: /* ** HELLO1 packets have the remote side's stream index ** in the "streamid" field and open key in the sequence. */ DBPRT((DBTO, "%s: HELLO1 stream %d sequence %d\n", id, streamid, sequence)) free(data); for (i=0; i<stream_num; i++) { sp = &stream_array[i]; if (sp->state <= RPP_FREE) continue; if (memcmp(&sp->addr, &addr, sizeof(addr))) continue; if (sp->open_key == sequence) { rpp_send_out(); return -2; } DBPRT((DBTO, "OLD STREAM state %d reopened %d %d\n", sp->state, sp->open_key, sequence)) clear_stream(sp); /* old stream */ } i = rpp_create_sp(); if (i == -1) return -1; sp = &stream_array[i]; sp->state = RPP_OPEN_PEND; sp->fd = fd; sp->retry = rpp_retry; memcpy(&sp->addr, &addr, sizeof(addr)); if ((hp = rpp_get_cname(&addr)) != NULL) rpp_alist(hp, sp); sp->stream_id = streamid; sp->open_key = sequence; open_key = MAX(open_key, sequence); rpp_form_pkt(i, RPP_HELLO2, i, NULL, 0); rpp_send_out(); break; case RPP_HELLO2: /* ** HELLO2 packet has this side's stream index in ** "streamid" as usual and the remote side's ** stream index overloaded in the "sequence" field. */ DBPRT((DBTO, "%s: HELLO2 stream %d sequence %d\n", id, streamid, sequence)) free(data); if ((sp = rpp_check_pkt(streamid, &addr)) == NULL) return -2; switch (sp->state) { case RPP_OPEN_WAIT: sp->state = RPP_CONNECT; break; case RPP_CLOSE_WAIT1: /* called close before open done */ case RPP_LAST_ACK: break; default: if (sp->stream_id == sequence) { DBPRT((DBTO, "%s: stream %d got DUP HELLO2 %d\n", id, streamid, sp->state)) if (rpp_send_ack(sp, sp->open_key) == -1) return -1; } else { DBPRT((DBTO, "%s: NON-DUP HELLO2\n", id)) } return -2; } sp->stream_id = sequence; if (rpp_send_ack(sp, sp->open_key) == -1) return -1; if ((spp = sp->send_head) == NULL) { DBPRT((DBTO, "%s: stream %d got HELLO2 but sendq NULL\n", id, streamid)) return -2; } if (spp->type != RPP_HELLO1) { DBPRT((DBTO, "%s: stream %d sendq %d rather than HELLO1\n", id, streamid, spp->type)) return -2; } sp->send_head = spp->next; /* remove HELLO1 pkt */ if (sp->send_tail == spp) sp->send_tail = NULL; dqueue(spp); /* ** Put any waitting packets onto the send queue */ for (spp=sp->send_head; spp; spp=spp->next) { int len = spp->len; DBPRT((DBTO, "%s: idx %d link %d seq %d len %d to sendq\n", id, streamid, spp->type, spp->sequence, len)) I2TOH(spp->type, (char *)&spp->data[len])
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -