📄 udpcc.java
字号:
if (tinfo.conn.writable()) { add_to_rr(tinfo.conn); } } // Clean out the second_chance set. while (( !second_chance_timeouts.isEmpty()) && (second_chance_timeouts.getFirstPriority() + 60 * 1000 < now_ms)) { TimeoutInfoAndSeq p = (TimeoutInfoAndSeq) second_chance_timeouts.removeFirst(); second_chance.remove(p.seq); } } } /** * Class BandwidthCB * */ protected class BandwidthCB implements ASyncCore.TimerCB { /** * Method timer_cb * * @param user_data */ public void timer_cb(Object user_data) { long now_ms = System.currentTimeMillis(); if (DEBUG_MIN) { logger.info(" ib=" + in_bytes + " ip=" + in_pkts + " ob=" + out_bytes + " op=" + out_pkts); } in_bytes = in_pkts = out_bytes = out_pkts = 0; if ( !closed) { acore.register_timer(BW_STATS_PERIOD, this, null); } } } /** * Class StatsCB * */ protected class StatsCB implements ASyncCore.TimerCB { /** * Method timer_cb * * @param user_data */ public void timer_cb(Object user_data) { long now_ms = System.currentTimeMillis(); Iterator i = conns.keySet().iterator(); while (i.hasNext()) { InetSocketAddress addr = (InetSocketAddress) i.next(); Connection conn = (Connection) conns.get(addr); double ttfs = (double) conn.time_to_first_send; if (conn.time_to_first_send_cnt > 0) { ttfs /= conn.time_to_first_send_cnt; } else { ttfs = 0.0; } conn.time_to_first_send = 0; conn.time_to_first_send_cnt = 0; double tta = (double) conn.time_to_ack; if (conn.time_to_ack_cnt > 0) { tta /= conn.time_to_ack_cnt; } else { tta = 0.0; } conn.time_to_ack = 0; conn.time_to_ack_cnt = 0; if (conn.lastrcv + 30 * 1000 > now_ms) { logger.info(addr + " sa=" + (conn.sa >> 3) + " sv=" + (conn.sv >> 2) + " rto=" + conn.rto + " cwnd=" + conn.cwnd + " ssthresh=" + conn.ssthresh + " ttfs=" + ttfs + " tta=" + tta); } } if ( !closed) { acore.register_timer(STATS_PERIOD, this, null); } } } protected static final boolean USE_DIRECT = true; protected static final long MAX_RTO = 5 * 1000; protected static long BW_STATS_PERIOD; protected static final long STATS_PERIOD = 30 * 1000; protected static final double MAX_WND = 1000.0 * 1000.0; protected static final int MAX_MSG_SIZE = 16 * 1024; protected static final boolean LOG_BAD_NETWORK_EVENTS = true; protected static final boolean REUSE = true; protected static double timeout_factor; protected static double timeout_diff; protected boolean closed; protected ASyncCore acore; protected Serializer serializer; protected Sink sink; protected MyAckTimeoutCB ack_timeout_cb = new MyAckTimeoutCB(); protected SelectionKey skey; protected DatagramChannel channel; protected DatagramSocket sock; protected InetSocketAddress my_addr; protected Map unacked = new HashMap(); protected Map second_chance = new HashMap(); protected PriorityQueue second_chance_timeouts = new PriorityQueue(10); protected long next_msg_id; protected Connection rr_first, rr_last; protected Map conns = new HashMap(); protected ByteBuffer reuse_buf = USE_DIRECT ? ByteBuffer.allocateDirect(MAX_MSG_SIZE) : ByteBuffer.allocate(MAX_MSG_SIZE); protected long in_bytes, in_pkts, out_bytes, out_pkts; protected HashSet recently_seen_set = new HashSet(); protected LinkedList recently_seen_list = new LinkedList(); protected static int MAX_RECENTLY_SEEN_SIZE; /** * Method debugln * * @param msg */ protected void debugln(String msg) { logger.debug(msg); } /** * Class SrcAndMsgId * */ protected static class SrcAndMsgId { public InetSocketAddress src; public long msg_id; /** * Constructor SrcAndMsgId * * @param s * @param m */ public SrcAndMsgId(InetSocketAddress s, long m) { src = s; msg_id = m; } /** * Method hashCode * @return */ public int hashCode() { return src.hashCode() ^ (int) msg_id; } /** * Method equals * * @param rhs * @return */ public boolean equals(Object rhs) { SrcAndMsgId other = (SrcAndMsgId) rhs; return (msg_id == other.msg_id) && src.equals(other.src); } } /** * Method recently_seen * * @param src * @param msg_id * @return */ protected final boolean recently_seen(InetSocketAddress src, long msg_id) { SrcAndMsgId k = new SrcAndMsgId(src, msg_id); if (recently_seen_set.contains(k)) { return true; } else { recently_seen_list.addLast(k); recently_seen_set.add(k); if (recently_seen_list.size() > MAX_RECENTLY_SEEN_SIZE) { recently_seen_set.remove(recently_seen_list.removeFirst()); } return false; } } /** * Method inc_next_msg_id * @return */ protected final long inc_next_msg_id() { if (next_msg_id < 0) { BUG("next_msg_id=" + Long.toHexString(next_msg_id)); } if (next_msg_id == (Long.MAX_VALUE >>> 8)) { next_msg_id = 0; } else { ++next_msg_id; } return next_msg_id; } /** * Method msg_id * * @param seq * @return */ protected final long msg_id(long seq) { if (seq < 0) { throw new IllegalArgumentException("seq=" + Long.toHexString(seq) + " in msg_id"); } return seq >> 8; } /** * Method attempt * * @param seq * @return */ protected final int attempt(long seq) { if (seq < 0) { throw new IllegalArgumentException("seq=" + Long.toHexString(seq) + " in make_seq"); } return (int) (seq & 0x7fL); } /** * Method make_seq * * @param msg_id * @param attempt * @return */ protected final long make_seq(long msg_id, int attempt) { if (msg_id > (Long.MAX_VALUE >> 8)) { throw new IllegalArgumentException("msg_id=" + Long.toHexString(msg_id) + " in make_seq"); } if (attempt > 0x7f) { throw new IllegalArgumentException("attempt=" + Long.toHexString(attempt) + " in make_seq"); } long seq = (msg_id << 8) | attempt; if (seq < 0) { BUG("seq=" + Long.toHexString(seq) + " in make_seq"); } return seq; } /** * Method BUG * * @param e */ protected void BUG(Exception e) { logger.fatal("unhandled exception", e); System.exit(1); } /** * Method BUG * * @param msg */ protected void BUG(String msg) { Exception e = null; try { throw new Exception(); } catch (Exception c) { e = c; } logger.fatal(msg, e); System.exit(1); } /** * Method alloc_bb * * @param sz * @return */ protected final ByteBuffer alloc_bb(int sz) { if (REUSE) { reuse_buf.clear(); reuse_buf.limit(sz); return reuse_buf; } else { if (USE_DIRECT) { return ByteBuffer.allocateDirect(sz); } else { return ByteBuffer.allocate(sz); } } } /** * Method handle_ack * * @param seq */ protected final void handle_ack(Long seq) { TimeoutInfo tinfo = (TimeoutInfo) unacked.remove(seq); long now_ms = System.currentTimeMillis(); if (tinfo == null) { tinfo = (TimeoutInfo) second_chance.remove(seq); if (tinfo == null) { if (DEBUG) { debugln("got unexpected ack for seq 0x" + Long.toHexString(seq.longValue())); } } else { tinfo.conn.lastrcv = System.currentTimeMillis(); if (DEBUG_RTT) { debugln("2nd chance ack seq 0x" + Long.toHexString(seq.longValue()) + ", peer=" + tinfo.conn.addr + ", rtt=" + (now_ms - tinfo.send_ms) + ", now=" + now_ms); } tinfo.conn.add_rtt_meas(System.currentTimeMillis() - tinfo.send_ms); } return; } Connection conn = tinfo.conn; conn.time_to_ack += (now_ms - tinfo.start_ms); conn.time_to_ack_cnt += 1; conn.lastrcv = now_ms; if (DEBUG) { debugln("got ack for seq 0x" + Long.toHexString(seq.longValue())); } conn.add_rtt_meas(System.currentTimeMillis() - tinfo.send_ms); conn.inf.remove(seq); if (DEBUG) { debugln("conn=" + conn.addr + " cwnd=" + ((int) conn.cwnd) + " inf=" + conn.inf.size()); } tinfo.conn.consecutive_timeouts = 0; if (conn.writable()) { add_to_rr(conn); } if (tinfo.send_cb != null) { tinfo.send_cb.cb(tinfo.send_cb_data, true /* success */); } } /** * Method handle_inb_msg * * @param bb * @param src */ protected final void handle_inb_msg(ByteBuffer bb, InetSocketAddress src) { long ack = bb.getLong(); Long seq = new Long(bb.getLong()); long wait_ms = -1L; long est_rtt_ms = -1L; if (bb.limit() - bb.position() >= 8) { wait_ms = bb.getInt(); est_rtt_ms = bb.getInt(); } if (ack != -1L) { handle_ack(new Long(ack)); } long now_ms = System.currentTimeMillis(); Connection conn = (Connection) conns.get(src); if (conn == null) { conn = new Connection(src, now_ms); conns.put(src, conn); } // Send an ack. int tries = -1; if (seq.longValue() != -1L) { conn.ack_q.addLast(seq); tries = attempt(seq.longValue()); } if (conn.writable()) { add_to_rr(conn); } if ((seq.longValue() != -1L) && recently_seen(src, msg_id(seq.longValue()))) { if (DEBUG_RTT) { debugln("received duplicate for msg_id=0x" + Long.toHexString(msg_id(seq.longValue()))); } } else { Object msg = null; try { msg = serializer.deserialize(bb); } catch (Exception e) { if (DEBUG_MIN) { logger.debug("caught " + e + " deserializing a message from " + src, e); } return; } catch (OutOfMemoryError e) { System.gc(); StringBuffer bytes = new StringBuffer(bb.position() * 3); bb.position(0); int col = 0; while (bb.position() < bb.limit()) { String bs = Integer.toHexString(bb.get() & 0xff); if (bs.length() == 1) { bs = "0" + bs; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -