call.c
来自「Linux Kernel 2.6.9 for OMAP1710」· C语言 代码 · 共 2,276 行 · 第 1/4 页
C
2,276 行
rxrpc_call_abort(call, ret); _leave(""); return; } call->app_opcode = ntohl(opid); /* locate the operation in the available ops table */ optbl = call->conn->service->ops_begin; lo = 0; hi = call->conn->service->ops_end - optbl; while (lo < hi) { int mid = (hi + lo) / 2; op = &optbl[mid]; if (call->app_opcode == op->id) goto found_op; if (call->app_opcode > op->id) lo = mid + 1; else hi = mid; } /* search failed */ kproto("Rx Client requested operation %d from %s service", call->app_opcode, call->conn->service->name); rxrpc_call_abort(call, -EINVAL); _leave(" [inval]"); return; found_op: _proto("Rx Client requested operation %s from %s service", op->name, call->conn->service->name); /* we're now waiting for the argument block (unless the call * was aborted) */ spin_lock(&call->lock); if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_OPID || call->app_call_state == RXRPC_CSTATE_SRVR_SND_REPLY) { if (!call->app_last_rcv) call->app_call_state = RXRPC_CSTATE_SRVR_RCV_ARGS; else if (call->app_ready_qty > 0) call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS; else call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY; call->app_mark = op->asize; call->app_user = op->user; } spin_unlock(&call->lock); _state(call); break; case RXRPC_CSTATE_SRVR_RCV_ARGS: /* change state if just received last packet of arg block */ if (call->app_last_rcv) call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS; spin_unlock(&call->lock); _state(call); break; case RXRPC_CSTATE_CLNT_RCV_REPLY: /* change state if just received last packet of reply block */ rmtimo = 0; if (call->app_last_rcv) { call->app_call_state = RXRPC_CSTATE_CLNT_GOT_REPLY; rmtimo = 1; } spin_unlock(&call->lock); if (rmtimo) { del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); } _state(call); break; default: /* deal with data reception in an unexpected state */ printk("Unexpected state [[[ %u ]]]\n", call->app_call_state); __rxrpc_call_abort(call, -EBADMSG); _leave(""); return; } if (call->app_call_state == RXRPC_CSTATE_CLNT_RCV_REPLY && call->app_last_rcv) BUG(); /* otherwise just invoke the data function whenever we can satisfy its desire for more * data */ _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s", call->app_call_state, call->app_ready_qty, call->app_mark, call->app_last_rcv ? " last-rcvd" : ""); spin_lock(&call->lock); ret = __rxrpc_call_read_data(call); switch (ret) { case 0: spin_unlock(&call->lock); call->app_attn_func(call); break; case -EAGAIN: spin_unlock(&call->lock); break; case -ECONNABORTED: spin_unlock(&call->lock); break; default: __rxrpc_call_abort(call, ret); break; } _state(call); _leave("");} /* end rxrpc_call_receive_data_packet() *//*****************************************************************************//* * received an ACK packet */static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call, struct rxrpc_message *msg){ struct rxrpc_ackpacket _ack, *ap; rxrpc_serial_net_t serial; rxrpc_seq_t seq; int ret; _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq); /* extract the basic ACK record */ ap = skb_header_pointer(msg->pkt, msg->offset, sizeof(_ack), &_ack); if (ap == NULL) { printk("Rx Received short ACK packet\n"); return; } msg->offset += sizeof(_ack); serial = ap->serial; seq = ntohl(ap->firstPacket); _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }", ntohl(msg->hdr.serial), ntohs(ap->bufferSpace), ntohs(ap->maxSkew), seq, ntohl(ap->previousPacket), ntohl(serial), rxrpc_acks[ap->reason], call->ackr.nAcks ); /* check the other side isn't ACK'ing a sequence number I haven't sent * yet */ if (ap->nAcks > 0 && (seq > call->snd_seq_count || seq + ap->nAcks - 1 > call->snd_seq_count)) { printk("Received ACK (#%u-#%u) for unsent packet\n", seq, seq + ap->nAcks - 1); rxrpc_call_abort(call, -EINVAL); _leave(""); return; } /* deal with RTT calculation */ if (serial) { struct rxrpc_message *rttmsg; /* find the prompting packet */ spin_lock(&call->lock); if (call->snd_ping && call->snd_ping->hdr.serial == serial) { /* it was a ping packet */ rttmsg = call->snd_ping; call->snd_ping = NULL; spin_unlock(&call->lock); if (rttmsg) { rttmsg->rttdone = 1; rxrpc_peer_calculate_rtt(call->conn->peer, rttmsg, msg); rxrpc_put_message(rttmsg); } } else { struct list_head *_p; /* it ought to be a data packet - look in the pending * ACK list */ list_for_each(_p, &call->acks_pendq) { rttmsg = list_entry(_p, struct rxrpc_message, link); if (rttmsg->hdr.serial == serial) { if (rttmsg->rttdone) /* never do RTT twice without * resending */ break; rttmsg->rttdone = 1; rxrpc_peer_calculate_rtt( call->conn->peer, rttmsg, msg); break; } } spin_unlock(&call->lock); } } switch (ap->reason) { /* deal with negative/positive acknowledgement of data * packets */ case RXRPC_ACK_REQUESTED: case RXRPC_ACK_DELAY: case RXRPC_ACK_IDLE: rxrpc_call_definitively_ACK(call, seq - 1); case RXRPC_ACK_DUPLICATE: case RXRPC_ACK_OUT_OF_SEQUENCE: case RXRPC_ACK_EXCEEDS_WINDOW: call->snd_resend_cnt = 0; ret = rxrpc_call_record_ACK(call, msg, seq, ap->nAcks); if (ret < 0) rxrpc_call_abort(call, ret); break; /* respond to ping packets immediately */ case RXRPC_ACK_PING: rxrpc_call_generate_ACK(call, &msg->hdr, ap); break; /* only record RTT on ping response packets */ case RXRPC_ACK_PING_RESPONSE: if (call->snd_ping) { struct rxrpc_message *rttmsg; /* only do RTT stuff if the response matches the * retained ping */ rttmsg = NULL; spin_lock(&call->lock); if (call->snd_ping && call->snd_ping->hdr.serial == ap->serial) { rttmsg = call->snd_ping; call->snd_ping = NULL; } spin_unlock(&call->lock); if (rttmsg) { rttmsg->rttdone = 1; rxrpc_peer_calculate_rtt(call->conn->peer, rttmsg, msg); rxrpc_put_message(rttmsg); } } break; default: printk("Unsupported ACK reason %u\n", ap->reason); break; } _leave("");} /* end rxrpc_call_receive_ack_packet() *//*****************************************************************************//* * record definitive ACKs for all messages up to and including the one with the * 'highest' seq */static void rxrpc_call_definitively_ACK(struct rxrpc_call *call, rxrpc_seq_t highest){ struct rxrpc_message *msg; int now_complete; _enter("%p{ads=%u},%u", call, call->acks_dftv_seq, highest); while (call->acks_dftv_seq < highest) { call->acks_dftv_seq++; _proto("Definitive ACK on packet #%u", call->acks_dftv_seq); /* discard those at front of queue until message with highest * ACK is found */ spin_lock(&call->lock); msg = NULL; if (!list_empty(&call->acks_pendq)) { msg = list_entry(call->acks_pendq.next, struct rxrpc_message, link); list_del_init(&msg->link); /* dequeue */ if (msg->state == RXRPC_MSG_SENT) call->acks_pend_cnt--; } spin_unlock(&call->lock); /* insanity check */ if (!msg) panic("%s(): acks_pendq unexpectedly empty\n", __FUNCTION__); if (msg->seq != call->acks_dftv_seq) panic("%s(): Packet #%u expected at front of acks_pendq" " (#%u found)\n", __FUNCTION__, call->acks_dftv_seq, msg->seq); /* discard the message */ msg->state = RXRPC_MSG_DONE; rxrpc_put_message(msg); } /* if all sent packets are definitively ACK'd then prod any sleepers just in case */ now_complete = 0; spin_lock(&call->lock); if (call->acks_dftv_seq == call->snd_seq_count) { if (call->app_call_state != RXRPC_CSTATE_COMPLETE) { call->app_call_state = RXRPC_CSTATE_COMPLETE; _state(call); now_complete = 1; } } spin_unlock(&call->lock); if (now_complete) { del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); call->app_attn_func(call); } _leave("");} /* end rxrpc_call_definitively_ACK() *//*****************************************************************************//* * record the specified amount of ACKs/NAKs */static int rxrpc_call_record_ACK(struct rxrpc_call *call, struct rxrpc_message *msg, rxrpc_seq_t seq, size_t count){ struct rxrpc_message *dmsg; struct list_head *_p; rxrpc_seq_t highest; unsigned ix; size_t chunk; char resend, now_complete; u8 acks[16]; _enter("%p{apc=%u ads=%u},%p,%u,%Zu", call, call->acks_pend_cnt, call->acks_dftv_seq, msg, seq, count); /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order * ACKs) */ if (seq <= call->acks_dftv_seq) { unsigned delta = call->acks_dftv_seq - seq; if (count <= delta) { _leave(" = 0 [all definitively ACK'd]"); return 0; } seq += delta; count -= delta; msg->offset += delta; } highest = seq + count - 1; resend = 0; while (count > 0) { /* extract up to 16 ACK slots at a time */ chunk = min(count, sizeof(acks)); count -= chunk; memset(acks, 2, sizeof(acks)); if (skb_copy_bits(msg->pkt, msg->offset, &acks, chunk) < 0) { printk("Rx Received short ACK packet\n"); _leave(" = -EINVAL"); return -EINVAL; } msg->offset += chunk; /* check that the ACK set is valid */ for (ix = 0; ix < chunk; ix++) { switch (acks[ix]) { case RXRPC_ACK_TYPE_ACK: break; case RXRPC_ACK_TYPE_NACK: resend = 1; break; default: printk("Rx Received unsupported ACK state" " %u\n", acks[ix]); _leave(" = -EINVAL"); return -EINVAL; } } _proto("Rx ACK of packets #%u-#%u " "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)", seq, (unsigned) (seq + chunk - 1), _acktype[acks[0x0]], _acktype[acks[0x1]], _acktype[acks[0x2]], _acktype[acks[0x3]], _acktype[acks[0x4]], _acktype[acks[0x5]], _acktype[acks[0x6]], _acktype[acks[0x7]], _acktype[acks[0x8]], _acktype[acks[0x9]], _acktype[acks[0xA]], _acktype[acks[0xB]], _acktype[acks[0xC]], _acktype[acks[0xD]], _acktype[acks[0xE]], _acktype[acks[0xF]], call->acks_pend_cnt ); /* mark the packets in the ACK queue as being provisionally * ACK'd */ ix = 0; spin_lock(&call->lock); /* find the first packet ACK'd/NAK'd here */ list_for_each(_p, &call->acks_pendq) { dmsg = list_entry(_p, struct rxrpc_message, link); if (dmsg->seq == seq) goto found_first; _debug("- %u: skipping #%u", ix, dmsg->seq); } goto bad_queue; found_first: do { _debug("- %u: processing #%u (%c) apc=%u", ix, dmsg->seq, _acktype[acks[ix]], call->acks_pend_cnt); if (acks[ix] == RXRPC_ACK_TYPE_ACK) { if (dmsg->state == RXRPC_MSG_SENT) call->acks_pend_cnt--; dmsg->state = RXRPC_MSG_ACKED; } else { if (dmsg->state == RXRPC_MSG_ACKED) call->acks_pend_cnt++; dmsg->state = RXRPC_MSG_SENT; } ix++; seq++; _p = dmsg->link.next; dmsg = list_entry(_p, struct rxrpc_message, link); } while(ix < chunk && _p != &call->acks_pendq && dmsg->seq == seq); if (ix < chunk) goto bad_queue; spin_unlock(&call->lock); } if (resend) rxrpc_call_resend(call, highest); /* if all packets are provisionally ACK'd, then wake up anyone who's * waiting for that */ now_complete = 0; spin_lock(&call->lock); if (call->acks_pend_cnt == 0) { if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_FINAL_ACK) { call->app_call_state = RXRPC_CSTATE_COMPLETE; _state(call); } now_complete = 1; } spin_unlock(&call->lock); if (now_complete) { _debug("- wake up waiters"); del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); call->app_attn_func(call); } _leave(" = 0 (apc=%u)", call->acks_pend_cnt); return 0; bad_queue: panic("%s(): acks_pendq in bad state (packet #%u absent)\n", __FUNCTION__, seq);} /* end rxrpc_call_record_ACK() *//*****************************************************************************//* * transfer data from the ready packet queue to the asynchronous read buffer * - since this func is the only one going to look at packets queued on * app_readyq, we don't need a lock to modify or access them, only to modify * the queue pointers * - called with call->lock held * - the buffer must be in kernel space * - returns: * 0 if buffer filled * -EAGAIN if buffer not filled and more data to come * -EBADMSG if last packet received and insufficient data left * -ECONNABORTED if the call has in an error state */static int __rxrpc_call_read_data(struct rxrpc_call *call){ struct rxrpc_message *msg; size_t qty; int ret; _enter("%p{as=%d buf=%p qty=%Zu/%Zu}", call, call->app_async_read, call->app_read_buf, call->app_ready_qty, call->app_mark); /* check the state */ switch (call->app_call_state) { case RXRPC_CSTATE_SRVR_RCV_ARGS: case RXRPC_CSTATE_CLNT_RCV_REPLY: if (call->app_last_rcv) { printk("%s(%p,%p,%Zd):" " Inconsistent call state (%s, last pkt)", __FUNCTION__, call, call->app_read_buf, call->app_mark, rxrpc_call_states[call->app_call_state]); BUG(); } break; case RXRPC_CSTATE_SRVR_RCV_OPID: case RXRPC_CSTATE_SRVR_GOT_ARGS: case RXRPC_CSTATE_CLNT_GOT_REPLY: break; case RXRPC_CSTATE_SRVR_SND_REPLY: if (!call->app_last_rcv) { printk("%s(%p,%p,%Zd):" " Inconsistent call state (%s, not last pkt)", __FUNCTION__, call, call->app_read_buf, call->app_mark, rxrpc_call_states[call->app_call_state]); BUG(); } _debug("Trying to read data from call in SND_REPLY state"); break; case RXRPC_CSTATE_ERROR: _leave(" = -ECONNABORTED"); return -ECONNABORTED; default:
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?