📄 udpcc.java
字号:
/** * Constructor TimeoutInfo * * @param other */ public TimeoutInfo(TimeoutInfo other) { msg = other.msg; conn = other.conn; send_ms = other.send_ms; attempt = other.attempt; timeout_ms = other.timeout_ms; cut_ssthresh = other.cut_ssthresh; send_cb = other.send_cb; send_cb_data = other.send_cb_data; msg_id = other.msg_id; } /** * Constructor TimeoutInfo * * @param c * @param m * @param n * @param to * @param scb * @param scbd */ public TimeoutInfo(Connection c, Object m, long n, long to, SendCB scb, Object scbd) { cut_ssthresh = true; conn = c; msg = m; attempt = 0; start_ms = n; timeout_ms = to; msg_id = -1; send_ms = 0; send_cb = scb; send_cb_data = scbd; } } /** * Keep track of all of the relavent information about another node we are * talking to. */ protected class Connection { public InetSocketAddress addr; /** * The mean RTT, variance of the RTT, and the round-trip timeout to * this host, all in milliseconds. The mean is scaled by a factor of * 8, and the variance is scaled by a factor of 4. See [Jac88]. */ public long sa, sv, rto; public int consecutive_timeouts; /** * The congestion window size and slow-start threshold. See [Jac88]. */ public double cwnd, ssthresh; /** * TimeoutInfo objects for each message in flight, indexed by sequence * number. */ public Map inf = new HashMap(); /** * The sequence numbers of acknowledgements we need to send out. */ public LinkedList ack_q = new LinkedList(); /** * TimeoutInfo objects for new messages to be sent out. */ public LinkedList send_q = new LinkedList(); /** * Non-conjestion controlled messages that need to be sent. */ public LinkedList probe_q = new LinkedList(); /** * TimeoutInfo objects for messages that need to be resent. */ public LinkedList retry_q = new LinkedList(); /** * The time in milliseconds since the epoch since we last sent a * message to this host and since we last received an acknowledgement * from them. */ public long lastsnd, lastrcv; /** * We use these to keep track of the time that the average message * spends in the send_q before being sent out on the wire. */ public long time_to_first_send; public int time_to_first_send_cnt; /** * We use these to keep track of the time that the average message * takes to be acknowledged. */ public long time_to_ack; public int time_to_ack_cnt; /** * We put messages on the wire from the send_q, ack_q, probe_q, and * retry_q in round-robin fashion, and this integer keeps track of * which one to pull out of next time we can write the socket. */ public int next_q; /** * Which connection is after us in the round-robin queue? */ public Connection next; /** * Are we currently in the round-robin queue? */ public boolean in_rr; /** * Constructor Connection * * @param a * @param now_ms */ public Connection(InetSocketAddress a, long now_ms) { addr = a; sa = -1; sv = 0; rto = MAX_RTO; lastrcv = now_ms; // so we don't immediately think they're down lastsnd = now_ms; // so we don't immediately throw them out cwnd = 1.0; ssthresh = MAX_WND; } /** * Do we have a regular msg to send, and are we within the congestion * window? * @return */ public final boolean can_send_msg() { return (( !send_q.isEmpty()) && (inf.size() < (int) cwnd)); } /** * Do we have a regular msg to retry, and are we within the congestion * window? * @return */ public final boolean can_send_retry() { return (( !retry_q.isEmpty()) && (inf.size() < (int) cwnd)); } /** * Do we have a regular msg to send or retry, and are we within the * congestion window? * @return */ public final boolean can_send_either() { if (DEBUG) { if (send_q.isEmpty() && retry_q.isEmpty()) { debugln("send_q and retry_q both empty"); } else if (inf.size() >= (int) cwnd) { debugln("inf.size >= cwnd"); } } return ((( !send_q.isEmpty()) || ( !retry_q.isEmpty())) && (inf.size() < (int) cwnd)); } /** * Do we have a nocc msg to send? * @return */ public final boolean can_send_probe() { return !probe_q.isEmpty(); } /** * Do we have an ack to send? * @return */ public final boolean can_send_ack() { return !ack_q.isEmpty(); } /** * Method writable * @return */ public final boolean writable() { // If we still have something to send on this connection, and // it's conjestion window is not yet full, or if we have an ack // to send, it is writable. if ( !ack_q.isEmpty()) { return true; } if ( !probe_q.isEmpty()) { return true; } if (retry_q.isEmpty() && send_q.isEmpty()) { return false; } return inf.size() < (int) cwnd; } /** * Method add_rtt_meas * * @param m */ public final void add_rtt_meas(long m) { long orig_m = m; if (sa == -1) { // After the first measurement, set the timeout to four // times the RTT. sa = m << 3; sv = 0; rto = (m << 2) + 10; // the 10 is to accont for GC } else { m -= (sa >> 3); sa += m; if (m < 0) { m = -1 * m; } m -= (sv >> 2); sv += m; rto = (sa >> 3) + sv + 10; // the 10 is to accont for GC } // Don't backoff past 1 second. if (rto > MAX_RTO) { if (DEBUG) { debugln("huge rto: conn=" + addr + " m=" + orig_m + " sa=" + sa + " sv=" + sv + " rto=" + rto); } rto = MAX_RTO; } if (cwnd < ssthresh) { // slow start cwnd += 1.0; } else { // increment by one cwnd += 1.0 / cwnd; } if (cwnd > MAX_WND) { cwnd = MAX_WND; } } /** * Method timeout */ public final void timeout() { rto <<= 1; // Don't backoff past 1 second. if (rto > MAX_RTO) { rto = MAX_RTO; } ssthresh = cwnd / 2.0; cwnd = 1.0; } } /** * Class MySelectableCB * */ protected class MySelectableCB implements ASyncCore.SelectableCB { /** * Method select_cb * * @param skey * @param user_data */ public void select_cb(SelectionKey skey, Object user_data) { // We want to do any available reads first, because if a message // has come in, it may generate a response. If so, we want it to // do so before we check for writability, so that if the socket is // writable, we can piggyback the ack for the incoming message on // the application-level response. if (DEBUG) { debugln("MySelectableCB readable=" + skey.isReadable() + ", writable=" + skey.isWritable() + ", want write=" + ((skey.interestOps() & SelectionKey.OP_WRITE) != 0)); } if (skey.isReadable()) { handle_readable(); } // UdpCC.close () may have been called in the last handle_inb_msg // of handle readable. if (closed) { return; } if (((skey.interestOps() & SelectionKey.OP_WRITE) != 0) && skey.isWritable()) { handle_writable(); } } } /** * Class TimeoutInfoAndSeq * */ protected static class TimeoutInfoAndSeq { public TimeoutInfo tinfo; public Long seq; /** * Constructor TimeoutInfoAndSeq * * @param t * @param s */ public TimeoutInfoAndSeq(TimeoutInfo t, Long s) { tinfo = t; seq = s; } } /** * Class MyAckTimeoutCB * */ protected class MyAckTimeoutCB implements ASyncCore.TimerCB { /** * Method timer_cb * * @param user_data */ public void timer_cb(Object user_data) { if (closed) { return; } Long seq = (Long) user_data; TimeoutInfo tinfo = (TimeoutInfo) unacked.remove(seq); if (DEBUG) { debugln("check timeout " + Long.toHexString(seq.longValue())); } long now_ms = System.currentTimeMillis(); if (tinfo == null) { // already acked if (DEBUG) { debugln("seq 0x" + Long.toHexString(seq.longValue()) + " already acked"); } } else { // timeout TimeoutInfo scti = new TimeoutInfo(tinfo); second_chance.put(seq, scti); second_chance_timeouts.add(new TimeoutInfoAndSeq(scti, seq), now_ms); if (DEBUG_RTT) { debugln("timeout seq 0x" + Long.toHexString(seq.longValue()) + ", peer=" + tinfo.conn.addr + ", rtt=" + (now_ms - tinfo.send_ms) + ", now=" + now_ms); } if (tinfo.cut_ssthresh) { ++tinfo.conn.consecutive_timeouts; tinfo.conn.timeout(); Iterator j = tinfo.conn.inf.keySet().iterator(); while (j.hasNext()) { Long s2 = (Long) j.next(); TimeoutInfo t2 = (TimeoutInfo) tinfo.conn.inf.get(s2); t2.cut_ssthresh = false; } } tinfo.conn.inf.remove(seq); LinkedList to_callback = null; if (tinfo.start_ms + tinfo.timeout_ms > now_ms) { tinfo.conn.retry_q.addLast(tinfo); } else if (tinfo.send_cb != null) { to_callback = new LinkedList(); to_callback.addLast(tinfo); } for (int queue = 0; queue < 2; ++queue) { Iterator i = (queue == 0) ? tinfo.conn.retry_q.iterator() : tinfo.conn.send_q.iterator(); while (i.hasNext()) { TimeoutInfo tmp = (TimeoutInfo) i.next(); if (tmp.start_ms + tmp.timeout_ms <= now_ms) { i.remove(); if (tmp.send_cb != null) { if (to_callback == null) { to_callback = new LinkedList(); } to_callback.addLast(tmp); } } } } if (to_callback != null) { if (DEBUG_MIN) { debugln("After " + tinfo.conn.consecutive_timeouts + " consecutive timeouts to " + tinfo.conn.addr + ", cancelling the following messages:"); } Iterator i = to_callback.iterator(); while (i.hasNext()) { TimeoutInfo tmp = (TimeoutInfo) i.next(); if (DEBUG_MIN) { debugln(" " + tmp.msg); } } while ( !to_callback.isEmpty()) { TimeoutInfo tmp = (TimeoutInfo) to_callback.removeFirst(); tmp.send_cb.cb(tmp.send_cb_data, false /* failure */); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -