⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udpcc.java

📁 High performance DB query
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/* * @(#)$Id: UdpCC.java,v 1.13 2004/10/14 19:49:04 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704.  Attention:  Intel License Inquiry. *//* * Copyright (c) 2001-2003 Regents of the University of California. * All rights reserved. * * See the file LICENSE included in this distribution for details. */package runtime.services.network.udpcc;import java.io.IOException;import java.net.DatagramSocket;import java.net.InetSocketAddress;import java.net.SocketException;import java.nio.ByteBuffer;import java.nio.channels.DatagramChannel;import java.nio.channels.SelectionKey;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.LinkedList;import java.util.Map;import org.apache.log4j.Level;import org.apache.log4j.Logger;import runtime.schedulers.ASyncCore;import util.PriorityQueue;/** * A TCP-friendly datagram layer. * * @author Sean C. Rhea * @version $Id: UdpCC.java,v 1.13 2004/10/14 19:49:04 huebsch Exp $ */public class UdpCC {    protected boolean DEBUG_MIN = false;    protected boolean DEBUG_RTT = false;    protected boolean DEBUG = false;    protected int SOCKBUF_MAX = 1048575;    protected Logger logger;    /**     * Interface Serializer     *     */    public static interface Serializer {        /**         * Called to determine the size of the byte array needed to         * serialize the given <code>msg</code> into.         *         * @param msg         * @return         */        int serialize_size(Object msg);        /**         * Called to serialize the given <code>msg</code> into the provided         * <code>buf</code>; the inverse of <code>deserialize</code>.         *         * @param msg         * @param buf         */        void serialize(Object msg, ByteBuffer buf);        /**         * Called to deserialize the given <code>buf</code> into the message         * it represents; the inverse of <code>serialize</code>.         *         * @param buf         * @return         * @throws Exception         */        Object deserialize(ByteBuffer buf) throws Exception;    }    /**     * Interface Sink     *     */    public static interface Sink {        /**         * Called when a message is received.         *         * @param msg        the message that was received         *         * @param src        the host and port from which the message was         *                   sent.  The source is not authenticated; this         *                   value is just what is read out of the IP packet         *                   header         *         * @param local      the localhost and port on which the message was         *                   received         *         * @param tries      the number of times the message has been sent,         *                   including this one, or -1 if that information is         *                   not available (for example, if the message was         *                   sent with {@link #send_nocc})         *         * @param wait_ms    the time in milliseconds that the message sat in         *                   the sender's outbound queue before being sent,         *                   presumably because it was waiting behind other         *                   messages, or -1 if that information is not         *                   available         *         * @param est_rtt_ms the sender's estimate of the round trip time in         *                   milliseconds between it and this host, or -1 if         *                   that information is not available         */        void recv(Object msg, InetSocketAddress src, InetSocketAddress local,                  int tries, long wait_ms, long est_rtt_ms);    }    /**     * Interface SendCB     *     */    public static interface SendCB {        /**         * The callback indicating the UdpCC layer is done with a particular         * message.         *         * @param user_data     the data supplied to <code>send</code>         * @param success       whether the message was acknowledged by the         *                      recipient         */        void cb(Object user_data, boolean success);    }    /**     * Construct a new UdpCC object with a UDP socket bound to     * <code>address<code> and start listening for messages.     *     * @param core     * @param address     * @param slz     * @param snk     */    public UdpCC(ASyncCore core, InetSocketAddress address, Serializer slz,                 Sink snk) {        logger = Logger.getLogger(getClass());        acore = core;        my_addr = address;        serializer = slz;        sink = snk;        next_msg_id = System.currentTimeMillis() >> 8;        if (next_msg_id < 0) {            BUG("next_msg_id=" + next_msg_id);        }        try {            channel = DatagramChannel.open();            sock = channel.socket();            int rxsize = sock.getReceiveBufferSize();            logger.info("initial rcv sobuf = " + rxsize);            sock.bind(my_addr);            channel.configureBlocking(false);            skey = acore.register_selectable(channel, SelectionKey.OP_READ,                                             new MySelectableCB(), null);        } catch (IOException e) {            logger.fatal("could not open " + my_addr, e);            System.exit(1);        }        // acore.register_timer (STATS_PERIOD, new StatsCB (), null);        acore.register_timer(BW_STATS_PERIOD, new BandwidthCB(), null);    }    /**     * Closes the socket associated with this object, removes its callbacks     * from ASyncCore, and causes it to stop responding to any outstanding     * timers it has registered--in other words, turns it off.  There is no     * way to turn it back on; instead, just create a new one.     */    public void close() {        acore.unregister_selectable(skey);        sock.close();    // closes channel with it        closed = true;    }    /**     * Method debug_level     * @return     */    public int debug_level() {        return ((DEBUG)                ? 0x4                : 0x0) | ((DEBUG_RTT)                          ? 0x2                          : 0x0) | ((DEBUG_MIN)                                    ? 0x1                                    : 0x0);    }    /**     * Method set_debug_level     *     * @param value     */    public void set_debug_level(int value) {        DEBUG_MIN = ((value & 0x1) != 0);        DEBUG_RTT = ((value & 0x2) != 0);        DEBUG = ((value & 0x4) != 0);        if (value > 0) {            logger.setLevel(Level.DEBUG);        }    }    /**     * Method set_sockbuf_size     *     * @param value     */    public void set_sockbuf_size(int value) {        if ((value > 0) && (value <= SOCKBUF_MAX)) {            try {                sock.setReceiveBufferSize(value);                int rxsize = sock.getReceiveBufferSize();                logger.info("set rcv sobuf " + value + "; got " + rxsize);            } catch (SocketException e) {                logger.fatal("could not set socket buffer size " + value, e);                System.exit(1);            }        }    }    /**     * Method set_timeout_factor     *     * @param value     */    public void set_timeout_factor(double value) {        timeout_factor = value;        logger.info("timeout_factor=" + timeout_factor);    }    /**     * Method set_timeout_diff     *     * @param value     */    public void set_timeout_diff(double value) {        timeout_diff = value;        logger.info("timeout_diff=" + timeout_diff);    }    /**     * This is just a one-shot, minimal-delay send without an acknowledgement;     * <i>Do not use this function unless you are providing some     * application-level congestion control</i>.  Messages send with this     * function go into their own per-destination queue, separate from     * messages sent with {@link #send}.     *     * @param msg     * @param dst     */    public void send_nocc(Object msg, InetSocketAddress dst) {        long now_ms = System.currentTimeMillis();        Connection conn = (Connection) conns.get(dst);        if (conn == null) {            conn = new Connection(dst, now_ms);            conns.put(dst, conn);        }        conn.probe_q.addLast(msg);        if (conn.writable()) {            add_to_rr(conn);        }    }    /**     * Send a congestion-controlled message to another host.  Use this     * function, and your communtication will be TCP-friendly, which is good.     * Once the message is sent, or when <code>tries</code> timeouts have     * occurred while trying to send it, the give callback will be called.     *     * @param msg             the message to send     * @param dst             the message's destination host and port     * @param timeout_sec     * @param cb              the callback to call when the message is     *                        sent, or when the number of tries is exhausted     * @param user_data       the application-specific data to call that     *                        callback with     */    public void send(Object msg, InetSocketAddress dst, long timeout_sec,                     SendCB cb, Object user_data) {        long now_ms = System.currentTimeMillis();        if (DEBUG) {            debugln("got " + msg + " to send to " + dst);        }        Connection conn = (Connection) conns.get(dst);        if (conn == null) {            conn = new Connection(dst, now_ms);            conns.put(dst, conn);        }        conn.send_q.addLast(new TimeoutInfo(conn, msg, now_ms,                                            timeout_sec * 1000, cb, user_data));        if (DEBUG) {            debugln("conn for " + dst + ": send_q.size=" + conn.send_q.size()                    + ", retry_q.size=" + conn.retry_q.size()                    + ", second_chance.size=" + second_chance.size());        }        if (conn.writable()) {            add_to_rr(conn);        } else if (DEBUG) {            debugln("not yet writable");        }    }    /**     * Returns the number of milliseconds since the epoch of the last time we     * sent a message to this peer, or 0 if we haven't sent a message to them     * at all.     *     * @param peer     * @return     */    public long last_send(InetSocketAddress peer) {        Connection conn = (Connection) conns.get(peer);        if (conn == null) {            return 0;        } else {            return conn.lastsnd;        }    }    /**     * Returns the number of milliseconds since the epoch of the last time we     * received an acknowledgement from this peer, or 0 if we have yet to     * receive an acknowledgement from them.     *     * @param peer     * @return     */    public long last_recv(InetSocketAddress peer) {        Connection conn = (Connection) conns.get(peer);        if (conn == null) {            return 0;        } else {            return conn.lastrcv;        }    }    /**     * Returns the current estimate of the mean latency to this peer, or -1 if     * there is no current estimate.     *     * @param peer     * @return     */    public long latency_mean(InetSocketAddress peer) {        Connection conn = (Connection) conns.get(peer);        if (conn == null) {            return -1L;        } else {            return conn.sa >> 3;        }    }    /**     * Returns the number of messages waiting to be sent or currently in     * flight to this peer.     *     * @param peer     * @return     */    public int queued_msgs(InetSocketAddress peer) {        Connection conn = (Connection) conns.get(peer);        if (conn == null) {            return 0;        } else {            return conn.retry_q.size() + conn.send_q.size()                   + conn.probe_q.size() + conn.inf.size();        }    }    /**     * Returns the number of messages waiting to be sent or currently in     * flight to all peers.     * @return     */    public int queued_msgs() {        int result = 0;        Iterator i = conns.keySet().iterator();        while (i.hasNext()) {            InetSocketAddress peer = (InetSocketAddress) i.next();            Connection conn = (Connection) conns.get(peer);            result += conn.retry_q.size() + conn.send_q.size()                      + conn.probe_q.size() + conn.inf.size();        }        return result;    }    /**     * Don't use this function; I'm trying to figure out how to get rid of it.     *     * @param peer     * @return     */    public LinkedList send_q(InetSocketAddress peer) {        Connection conn = (Connection) conns.get(peer);        if (conn == null) {            return null;        } else {            return conn.send_q;        }    }    /**     * Class TimeoutInfo     *     */    public static class TimeoutInfo {        public Object msg;        public Connection conn;        public long start_ms, send_ms;        public long msg_id;        public int attempt;        public long timeout_ms;        public boolean cut_ssthresh;        public SendCB send_cb;        public Object send_cb_data;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -