📄 udpcc.java
字号:
/* * @(#)$Id: UdpCC.java,v 1.13 2004/10/14 19:49:04 huebsch Exp $ * * Copyright (c) 2001-2004 Regents of the University of California. * All rights reserved. * * This file is distributed under the terms in the attached BERKELEY-LICENSE * file. If you do not find these files, copies can be found by writing to: * Computer Science Division, Database Group, Universite of California, * 617 Soda Hall #1776, Berkeley, CA 94720-1776. Attention: Berkeley License * * Copyright (c) 2003-2004 Intel Corporation. All rights reserved. * * This file is distributed under the terms in the attached INTEL-LICENSE file. * If you do not find these files, copies can be found by writing to: * Intel Research Berkeley, 2150 Shattuck Avenue, Suite 1300, * Berkeley, CA, 94704. Attention: Intel License Inquiry. *//* * Copyright (c) 2001-2003 Regents of the University of California. * All rights reserved. * * See the file LICENSE included in this distribution for details. */package runtime.services.network.udpcc;import java.io.IOException;import java.net.DatagramSocket;import java.net.InetSocketAddress;import java.net.SocketException;import java.nio.ByteBuffer;import java.nio.channels.DatagramChannel;import java.nio.channels.SelectionKey;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.LinkedList;import java.util.Map;import org.apache.log4j.Level;import org.apache.log4j.Logger;import runtime.schedulers.ASyncCore;import util.PriorityQueue;/** * A TCP-friendly datagram layer. * * @author Sean C. Rhea * @version $Id: UdpCC.java,v 1.13 2004/10/14 19:49:04 huebsch Exp $ */public class UdpCC { protected boolean DEBUG_MIN = false; protected boolean DEBUG_RTT = false; protected boolean DEBUG = false; protected int SOCKBUF_MAX = 1048575; protected Logger logger; /** * Interface Serializer * */ public static interface Serializer { /** * Called to determine the size of the byte array needed to * serialize the given <code>msg</code> into. * * @param msg * @return */ int serialize_size(Object msg); /** * Called to serialize the given <code>msg</code> into the provided * <code>buf</code>; the inverse of <code>deserialize</code>. * * @param msg * @param buf */ void serialize(Object msg, ByteBuffer buf); /** * Called to deserialize the given <code>buf</code> into the message * it represents; the inverse of <code>serialize</code>. * * @param buf * @return * @throws Exception */ Object deserialize(ByteBuffer buf) throws Exception; } /** * Interface Sink * */ public static interface Sink { /** * Called when a message is received. * * @param msg the message that was received * * @param src the host and port from which the message was * sent. The source is not authenticated; this * value is just what is read out of the IP packet * header * * @param local the localhost and port on which the message was * received * * @param tries the number of times the message has been sent, * including this one, or -1 if that information is * not available (for example, if the message was * sent with {@link #send_nocc}) * * @param wait_ms the time in milliseconds that the message sat in * the sender's outbound queue before being sent, * presumably because it was waiting behind other * messages, or -1 if that information is not * available * * @param est_rtt_ms the sender's estimate of the round trip time in * milliseconds between it and this host, or -1 if * that information is not available */ void recv(Object msg, InetSocketAddress src, InetSocketAddress local, int tries, long wait_ms, long est_rtt_ms); } /** * Interface SendCB * */ public static interface SendCB { /** * The callback indicating the UdpCC layer is done with a particular * message. * * @param user_data the data supplied to <code>send</code> * @param success whether the message was acknowledged by the * recipient */ void cb(Object user_data, boolean success); } /** * Construct a new UdpCC object with a UDP socket bound to * <code>address<code> and start listening for messages. * * @param core * @param address * @param slz * @param snk */ public UdpCC(ASyncCore core, InetSocketAddress address, Serializer slz, Sink snk) { logger = Logger.getLogger(getClass()); acore = core; my_addr = address; serializer = slz; sink = snk; next_msg_id = System.currentTimeMillis() >> 8; if (next_msg_id < 0) { BUG("next_msg_id=" + next_msg_id); } try { channel = DatagramChannel.open(); sock = channel.socket(); int rxsize = sock.getReceiveBufferSize(); logger.info("initial rcv sobuf = " + rxsize); sock.bind(my_addr); channel.configureBlocking(false); skey = acore.register_selectable(channel, SelectionKey.OP_READ, new MySelectableCB(), null); } catch (IOException e) { logger.fatal("could not open " + my_addr, e); System.exit(1); } // acore.register_timer (STATS_PERIOD, new StatsCB (), null); acore.register_timer(BW_STATS_PERIOD, new BandwidthCB(), null); } /** * Closes the socket associated with this object, removes its callbacks * from ASyncCore, and causes it to stop responding to any outstanding * timers it has registered--in other words, turns it off. There is no * way to turn it back on; instead, just create a new one. */ public void close() { acore.unregister_selectable(skey); sock.close(); // closes channel with it closed = true; } /** * Method debug_level * @return */ public int debug_level() { return ((DEBUG) ? 0x4 : 0x0) | ((DEBUG_RTT) ? 0x2 : 0x0) | ((DEBUG_MIN) ? 0x1 : 0x0); } /** * Method set_debug_level * * @param value */ public void set_debug_level(int value) { DEBUG_MIN = ((value & 0x1) != 0); DEBUG_RTT = ((value & 0x2) != 0); DEBUG = ((value & 0x4) != 0); if (value > 0) { logger.setLevel(Level.DEBUG); } } /** * Method set_sockbuf_size * * @param value */ public void set_sockbuf_size(int value) { if ((value > 0) && (value <= SOCKBUF_MAX)) { try { sock.setReceiveBufferSize(value); int rxsize = sock.getReceiveBufferSize(); logger.info("set rcv sobuf " + value + "; got " + rxsize); } catch (SocketException e) { logger.fatal("could not set socket buffer size " + value, e); System.exit(1); } } } /** * Method set_timeout_factor * * @param value */ public void set_timeout_factor(double value) { timeout_factor = value; logger.info("timeout_factor=" + timeout_factor); } /** * Method set_timeout_diff * * @param value */ public void set_timeout_diff(double value) { timeout_diff = value; logger.info("timeout_diff=" + timeout_diff); } /** * This is just a one-shot, minimal-delay send without an acknowledgement; * <i>Do not use this function unless you are providing some * application-level congestion control</i>. Messages send with this * function go into their own per-destination queue, separate from * messages sent with {@link #send}. * * @param msg * @param dst */ public void send_nocc(Object msg, InetSocketAddress dst) { long now_ms = System.currentTimeMillis(); Connection conn = (Connection) conns.get(dst); if (conn == null) { conn = new Connection(dst, now_ms); conns.put(dst, conn); } conn.probe_q.addLast(msg); if (conn.writable()) { add_to_rr(conn); } } /** * Send a congestion-controlled message to another host. Use this * function, and your communtication will be TCP-friendly, which is good. * Once the message is sent, or when <code>tries</code> timeouts have * occurred while trying to send it, the give callback will be called. * * @param msg the message to send * @param dst the message's destination host and port * @param timeout_sec * @param cb the callback to call when the message is * sent, or when the number of tries is exhausted * @param user_data the application-specific data to call that * callback with */ public void send(Object msg, InetSocketAddress dst, long timeout_sec, SendCB cb, Object user_data) { long now_ms = System.currentTimeMillis(); if (DEBUG) { debugln("got " + msg + " to send to " + dst); } Connection conn = (Connection) conns.get(dst); if (conn == null) { conn = new Connection(dst, now_ms); conns.put(dst, conn); } conn.send_q.addLast(new TimeoutInfo(conn, msg, now_ms, timeout_sec * 1000, cb, user_data)); if (DEBUG) { debugln("conn for " + dst + ": send_q.size=" + conn.send_q.size() + ", retry_q.size=" + conn.retry_q.size() + ", second_chance.size=" + second_chance.size()); } if (conn.writable()) { add_to_rr(conn); } else if (DEBUG) { debugln("not yet writable"); } } /** * Returns the number of milliseconds since the epoch of the last time we * sent a message to this peer, or 0 if we haven't sent a message to them * at all. * * @param peer * @return */ public long last_send(InetSocketAddress peer) { Connection conn = (Connection) conns.get(peer); if (conn == null) { return 0; } else { return conn.lastsnd; } } /** * Returns the number of milliseconds since the epoch of the last time we * received an acknowledgement from this peer, or 0 if we have yet to * receive an acknowledgement from them. * * @param peer * @return */ public long last_recv(InetSocketAddress peer) { Connection conn = (Connection) conns.get(peer); if (conn == null) { return 0; } else { return conn.lastrcv; } } /** * Returns the current estimate of the mean latency to this peer, or -1 if * there is no current estimate. * * @param peer * @return */ public long latency_mean(InetSocketAddress peer) { Connection conn = (Connection) conns.get(peer); if (conn == null) { return -1L; } else { return conn.sa >> 3; } } /** * Returns the number of messages waiting to be sent or currently in * flight to this peer. * * @param peer * @return */ public int queued_msgs(InetSocketAddress peer) { Connection conn = (Connection) conns.get(peer); if (conn == null) { return 0; } else { return conn.retry_q.size() + conn.send_q.size() + conn.probe_q.size() + conn.inf.size(); } } /** * Returns the number of messages waiting to be sent or currently in * flight to all peers. * @return */ public int queued_msgs() { int result = 0; Iterator i = conns.keySet().iterator(); while (i.hasNext()) { InetSocketAddress peer = (InetSocketAddress) i.next(); Connection conn = (Connection) conns.get(peer); result += conn.retry_q.size() + conn.send_q.size() + conn.probe_q.size() + conn.inf.size(); } return result; } /** * Don't use this function; I'm trying to figure out how to get rid of it. * * @param peer * @return */ public LinkedList send_q(InetSocketAddress peer) { Connection conn = (Connection) conns.get(peer); if (conn == null) { return null; } else { return conn.send_q; } } /** * Class TimeoutInfo * */ public static class TimeoutInfo { public Object msg; public Connection conn; public long start_ms, send_ms; public long msg_id; public int attempt; public long timeout_ms; public boolean cut_ssthresh; public SendCB send_cb; public Object send_cb_data;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -