call.c
来自「Linux Kernel 2.6.9 for OMAP1710」· C语言 代码 · 共 2,276 行 · 第 1/4 页
C
2,276 行
static int rxrpc_call_generate_ACK(struct rxrpc_call *call, struct rxrpc_header *hdr, struct rxrpc_ackpacket *ack){ struct rxrpc_message *msg; rxrpc_seq_t seq; unsigned offset; int ret = 0, err; u8 special_ACK, do_ACK, force; _enter("%p,%p { seq=%d tp=%d fl=%02x }", call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags); seq = ntohl(hdr->seq); offset = seq - call->ackr_win_bot; do_ACK = RXRPC_ACK_DELAY; special_ACK = 0; force = (seq == 1); if (call->ackr_high_seq < seq) call->ackr_high_seq = seq; /* deal with generation of obvious special ACKs first */ if (ack && ack->reason == RXRPC_ACK_PING) { special_ACK = RXRPC_ACK_PING_RESPONSE; ret = 1; goto gen_ACK; } if (seq < call->ackr_win_bot) { special_ACK = RXRPC_ACK_DUPLICATE; ret = 1; goto gen_ACK; } if (seq >= call->ackr_win_top) { special_ACK = RXRPC_ACK_EXCEEDS_WINDOW; ret = 1; goto gen_ACK; } if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) { special_ACK = RXRPC_ACK_DUPLICATE; ret = 1; goto gen_ACK; } /* okay... it's a normal data packet inside the ACK window */ call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK; if (offset < call->ackr_pend_cnt) { } else if (offset > call->ackr_pend_cnt) { do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE; call->ackr_pend_cnt = offset; goto gen_ACK; } if (hdr->flags & RXRPC_REQUEST_ACK) { do_ACK = RXRPC_ACK_REQUESTED; } /* generate an ACK on the final packet of a reply just received */ if (hdr->flags & RXRPC_LAST_PACKET) { if (call->conn->out_clientflag) force = 1; } else if (!(hdr->flags & RXRPC_MORE_PACKETS)) { do_ACK = RXRPC_ACK_REQUESTED; } /* re-ACK packets previously received out-of-order */ for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++) if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK) break; call->ackr_pend_cnt = offset; /* generate an ACK if we fill up the window */ if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE) force = 1; gen_ACK: _debug("%05lu ACKs pend=%u norm=%s special=%s%s", jiffies - call->cjif, call->ackr_pend_cnt, rxrpc_acks[do_ACK], rxrpc_acks[special_ACK], force ? " immediate" : do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" : hdr->flags & RXRPC_LAST_PACKET ? " finalise" : " defer" ); /* send any pending normal ACKs if need be */ if (call->ackr_pend_cnt > 0) { /* fill out the appropriate form */ call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq, 65535U)); call->ackr.firstPacket = htonl(call->ackr_win_bot); call->ackr.previousPacket = call->ackr_prev_seq; call->ackr.serial = hdr->serial; call->ackr.nAcks = call->ackr_pend_cnt; if (do_ACK == RXRPC_ACK_REQUESTED) call->ackr.reason = do_ACK; /* generate the ACK immediately if necessary */ if (special_ACK || force) { err = __rxrpc_call_gen_normal_ACK( call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq); if (err < 0) { ret = err; goto out; } } } if (call->ackr.reason == RXRPC_ACK_REQUESTED) call->ackr_dfr_seq = seq; /* start the ACK timer if not running if there are any pending deferred * ACKs */ if (call->ackr_pend_cnt > 0 && call->ackr.reason != RXRPC_ACK_REQUESTED && !timer_pending(&call->ackr_dfr_timo) ) { unsigned long timo; timo = rxrpc_call_dfr_ack_timeout + jiffies; _debug("START ACKR TIMER for cj=%lu", timo - call->cjif); spin_lock(&call->lock); mod_timer(&call->ackr_dfr_timo, timo); spin_unlock(&call->lock); } else if ((call->ackr_pend_cnt == 0 || call->ackr.reason == RXRPC_ACK_REQUESTED) && timer_pending(&call->ackr_dfr_timo) ) { /* stop timer if no pending ACKs */ _debug("CLEAR ACKR TIMER"); del_timer_sync(&call->ackr_dfr_timo); } /* send a special ACK if one is required */ if (special_ACK) { struct rxrpc_ackpacket ack; struct kvec diov[2]; uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK }; /* fill out the appropriate form */ ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE); ack.maxSkew = htons(min(call->ackr_high_seq - seq, 65535U)); ack.firstPacket = htonl(call->ackr_win_bot); ack.previousPacket = call->ackr_prev_seq; ack.serial = hdr->serial; ack.reason = special_ACK; ack.nAcks = 0; _proto("Rx Sending s-ACK" " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", ntohs(ack.maxSkew), ntohl(ack.firstPacket), ntohl(ack.previousPacket), ntohl(ack.serial), rxrpc_acks[ack.reason], ack.nAcks); diov[0].iov_len = sizeof(struct rxrpc_ackpacket); diov[0].iov_base = &ack; diov[1].iov_len = sizeof(acks); diov[1].iov_base = acks; /* build and send the message */ err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK, hdr->seq ? 2 : 1, diov, GFP_KERNEL, &msg); if (err < 0) { ret = err; goto out; } msg->seq = seq; msg->hdr.seq = htonl(seq); msg->hdr.flags |= RXRPC_SLOW_START_OK; err = rxrpc_conn_sendmsg(call->conn, msg); rxrpc_put_message(msg); if (err < 0) { ret = err; goto out; } call->pkt_snd_count++; } out: if (hdr->seq) call->ackr_prev_seq = hdr->seq; _leave(" = %d", ret); return ret;} /* end rxrpc_call_generate_ACK() *//*****************************************************************************//* * handle work to be done on a call * - includes packet reception and timeout processing */void rxrpc_call_do_stuff(struct rxrpc_call *call){ _enter("%p{flags=%lx}", call, call->flags); /* handle packet reception */ if (call->flags & RXRPC_CALL_RCV_PKT) { _debug("- receive packet"); call->flags &= ~RXRPC_CALL_RCV_PKT; rxrpc_call_receive_packet(call); } /* handle overdue ACKs */ if (call->flags & RXRPC_CALL_ACKS_TIMO) { _debug("- overdue ACK timeout"); call->flags &= ~RXRPC_CALL_ACKS_TIMO; rxrpc_call_resend(call, call->snd_seq_count); } /* handle lack of reception */ if (call->flags & RXRPC_CALL_RCV_TIMO) { _debug("- reception timeout"); call->flags &= ~RXRPC_CALL_RCV_TIMO; rxrpc_call_abort(call, -EIO); } /* handle deferred ACKs */ if (call->flags & RXRPC_CALL_ACKR_TIMO || (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED) ) { _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u", jiffies - call->cjif, rxrpc_acks[call->ackr.reason], call->ackr.nAcks); call->flags &= ~RXRPC_CALL_ACKR_TIMO; if (call->ackr.nAcks > 0 && call->app_call_state != RXRPC_CSTATE_ERROR) { /* generate ACK */ __rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq); call->ackr_dfr_seq = 0; } } _leave("");} /* end rxrpc_call_do_stuff() *//*****************************************************************************//* * send an abort message at call or connection level * - must be called with call->lock held * - the supplied error code is sent as the packet data */static int __rxrpc_call_abort(struct rxrpc_call *call, int errno){ struct rxrpc_connection *conn = call->conn; struct rxrpc_message *msg; struct kvec diov[1]; int ret; __be32 _error; _enter("%p{%08x},%p{%d},%d", conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno); /* if this call is already aborted, then just wake up any waiters */ if (call->app_call_state == RXRPC_CSTATE_ERROR) { spin_unlock(&call->lock); call->app_error_func(call); _leave(" = 0"); return 0; } rxrpc_get_call(call); /* change the state _with_ the lock still held */ call->app_call_state = RXRPC_CSTATE_ERROR; call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT; call->app_errno = errno; call->app_mark = RXRPC_APP_MARK_EOF; call->app_read_buf = NULL; call->app_async_read = 0; _state(call); /* ask the app to translate the error code */ call->app_aemap_func(call); spin_unlock(&call->lock); /* flush any outstanding ACKs */ del_timer_sync(&call->acks_timeout); del_timer_sync(&call->rcv_timeout); del_timer_sync(&call->ackr_dfr_timo); if (rxrpc_call_is_ack_pending(call)) __rxrpc_call_gen_normal_ACK(call, 0); /* send the abort packet only if we actually traded some other * packets */ ret = 0; if (call->pkt_snd_count || call->pkt_rcv_count) { /* actually send the abort */ _proto("Rx Sending Call ABORT { data=%d }", call->app_abort_code); _error = htonl(call->app_abort_code); diov[0].iov_len = sizeof(_error); diov[0].iov_base = &_error; ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT, 1, diov, GFP_KERNEL, &msg); if (ret == 0) { ret = rxrpc_conn_sendmsg(conn, msg); rxrpc_put_message(msg); } } /* tell the app layer to let go */ call->app_error_func(call); rxrpc_put_call(call); _leave(" = %d", ret); return ret;} /* end __rxrpc_call_abort() *//*****************************************************************************//* * send an abort message at call or connection level * - the supplied error code is sent as the packet data */int rxrpc_call_abort(struct rxrpc_call *call, int error){ spin_lock(&call->lock); return __rxrpc_call_abort(call, error);} /* end rxrpc_call_abort() *//*****************************************************************************//* * process packets waiting for this call */static void rxrpc_call_receive_packet(struct rxrpc_call *call){ struct rxrpc_message *msg; struct list_head *_p; _enter("%p", call); rxrpc_get_call(call); /* must not go away too soon if aborted by * app-layer */ while (!list_empty(&call->rcv_receiveq)) { /* try to get next packet */ _p = NULL; spin_lock(&call->lock); if (!list_empty(&call->rcv_receiveq)) { _p = call->rcv_receiveq.next; list_del_init(_p); } spin_unlock(&call->lock); if (!_p) break; msg = list_entry(_p, struct rxrpc_message, link); _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)", jiffies - call->cjif, rxrpc_pkts[msg->hdr.type], ntohl(msg->hdr.serial), msg->seq, msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-', msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-', msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-', msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-', msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S' ); switch (msg->hdr.type) { /* deal with data packets */ case RXRPC_PACKET_TYPE_DATA: /* ACK the packet if necessary */ switch (rxrpc_call_generate_ACK(call, &msg->hdr, NULL)) { case 0: /* useful packet */ rxrpc_call_receive_data_packet(call, msg); break; case 1: /* duplicate or out-of-window packet */ break; default: rxrpc_put_message(msg); goto out; } break; /* deal with ACK packets */ case RXRPC_PACKET_TYPE_ACK: rxrpc_call_receive_ack_packet(call, msg); break; /* deal with abort packets */ case RXRPC_PACKET_TYPE_ABORT: { __be32 _dbuf, *dp; dp = skb_header_pointer(msg->pkt, msg->offset, sizeof(_dbuf), &_dbuf); if (dp == NULL) printk("Rx Received short ABORT packet\n"); _proto("Rx Received Call ABORT { data=%d }", (dp ? ntohl(*dp) : 0)); spin_lock(&call->lock); call->app_call_state = RXRPC_CSTATE_ERROR; call->app_err_state = RXRPC_ESTATE_PEER_ABORT; call->app_abort_code = (dp ? ntohl(*dp) : 0); call->app_errno = -ECONNABORTED; call->app_mark = RXRPC_APP_MARK_EOF; call->app_read_buf = NULL; call->app_async_read = 0; /* ask the app to translate the error code */ call->app_aemap_func(call); _state(call); spin_unlock(&call->lock); call->app_error_func(call); break; } default: /* deal with other packet types */ _proto("Rx Unsupported packet type %u (#%u)", msg->hdr.type, msg->seq); break; } rxrpc_put_message(msg); } out: rxrpc_put_call(call); _leave("");} /* end rxrpc_call_receive_packet() *//*****************************************************************************//* * process next data packet * - as the next data packet arrives: * - it is queued on app_readyq _if_ it is the next one expected * (app_ready_seq+1) * - it is queued on app_unreadyq _if_ it is not the next one expected * - if a packet placed on app_readyq completely fills a hole leading up to * the first packet on app_unreadyq, then packets now in sequence are * tranferred to app_readyq * - the application layer can only see packets on app_readyq * (app_ready_qty bytes) * - the application layer is prodded every time a new packet arrives */static void rxrpc_call_receive_data_packet(struct rxrpc_call *call, struct rxrpc_message *msg){ const struct rxrpc_operation *optbl, *op; struct rxrpc_message *pmsg; struct list_head *_p; int ret, lo, hi, rmtimo; __be32 opid; _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq); rxrpc_get_message(msg); /* add to the unready queue if we'd have to create a hole in the ready * queue otherwise */ if (msg->seq != call->app_ready_seq + 1) { _debug("Call add packet %d to unreadyq", msg->seq); /* insert in seq order */ list_for_each(_p, &call->app_unreadyq) { pmsg = list_entry(_p, struct rxrpc_message, link); if (pmsg->seq > msg->seq) break; } list_add_tail(&msg->link, _p); _leave(" [unreadyq]"); return; } /* next in sequence - simply append into the call's ready queue */ _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)", msg->seq, msg->dsize, call->app_ready_qty); spin_lock(&call->lock); call->app_ready_seq = msg->seq; call->app_ready_qty += msg->dsize; list_add_tail(&msg->link, &call->app_readyq); /* move unready packets to the readyq if we got rid of a hole */ while (!list_empty(&call->app_unreadyq)) { pmsg = list_entry(call->app_unreadyq.next, struct rxrpc_message, link); if (pmsg->seq != call->app_ready_seq + 1) break; /* next in sequence - just move list-to-list */ _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)", pmsg->seq, pmsg->dsize, call->app_ready_qty); call->app_ready_seq = pmsg->seq; call->app_ready_qty += pmsg->dsize; list_del_init(&pmsg->link); list_add_tail(&pmsg->link, &call->app_readyq); } /* see if we've got the last packet yet */ if (!list_empty(&call->app_readyq)) { pmsg = list_entry(call->app_readyq.prev, struct rxrpc_message, link); if (pmsg->hdr.flags & RXRPC_LAST_PACKET) { call->app_last_rcv = 1; _debug("Last packet on readyq"); } } switch (call->app_call_state) { /* do nothing if call already aborted */ case RXRPC_CSTATE_ERROR: spin_unlock(&call->lock); _leave(" [error]"); return; /* extract the operation ID from an incoming call if that's not * yet been done */ case RXRPC_CSTATE_SRVR_RCV_OPID: spin_unlock(&call->lock); /* handle as yet insufficient data for the operation ID */ if (call->app_ready_qty < 4) { if (call->app_last_rcv) /* trouble - last packet seen */ rxrpc_call_abort(call, -EINVAL); _leave(""); return; } /* pull the operation ID out of the buffer */ ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0); if (ret < 0) { printk("Unexpected error from read-data: %d\n", ret); if (call->app_call_state != RXRPC_CSTATE_ERROR)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?