call.c

来自「Linux Kernel 2.6.9 for OMAP1710」· C语言 代码 · 共 2,276 行 · 第 1/4 页

C
2,276
字号
static int rxrpc_call_generate_ACK(struct rxrpc_call *call,				   struct rxrpc_header *hdr,				   struct rxrpc_ackpacket *ack){	struct rxrpc_message *msg;	rxrpc_seq_t seq;	unsigned offset;	int ret = 0, err;	u8 special_ACK, do_ACK, force;	_enter("%p,%p { seq=%d tp=%d fl=%02x }",	       call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags);	seq = ntohl(hdr->seq);	offset = seq - call->ackr_win_bot;	do_ACK = RXRPC_ACK_DELAY;	special_ACK = 0;	force = (seq == 1);	if (call->ackr_high_seq < seq)		call->ackr_high_seq = seq;	/* deal with generation of obvious special ACKs first */	if (ack && ack->reason == RXRPC_ACK_PING) {		special_ACK = RXRPC_ACK_PING_RESPONSE;		ret = 1;		goto gen_ACK;	}	if (seq < call->ackr_win_bot) {		special_ACK = RXRPC_ACK_DUPLICATE;		ret = 1;		goto gen_ACK;	}	if (seq >= call->ackr_win_top) {		special_ACK = RXRPC_ACK_EXCEEDS_WINDOW;		ret = 1;		goto gen_ACK;	}	if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) {		special_ACK = RXRPC_ACK_DUPLICATE;		ret = 1;		goto gen_ACK;	}	/* okay... it's a normal data packet inside the ACK window */	call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK;	if (offset < call->ackr_pend_cnt) {	}	else if (offset > call->ackr_pend_cnt) {		do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE;		call->ackr_pend_cnt = offset;		goto gen_ACK;	}	if (hdr->flags & RXRPC_REQUEST_ACK) {		do_ACK = RXRPC_ACK_REQUESTED;	}	/* generate an ACK on the final packet of a reply just received */	if (hdr->flags & RXRPC_LAST_PACKET) {		if (call->conn->out_clientflag)			force = 1;	}	else if (!(hdr->flags & RXRPC_MORE_PACKETS)) {		do_ACK = RXRPC_ACK_REQUESTED;	}	/* re-ACK packets previously received out-of-order */	for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++)		if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK)			break;	call->ackr_pend_cnt = offset;	/* generate an ACK if we fill up the window */	if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE)		force = 1; gen_ACK:	_debug("%05lu ACKs pend=%u norm=%s special=%s%s",	       jiffies - call->cjif,	       call->ackr_pend_cnt,	       rxrpc_acks[do_ACK],	       rxrpc_acks[special_ACK],	       force ? " immediate" :	       do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" :	       hdr->flags & RXRPC_LAST_PACKET ? " finalise" :	       " defer"	       );	/* send any pending normal ACKs if need be */	if (call->ackr_pend_cnt > 0) {		/* fill out the appropriate form */		call->ackr.bufferSpace	= htons(RXRPC_CALL_ACK_WINDOW_SIZE);		call->ackr.maxSkew	= htons(min(call->ackr_high_seq - seq,						    65535U));		call->ackr.firstPacket	= htonl(call->ackr_win_bot);		call->ackr.previousPacket = call->ackr_prev_seq;		call->ackr.serial	= hdr->serial;		call->ackr.nAcks	= call->ackr_pend_cnt;		if (do_ACK == RXRPC_ACK_REQUESTED)			call->ackr.reason = do_ACK;		/* generate the ACK immediately if necessary */		if (special_ACK || force) {			err = __rxrpc_call_gen_normal_ACK(				call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq);			if (err < 0) {				ret = err;				goto out;			}		}	}	if (call->ackr.reason == RXRPC_ACK_REQUESTED)		call->ackr_dfr_seq = seq;	/* start the ACK timer if not running if there are any pending deferred	 * ACKs */	if (call->ackr_pend_cnt > 0 &&	    call->ackr.reason != RXRPC_ACK_REQUESTED &&	    !timer_pending(&call->ackr_dfr_timo)	    ) {		unsigned long timo;		timo = rxrpc_call_dfr_ack_timeout + jiffies;		_debug("START ACKR TIMER for cj=%lu", timo - call->cjif);		spin_lock(&call->lock);		mod_timer(&call->ackr_dfr_timo, timo);		spin_unlock(&call->lock);	}	else if ((call->ackr_pend_cnt == 0 ||		  call->ackr.reason == RXRPC_ACK_REQUESTED) &&		 timer_pending(&call->ackr_dfr_timo)		 ) {		/* stop timer if no pending ACKs */		_debug("CLEAR ACKR TIMER");		del_timer_sync(&call->ackr_dfr_timo);	}	/* send a special ACK if one is required */	if (special_ACK) {		struct rxrpc_ackpacket ack;		struct kvec diov[2];		uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK };		/* fill out the appropriate form */		ack.bufferSpace	= htons(RXRPC_CALL_ACK_WINDOW_SIZE);		ack.maxSkew	= htons(min(call->ackr_high_seq - seq,					    65535U));		ack.firstPacket	= htonl(call->ackr_win_bot);		ack.previousPacket = call->ackr_prev_seq;		ack.serial	= hdr->serial;		ack.reason	= special_ACK;		ack.nAcks	= 0;		_proto("Rx Sending s-ACK"		       " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",		       ntohs(ack.maxSkew),		       ntohl(ack.firstPacket),		       ntohl(ack.previousPacket),		       ntohl(ack.serial),		       rxrpc_acks[ack.reason],		       ack.nAcks);		diov[0].iov_len  = sizeof(struct rxrpc_ackpacket);		diov[0].iov_base = &ack;		diov[1].iov_len  = sizeof(acks);		diov[1].iov_base = acks;		/* build and send the message */		err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,					hdr->seq ? 2 : 1, diov,					GFP_KERNEL,					&msg);		if (err < 0) {			ret = err;			goto out;		}		msg->seq = seq;		msg->hdr.seq = htonl(seq);		msg->hdr.flags |= RXRPC_SLOW_START_OK;		err = rxrpc_conn_sendmsg(call->conn, msg);		rxrpc_put_message(msg);		if (err < 0) {			ret = err;			goto out;		}		call->pkt_snd_count++;	} out:	if (hdr->seq)		call->ackr_prev_seq = hdr->seq;	_leave(" = %d", ret);	return ret;} /* end rxrpc_call_generate_ACK() *//*****************************************************************************//* * handle work to be done on a call * - includes packet reception and timeout processing */void rxrpc_call_do_stuff(struct rxrpc_call *call){	_enter("%p{flags=%lx}", call, call->flags);	/* handle packet reception */	if (call->flags & RXRPC_CALL_RCV_PKT) {		_debug("- receive packet");		call->flags &= ~RXRPC_CALL_RCV_PKT;		rxrpc_call_receive_packet(call);	}	/* handle overdue ACKs */	if (call->flags & RXRPC_CALL_ACKS_TIMO) {		_debug("- overdue ACK timeout");		call->flags &= ~RXRPC_CALL_ACKS_TIMO;		rxrpc_call_resend(call, call->snd_seq_count);	}	/* handle lack of reception */	if (call->flags & RXRPC_CALL_RCV_TIMO) {		_debug("- reception timeout");		call->flags &= ~RXRPC_CALL_RCV_TIMO;		rxrpc_call_abort(call, -EIO);	}	/* handle deferred ACKs */	if (call->flags & RXRPC_CALL_ACKR_TIMO ||	    (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED)	    ) {		_debug("- deferred ACK timeout: cj=%05lu r=%s n=%u",		       jiffies - call->cjif,		       rxrpc_acks[call->ackr.reason],		       call->ackr.nAcks);		call->flags &= ~RXRPC_CALL_ACKR_TIMO;		if (call->ackr.nAcks > 0 &&		    call->app_call_state != RXRPC_CSTATE_ERROR) {			/* generate ACK */			__rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq);			call->ackr_dfr_seq = 0;		}	}	_leave("");} /* end rxrpc_call_do_stuff() *//*****************************************************************************//* * send an abort message at call or connection level * - must be called with call->lock held * - the supplied error code is sent as the packet data */static int __rxrpc_call_abort(struct rxrpc_call *call, int errno){	struct rxrpc_connection *conn = call->conn;	struct rxrpc_message *msg;	struct kvec diov[1];	int ret;	__be32 _error;	_enter("%p{%08x},%p{%d},%d",	       conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno);	/* if this call is already aborted, then just wake up any waiters */	if (call->app_call_state == RXRPC_CSTATE_ERROR) {		spin_unlock(&call->lock);		call->app_error_func(call);		_leave(" = 0");		return 0;	}	rxrpc_get_call(call);	/* change the state _with_ the lock still held */	call->app_call_state	= RXRPC_CSTATE_ERROR;	call->app_err_state	= RXRPC_ESTATE_LOCAL_ABORT;	call->app_errno		= errno;	call->app_mark		= RXRPC_APP_MARK_EOF;	call->app_read_buf	= NULL;	call->app_async_read	= 0;	_state(call);	/* ask the app to translate the error code */	call->app_aemap_func(call);	spin_unlock(&call->lock);	/* flush any outstanding ACKs */	del_timer_sync(&call->acks_timeout);	del_timer_sync(&call->rcv_timeout);	del_timer_sync(&call->ackr_dfr_timo);	if (rxrpc_call_is_ack_pending(call))		__rxrpc_call_gen_normal_ACK(call, 0);	/* send the abort packet only if we actually traded some other	 * packets */	ret = 0;	if (call->pkt_snd_count || call->pkt_rcv_count) {		/* actually send the abort */		_proto("Rx Sending Call ABORT { data=%d }",		       call->app_abort_code);		_error = htonl(call->app_abort_code);		diov[0].iov_len  = sizeof(_error);		diov[0].iov_base = &_error;		ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT,					1, diov, GFP_KERNEL, &msg);		if (ret == 0) {			ret = rxrpc_conn_sendmsg(conn, msg);			rxrpc_put_message(msg);		}	}	/* tell the app layer to let go */	call->app_error_func(call);	rxrpc_put_call(call);	_leave(" = %d", ret);	return ret;} /* end __rxrpc_call_abort() *//*****************************************************************************//* * send an abort message at call or connection level * - the supplied error code is sent as the packet data */int rxrpc_call_abort(struct rxrpc_call *call, int error){	spin_lock(&call->lock);	return __rxrpc_call_abort(call, error);} /* end rxrpc_call_abort() *//*****************************************************************************//* * process packets waiting for this call */static void rxrpc_call_receive_packet(struct rxrpc_call *call){	struct rxrpc_message *msg;	struct list_head *_p;	_enter("%p", call);	rxrpc_get_call(call); /* must not go away too soon if aborted by			       * app-layer */	while (!list_empty(&call->rcv_receiveq)) {		/* try to get next packet */		_p = NULL;		spin_lock(&call->lock);		if (!list_empty(&call->rcv_receiveq)) {			_p = call->rcv_receiveq.next;			list_del_init(_p);		}		spin_unlock(&call->lock);		if (!_p)			break;		msg = list_entry(_p, struct rxrpc_message, link);		_proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)",		       jiffies - call->cjif,		       rxrpc_pkts[msg->hdr.type],		       ntohl(msg->hdr.serial),		       msg->seq,		       msg->hdr.flags & RXRPC_JUMBO_PACKET	? 'j' : '-',		       msg->hdr.flags & RXRPC_MORE_PACKETS	? 'm' : '-',		       msg->hdr.flags & RXRPC_LAST_PACKET	? 'l' : '-',		       msg->hdr.flags & RXRPC_REQUEST_ACK	? 'r' : '-',		       msg->hdr.flags & RXRPC_CLIENT_INITIATED	? 'C' : 'S'		       );		switch (msg->hdr.type) {			/* deal with data packets */		case RXRPC_PACKET_TYPE_DATA:			/* ACK the packet if necessary */			switch (rxrpc_call_generate_ACK(call, &msg->hdr,							NULL)) {			case 0: /* useful packet */				rxrpc_call_receive_data_packet(call, msg);				break;			case 1: /* duplicate or out-of-window packet */				break;			default:				rxrpc_put_message(msg);				goto out;			}			break;			/* deal with ACK packets */		case RXRPC_PACKET_TYPE_ACK:			rxrpc_call_receive_ack_packet(call, msg);			break;			/* deal with abort packets */		case RXRPC_PACKET_TYPE_ABORT: {			__be32 _dbuf, *dp;			dp = skb_header_pointer(msg->pkt, msg->offset,						sizeof(_dbuf), &_dbuf);			if (dp == NULL)				printk("Rx Received short ABORT packet\n");			_proto("Rx Received Call ABORT { data=%d }",			       (dp ? ntohl(*dp) : 0));			spin_lock(&call->lock);			call->app_call_state	= RXRPC_CSTATE_ERROR;			call->app_err_state	= RXRPC_ESTATE_PEER_ABORT;			call->app_abort_code	= (dp ? ntohl(*dp) : 0);			call->app_errno		= -ECONNABORTED;			call->app_mark		= RXRPC_APP_MARK_EOF;			call->app_read_buf	= NULL;			call->app_async_read	= 0;			/* ask the app to translate the error code */			call->app_aemap_func(call);			_state(call);			spin_unlock(&call->lock);			call->app_error_func(call);			break;		}		default:			/* deal with other packet types */			_proto("Rx Unsupported packet type %u (#%u)",			       msg->hdr.type, msg->seq);			break;		}		rxrpc_put_message(msg);	} out:	rxrpc_put_call(call);	_leave("");} /* end rxrpc_call_receive_packet() *//*****************************************************************************//* * process next data packet * - as the next data packet arrives: *   - it is queued on app_readyq _if_ it is the next one expected *     (app_ready_seq+1) *   - it is queued on app_unreadyq _if_ it is not the next one expected *   - if a packet placed on app_readyq completely fills a hole leading up to *     the first packet on app_unreadyq, then packets now in sequence are *     tranferred to app_readyq * - the application layer can only see packets on app_readyq *   (app_ready_qty bytes) * - the application layer is prodded every time a new packet arrives */static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,					   struct rxrpc_message *msg){	const struct rxrpc_operation *optbl, *op;	struct rxrpc_message *pmsg;	struct list_head *_p;	int ret, lo, hi, rmtimo;	__be32 opid;	_enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq);	rxrpc_get_message(msg);	/* add to the unready queue if we'd have to create a hole in the ready	 * queue otherwise */	if (msg->seq != call->app_ready_seq + 1) {		_debug("Call add packet %d to unreadyq", msg->seq);		/* insert in seq order */		list_for_each(_p, &call->app_unreadyq) {			pmsg = list_entry(_p, struct rxrpc_message, link);			if (pmsg->seq > msg->seq)				break;		}		list_add_tail(&msg->link, _p);		_leave(" [unreadyq]");		return;	}	/* next in sequence - simply append into the call's ready queue */	_debug("Call add packet %d to readyq (+%Zd => %Zd bytes)",	       msg->seq, msg->dsize, call->app_ready_qty);	spin_lock(&call->lock);	call->app_ready_seq = msg->seq;	call->app_ready_qty += msg->dsize;	list_add_tail(&msg->link, &call->app_readyq);	/* move unready packets to the readyq if we got rid of a hole */	while (!list_empty(&call->app_unreadyq)) {		pmsg = list_entry(call->app_unreadyq.next,				  struct rxrpc_message, link);		if (pmsg->seq != call->app_ready_seq + 1)			break;		/* next in sequence - just move list-to-list */		_debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)",		       pmsg->seq, pmsg->dsize, call->app_ready_qty);		call->app_ready_seq = pmsg->seq;		call->app_ready_qty += pmsg->dsize;		list_del_init(&pmsg->link);		list_add_tail(&pmsg->link, &call->app_readyq);	}	/* see if we've got the last packet yet */	if (!list_empty(&call->app_readyq)) {		pmsg = list_entry(call->app_readyq.prev,				  struct rxrpc_message, link);		if (pmsg->hdr.flags & RXRPC_LAST_PACKET) {			call->app_last_rcv = 1;			_debug("Last packet on readyq");		}	}	switch (call->app_call_state) {		/* do nothing if call already aborted */	case RXRPC_CSTATE_ERROR:		spin_unlock(&call->lock);		_leave(" [error]");		return;		/* extract the operation ID from an incoming call if that's not		 * yet been done */	case RXRPC_CSTATE_SRVR_RCV_OPID:		spin_unlock(&call->lock);		/* handle as yet insufficient data for the operation ID */		if (call->app_ready_qty < 4) {			if (call->app_last_rcv)				/* trouble - last packet seen */				rxrpc_call_abort(call, -EINVAL);			_leave("");			return;		}		/* pull the operation ID out of the buffer */		ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0);		if (ret < 0) {			printk("Unexpected error from read-data: %d\n", ret);			if (call->app_call_state != RXRPC_CSTATE_ERROR)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?