call.c

来自「Linux Kernel 2.6.9 for OMAP1710」· C语言 代码 · 共 2,276 行 · 第 1/4 页

C
2,276
字号
		printk("reading in unexpected state [[[ %u ]]]\n",		       call->app_call_state);		BUG();	}	/* handle the case of not having an async buffer */	if (!call->app_async_read) {		if (call->app_mark == RXRPC_APP_MARK_EOF) {			ret = call->app_last_rcv ? 0 : -EAGAIN;		}		else {			if (call->app_mark >= call->app_ready_qty) {				call->app_mark = RXRPC_APP_MARK_EOF;				ret = 0;			}			else {				ret = call->app_last_rcv ? -EBADMSG : -EAGAIN;			}		}		_leave(" = %d [no buf]", ret);		return 0;	}	while (!list_empty(&call->app_readyq) && call->app_mark > 0) {		msg = list_entry(call->app_readyq.next,				 struct rxrpc_message, link);		/* drag as much data as we need out of this packet */		qty = min(call->app_mark, msg->dsize);		_debug("reading %Zu from skb=%p off=%lu",		       qty, msg->pkt, msg->offset);		if (call->app_read_buf)			if (skb_copy_bits(msg->pkt, msg->offset,					  call->app_read_buf, qty) < 0)				panic("%s: Failed to copy data from packet:"				      " (%p,%p,%Zd)",				      __FUNCTION__,				      call, call->app_read_buf, qty);		/* if that packet is now empty, discard it */		call->app_ready_qty -= qty;		msg->dsize -= qty;		if (msg->dsize == 0) {			list_del_init(&msg->link);			rxrpc_put_message(msg);		}		else {			msg->offset += qty;		}		call->app_mark -= qty;		if (call->app_read_buf)			call->app_read_buf += qty;	}	if (call->app_mark == 0) {		call->app_async_read = 0;		call->app_mark = RXRPC_APP_MARK_EOF;		call->app_read_buf = NULL;		/* adjust the state if used up all packets */		if (list_empty(&call->app_readyq) && call->app_last_rcv) {			switch (call->app_call_state) {			case RXRPC_CSTATE_SRVR_RCV_OPID:				call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;				call->app_mark = RXRPC_APP_MARK_EOF;				_state(call);				del_timer_sync(&call->rcv_timeout);				break;			case RXRPC_CSTATE_SRVR_GOT_ARGS:				call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;				_state(call);				del_timer_sync(&call->rcv_timeout);				break;			default:				call->app_call_state = RXRPC_CSTATE_COMPLETE;				_state(call);				del_timer_sync(&call->acks_timeout);				del_timer_sync(&call->ackr_dfr_timo);				del_timer_sync(&call->rcv_timeout);				break;			}		}		_leave(" = 0");		return 0;	}	if (call->app_last_rcv) {		_debug("Insufficient data (%Zu/%Zu)",		       call->app_ready_qty, call->app_mark);		call->app_async_read = 0;		call->app_mark = RXRPC_APP_MARK_EOF;		call->app_read_buf = NULL;		_leave(" = -EBADMSG");		return -EBADMSG;	}	_leave(" = -EAGAIN");	return -EAGAIN;} /* end __rxrpc_call_read_data() *//*****************************************************************************//* * attempt to read the specified amount of data from the call's ready queue * into the buffer provided * - since this func is the only one going to look at packets queued on *   app_readyq, we don't need a lock to modify or access them, only to modify *   the queue pointers * - if the buffer pointer is NULL, then data is merely drained, not copied * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is *   enough data or an error will be generated *   - note that the caller must have added the calling task to the call's wait *     queue beforehand * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this *   function doesn't read all available data */int rxrpc_call_read_data(struct rxrpc_call *call,			 void *buffer, size_t size, int flags){	int ret;	_enter("%p{arq=%Zu},%p,%Zd,%x",	       call, call->app_ready_qty, buffer, size, flags);	spin_lock(&call->lock);	if (unlikely(!!call->app_read_buf)) {		spin_unlock(&call->lock);		_leave(" = -EBUSY");		return -EBUSY;	}	call->app_mark = size;	call->app_read_buf = buffer;	call->app_async_read = 1;	call->app_read_count++;	/* read as much data as possible */	ret = __rxrpc_call_read_data(call);	switch (ret) {	case 0:		if (flags & RXRPC_CALL_READ_ALL &&		    (!call->app_last_rcv || call->app_ready_qty > 0)) {			_leave(" = -EBADMSG");			__rxrpc_call_abort(call, -EBADMSG);			return -EBADMSG;		}		spin_unlock(&call->lock);		call->app_attn_func(call);		_leave(" = 0");		return ret;	case -ECONNABORTED:		spin_unlock(&call->lock);		_leave(" = %d [aborted]", ret);		return ret;	default:		__rxrpc_call_abort(call, ret);		_leave(" = %d", ret);		return ret;	case -EAGAIN:		spin_unlock(&call->lock);		if (!(flags & RXRPC_CALL_READ_BLOCK)) {			_leave(" = -EAGAIN");			return -EAGAIN;		}		/* wait for the data to arrive */		_debug("blocking for data arrival");		for (;;) {			set_current_state(TASK_INTERRUPTIBLE);			if (!call->app_async_read || signal_pending(current))				break;			schedule();		}		set_current_state(TASK_RUNNING);		if (signal_pending(current)) {			_leave(" = -EINTR");			return -EINTR;		}		if (call->app_call_state == RXRPC_CSTATE_ERROR) {			_leave(" = -ECONNABORTED");			return -ECONNABORTED;		}		_leave(" = 0");		return 0;	}} /* end rxrpc_call_read_data() *//*****************************************************************************//* * write data to a call * - the data may not be sent immediately if it doesn't fill a buffer * - if we can't queue all the data for buffering now, siov[] will have been *   adjusted to take account of what has been sent */int rxrpc_call_write_data(struct rxrpc_call *call,			  size_t sioc,			  struct kvec *siov,			  u8 rxhdr_flags,			  int alloc_flags,			  int dup_data,			  size_t *size_sent){	struct rxrpc_message *msg;	struct kvec *sptr;	size_t space, size, chunk, tmp;	char *buf;	int ret;	_enter("%p,%Zu,%p,%02x,%x,%d,%p",	       call, sioc, siov, rxhdr_flags, alloc_flags, dup_data,	       size_sent);	*size_sent = 0;	size = 0;	ret = -EINVAL;	/* can't send more if we've sent last packet from this end */	switch (call->app_call_state) {	case RXRPC_CSTATE_SRVR_SND_REPLY:	case RXRPC_CSTATE_CLNT_SND_ARGS:		break;	case RXRPC_CSTATE_ERROR:		ret = call->app_errno;	default:		goto out;	}	/* calculate how much data we've been given */	sptr = siov;	for (; sioc > 0; sptr++, sioc--) {		if (!sptr->iov_len)			continue;		if (!sptr->iov_base)			goto out;		size += sptr->iov_len;	}	_debug("- size=%Zu mtu=%Zu", size, call->conn->mtu_size);	do {		/* make sure there's a message under construction */		if (!call->snd_nextmsg) {			/* no - allocate a message with no data yet attached */			ret = rxrpc_conn_newmsg(call->conn, call,						RXRPC_PACKET_TYPE_DATA,						0, NULL, alloc_flags,						&call->snd_nextmsg);			if (ret < 0)				goto out;			_debug("- allocated new message [ds=%Zu]",			       call->snd_nextmsg->dsize);		}		msg = call->snd_nextmsg;		msg->hdr.flags |= rxhdr_flags;		/* deal with zero-length terminal packet */		if (size == 0) {			if (rxhdr_flags & RXRPC_LAST_PACKET) {				ret = rxrpc_call_flush(call);				if (ret < 0)					goto out;			}			break;		}		/* work out how much space current packet has available */		space = call->conn->mtu_size - msg->dsize;		chunk = min(space, size);		_debug("- [before] space=%Zu chunk=%Zu", space, chunk);		while (!siov->iov_len)			siov++;		/* if we are going to have to duplicate the data then coalesce		 * it too */		if (dup_data) {			/* don't allocate more that 1 page at a time */			if (chunk > PAGE_SIZE)				chunk = PAGE_SIZE;			/* allocate a data buffer and attach to the message */			buf = kmalloc(chunk, alloc_flags);			if (unlikely(!buf)) {				if (msg->dsize ==				    sizeof(struct rxrpc_header)) {					/* discard an empty msg and wind back					 * the seq counter */					rxrpc_put_message(msg);					call->snd_nextmsg = NULL;					call->snd_seq_count--;				}				ret = -ENOMEM;				goto out;			}			tmp = msg->dcount++;			set_bit(tmp, &msg->dfree);			msg->data[tmp].iov_base = buf;			msg->data[tmp].iov_len = chunk;			msg->dsize += chunk;			*size_sent += chunk;			size -= chunk;			/* load the buffer with data */			while (chunk > 0) {				tmp = min(chunk, siov->iov_len);				memcpy(buf, siov->iov_base, tmp);				buf += tmp;				siov->iov_base += tmp;				siov->iov_len -= tmp;				if (!siov->iov_len)					siov++;				chunk -= tmp;			}		}		else {			/* we want to attach the supplied buffers directly */			while (chunk > 0 &&			       msg->dcount < RXRPC_MSG_MAX_IOCS) {				tmp = msg->dcount++;				msg->data[tmp].iov_base = siov->iov_base;				msg->data[tmp].iov_len = siov->iov_len;				msg->dsize += siov->iov_len;				*size_sent += siov->iov_len;				size -= siov->iov_len;				chunk -= siov->iov_len;				siov++;			}		}		_debug("- [loaded] chunk=%Zu size=%Zu", chunk, size);		/* dispatch the message when full, final or requesting ACK */		if (msg->dsize >= call->conn->mtu_size || rxhdr_flags) {			ret = rxrpc_call_flush(call);			if (ret < 0)				goto out;		}	} while(size > 0);	ret = 0; out:	_leave(" = %d (%Zd queued, %Zd rem)", ret, *size_sent, size);	return ret;} /* end rxrpc_call_write_data() *//*****************************************************************************//* * flush outstanding packets to the network */int rxrpc_call_flush(struct rxrpc_call *call){	struct rxrpc_message *msg;	int ret = 0;	_enter("%p", call);	rxrpc_get_call(call);	/* if there's a packet under construction, then dispatch it now */	if (call->snd_nextmsg) {		msg = call->snd_nextmsg;		call->snd_nextmsg = NULL;		if (msg->hdr.flags & RXRPC_LAST_PACKET) {			msg->hdr.flags &= ~RXRPC_MORE_PACKETS;			if (call->app_call_state != RXRPC_CSTATE_CLNT_SND_ARGS)				msg->hdr.flags |= RXRPC_REQUEST_ACK;		}		else {			msg->hdr.flags |= RXRPC_MORE_PACKETS;		}		_proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }",		       msg->dsize, msg->dcount, msg->dfree);		/* queue and adjust call state */		spin_lock(&call->lock);		list_add_tail(&msg->link, &call->acks_pendq);		/* decide what to do depending on current state and if this is		 * the last packet */		ret = -EINVAL;		switch (call->app_call_state) {		case RXRPC_CSTATE_SRVR_SND_REPLY:			if (msg->hdr.flags & RXRPC_LAST_PACKET) {				call->app_call_state =					RXRPC_CSTATE_SRVR_RCV_FINAL_ACK;				_state(call);			}			break;		case RXRPC_CSTATE_CLNT_SND_ARGS:			if (msg->hdr.flags & RXRPC_LAST_PACKET) {				call->app_call_state =					RXRPC_CSTATE_CLNT_RCV_REPLY;				_state(call);			}			break;		case RXRPC_CSTATE_ERROR:			ret = call->app_errno;		default:			spin_unlock(&call->lock);			goto out;		}		call->acks_pend_cnt++;		mod_timer(&call->acks_timeout,			  __rxrpc_rtt_based_timeout(call,						    rxrpc_call_acks_timeout));		spin_unlock(&call->lock);		ret = rxrpc_conn_sendmsg(call->conn, msg);		if (ret == 0)			call->pkt_snd_count++;	} out:	rxrpc_put_call(call);	_leave(" = %d", ret);	return ret;} /* end rxrpc_call_flush() *//*****************************************************************************//* * resend NAK'd or unacknowledged packets up to the highest one specified */static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest){	struct rxrpc_message *msg;	struct list_head *_p;	rxrpc_seq_t seq = 0;	_enter("%p,%u", call, highest);	_proto("Rx Resend required");	/* handle too many resends */	if (call->snd_resend_cnt >= rxrpc_call_max_resend) {		_debug("Aborting due to too many resends (rcv=%d)",		       call->pkt_rcv_count);		rxrpc_call_abort(call,				 call->pkt_rcv_count > 0 ? -EIO : -ETIMEDOUT);		_leave("");		return;	}	spin_lock(&call->lock);	call->snd_resend_cnt++;	for (;;) {		/* determine which the next packet we might need to ACK is */		if (seq <= call->acks_dftv_seq)			seq = call->acks_dftv_seq;		seq++;		if (seq > highest)			break;		/* look for the packet in the pending-ACK queue */		list_for_each(_p, &call->acks_pendq) {			msg = list_entry(_p, struct rxrpc_message, link);			if (msg->seq == seq)				goto found_msg;		}		panic("%s(%p,%d):"		      " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n",		      __FUNCTION__, call, highest,		      call->acks_dftv_seq, call->snd_seq_count, seq);	found_msg:		if (msg->state != RXRPC_MSG_SENT)			continue; /* only un-ACK'd packets */		rxrpc_get_message(msg);		spin_unlock(&call->lock);		/* send each message again (and ignore any errors we might		 * incur) */		_proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }",		       msg->dsize, msg->dcount, msg->dfree);		if (rxrpc_conn_sendmsg(call->conn, msg) == 0)			call->pkt_snd_count++;		rxrpc_put_message(msg);		spin_lock(&call->lock);	}	/* reset the timeout */	mod_timer(&call->acks_timeout,		  __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout));	spin_unlock(&call->lock);	_leave("");} /* end rxrpc_call_resend() *//*****************************************************************************//* * handle an ICMP error being applied to a call */void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno){	_enter("%p{%u},%d", call, ntohl(call->call_id), errno);	/* if this call is already aborted, then just wake up any waiters */	if (call->app_call_state == RXRPC_CSTATE_ERROR) {		call->app_error_func(call);	}	else {		/* tell the app layer what happened */		spin_lock(&call->lock);		call->app_call_state = RXRPC_CSTATE_ERROR;		_state(call);		if (local)			call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR;		else			call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR;		call->app_errno		= errno;		call->app_mark		= RXRPC_APP_MARK_EOF;		call->app_read_buf	= NULL;		call->app_async_read	= 0;		/* map the error */		call->app_aemap_func(call);		del_timer_sync(&call->acks_timeout);		del_timer_sync(&call->rcv_timeout);		del_timer_sync(&call->ackr_dfr_timo);		spin_unlock(&call->lock);		call->app_error_func(call);	}	_leave("");} /* end rxrpc_call_handle_error() */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?