⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpp.c

📁 OpenPBS
💻 C
📖 第 1 页 / 共 5 页
字号:
    int		len;{	DOID("write")	struct	stream	*sp;	struct	pending	*pp;	int		hold, residual, more;	DBPRT((DBTO, "%s: entered index %d size %d\n", id, index, len))	if (index < 0 || index >= stream_num || len < 0) {		errno = EINVAL;		return -1;	}	if (len == 0)		return 0;	sp = &stream_array[index];	rpp_stale(sp);			/* check freshness */	switch (sp->state) {	case RPP_STALE:		errno = ETIMEDOUT;		return -1;	case RPP_CLOSE_PEND:		errno = EPIPE;		return -1;	case RPP_OPEN_PEND:			/* shouldn't happen */	case RPP_DEAD:	case RPP_FREE:	case RPP_CLOSE_WAIT1:			/* stream closed */	case RPP_CLOSE_WAIT2:	case RPP_LAST_ACK:		errno = ENOTCONN;		return -1;	default:		break;	}	residual = 0;	while (residual < len) {		hold = sp->pend_attempt % RPP_PKT_DATA;		if ((pp = sp->pend_tail) == NULL || hold == 0) {			pp = (struct pending *)malloc(sizeof(struct pending));			if (sp->pend_tail == NULL)				sp->pend_head = pp;			else				sp->pend_tail->next = pp;			sp->pend_tail = pp;			pp->data = (u_char *)malloc(RPP_PKT_SIZE);			assert(pp->data != NULL);			pp->next = NULL;		}		more = MIN(len - residual, RPP_PKT_DATA - hold);		memcpy(&pp->data[hold], (char *)buf + residual, more);		residual += more;		sp->pend_attempt += more;	}	if (rpp_recv_all() == -1)		return -1;	rpp_send_out();	return residual;}/***	Check a stream to see if it needs attention.*/staticintrpp_attention(index)    int		index;{	DOID("attention")	int			mesg, count;	int			seq;	struct	stream		*sp;	struct	recv_packet	*pp;	sp = &stream_array[index];	DBPRT((DBTO, "%s: stream %d in state %d addr %s\n",			id, index, sp->state, netaddr(&sp->addr)))	rpp_stale(sp);	switch (sp->state) {	case RPP_STALE:			/* need to report error */		return TRUE;	case RPP_CLOSE_PEND:		/* we haven't closed yet */	case RPP_CONNECT:		/* check for message */		break;	default:		return FALSE;	}	if (sp->msg_cnt > 0 && sp->recv_attempt <= sp->msg_cnt)		return TRUE;		/* message to read */	mesg = FALSE;	count = 0;	for (pp=sp->recv_head, seq=sp->recv_sequence; pp; pp=pp->next, seq++) {		count += pp->len;		if (pp->sequence != seq)			break;		if (pp->type != RPP_DATA) {	/* end of message */			mesg = TRUE;			break;		}	}	if (mesg)		sp->msg_cnt = count;	return mesg;}/***	Check some state before reading or skipping.  If it is**	okay to continue, return 1.  Otherwise, return <= 0.*/staticintrpp_okay(index)    int	index;{	struct	stream		*sp;	fd_set			fdset;	struct	timeval		tv;	FD_ZERO(&fdset);	while (rpp_attention(index) == FALSE) {		int	i;		tv.tv_sec = RPP_TIMEOUT;		tv.tv_usec = 0;		for (i=0; i<rpp_fd_num; i++)			FD_SET(rpp_fd_array[i], &fdset);		i = select(FD_SETSIZE, &fdset, NULL, NULL, &tv);		if (i == -1) 			return -1;		if (rpp_recv_all() == -1)			return -1;		rpp_send_out();	}  	sp = &stream_array[index];	if (sp->state == RPP_STALE) {		/* stale output */		errno = ETIMEDOUT;		return -1;	}	if (sp->recv_attempt == sp->msg_cnt) {	/* end of message */		if (sp->state == RPP_CLOSE_PEND)			return -2;		else			return 0;	}	return 1;}/***	Read a message.  Return data up to the end of a message**	or the end of the provided buffer.**	Return -1 on error, -2 if other side has closed, otherwise**	number of bytes read.*/intrpp_read(index, buf, len)    int		index;    void	*buf;    int		len;{	DOID("read")	int			hiwater, cpylen, hold, ret, xlen;	struct	recv_packet	*pp;	struct	stream		*sp;	DBPRT((DBTO, "%s: entered index %d\n", id, index))	if (index < 0 || index >= stream_num || len < 0) {		errno = EINVAL;		return -1;	}	if (len == 0)		return 0;	sp = &stream_array[index];	switch (sp->state) {	case RPP_DEAD:	case RPP_FREE:	case RPP_CLOSE_WAIT1:	case RPP_CLOSE_WAIT2:	case RPP_LAST_ACK:		errno = ENOTCONN;		return -1;	/* stream closed */	default:		break;	}	if ((ret = rpp_okay(index)) <= 0)		return ret;	sp = &stream_array[index];	cpylen = 0;				/* find packet to copy from */	for (pp=sp->recv_head; pp; pp=pp->next) {		int	bump = cpylen + pp->len;		if (sp->recv_attempt < bump)			break;		cpylen = bump;	}	hiwater = 0;	xlen = MIN(len, sp->msg_cnt);	hold = sp->recv_attempt - cpylen;	/* start point in pkt data */	while (pp && xlen > hiwater) {		/* got room */		cpylen = MIN(pp->len-hold, xlen-hiwater);		memcpy((char *)buf + hiwater, &pp->data[hold], cpylen);		hiwater += cpylen;		sp->recv_attempt += cpylen;		hold = 0;		pp = pp->next;	}	return hiwater;}/***	Commit data which has been read up to recv_attempt if flag**	is TRUE.  Otherwise, set recv_attempt back to the previous**	commit point recv_commit.**	Return -1 on error, FALSE on decommit or if end-of-message has**	not been reached, TRUE if end-of-message has been reached.*/intrpp_rcommit(index, flag)    int		index;    int		flag;{	DOID("rcommit")	struct	stream		*sp;	DBPRT((DBTO, "%s: entered index %d\n", id, index))	if (index < 0 || index >= stream_num) {		errno = EINVAL;		return -1;	}	sp = &stream_array[index];	switch (sp->state) {	case RPP_CLOSE_WAIT1:			/* stream closed */	case RPP_CLOSE_WAIT2:	case RPP_LAST_ACK:	case RPP_OPEN_PEND:			/* shouldn't happen */	case RPP_FREE:	case RPP_DEAD:		errno = ENOTCONN;		return -1;	default:		break;	}	if (flag == FALSE) {			/* no commit */		sp->recv_attempt = sp->recv_commit;		return 0;	}	sp->recv_commit = sp->recv_attempt;	return (sp->recv_commit == sp->msg_cnt);}/***	Reset end-of-message condition on a stream.  Any packets**	on the receive queue are freed.**	Return -1 on error, 0 otherwise.*/intrpp_eom(index)    int		index;{	DOID("eom")	struct	stream		*sp;	struct	recv_packet	*pp;	DBPRT((DBTO, "%s: entered index %d\n", id, index))	if (index < 0 || index >= stream_num) {		errno = EINVAL;		return -1;	}	sp = &stream_array[index];	switch (sp->state) {	case RPP_CLOSE_WAIT1:			/* stream closed */	case RPP_CLOSE_WAIT2:	case RPP_LAST_ACK:	case RPP_OPEN_PEND:			/* shouldn't happen */	case RPP_FREE:	case RPP_DEAD:		errno = ENOTCONN;		return -1;	default:		break;	}/***	work though recv packets*/	for (pp=sp->recv_head; pp; pp=sp->recv_head) {		if (pp->type == RPP_GOODBYE)	/* stream finished */			break;		if (sp->msg_cnt < pp->len)			break;		sp->recv_sequence++;		sp->msg_cnt -= pp->len;		if (pp->data)			free(pp->data);		sp->recv_head = pp->next;		free(pp);	}	if (sp->recv_head == NULL)		sp->recv_tail = NULL;	sp->recv_attempt = 0;	sp->recv_commit = 0;	return 0;}/***	Commit data which has been written up to pend_attempt if flag**	is TRUE.  Otherwise, set pend_attempt back to the previous**	commit point pend_commit.**	Return -1 on error, 0 otherwise.*/intrpp_wcommit(index, flag)    int		index;    int		flag;{	DOID("wcommit")	struct	pending		*pp, *next;	struct	stream		*sp;	DBPRT((DBTO, "%s: entered index %d\n", id, index))	if (index < 0 || index >= stream_num) {		errno = EINVAL;		return -1;	}	sp = &stream_array[index];	switch (sp->state) {	case RPP_CLOSE_PEND:		errno = EPIPE;		return -1;	case RPP_STALE:		errno = ETIMEDOUT;		return -1;	case RPP_CLOSE_WAIT1:			/* stream closed */	case RPP_CLOSE_WAIT2:	case RPP_LAST_ACK:	case RPP_OPEN_PEND:			/* shouldn't happen */	case RPP_FREE:	case RPP_DEAD:		errno = ENOTCONN;		return -1;	default:		break;	}	if (flag) {			/* commit */		if (rpp_dopending(index, FALSE))			return -1;		if (rpp_recv_all() == -1)			return -1;		rpp_send_out();		return 0;	}	sp->pend_attempt = sp->pend_commit;	if (sp->pend_head == NULL)		return 0;	for (pp=sp->pend_head->next; pp; pp=next) {		free(pp->data);		next = pp->next;		free(pp);	}	sp->pend_head->next = NULL;	sp->pend_tail = sp->pend_head;	return 0;}/***	Skip len characters of a message.*/intrpp_skip(index, len)    int		index;    int		len;{	DOID("skip")	struct	stream		*sp;	int			ret, hiwater;	DBPRT((DBTO, "%s: entered index %d\n", id, index))	if (index < 0 || index >= stream_num) {		errno = EINVAL;		return -1;	}	sp = &stream_array[index];	switch (sp->state) {	case RPP_DEAD:	case RPP_FREE:	case RPP_CLOSE_WAIT1:	case RPP_CLOSE_WAIT2:	case RPP_LAST_ACK:		errno = ENOTCONN;		return -1;	/* stream closed */	default:		break;	}	if ((ret = rpp_okay(index)) <= 0)		return ret;	sp = &stream_array[index];	hiwater = MIN(sp->msg_cnt - sp->recv_attempt, len);	sp->recv_attempt += hiwater;	return hiwater;}/***	Check for any stream with a message waiting and**	return the stream number or a -1 if there are none.*/intrpp_poll(){	DOID("poll")	int			i;	DBPRT((DBTO, "%s: entered streams %d\n", id, stream_num))	/*	** Read socket to get any packets	*/	for (;;) {		i = rpp_recv_all();		if (i == -1 || i == -3)			break;	}	if (i == -1)		return -1;	/*	** See if any stream has a message waiting.	*/	for (i=0; i<stream_num; i++) {		if (rpp_attention(i))			break;	}	if (i < stream_num)	/* found one */		return i;	rpp_send_out();	return -2;}/***	Process any stream i/o.**	Return 0 or a -1 if there was an error.*/intrpp_io(){	DOID("io")	int			i;	DBPRT((DBTO, "%s: entered streams %d\n", id, stream_num))	/*	** Read socket to get any packets	*/	for (;;) {		i = rpp_recv_all();		if (i == -1 || i == -3)			break;	}	if (i == -1)		return -1;	rpp_send_out();	return 0;}/***	Read a character.**	Returns  >=0	the char read**		  -1	error or EOD**		  -2	EOF*/intrpp_getc(index)    int		index;{	int	ret;	u_char	c;	if ((ret = rpp_read(index, &c, 1)) == 1)		return ((int)c);	return ((ret == -2) ? -2 : -1);}/***	Write a character.*/intrpp_putc(index, c)    int		index;    int		c;{	u_char	x = (u_char)c;	if (rpp_write(index, &x, 1) != 1)		return -1;	return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -