⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udpcc.java

📁 High performance DB query
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                    bytes.append(bs);                    if (++col == 24) {                        bytes.append("\n");                        col = 0;                    } else if ((col > 0) && (col % 4 == 0)) {                        bytes.append(" ");                    }                }                logger.error("out of memory error deserializing bytes:\n"                             + bytes);            }            if (DEBUG && (seq.longValue() != -1L)) {                if (DEBUG_MIN) {                    debugln("received " + msg + " seq=0x"                            + Long.toHexString(seq.longValue()) + " from "                            + src);                }            }            sink.recv(msg, src, my_addr, tries, wait_ms, est_rtt_ms);        }    }    /**     * Method handle_readable     */    protected final void handle_readable() {        while (true) {            ByteBuffer bb = alloc_bb(MAX_MSG_SIZE);            InetSocketAddress src = null;            try {                src = (InetSocketAddress) channel.receive(bb);            } catch (SocketException e) {                // For some reason, Sun's j2sdk1.4.2 will occasionally throw                // one of these with the message "Connection reset by peer: no                // further information".  That doesn't make any sense to me,                // but I think we can safely ignore it.  We'll get the data                // for the next packet on the next select loop.                return;            } catch (IOException e) {                BUG(e);            }            if (bb.position() == 0) {                return;            }            bb.flip();            in_pkts += 1;            in_bytes += bb.limit() + 20    /* account for IP header */            ;            if (bb.limit() == 8) {                handle_ack(new Long(bb.getLong()));            } else {                handle_inb_msg(bb, src);            }            // UdpCC.close () may have been called from the user's handler            // in handle_ack or handle_inb_msg.            if (closed) {                return;            }        }    }    /**     * Method send_ack     *     * @param conn     * @return     */    protected final boolean send_ack(Connection conn) {        long now_ms = System.currentTimeMillis();        Long seq = (Long) conn.ack_q.getFirst();        if (DEBUG) {            debugln("sending ack seq 0x" + Long.toHexString(seq.longValue()));        }        ByteBuffer bb = alloc_bb(8);        bb.putLong(seq.longValue());        bb.rewind();        if (DEBUG) {            debugln("acking " + seq);        }        int n = 0;        try {            n = channel.send(bb, conn.addr);        } catch (IOException e) {            BUG(e);        }        if (n == 0) {            return false;        }        // Send was successful.        out_pkts += 1;        out_bytes += bb.limit() + 20    /* account for IP header */        ;        conn.ack_q.removeFirst();        return true;    }    /**     * Method send_probe     *     * @param conn     * @return     */    protected final boolean send_probe(Connection conn) {        long seq = -1L;        Object msg = conn.probe_q.getFirst();        if (DEBUG) {            debugln("sending probe");        }        // Piggyback an ACK if we have one.        long ack = -1L;        if ( !conn.ack_q.isEmpty()) {            ack = ((Long) conn.ack_q.getFirst()).longValue();            if (DEBUG) {                debugln("piggybacking ack " + ack);            }        }        int sz = serializer.serialize_size(msg)                 + 24    /* for ack+seq+wait_ms+est_rtt_ms */        ;        if (sz > MAX_MSG_SIZE) {            BUG("size=" + sz + " is greater than max size=" + MAX_MSG_SIZE                + " for msg " + msg);        }        long now_ms = System.currentTimeMillis();        ByteBuffer bb = alloc_bb(sz);        bb.putLong(ack);        bb.putLong(seq);        bb.putInt((int) 0);    // don't know how long it's been waiting        bb.putInt((int) (conn.sa >> 3));        serializer.serialize(msg, bb);        bb.rewind();        int n = 0;        try {            n = channel.send(bb, conn.addr);        } catch (IOException e) {            BUG(e);        }        if (n == 0) {            // Undo            if (DEBUG) {                debugln("send failed, will retry later");            }            return false;        }        // Send was successful.        if (ack != -1) {            conn.ack_q.removeFirst();        }        out_pkts += 1;        out_bytes += bb.limit() + 20    /* account for IP header */        ;        conn.probe_q.removeFirst();        return true;    }    /**     * Method send_msg     *     * @param conn     * @param retry     * @return     */    protected final boolean send_msg(Connection conn, boolean retry) {        TimeoutInfo tinfo = (TimeoutInfo) (retry                                           ? conn.retry_q.getFirst()                                           : conn.send_q.getFirst());        if (tinfo.msg_id == -1) {            tinfo.msg_id = inc_next_msg_id();        }        tinfo.attempt++;        long seq = make_seq(tinfo.msg_id, tinfo.attempt);        long now_ms = System.currentTimeMillis();        if (DEBUG_RTT) {            debugln("sending " + my_addr + " seq=0x" + Long.toHexString(seq)                    + ", now=" + now_ms);        }        // Piggyback an ACK if we have one.        long ack = -1L;        if ( !conn.ack_q.isEmpty()) {            ack = ((Long) conn.ack_q.getFirst()).longValue();            if (DEBUG) {                debugln("piggybacking ack " + ack);            }        }        long wait_ms = now_ms - tinfo.start_ms;        int sz = serializer.serialize_size(tinfo.msg)                 + 24    /* for ack+seq+wait_ms+est_rtt_ms */        ;        if (sz > MAX_MSG_SIZE) {            BUG("size=" + sz + " is greater than max size=" + MAX_MSG_SIZE                + " for msg " + tinfo.msg);        }        ByteBuffer bb = alloc_bb(sz);        bb.putLong(ack);        bb.putLong(seq);        bb.putInt((int) wait_ms);        bb.putInt((int) (conn.sa >> 3));        serializer.serialize(tinfo.msg, bb);        bb.rewind();        int n = 0;        try {            n = channel.send(bb, conn.addr);        } catch (IOException e) {            bb.rewind();            byte[] data = new byte[bb.limit() - bb.position()];            bb.get(data, 0, data.length);            logger.fatal("bb.pos=" + bb.position() + " bb.lim=" + bb.limit()                         + " addr="                         + conn.addr.getAddress().getHostAddress(), e);            System.exit(1);        }        if (n == 0) {            // Undo            if (DEBUG) {                debugln("send failed, will retry later");            }            return false;        }        // Send was successful.        out_pkts += 1;        out_bytes += bb.limit() + 20    /* account for IP header */        ;        if (ack != -1) {            conn.ack_q.removeFirst();        }        if (retry) {            conn.retry_q.removeFirst();            TimeoutInfo tnew = new TimeoutInfo(conn, tinfo.msg, tinfo.start_ms,                                               tinfo.timeout_ms, tinfo.send_cb,                                               tinfo.send_cb_data);            tnew.attempt = tinfo.attempt;            tnew.msg_id = tinfo.msg_id;            tinfo = tnew;        } else {            conn.send_q.removeFirst();            conn.time_to_first_send += (tinfo.send_ms - tinfo.start_ms);            conn.time_to_first_send_cnt += 1;        }        tinfo.send_ms = now_ms;        Long Seq = new Long(seq);        unacked.put(Seq, tinfo);        conn.inf.put(Seq, tinfo);        conn.lastsnd = now_ms;        long timeout_ms = Math.round(conn.rto * timeout_factor + timeout_diff);        if (DEBUG) {            debugln("setting timeout for " + timeout_ms);        }        acore.register_timer(timeout_ms, ack_timeout_cb, Seq);        return true;    }    /**     * Method add_to_rr     *     * @param conn     */    protected final void add_to_rr(Connection conn) {        // A connection can be added to the round robin queue waiting on        // writability, but then no longer be writable when it gets its turn,        // because in the interim a timeout has occurred and cwnd has been        // lowered.  As a result, conn.writable is not a good indicator of        // whether a connection is in the round robin queue already or        // not.  Instead, we check conn.in_rr.        if ( !conn.in_rr) {            if (DEBUG) {                debugln("adding conn=" + conn.addr + " to rr");            }            conn.in_rr = true;            if (rr_first == null) {                rr_first = rr_last = conn;            } else {                rr_last.next = conn;                rr_last = conn;            }            conn.next = null;            skey.interestOps(skey.interestOps() | SelectionKey.OP_WRITE);        } else {            if (DEBUG) {                debugln("conn=" + conn.addr + " already in rr");            }        }    }    /**     * Method handle_writable     */    protected void handle_writable() {        // For some reason, when the round-robin queue was implemented with        // java.util.LinkedList, the removeFirst statement kept throwing a        // NoSuchElement exception which--given that it immediately followed a        // !isEmpty check in a single-threaded system--made no sense.  So I've        // implemented the list by hand and it seems to have made the problem        // go away.  I love compiler bugs...        while (rr_first != null) {            Connection conn = rr_first;            rr_first = rr_first.next;            conn.in_rr = false;            if (conn.writable()) {                if (DEBUG) {                    debugln("handle_writable conn=" + conn.addr);                }                if (conn.can_send_either() || conn.can_send_probe()) {                    // If we have anything other than an ack to send, send                    // it, since the ack will get piggybacked along.                    // Cycle between congestion-controlled and non-cc msgs,                    // but only send one of the two before going on to the                    // next connection in the rr queue.                    boolean done = false;                    while ( !done) {                        switch (conn.next_q) {                        case 0:                            // Try the retry queue first.                            if (conn.can_send_retry()) {                                if ( !send_msg(conn, true /* retry */)) {                                    return;                                }                                done = true;                            } else if (conn.can_send_msg()) {                                if ( !send_msg(conn, false /* !retry */)) {                                    return;                                }                                done = true;                            } else if (DEBUG) {                                if (conn.send_q.isEmpty()                                        && conn.retry_q.isEmpty()) {                                    debugln("no msgs");                                } else {                                    debugln("cwnd full");                                }                            }                            break;                        case 1:                            if (conn.can_send_probe()) {                                if ( !send_probe(conn)) {                                    return;                                }                                done = true;                            } else {                                if (DEBUG) {                                    debugln("no probes");                                }                            }                            break;                        }                        conn.next_q = (conn.next_q + 1) % 2;                    }                } else {                    if (conn.can_send_ack()) {                        if ( !send_ack(conn)) {                            return;                        }                    } else if (DEBUG) {                        debugln("no acks");                    }                }            }            // If the connection is still writable, leave it in the round            // robin list.            if (conn.writable()) {                if (DEBUG) {                    debugln("still writable");                }                add_to_rr(conn);            }            // If there is nothing in the round robin list, stop selecting            // for writability.            if (rr_first == null) {                if (DEBUG) {                    debugln("rr empty");                }                skey.interestOps(SelectionKey.OP_READ);            } else {                if (DEBUG) {                    debugln("rr not empty");                }            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -