⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpp.c

📁 OpenPBS
💻 C
📖 第 1 页 / 共 5 页
字号:
	}	free(hname);	return hp;}/***	Allocate a list of alternate address for a host and save**	them in the stream structure.*/staticvoidrpp_alist(hp, sp)    struct	hostent		*hp;    struct	stream		*sp;{	int	i, j;	for (i=1; hp->h_addr_list[i]; i++);	if (i == 1)		return;	sp->addr_array = (struct in_addr *)calloc(i, sizeof(struct in_addr));	for (j=i=0; hp->h_addr_list[i]; i++) {		if (memcmp(&sp->addr.sin_addr,				hp->h_addr_list[i], hp->h_length) == 0)			continue;		memcpy(&sp->addr_array[j++], hp->h_addr_list[i], hp->h_length);	}	sp->addr_array[j].s_addr = 0;	return;}staticintrpp_send_ack(sp, seq)    struct	stream	*sp;    int		seq;{	DOID("send_ack")	char	buf[RPP_PKT_HEAD];	u_long	xcrc;	if (sp->stream_id < 0) {		/* can't send yet */		DBPRT((DBTO, "%s: STREAM NOT OPEN seq %d\n", id, seq))		return 0;	}	I2TOH(RPP_ACK, buf)	I8TOH(sp->stream_id, &buf[2])	I8TOH(seq, &buf[10])	xcrc = crc((u_char *)buf, (u_long)RPP_PKT_CRC);	I8TOH(xcrc, &buf[RPP_PKT_CRC])	DBPRT((DBTO, "%s: seq %d to %s crc %lX\n",			id, seq, netaddr(&sp->addr), xcrc))	if (sendto(sp->fd, buf, RPP_PKT_HEAD, 0, (struct sockaddr *)&sp->addr,			sizeof(struct sockaddr_in)) == -1) {		DBPRT((DBTO, "%s: ACK error %d\n", id, errno))		if (errno != EWOULDBLOCK && errno != ENOBUFS)			return -1;	}	return 0;}/***	Take a packet off the send queue and free it.*/staticvoiddqueue(pp)    struct	send_packet	*pp;{	if (pp->down == NULL)		bottom = pp->up;	else		pp->down->up = pp->up;	if (pp->up == NULL)		top = pp->down;	else		pp->up->down = pp->down;	if (--pkts_sent < 0)		pkts_sent = 0;	free(pp->data);	free(pp);	return;}/***	Get rid of anything on the pend and send queue for a stream.*/staticvoidclear_send(sp)    struct	stream		*sp;{	struct	pending		*ppp, *pprev;	struct	send_packet	*spp, *sprev;	for (ppp=sp->pend_head; ppp; ppp=pprev) {		pprev=ppp->next;		free(ppp->data);		free(ppp);	}	sp->pend_head = NULL;	sp->pend_tail = NULL;	sp->pend_commit = 0;	sp->pend_attempt = 0;	for (spp=sp->send_head; spp; spp=sprev) {		sprev=spp->next;		if (sp->stream_id == -1) { 	    	/* not open yet */			struct	send_packet	*look;	/* might not be */							/* on send queue */			for (look = top; look; look = look->down) {				if (look == spp)					break;			}			if (look == NULL) {				free(spp->data);				free(spp);				continue;			}		}		dqueue(spp);	}	sp->send_head = NULL;	sp->send_tail = NULL;}/***	Remove packets from receive, pending and send queues for**	a stream, free all the memory and zero the stream_array**	entry.*/staticvoidclear_stream(sp)    struct	stream		*sp;{	struct	recv_packet	*rpp, *rprev;	DBPRT((DBTO, "CLEAR stream %ld\n",			((long)sp - (long)stream_array)/sizeof(struct stream)))	for (rpp=sp->recv_head; rpp; rpp=rprev) {		rprev=rpp->next;		if (rpp->data)			free(rpp->data);		free(rpp);	}	sp->recv_head = NULL;	sp->recv_tail = NULL;	clear_send(sp);	if (sp->addr_array) {		free(sp->addr_array);		sp->addr_array = NULL;	}	sp->state = RPP_DEAD;}/***	Do a recvfrom call to get a packet off of all file descriptors.**	Return the index of the stream the packet belonged to**	or -2 if it was not data, or -1 if there was an error.**	Return -3 if there was no data to read.**	MAY CAUSE STATE CHANGE!*/staticintrpp_recv_pkt(fd)	int	fd;{	DOID("recv_pkt")	int			len, flen;	struct	sockaddr_in	addr;	struct	hostent		*hp;	int			i, streamid;	struct	send_packet	*spp, *sprev;	struct	recv_packet	*rpp, *rprev;	struct	recv_packet	*pkt;	struct	stream		*sp;	char			*data;	int		type;	int		sequence;	u_long		pktcrc;	data = malloc(RPP_PKT_SIZE);	assert(data != NULL);	flen = sizeof(struct sockaddr_in);	/*	**	Loop so we can avoid failing on EINTR.  Thanks to	**	Pete Wyckoff for finding this.	*/	for (;;) {		len = recvfrom(fd, data, RPP_PKT_SIZE, 0,			(struct sockaddr *)&addr, &flen);		if (len != -1)			break;		if (errno == EINTR)			continue;		free(data);		if (errno == EWOULDBLOCK ||		    errno == EAGAIN      ||		    errno == ECONNREFUSED) {			errno = 0;			return -3;		}		return -1;	}	DBPRT((DBTO, "%s: addr %s len %d\n", id, netaddr(&addr), len))	if (len < RPP_PKT_HEAD)		/* less than minimum size */		goto err_out;	HTOI8(&data[len-RPP_CRC_LEN], pktcrc)	if (pktcrc != crc((u_char *)data, (u_long)(len-RPP_CRC_LEN))) {		DBPRT((DBTO, "%s: packet crc %08lX failed\n", id, pktcrc))		goto err_out;	}	HTOI2(&data[len-RPP_PKT_HEAD], type)	HTOI8(&data[len-RPP_PKT_HEAD+RPP_HDR_SID], streamid)	HTOI8(&data[len-RPP_PKT_HEAD+RPP_HDR_SEQ], sequence)	switch (type) {	case RPP_ACK:		DBPRT((DBTO, "%s: ACK stream %d sequence %d crc %08lX\n",				id, streamid, sequence, pktcrc))		free(data);		if ((sp = rpp_check_pkt(streamid, &addr)) == NULL)			return -2;		if (sp->state == RPP_OPEN_PEND) {			if (sequence != sp->open_key) {				DBPRT((DBTO,					"%s: WILD ACK in RPP_OPEN_PEND %d\n",					id, streamid))				return -2;			}			spp = sp->send_head;			assert(spp->type == RPP_HELLO2);			assert(spp->next == NULL);			sp->state = RPP_CONNECT;			sp->send_head = NULL;			sp->send_tail = NULL;			dqueue(spp);			return streamid;		}		else if (sp->stream_id == -1) {			DBPRT((DBTO, "%s: ACK for closed stream %d\n",				id, streamid))			return -2;		}		for (spp=sp->send_head, sprev=NULL; spp;				sprev=spp, spp=spp->next) {			if (spp->sequence == sequence)				break;		}		if (spp) {			DBPRT((DBTO, "%s: stream %d seq %d took %ld\n",				id, streamid, sequence,				(long)(time(NULL) - spp->time_sent)))			if (sp->state == RPP_CLOSE_WAIT1 &&					spp->type == RPP_GOODBYE)				sp->state = RPP_CLOSE_WAIT2;			if (sprev == NULL)				sp->send_head = spp->next;			else				sprev->next = spp->next;			if (sp->send_tail == spp)				sp->send_tail = sprev;			dqueue(spp);			if (sp->state == RPP_LAST_ACK &&					sp->send_head == NULL) {				clear_stream(sp);				return -2;			}		}		return streamid;	case RPP_GOODBYE:		DBPRT((DBTO, "%s: GOODBYE stream %d sequence %d crc %08lX\n",				id, streamid, sequence, pktcrc))		free(data);		if ((sp = rpp_check_pkt(streamid, &addr)) == NULL)			return -2;		if (rpp_send_ack(sp, sequence) == -1)			return -1;		switch (sp->state) {		case RPP_OPEN_PEND:		case RPP_OPEN_WAIT:		case RPP_CLOSE_PEND:		case RPP_LAST_ACK:			return -2;		case RPP_CLOSE_WAIT1:			sp->state = RPP_LAST_ACK;			return -2;		case RPP_CLOSE_WAIT2:			clear_stream(sp);			return -2;				default:			break;		}		sp->state = RPP_CLOSE_PEND;		clear_send(sp);		/* other side not reading now */		for (rpp=sp->recv_head, rprev=NULL; rpp;				rprev=rpp, rpp=rpp->next) {			if (rpp->sequence >= sequence)				break;		}		if (rpp == NULL || rpp->sequence > sequence) {			DBPRT((DBTO, "%s: GOOD seq %d\n", id, sequence))			pkt = (struct recv_packet *)				malloc(sizeof(struct recv_packet));			assert(pkt != NULL);			pkt->type = type;			pkt->sequence = sequence;			pkt->len = 0;			pkt->data = NULL;			if (rprev == NULL) {				pkt->next = sp->recv_head;				sp->recv_head = pkt;			}			else {				pkt->next = rprev->next;				rprev->next = pkt;			}			if (sp->recv_tail == rprev)				sp->recv_tail = pkt;		}		else {			DBPRT((DBTO, "%s: DUPLICATE seq %d MAX seen %d\n",				id, sequence, rpp->sequence))		}		return -2;	case RPP_DATA:	case RPP_EOD:		DBPRT((DBTO,			"%s: DATA stream %d sequence %d crc %08lX len %d\n",			id, streamid, sequence, pktcrc, len))		if ((sp = rpp_check_pkt(streamid, &addr)) == NULL)			goto err_out;		if (rpp_send_ack(sp, sequence) == -1) {			free(data);			return -1;		}		switch (sp->state) {		case RPP_OPEN_WAIT:			DBPRT((DBTO,				"INPUT on unconnected stream %d\n", streamid))			free(data);			return -2;		case RPP_CLOSE_WAIT1:		case RPP_CLOSE_WAIT2:		case RPP_LAST_ACK:			DBPRT((DBTO, "INPUT on closed stream %d\n", streamid))			free(data);			return -2;		default:			break;		}		if (sequence < sp->recv_sequence) {			DBPRT((DBTO, "%s: OLD seq %d\n", id, sequence))			free(data);			return -2;		}		for (rpp=sp->recv_head, rprev=NULL; rpp;				rprev=rpp, rpp=rpp->next) {			if (rpp->sequence >= sequence)				break;		}		if (rpp == NULL || rpp->sequence > sequence) {			DBPRT((DBTO, "%s: GOOD seq %d\n", id, sequence))			data = realloc(data, len);			assert(data != NULL);			pkt = (struct recv_packet *)				malloc(sizeof(struct recv_packet));			assert(pkt != NULL);			pkt->type = type;			pkt->sequence = sequence;			pkt->len = len-RPP_PKT_HEAD;			pkt->data = (u_char *)data;			if (rprev == NULL) {				pkt->next = sp->recv_head;				sp->recv_head = pkt;			}			else {				pkt->next = rprev->next;				rprev->next = pkt;			}			if (sp->recv_tail == rprev)				sp->recv_tail = pkt;			if (sp->state == RPP_OPEN_PEND)				return -2;			else				return streamid;		}		else {			DBPRT((DBTO, "%s: DUPLICATE seq %d MAX seen %d\n",				id, sequence, rpp->sequence))			free(data);		}		break;	case RPP_HELLO1:		/*		** HELLO1 packets have the remote side's stream index		** in the "streamid" field and open key in the sequence.		*/		DBPRT((DBTO, "%s: HELLO1 stream %d sequence %d\n",				id, streamid, sequence))		free(data);		for (i=0; i<stream_num; i++) {			sp = &stream_array[i];			if (sp->state <= RPP_FREE)				continue;			if (memcmp(&sp->addr, &addr, sizeof(addr)))				continue;			if (sp->open_key == sequence) {				rpp_send_out();				return -2;			}			DBPRT((DBTO, "OLD STREAM state %d reopened %d %d\n",				sp->state, sp->open_key, sequence))			clear_stream(sp);	/* old stream */		}		i = rpp_create_sp();		if (i == -1)			return -1;		sp = &stream_array[i];		sp->state = RPP_OPEN_PEND;		sp->fd = fd;		sp->retry = rpp_retry;		memcpy(&sp->addr, &addr, sizeof(addr));		if ((hp = rpp_get_cname(&addr)) != NULL)			rpp_alist(hp, sp);		sp->stream_id = streamid;		sp->open_key = sequence;		open_key = MAX(open_key, sequence);		rpp_form_pkt(i, RPP_HELLO2, i, NULL, 0);		rpp_send_out();		break;	case RPP_HELLO2:		/*		** HELLO2 packet has this side's stream index in		** "streamid" as usual and the remote side's		** stream index overloaded in the "sequence" field.		*/		DBPRT((DBTO, "%s: HELLO2 stream %d sequence %d\n",				id, streamid, sequence))		free(data);		if ((sp = rpp_check_pkt(streamid, &addr)) == NULL)			return -2;		switch (sp->state) {		case RPP_OPEN_WAIT:			sp->state = RPP_CONNECT;			break;		case RPP_CLOSE_WAIT1:	/* called close before open done */		case RPP_LAST_ACK:			break;		default:			if (sp->stream_id == sequence) {				DBPRT((DBTO,					"%s: stream %d got DUP HELLO2 %d\n",					id, streamid, sp->state))				if (rpp_send_ack(sp, sp->open_key) == -1)					return -1;			}			else {				DBPRT((DBTO, "%s: NON-DUP HELLO2\n", id))			}			return -2;		}		sp->stream_id = sequence;		if (rpp_send_ack(sp, sp->open_key) == -1)			return -1;		if ((spp = sp->send_head) == NULL) {			DBPRT((DBTO,				"%s: stream %d got HELLO2 but sendq NULL\n",				id, streamid))			return -2;		}		if (spp->type != RPP_HELLO1) {			DBPRT((DBTO,				"%s: stream %d sendq %d rather than HELLO1\n",				id, streamid, spp->type))			return -2;		}		sp->send_head = spp->next;	/* remove HELLO1 pkt */		if (sp->send_tail == spp)			sp->send_tail = NULL;		dqueue(spp);		/*		** Put any waitting packets onto the send queue		*/		for (spp=sp->send_head; spp; spp=spp->next) {			int	len = spp->len;			DBPRT((DBTO, "%s: idx %d link %d seq %d len %d to sendq\n",				id, streamid, spp->type, spp->sequence, len))			I2TOH(spp->type, (char *)&spp->data[len])

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -