⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udpcc.java

📁 High performance DB query
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        /**         * Constructor TimeoutInfo         *         * @param other         */        public TimeoutInfo(TimeoutInfo other) {            msg = other.msg;            conn = other.conn;            send_ms = other.send_ms;            attempt = other.attempt;            timeout_ms = other.timeout_ms;            cut_ssthresh = other.cut_ssthresh;            send_cb = other.send_cb;            send_cb_data = other.send_cb_data;            msg_id = other.msg_id;        }        /**         * Constructor TimeoutInfo         *         * @param c         * @param m         * @param n         * @param to         * @param scb         * @param scbd         */        public TimeoutInfo(Connection c, Object m, long n, long to, SendCB scb,                           Object scbd) {            cut_ssthresh = true;            conn = c;            msg = m;            attempt = 0;            start_ms = n;            timeout_ms = to;            msg_id = -1;            send_ms = 0;            send_cb = scb;            send_cb_data = scbd;        }    }    /**     * Keep track of all of the relavent information about another node we are     * talking to.     */    protected class Connection {        public InetSocketAddress addr;        /**         * The mean RTT, variance of the RTT, and the round-trip timeout to         * this host, all in milliseconds.  The mean is scaled by a factor of         * 8, and the variance is scaled by a factor of 4.  See [Jac88].         */        public long sa, sv, rto;        public int consecutive_timeouts;        /**         * The congestion window size and slow-start threshold.  See [Jac88].         */        public double cwnd, ssthresh;        /**         * TimeoutInfo objects for each message in flight, indexed by sequence         * number.         */        public Map inf = new HashMap();        /**         * The sequence numbers of acknowledgements we need to send out.         */        public LinkedList ack_q = new LinkedList();        /**         * TimeoutInfo objects for new messages to be sent out.         */        public LinkedList send_q = new LinkedList();        /**         * Non-conjestion controlled messages that need to be sent.         */        public LinkedList probe_q = new LinkedList();        /**         * TimeoutInfo objects for messages that need to be resent.         */        public LinkedList retry_q = new LinkedList();        /**         * The time in milliseconds since the epoch since we last sent a         * message to this host and since we last received an acknowledgement         * from them.         */        public long lastsnd, lastrcv;        /**         * We use these to keep track of the time that the average message         * spends in the send_q before being sent out on the wire.         */        public long time_to_first_send;        public int time_to_first_send_cnt;        /**         * We use these to keep track of the time that the average message         * takes to be acknowledged.         */        public long time_to_ack;        public int time_to_ack_cnt;        /**         * We put messages on the wire from the send_q, ack_q, probe_q, and         * retry_q in round-robin fashion, and this integer keeps track of         * which one to pull out of next time we can write the socket.         */        public int next_q;        /**         * Which connection is after us in the round-robin queue?         */        public Connection next;        /**         * Are we currently in the round-robin queue?         */        public boolean in_rr;        /**         * Constructor Connection         *         * @param a         * @param now_ms         */        public Connection(InetSocketAddress a, long now_ms) {            addr = a;            sa = -1;            sv = 0;            rto = MAX_RTO;            lastrcv = now_ms;    // so we don't immediately think they're down            lastsnd = now_ms;    // so we don't immediately throw them out            cwnd = 1.0;            ssthresh = MAX_WND;        }        /**         * Do we have a regular msg to send, and are we within the congestion         * window?         * @return         */        public final boolean can_send_msg() {            return (( !send_q.isEmpty()) && (inf.size() < (int) cwnd));        }        /**         * Do we have a regular msg to retry, and are we within the congestion         * window?         * @return         */        public final boolean can_send_retry() {            return (( !retry_q.isEmpty()) && (inf.size() < (int) cwnd));        }        /**         * Do we have a regular msg to send or retry, and are we within the         * congestion window?         * @return         */        public final boolean can_send_either() {            if (DEBUG) {                if (send_q.isEmpty() && retry_q.isEmpty()) {                    debugln("send_q and retry_q both empty");                } else if (inf.size() >= (int) cwnd) {                    debugln("inf.size >= cwnd");                }            }            return ((( !send_q.isEmpty()) || ( !retry_q.isEmpty()))                    && (inf.size() < (int) cwnd));        }        /**         * Do we have a nocc msg to send?         * @return         */        public final boolean can_send_probe() {            return !probe_q.isEmpty();        }        /**         * Do we have an ack to send?         * @return         */        public final boolean can_send_ack() {            return !ack_q.isEmpty();        }        /**         * Method writable         * @return         */        public final boolean writable() {            // If we still have something to send on this connection, and            // it's conjestion window is not yet full, or if we have an ack            // to send, it is writable.            if ( !ack_q.isEmpty()) {                return true;            }            if ( !probe_q.isEmpty()) {                return true;            }            if (retry_q.isEmpty() && send_q.isEmpty()) {                return false;            }            return inf.size() < (int) cwnd;        }        /**         * Method add_rtt_meas         *         * @param m         */        public final void add_rtt_meas(long m) {            long orig_m = m;            if (sa == -1) {                // After the first measurement, set the timeout to four                // times the RTT.                sa = m << 3;                sv = 0;                rto = (m << 2) + 10;          // the 10 is to accont for GC            } else {                m -= (sa >> 3);                sa += m;                if (m < 0) {                    m = -1 * m;                }                m -= (sv >> 2);                sv += m;                rto = (sa >> 3) + sv + 10;    // the 10 is to accont for GC            }            // Don't backoff past 1 second.            if (rto > MAX_RTO) {                if (DEBUG) {                    debugln("huge rto: conn=" + addr + " m=" + orig_m + " sa="                            + sa + " sv=" + sv + " rto=" + rto);                }                rto = MAX_RTO;            }            if (cwnd < ssthresh) {    // slow start                cwnd += 1.0;            } else {                  // increment by one                cwnd += 1.0 / cwnd;            }            if (cwnd > MAX_WND) {                cwnd = MAX_WND;            }        }        /**         * Method timeout         */        public final void timeout() {            rto <<= 1;            // Don't backoff past 1 second.            if (rto > MAX_RTO) {                rto = MAX_RTO;            }            ssthresh = cwnd / 2.0;            cwnd = 1.0;        }    }    /**     * Class MySelectableCB     *     */    protected class MySelectableCB implements ASyncCore.SelectableCB {        /**         * Method select_cb         *         * @param skey         * @param user_data         */        public void select_cb(SelectionKey skey, Object user_data) {            // We want to do any available reads first, because if a message            // has come in, it may generate a response.  If so, we want it to            // do so before we check for writability, so that if the socket is            // writable, we can piggyback the ack for the incoming message on            // the application-level response.            if (DEBUG) {                debugln("MySelectableCB readable=" + skey.isReadable()                        + ", writable=" + skey.isWritable() + ", want write="                        + ((skey.interestOps() & SelectionKey.OP_WRITE) != 0));            }            if (skey.isReadable()) {                handle_readable();            }            // UdpCC.close () may have been called in the last handle_inb_msg            // of handle readable.            if (closed) {                return;            }            if (((skey.interestOps() & SelectionKey.OP_WRITE) != 0)                    && skey.isWritable()) {                handle_writable();            }        }    }    /**     * Class TimeoutInfoAndSeq     *     */    protected static class TimeoutInfoAndSeq {        public TimeoutInfo tinfo;        public Long seq;        /**         * Constructor TimeoutInfoAndSeq         *         * @param t         * @param s         */        public TimeoutInfoAndSeq(TimeoutInfo t, Long s) {            tinfo = t;            seq = s;        }    }    /**     * Class MyAckTimeoutCB     *     */    protected class MyAckTimeoutCB implements ASyncCore.TimerCB {        /**         * Method timer_cb         *         * @param user_data         */        public void timer_cb(Object user_data) {            if (closed) {                return;            }            Long seq = (Long) user_data;            TimeoutInfo tinfo = (TimeoutInfo) unacked.remove(seq);            if (DEBUG) {                debugln("check timeout " + Long.toHexString(seq.longValue()));            }            long now_ms = System.currentTimeMillis();            if (tinfo == null) {                // already acked                if (DEBUG) {                    debugln("seq 0x" + Long.toHexString(seq.longValue())                            + " already acked");                }            } else {                // timeout                TimeoutInfo scti = new TimeoutInfo(tinfo);                second_chance.put(seq, scti);                second_chance_timeouts.add(new TimeoutInfoAndSeq(scti, seq),                                           now_ms);                if (DEBUG_RTT) {                    debugln("timeout seq 0x"                            + Long.toHexString(seq.longValue()) + ", peer="                            + tinfo.conn.addr + ", rtt="                            + (now_ms - tinfo.send_ms) + ", now=" + now_ms);                }                if (tinfo.cut_ssthresh) {                    ++tinfo.conn.consecutive_timeouts;                    tinfo.conn.timeout();                    Iterator j = tinfo.conn.inf.keySet().iterator();                    while (j.hasNext()) {                        Long s2 = (Long) j.next();                        TimeoutInfo t2 = (TimeoutInfo) tinfo.conn.inf.get(s2);                        t2.cut_ssthresh = false;                    }                }                tinfo.conn.inf.remove(seq);                LinkedList to_callback = null;                if (tinfo.start_ms + tinfo.timeout_ms > now_ms) {                    tinfo.conn.retry_q.addLast(tinfo);                } else if (tinfo.send_cb != null) {                    to_callback = new LinkedList();                    to_callback.addLast(tinfo);                }                for (int queue = 0; queue < 2; ++queue) {                    Iterator i = (queue == 0)                                 ? tinfo.conn.retry_q.iterator()                                 : tinfo.conn.send_q.iterator();                    while (i.hasNext()) {                        TimeoutInfo tmp = (TimeoutInfo) i.next();                        if (tmp.start_ms + tmp.timeout_ms <= now_ms) {                            i.remove();                            if (tmp.send_cb != null) {                                if (to_callback == null) {                                    to_callback = new LinkedList();                                }                                to_callback.addLast(tmp);                            }                        }                    }                }                if (to_callback != null) {                    if (DEBUG_MIN) {                        debugln("After " + tinfo.conn.consecutive_timeouts                                + " consecutive timeouts to " + tinfo.conn.addr                                + ", cancelling the following messages:");                    }                    Iterator i = to_callback.iterator();                    while (i.hasNext()) {                        TimeoutInfo tmp = (TimeoutInfo) i.next();                        if (DEBUG_MIN) {                            debugln("    " + tmp.msg);                        }                    }                    while ( !to_callback.isEmpty()) {                        TimeoutInfo tmp =                            (TimeoutInfo) to_callback.removeFirst();                        tmp.send_cb.cb(tmp.send_cb_data, false /* failure */);                    }                }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -