📄 rpp.c
字号:
int len;{ DOID("write") struct stream *sp; struct pending *pp; int hold, residual, more; DBPRT((DBTO, "%s: entered index %d size %d\n", id, index, len)) if (index < 0 || index >= stream_num || len < 0) { errno = EINVAL; return -1; } if (len == 0) return 0; sp = &stream_array[index]; rpp_stale(sp); /* check freshness */ switch (sp->state) { case RPP_STALE: errno = ETIMEDOUT; return -1; case RPP_CLOSE_PEND: errno = EPIPE; return -1; case RPP_OPEN_PEND: /* shouldn't happen */ case RPP_DEAD: case RPP_FREE: case RPP_CLOSE_WAIT1: /* stream closed */ case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: errno = ENOTCONN; return -1; default: break; } residual = 0; while (residual < len) { hold = sp->pend_attempt % RPP_PKT_DATA; if ((pp = sp->pend_tail) == NULL || hold == 0) { pp = (struct pending *)malloc(sizeof(struct pending)); if (sp->pend_tail == NULL) sp->pend_head = pp; else sp->pend_tail->next = pp; sp->pend_tail = pp; pp->data = (u_char *)malloc(RPP_PKT_SIZE); assert(pp->data != NULL); pp->next = NULL; } more = MIN(len - residual, RPP_PKT_DATA - hold); memcpy(&pp->data[hold], (char *)buf + residual, more); residual += more; sp->pend_attempt += more; } if (rpp_recv_all() == -1) return -1; rpp_send_out(); return residual;}/*** Check a stream to see if it needs attention.*/staticintrpp_attention(index) int index;{ DOID("attention") int mesg, count; int seq; struct stream *sp; struct recv_packet *pp; sp = &stream_array[index]; DBPRT((DBTO, "%s: stream %d in state %d addr %s\n", id, index, sp->state, netaddr(&sp->addr))) rpp_stale(sp); switch (sp->state) { case RPP_STALE: /* need to report error */ return TRUE; case RPP_CLOSE_PEND: /* we haven't closed yet */ case RPP_CONNECT: /* check for message */ break; default: return FALSE; } if (sp->msg_cnt > 0 && sp->recv_attempt <= sp->msg_cnt) return TRUE; /* message to read */ mesg = FALSE; count = 0; for (pp=sp->recv_head, seq=sp->recv_sequence; pp; pp=pp->next, seq++) { count += pp->len; if (pp->sequence != seq) break; if (pp->type != RPP_DATA) { /* end of message */ mesg = TRUE; break; } } if (mesg) sp->msg_cnt = count; return mesg;}/*** Check some state before reading or skipping. If it is** okay to continue, return 1. Otherwise, return <= 0.*/staticintrpp_okay(index) int index;{ struct stream *sp; fd_set fdset; struct timeval tv; FD_ZERO(&fdset); while (rpp_attention(index) == FALSE) { int i; tv.tv_sec = RPP_TIMEOUT; tv.tv_usec = 0; for (i=0; i<rpp_fd_num; i++) FD_SET(rpp_fd_array[i], &fdset); i = select(FD_SETSIZE, &fdset, NULL, NULL, &tv); if (i == -1) return -1; if (rpp_recv_all() == -1) return -1; rpp_send_out(); } sp = &stream_array[index]; if (sp->state == RPP_STALE) { /* stale output */ errno = ETIMEDOUT; return -1; } if (sp->recv_attempt == sp->msg_cnt) { /* end of message */ if (sp->state == RPP_CLOSE_PEND) return -2; else return 0; } return 1;}/*** Read a message. Return data up to the end of a message** or the end of the provided buffer.** Return -1 on error, -2 if other side has closed, otherwise** number of bytes read.*/intrpp_read(index, buf, len) int index; void *buf; int len;{ DOID("read") int hiwater, cpylen, hold, ret, xlen; struct recv_packet *pp; struct stream *sp; DBPRT((DBTO, "%s: entered index %d\n", id, index)) if (index < 0 || index >= stream_num || len < 0) { errno = EINVAL; return -1; } if (len == 0) return 0; sp = &stream_array[index]; switch (sp->state) { case RPP_DEAD: case RPP_FREE: case RPP_CLOSE_WAIT1: case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: errno = ENOTCONN; return -1; /* stream closed */ default: break; } if ((ret = rpp_okay(index)) <= 0) return ret; sp = &stream_array[index]; cpylen = 0; /* find packet to copy from */ for (pp=sp->recv_head; pp; pp=pp->next) { int bump = cpylen + pp->len; if (sp->recv_attempt < bump) break; cpylen = bump; } hiwater = 0; xlen = MIN(len, sp->msg_cnt); hold = sp->recv_attempt - cpylen; /* start point in pkt data */ while (pp && xlen > hiwater) { /* got room */ cpylen = MIN(pp->len-hold, xlen-hiwater); memcpy((char *)buf + hiwater, &pp->data[hold], cpylen); hiwater += cpylen; sp->recv_attempt += cpylen; hold = 0; pp = pp->next; } return hiwater;}/*** Commit data which has been read up to recv_attempt if flag** is TRUE. Otherwise, set recv_attempt back to the previous** commit point recv_commit.** Return -1 on error, FALSE on decommit or if end-of-message has** not been reached, TRUE if end-of-message has been reached.*/intrpp_rcommit(index, flag) int index; int flag;{ DOID("rcommit") struct stream *sp; DBPRT((DBTO, "%s: entered index %d\n", id, index)) if (index < 0 || index >= stream_num) { errno = EINVAL; return -1; } sp = &stream_array[index]; switch (sp->state) { case RPP_CLOSE_WAIT1: /* stream closed */ case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: case RPP_OPEN_PEND: /* shouldn't happen */ case RPP_FREE: case RPP_DEAD: errno = ENOTCONN; return -1; default: break; } if (flag == FALSE) { /* no commit */ sp->recv_attempt = sp->recv_commit; return 0; } sp->recv_commit = sp->recv_attempt; return (sp->recv_commit == sp->msg_cnt);}/*** Reset end-of-message condition on a stream. Any packets** on the receive queue are freed.** Return -1 on error, 0 otherwise.*/intrpp_eom(index) int index;{ DOID("eom") struct stream *sp; struct recv_packet *pp; DBPRT((DBTO, "%s: entered index %d\n", id, index)) if (index < 0 || index >= stream_num) { errno = EINVAL; return -1; } sp = &stream_array[index]; switch (sp->state) { case RPP_CLOSE_WAIT1: /* stream closed */ case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: case RPP_OPEN_PEND: /* shouldn't happen */ case RPP_FREE: case RPP_DEAD: errno = ENOTCONN; return -1; default: break; }/*** work though recv packets*/ for (pp=sp->recv_head; pp; pp=sp->recv_head) { if (pp->type == RPP_GOODBYE) /* stream finished */ break; if (sp->msg_cnt < pp->len) break; sp->recv_sequence++; sp->msg_cnt -= pp->len; if (pp->data) free(pp->data); sp->recv_head = pp->next; free(pp); } if (sp->recv_head == NULL) sp->recv_tail = NULL; sp->recv_attempt = 0; sp->recv_commit = 0; return 0;}/*** Commit data which has been written up to pend_attempt if flag** is TRUE. Otherwise, set pend_attempt back to the previous** commit point pend_commit.** Return -1 on error, 0 otherwise.*/intrpp_wcommit(index, flag) int index; int flag;{ DOID("wcommit") struct pending *pp, *next; struct stream *sp; DBPRT((DBTO, "%s: entered index %d\n", id, index)) if (index < 0 || index >= stream_num) { errno = EINVAL; return -1; } sp = &stream_array[index]; switch (sp->state) { case RPP_CLOSE_PEND: errno = EPIPE; return -1; case RPP_STALE: errno = ETIMEDOUT; return -1; case RPP_CLOSE_WAIT1: /* stream closed */ case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: case RPP_OPEN_PEND: /* shouldn't happen */ case RPP_FREE: case RPP_DEAD: errno = ENOTCONN; return -1; default: break; } if (flag) { /* commit */ if (rpp_dopending(index, FALSE)) return -1; if (rpp_recv_all() == -1) return -1; rpp_send_out(); return 0; } sp->pend_attempt = sp->pend_commit; if (sp->pend_head == NULL) return 0; for (pp=sp->pend_head->next; pp; pp=next) { free(pp->data); next = pp->next; free(pp); } sp->pend_head->next = NULL; sp->pend_tail = sp->pend_head; return 0;}/*** Skip len characters of a message.*/intrpp_skip(index, len) int index; int len;{ DOID("skip") struct stream *sp; int ret, hiwater; DBPRT((DBTO, "%s: entered index %d\n", id, index)) if (index < 0 || index >= stream_num) { errno = EINVAL; return -1; } sp = &stream_array[index]; switch (sp->state) { case RPP_DEAD: case RPP_FREE: case RPP_CLOSE_WAIT1: case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: errno = ENOTCONN; return -1; /* stream closed */ default: break; } if ((ret = rpp_okay(index)) <= 0) return ret; sp = &stream_array[index]; hiwater = MIN(sp->msg_cnt - sp->recv_attempt, len); sp->recv_attempt += hiwater; return hiwater;}/*** Check for any stream with a message waiting and** return the stream number or a -1 if there are none.*/intrpp_poll(){ DOID("poll") int i; DBPRT((DBTO, "%s: entered streams %d\n", id, stream_num)) /* ** Read socket to get any packets */ for (;;) { i = rpp_recv_all(); if (i == -1 || i == -3) break; } if (i == -1) return -1; /* ** See if any stream has a message waiting. */ for (i=0; i<stream_num; i++) { if (rpp_attention(i)) break; } if (i < stream_num) /* found one */ return i; rpp_send_out(); return -2;}/*** Process any stream i/o.** Return 0 or a -1 if there was an error.*/intrpp_io(){ DOID("io") int i; DBPRT((DBTO, "%s: entered streams %d\n", id, stream_num)) /* ** Read socket to get any packets */ for (;;) { i = rpp_recv_all(); if (i == -1 || i == -3) break; } if (i == -1) return -1; rpp_send_out(); return 0;}/*** Read a character.** Returns >=0 the char read** -1 error or EOD** -2 EOF*/intrpp_getc(index) int index;{ int ret; u_char c; if ((ret = rpp_read(index, &c, 1)) == 1) return ((int)c); return ((ret == -2) ? -2 : -1);}/*** Write a character.*/intrpp_putc(index, c) int index; int c;{ u_char x = (u_char)c; if (rpp_write(index, &x, 1) != 1) return -1; return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -