📄 udpcc.java
字号:
bytes.append(bs); if (++col == 24) { bytes.append("\n"); col = 0; } else if ((col > 0) && (col % 4 == 0)) { bytes.append(" "); } } logger.error("out of memory error deserializing bytes:\n" + bytes); } if (DEBUG && (seq.longValue() != -1L)) { if (DEBUG_MIN) { debugln("received " + msg + " seq=0x" + Long.toHexString(seq.longValue()) + " from " + src); } } sink.recv(msg, src, my_addr, tries, wait_ms, est_rtt_ms); } } /** * Method handle_readable */ protected final void handle_readable() { while (true) { ByteBuffer bb = alloc_bb(MAX_MSG_SIZE); InetSocketAddress src = null; try { src = (InetSocketAddress) channel.receive(bb); } catch (SocketException e) { // For some reason, Sun's j2sdk1.4.2 will occasionally throw // one of these with the message "Connection reset by peer: no // further information". That doesn't make any sense to me, // but I think we can safely ignore it. We'll get the data // for the next packet on the next select loop. return; } catch (IOException e) { BUG(e); } if (bb.position() == 0) { return; } bb.flip(); in_pkts += 1; in_bytes += bb.limit() + 20 /* account for IP header */ ; if (bb.limit() == 8) { handle_ack(new Long(bb.getLong())); } else { handle_inb_msg(bb, src); } // UdpCC.close () may have been called from the user's handler // in handle_ack or handle_inb_msg. if (closed) { return; } } } /** * Method send_ack * * @param conn * @return */ protected final boolean send_ack(Connection conn) { long now_ms = System.currentTimeMillis(); Long seq = (Long) conn.ack_q.getFirst(); if (DEBUG) { debugln("sending ack seq 0x" + Long.toHexString(seq.longValue())); } ByteBuffer bb = alloc_bb(8); bb.putLong(seq.longValue()); bb.rewind(); if (DEBUG) { debugln("acking " + seq); } int n = 0; try { n = channel.send(bb, conn.addr); } catch (IOException e) { BUG(e); } if (n == 0) { return false; } // Send was successful. out_pkts += 1; out_bytes += bb.limit() + 20 /* account for IP header */ ; conn.ack_q.removeFirst(); return true; } /** * Method send_probe * * @param conn * @return */ protected final boolean send_probe(Connection conn) { long seq = -1L; Object msg = conn.probe_q.getFirst(); if (DEBUG) { debugln("sending probe"); } // Piggyback an ACK if we have one. long ack = -1L; if ( !conn.ack_q.isEmpty()) { ack = ((Long) conn.ack_q.getFirst()).longValue(); if (DEBUG) { debugln("piggybacking ack " + ack); } } int sz = serializer.serialize_size(msg) + 24 /* for ack+seq+wait_ms+est_rtt_ms */ ; if (sz > MAX_MSG_SIZE) { BUG("size=" + sz + " is greater than max size=" + MAX_MSG_SIZE + " for msg " + msg); } long now_ms = System.currentTimeMillis(); ByteBuffer bb = alloc_bb(sz); bb.putLong(ack); bb.putLong(seq); bb.putInt((int) 0); // don't know how long it's been waiting bb.putInt((int) (conn.sa >> 3)); serializer.serialize(msg, bb); bb.rewind(); int n = 0; try { n = channel.send(bb, conn.addr); } catch (IOException e) { BUG(e); } if (n == 0) { // Undo if (DEBUG) { debugln("send failed, will retry later"); } return false; } // Send was successful. if (ack != -1) { conn.ack_q.removeFirst(); } out_pkts += 1; out_bytes += bb.limit() + 20 /* account for IP header */ ; conn.probe_q.removeFirst(); return true; } /** * Method send_msg * * @param conn * @param retry * @return */ protected final boolean send_msg(Connection conn, boolean retry) { TimeoutInfo tinfo = (TimeoutInfo) (retry ? conn.retry_q.getFirst() : conn.send_q.getFirst()); if (tinfo.msg_id == -1) { tinfo.msg_id = inc_next_msg_id(); } tinfo.attempt++; long seq = make_seq(tinfo.msg_id, tinfo.attempt); long now_ms = System.currentTimeMillis(); if (DEBUG_RTT) { debugln("sending " + my_addr + " seq=0x" + Long.toHexString(seq) + ", now=" + now_ms); } // Piggyback an ACK if we have one. long ack = -1L; if ( !conn.ack_q.isEmpty()) { ack = ((Long) conn.ack_q.getFirst()).longValue(); if (DEBUG) { debugln("piggybacking ack " + ack); } } long wait_ms = now_ms - tinfo.start_ms; int sz = serializer.serialize_size(tinfo.msg) + 24 /* for ack+seq+wait_ms+est_rtt_ms */ ; if (sz > MAX_MSG_SIZE) { BUG("size=" + sz + " is greater than max size=" + MAX_MSG_SIZE + " for msg " + tinfo.msg); } ByteBuffer bb = alloc_bb(sz); bb.putLong(ack); bb.putLong(seq); bb.putInt((int) wait_ms); bb.putInt((int) (conn.sa >> 3)); serializer.serialize(tinfo.msg, bb); bb.rewind(); int n = 0; try { n = channel.send(bb, conn.addr); } catch (IOException e) { bb.rewind(); byte[] data = new byte[bb.limit() - bb.position()]; bb.get(data, 0, data.length); logger.fatal("bb.pos=" + bb.position() + " bb.lim=" + bb.limit() + " addr=" + conn.addr.getAddress().getHostAddress(), e); System.exit(1); } if (n == 0) { // Undo if (DEBUG) { debugln("send failed, will retry later"); } return false; } // Send was successful. out_pkts += 1; out_bytes += bb.limit() + 20 /* account for IP header */ ; if (ack != -1) { conn.ack_q.removeFirst(); } if (retry) { conn.retry_q.removeFirst(); TimeoutInfo tnew = new TimeoutInfo(conn, tinfo.msg, tinfo.start_ms, tinfo.timeout_ms, tinfo.send_cb, tinfo.send_cb_data); tnew.attempt = tinfo.attempt; tnew.msg_id = tinfo.msg_id; tinfo = tnew; } else { conn.send_q.removeFirst(); conn.time_to_first_send += (tinfo.send_ms - tinfo.start_ms); conn.time_to_first_send_cnt += 1; } tinfo.send_ms = now_ms; Long Seq = new Long(seq); unacked.put(Seq, tinfo); conn.inf.put(Seq, tinfo); conn.lastsnd = now_ms; long timeout_ms = Math.round(conn.rto * timeout_factor + timeout_diff); if (DEBUG) { debugln("setting timeout for " + timeout_ms); } acore.register_timer(timeout_ms, ack_timeout_cb, Seq); return true; } /** * Method add_to_rr * * @param conn */ protected final void add_to_rr(Connection conn) { // A connection can be added to the round robin queue waiting on // writability, but then no longer be writable when it gets its turn, // because in the interim a timeout has occurred and cwnd has been // lowered. As a result, conn.writable is not a good indicator of // whether a connection is in the round robin queue already or // not. Instead, we check conn.in_rr. if ( !conn.in_rr) { if (DEBUG) { debugln("adding conn=" + conn.addr + " to rr"); } conn.in_rr = true; if (rr_first == null) { rr_first = rr_last = conn; } else { rr_last.next = conn; rr_last = conn; } conn.next = null; skey.interestOps(skey.interestOps() | SelectionKey.OP_WRITE); } else { if (DEBUG) { debugln("conn=" + conn.addr + " already in rr"); } } } /** * Method handle_writable */ protected void handle_writable() { // For some reason, when the round-robin queue was implemented with // java.util.LinkedList, the removeFirst statement kept throwing a // NoSuchElement exception which--given that it immediately followed a // !isEmpty check in a single-threaded system--made no sense. So I've // implemented the list by hand and it seems to have made the problem // go away. I love compiler bugs... while (rr_first != null) { Connection conn = rr_first; rr_first = rr_first.next; conn.in_rr = false; if (conn.writable()) { if (DEBUG) { debugln("handle_writable conn=" + conn.addr); } if (conn.can_send_either() || conn.can_send_probe()) { // If we have anything other than an ack to send, send // it, since the ack will get piggybacked along. // Cycle between congestion-controlled and non-cc msgs, // but only send one of the two before going on to the // next connection in the rr queue. boolean done = false; while ( !done) { switch (conn.next_q) { case 0: // Try the retry queue first. if (conn.can_send_retry()) { if ( !send_msg(conn, true /* retry */)) { return; } done = true; } else if (conn.can_send_msg()) { if ( !send_msg(conn, false /* !retry */)) { return; } done = true; } else if (DEBUG) { if (conn.send_q.isEmpty() && conn.retry_q.isEmpty()) { debugln("no msgs"); } else { debugln("cwnd full"); } } break; case 1: if (conn.can_send_probe()) { if ( !send_probe(conn)) { return; } done = true; } else { if (DEBUG) { debugln("no probes"); } } break; } conn.next_q = (conn.next_q + 1) % 2; } } else { if (conn.can_send_ack()) { if ( !send_ack(conn)) { return; } } else if (DEBUG) { debugln("no acks"); } } } // If the connection is still writable, leave it in the round // robin list. if (conn.writable()) { if (DEBUG) { debugln("still writable"); } add_to_rr(conn); } // If there is nothing in the round robin list, stop selecting // for writability. if (rr_first == null) { if (DEBUG) { debugln("rr empty"); } skey.interestOps(SelectionKey.OP_READ); } else { if (DEBUG) { debugln("rr not empty"); } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -