call.c
来自「Linux Kernel 2.6.9 for OMAP1710」· C语言 代码 · 共 2,276 行 · 第 1/4 页
C
2,276 行
printk("reading in unexpected state [[[ %u ]]]\n", call->app_call_state); BUG(); } /* handle the case of not having an async buffer */ if (!call->app_async_read) { if (call->app_mark == RXRPC_APP_MARK_EOF) { ret = call->app_last_rcv ? 0 : -EAGAIN; } else { if (call->app_mark >= call->app_ready_qty) { call->app_mark = RXRPC_APP_MARK_EOF; ret = 0; } else { ret = call->app_last_rcv ? -EBADMSG : -EAGAIN; } } _leave(" = %d [no buf]", ret); return 0; } while (!list_empty(&call->app_readyq) && call->app_mark > 0) { msg = list_entry(call->app_readyq.next, struct rxrpc_message, link); /* drag as much data as we need out of this packet */ qty = min(call->app_mark, msg->dsize); _debug("reading %Zu from skb=%p off=%lu", qty, msg->pkt, msg->offset); if (call->app_read_buf) if (skb_copy_bits(msg->pkt, msg->offset, call->app_read_buf, qty) < 0) panic("%s: Failed to copy data from packet:" " (%p,%p,%Zd)", __FUNCTION__, call, call->app_read_buf, qty); /* if that packet is now empty, discard it */ call->app_ready_qty -= qty; msg->dsize -= qty; if (msg->dsize == 0) { list_del_init(&msg->link); rxrpc_put_message(msg); } else { msg->offset += qty; } call->app_mark -= qty; if (call->app_read_buf) call->app_read_buf += qty; } if (call->app_mark == 0) { call->app_async_read = 0; call->app_mark = RXRPC_APP_MARK_EOF; call->app_read_buf = NULL; /* adjust the state if used up all packets */ if (list_empty(&call->app_readyq) && call->app_last_rcv) { switch (call->app_call_state) { case RXRPC_CSTATE_SRVR_RCV_OPID: call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; call->app_mark = RXRPC_APP_MARK_EOF; _state(call); del_timer_sync(&call->rcv_timeout); break; case RXRPC_CSTATE_SRVR_GOT_ARGS: call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; _state(call); del_timer_sync(&call->rcv_timeout); break; default: call->app_call_state = RXRPC_CSTATE_COMPLETE; _state(call); del_timer_sync(&call->acks_timeout); del_timer_sync(&call->ackr_dfr_timo); del_timer_sync(&call->rcv_timeout); break; } } _leave(" = 0"); return 0; } if (call->app_last_rcv) { _debug("Insufficient data (%Zu/%Zu)", call->app_ready_qty, call->app_mark); call->app_async_read = 0; call->app_mark = RXRPC_APP_MARK_EOF; call->app_read_buf = NULL; _leave(" = -EBADMSG"); return -EBADMSG; } _leave(" = -EAGAIN"); return -EAGAIN;} /* end __rxrpc_call_read_data() *//*****************************************************************************//* * attempt to read the specified amount of data from the call's ready queue * into the buffer provided * - since this func is the only one going to look at packets queued on * app_readyq, we don't need a lock to modify or access them, only to modify * the queue pointers * - if the buffer pointer is NULL, then data is merely drained, not copied * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is * enough data or an error will be generated * - note that the caller must have added the calling task to the call's wait * queue beforehand * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this * function doesn't read all available data */int rxrpc_call_read_data(struct rxrpc_call *call, void *buffer, size_t size, int flags){ int ret; _enter("%p{arq=%Zu},%p,%Zd,%x", call, call->app_ready_qty, buffer, size, flags); spin_lock(&call->lock); if (unlikely(!!call->app_read_buf)) { spin_unlock(&call->lock); _leave(" = -EBUSY"); return -EBUSY; } call->app_mark = size; call->app_read_buf = buffer; call->app_async_read = 1; call->app_read_count++; /* read as much data as possible */ ret = __rxrpc_call_read_data(call); switch (ret) { case 0: if (flags & RXRPC_CALL_READ_ALL && (!call->app_last_rcv || call->app_ready_qty > 0)) { _leave(" = -EBADMSG"); __rxrpc_call_abort(call, -EBADMSG); return -EBADMSG; } spin_unlock(&call->lock); call->app_attn_func(call); _leave(" = 0"); return ret; case -ECONNABORTED: spin_unlock(&call->lock); _leave(" = %d [aborted]", ret); return ret; default: __rxrpc_call_abort(call, ret); _leave(" = %d", ret); return ret; case -EAGAIN: spin_unlock(&call->lock); if (!(flags & RXRPC_CALL_READ_BLOCK)) { _leave(" = -EAGAIN"); return -EAGAIN; } /* wait for the data to arrive */ _debug("blocking for data arrival"); for (;;) { set_current_state(TASK_INTERRUPTIBLE); if (!call->app_async_read || signal_pending(current)) break; schedule(); } set_current_state(TASK_RUNNING); if (signal_pending(current)) { _leave(" = -EINTR"); return -EINTR; } if (call->app_call_state == RXRPC_CSTATE_ERROR) { _leave(" = -ECONNABORTED"); return -ECONNABORTED; } _leave(" = 0"); return 0; }} /* end rxrpc_call_read_data() *//*****************************************************************************//* * write data to a call * - the data may not be sent immediately if it doesn't fill a buffer * - if we can't queue all the data for buffering now, siov[] will have been * adjusted to take account of what has been sent */int rxrpc_call_write_data(struct rxrpc_call *call, size_t sioc, struct kvec *siov, u8 rxhdr_flags, int alloc_flags, int dup_data, size_t *size_sent){ struct rxrpc_message *msg; struct kvec *sptr; size_t space, size, chunk, tmp; char *buf; int ret; _enter("%p,%Zu,%p,%02x,%x,%d,%p", call, sioc, siov, rxhdr_flags, alloc_flags, dup_data, size_sent); *size_sent = 0; size = 0; ret = -EINVAL; /* can't send more if we've sent last packet from this end */ switch (call->app_call_state) { case RXRPC_CSTATE_SRVR_SND_REPLY: case RXRPC_CSTATE_CLNT_SND_ARGS: break; case RXRPC_CSTATE_ERROR: ret = call->app_errno; default: goto out; } /* calculate how much data we've been given */ sptr = siov; for (; sioc > 0; sptr++, sioc--) { if (!sptr->iov_len) continue; if (!sptr->iov_base) goto out; size += sptr->iov_len; } _debug("- size=%Zu mtu=%Zu", size, call->conn->mtu_size); do { /* make sure there's a message under construction */ if (!call->snd_nextmsg) { /* no - allocate a message with no data yet attached */ ret = rxrpc_conn_newmsg(call->conn, call, RXRPC_PACKET_TYPE_DATA, 0, NULL, alloc_flags, &call->snd_nextmsg); if (ret < 0) goto out; _debug("- allocated new message [ds=%Zu]", call->snd_nextmsg->dsize); } msg = call->snd_nextmsg; msg->hdr.flags |= rxhdr_flags; /* deal with zero-length terminal packet */ if (size == 0) { if (rxhdr_flags & RXRPC_LAST_PACKET) { ret = rxrpc_call_flush(call); if (ret < 0) goto out; } break; } /* work out how much space current packet has available */ space = call->conn->mtu_size - msg->dsize; chunk = min(space, size); _debug("- [before] space=%Zu chunk=%Zu", space, chunk); while (!siov->iov_len) siov++; /* if we are going to have to duplicate the data then coalesce * it too */ if (dup_data) { /* don't allocate more that 1 page at a time */ if (chunk > PAGE_SIZE) chunk = PAGE_SIZE; /* allocate a data buffer and attach to the message */ buf = kmalloc(chunk, alloc_flags); if (unlikely(!buf)) { if (msg->dsize == sizeof(struct rxrpc_header)) { /* discard an empty msg and wind back * the seq counter */ rxrpc_put_message(msg); call->snd_nextmsg = NULL; call->snd_seq_count--; } ret = -ENOMEM; goto out; } tmp = msg->dcount++; set_bit(tmp, &msg->dfree); msg->data[tmp].iov_base = buf; msg->data[tmp].iov_len = chunk; msg->dsize += chunk; *size_sent += chunk; size -= chunk; /* load the buffer with data */ while (chunk > 0) { tmp = min(chunk, siov->iov_len); memcpy(buf, siov->iov_base, tmp); buf += tmp; siov->iov_base += tmp; siov->iov_len -= tmp; if (!siov->iov_len) siov++; chunk -= tmp; } } else { /* we want to attach the supplied buffers directly */ while (chunk > 0 && msg->dcount < RXRPC_MSG_MAX_IOCS) { tmp = msg->dcount++; msg->data[tmp].iov_base = siov->iov_base; msg->data[tmp].iov_len = siov->iov_len; msg->dsize += siov->iov_len; *size_sent += siov->iov_len; size -= siov->iov_len; chunk -= siov->iov_len; siov++; } } _debug("- [loaded] chunk=%Zu size=%Zu", chunk, size); /* dispatch the message when full, final or requesting ACK */ if (msg->dsize >= call->conn->mtu_size || rxhdr_flags) { ret = rxrpc_call_flush(call); if (ret < 0) goto out; } } while(size > 0); ret = 0; out: _leave(" = %d (%Zd queued, %Zd rem)", ret, *size_sent, size); return ret;} /* end rxrpc_call_write_data() *//*****************************************************************************//* * flush outstanding packets to the network */int rxrpc_call_flush(struct rxrpc_call *call){ struct rxrpc_message *msg; int ret = 0; _enter("%p", call); rxrpc_get_call(call); /* if there's a packet under construction, then dispatch it now */ if (call->snd_nextmsg) { msg = call->snd_nextmsg; call->snd_nextmsg = NULL; if (msg->hdr.flags & RXRPC_LAST_PACKET) { msg->hdr.flags &= ~RXRPC_MORE_PACKETS; if (call->app_call_state != RXRPC_CSTATE_CLNT_SND_ARGS) msg->hdr.flags |= RXRPC_REQUEST_ACK; } else { msg->hdr.flags |= RXRPC_MORE_PACKETS; } _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }", msg->dsize, msg->dcount, msg->dfree); /* queue and adjust call state */ spin_lock(&call->lock); list_add_tail(&msg->link, &call->acks_pendq); /* decide what to do depending on current state and if this is * the last packet */ ret = -EINVAL; switch (call->app_call_state) { case RXRPC_CSTATE_SRVR_SND_REPLY: if (msg->hdr.flags & RXRPC_LAST_PACKET) { call->app_call_state = RXRPC_CSTATE_SRVR_RCV_FINAL_ACK; _state(call); } break; case RXRPC_CSTATE_CLNT_SND_ARGS: if (msg->hdr.flags & RXRPC_LAST_PACKET) { call->app_call_state = RXRPC_CSTATE_CLNT_RCV_REPLY; _state(call); } break; case RXRPC_CSTATE_ERROR: ret = call->app_errno; default: spin_unlock(&call->lock); goto out; } call->acks_pend_cnt++; mod_timer(&call->acks_timeout, __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout)); spin_unlock(&call->lock); ret = rxrpc_conn_sendmsg(call->conn, msg); if (ret == 0) call->pkt_snd_count++; } out: rxrpc_put_call(call); _leave(" = %d", ret); return ret;} /* end rxrpc_call_flush() *//*****************************************************************************//* * resend NAK'd or unacknowledged packets up to the highest one specified */static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest){ struct rxrpc_message *msg; struct list_head *_p; rxrpc_seq_t seq = 0; _enter("%p,%u", call, highest); _proto("Rx Resend required"); /* handle too many resends */ if (call->snd_resend_cnt >= rxrpc_call_max_resend) { _debug("Aborting due to too many resends (rcv=%d)", call->pkt_rcv_count); rxrpc_call_abort(call, call->pkt_rcv_count > 0 ? -EIO : -ETIMEDOUT); _leave(""); return; } spin_lock(&call->lock); call->snd_resend_cnt++; for (;;) { /* determine which the next packet we might need to ACK is */ if (seq <= call->acks_dftv_seq) seq = call->acks_dftv_seq; seq++; if (seq > highest) break; /* look for the packet in the pending-ACK queue */ list_for_each(_p, &call->acks_pendq) { msg = list_entry(_p, struct rxrpc_message, link); if (msg->seq == seq) goto found_msg; } panic("%s(%p,%d):" " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n", __FUNCTION__, call, highest, call->acks_dftv_seq, call->snd_seq_count, seq); found_msg: if (msg->state != RXRPC_MSG_SENT) continue; /* only un-ACK'd packets */ rxrpc_get_message(msg); spin_unlock(&call->lock); /* send each message again (and ignore any errors we might * incur) */ _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }", msg->dsize, msg->dcount, msg->dfree); if (rxrpc_conn_sendmsg(call->conn, msg) == 0) call->pkt_snd_count++; rxrpc_put_message(msg); spin_lock(&call->lock); } /* reset the timeout */ mod_timer(&call->acks_timeout, __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout)); spin_unlock(&call->lock); _leave("");} /* end rxrpc_call_resend() *//*****************************************************************************//* * handle an ICMP error being applied to a call */void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno){ _enter("%p{%u},%d", call, ntohl(call->call_id), errno); /* if this call is already aborted, then just wake up any waiters */ if (call->app_call_state == RXRPC_CSTATE_ERROR) { call->app_error_func(call); } else { /* tell the app layer what happened */ spin_lock(&call->lock); call->app_call_state = RXRPC_CSTATE_ERROR; _state(call); if (local) call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR; else call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR; call->app_errno = errno; call->app_mark = RXRPC_APP_MARK_EOF; call->app_read_buf = NULL; call->app_async_read = 0; /* map the error */ call->app_aemap_func(call); del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); spin_unlock(&call->lock); call->app_error_func(call); } _leave("");} /* end rxrpc_call_handle_error() */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?