⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pingmanager.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*************************************************************************"FreePastry" Peer-to-Peer Application Development Substrate Copyright 2002, Rice University. All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:- Redistributions of source code must retain the above copyrightnotice, this list of conditions and the following disclaimer.- Redistributions in binary form must reproduce the above copyrightnotice, this list of conditions and the following disclaimer in thedocumentation and/or other materials provided with the distribution.- Neither  the name  of Rice  University (RICE) nor  the names  of itscontributors may be  used to endorse or promote  products derived fromthis software without specific prior written permission.This software is provided by RICE and the contributors on an "as is"basis, without any representations or warranties of any kind, expressor implied including, but not limited to, representations orwarranties of non-infringement, merchantability or fitness for aparticular purpose. In no event shall RICE or contributors be liablefor any direct, indirect, incidental, special, exemplary, orconsequential damages (including, but not limited to, procurement ofsubstitute goods or services; loss of use, data, or profits; orbusiness interruption) however caused and on any theory of liability,whether in contract, strict liability, or tort (including negligenceor otherwise) arising in any way out of the use of this software, evenif advised of the possibility of such damage.********************************************************************************/package rice.pastry.socket;import java.io.*;import java.net.*;import java.nio.*;import java.nio.channels.*;import java.util.*;import rice.environment.Environment;import rice.environment.logging.Logger;import rice.environment.params.Parameters;import rice.environment.time.TimeSource;import rice.p2p.commonapi.*;import rice.p2p.commonapi.rawserialization.*;import rice.p2p.util.MathUtils;import rice.pastry.*;import rice.pastry.messaging.PRawMessage;import rice.pastry.socket.messaging.*;import rice.selector.*;/** * @version $Id: PingManager.java 3274 2006-05-15 16:17:47Z jeffh $ * @author jeffh To change the template for this generated type comment go to *      Window>Preferences>Java>Code Generation>Code and Comments */public class PingManager extends SelectionKeyHandler {  // the size of the buffer used to read incoming datagrams must be big enough  // to encompass multiple datagram packets  /**   * DESCRIBE THE FIELD   */  public final int DATAGRAM_RECEIVE_BUFFER_SIZE;  // the size of the buffer used to send outgoing datagrams this is also the  // largest message size than can be sent via UDP  /**   * DESCRIBE THE FIELD   */  public final int DATAGRAM_SEND_BUFFER_SIZE;  // SourceRoute -> ArrayList of PingResponseListener  /**   * DESCRIBE THE FIELD   */  protected WeakHashMap pingListeners = new WeakHashMap();  // SourceRoute -> Long  /**   * DESCRIBE THE FIELD   */  protected WeakHashMap lastPingTime = new WeakHashMap();  // The list of pending meesages  /**   * DESCRIBE THE FIELD   */  protected ArrayList pendingMsgs;  // the buffer used for writing datagrams  private ByteBuffer buffer;  // the channel used from talking to the network  private DatagramChannel channel;  // the key used to determine what has taken place  private SelectionKey key;  // the source route manager  private SocketSourceRouteManager manager;  // the local address of this node  private EpochInetSocketAddress localAddress;  // the local node  private SocketPastryNode spn;  private Logger logger;  private TimeSource timeSource;  private Environment environment;  private boolean testSourceRouting;  MessageDeserializer deserializer;  long lastTimePrinted = 0;  /**   * DESCRIBE THE FIELD   */  public final static int PING_THROTTLE = 1500;//  private static final short SHORT_PING_TYPE = 159;//  private static final short SHORT_PING_RESPONSE_TYPE = 160;  // whether or not we should use short pings//  public final boolean USE_SHORT_PINGS;// = false;  // the header which signifies a normal socket//  protected static byte[] HEADER_PING = new byte[] {0x49, 0x3A, 0x09, 0x5C};  // the header which signifies a new, shorter ping//  protected static byte[] HEADER_SHORT_PING = new byte[] {0x31, 0x1C, 0x0E, 0x11};  // the header which signifies a new, shorter ping//  protected static byte[] HEADER_SHORT_PING_RESPONSE = new byte[] {0x31, 0x1C, 0x0E, 0x12};  // the length of the ping header  /**   * DESCRIBE THE FIELD   */  public static int HEADER_SIZE = SocketCollectionManager.PASTRY_MAGIC_NUMBER.length;  /**   * @param manager DESCRIBE THE PARAMETER   * @param spn DESCRIBE THE PARAMETER   * @param bindAddress DESCRIBE THE PARAMETER   * @param proxyAddress DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  public PingManager(SocketPastryNode spn, SocketSourceRouteManager manager, EpochInetSocketAddress bindAddress, EpochInetSocketAddress proxyAddress) throws IOException {    this.spn = spn;    this.environment = spn.getEnvironment();    this.logger = environment.getLogManager().getLogger(PingManager.class, null);    this.deserializer = new PMDeserializer(logger);    this.timeSource = environment.getTimeSource();    Parameters p = environment.getParameters();    this.manager = manager;    this.pendingMsgs = new ArrayList();    this.localAddress = proxyAddress;    testSourceRouting = p.getBoolean("pastry_socket_pingmanager_testSourceRouting");//    USE_SHORT_PINGS = p.getBoolean("pastry_socket_pingmanager_smallPings");    DATAGRAM_RECEIVE_BUFFER_SIZE = p.getInt("pastry_socket_pingmanager_datagram_receive_buffer_size");    DATAGRAM_SEND_BUFFER_SIZE = p.getInt("pastry_socket_pingmanager_datagram_send_buffer_size");    // allocate enought bytes to read data    this.buffer = ByteBuffer.allocateDirect(DATAGRAM_SEND_BUFFER_SIZE);    try {      // bind to the appropriate port      channel = DatagramChannel.open();      channel.configureBlocking(false);      channel.socket().setReuseAddress(true);      channel.socket().bind(bindAddress.getAddress());      channel.socket().setSendBufferSize(DATAGRAM_SEND_BUFFER_SIZE);      channel.socket().setReceiveBufferSize(DATAGRAM_RECEIVE_BUFFER_SIZE);      key = environment.getSelectorManager().register(channel, this, 0);      key.interestOps(SelectionKey.OP_READ);      if (logger.level <= Logger.INFO) {        logger.log("PingManager binding to " + bindAddress);      }    } catch (IOException e) {//      if (logger.level <= Logger.SEVERE) logger.log(//          "PANIC: Error binding datagram server to address " + localAddress + ": " + e);      throw e;    }  }  /**   * ----- EXTERNAL METHODS -----   *   * @param path DESCRIBE THE PARAMETER   * @param prl DESCRIBE THE PARAMETER   */  /**   * Method which actually sends a ping to over the specified path, and returns   * the result to the specified listener. Note that if no ping response is ever   * received, the listener is never called.   *   * @param path The path to send the ping over   * @param prl The listener which should hear about the response   */  protected void ping(SourceRoute path, PingResponseListener prl) {    if (prl == null && path.getLastHop().equals(localAddress)) {      return;    }    // this code is to throttle pings    // I don't know what to do if there is a prl, because it is difficult to know    // if there is still an outstanding ping, so we can only throttle if there is no    // prl    long curTime = timeSource.currentTimeMillis();    if (prl == null) {      Long time = (Long) lastPingTime.get(path);      if (time != null) {        if ((time.longValue() + PING_THROTTLE) > curTime) {          if (logger.level <= Logger.FINE) {            logger.log(              "(PM) Suppressing ping via path " + path + " local " + localAddress);          }          return;        }      }    }    if (logger.level <= Logger.FINE) {      logger.log(        "(PM) Actually sending ping via path " + path + " local " + localAddress);    }    lastPingTime.put(path, new Long(curTime));    addPingResponseListener(path, prl);//    if (USE_SHORT_PINGS)//      sendShortPing(path);//    else    enqueue(path, new PingMessage(    /*     *  path, path.reverse(localAddress),     */      environment.getTimeSource().currentTimeMillis()));  }  /**   * Makes this node resign from the network. Is designed to be used for   * debugging and testing.   *   * @exception IOException DESCRIBE THE EXCEPTION   */  protected void resign() throws IOException {    if (key != null) {      if (key.channel() != null) {        key.channel().close();      }      key.cancel();    }  }  /**   * Internal testing method which simulates a stall. DO NOT USE!!!!!   */  public void stall() {    key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);  }  /**   * ----- INTERNAL METHODS -----   *   * @param path DESCRIBE THE PARAMETER   * @param prl DESCRIBE THE PARAMETER   */  /**   * Builds the data for a short ping   *   * @param path DESCRIBE THE PARAMETER   * @param prl DESCRIBE THE PARAMETER   *///  protected void sendShortPing(SourceRoute route) {//    try {////      ByteArrayOutputStream baos = new ByteArrayOutputStream();////      DataOutputStream dos = new DataOutputStream(baos);////      SocketBuffer sb = new SocketBuffer(localAddress, route);//      OutputBuffer dos = sb.o;////      dos.write(HEADER_SHORT_PING, 0, HEADER_SHORT_PING.length);//      dos.writeLong(environment.getTimeSource().currentTimeMillis());////      sb.setType(SHORT_PING_TYPE);////      enqueue(route, sb);//    } catch (Exception canthappen) {//      if (logger.level <= Logger.SEVERE) logger.logException(//          "CANT HAPPEN: ",canthappen);//    }//  }  /**   * Builds the data for a short ping response   *   * @param path DESCRIBE THE PARAMETER   * @param prl DESCRIBE THE PARAMETER   *///  protected void shortPingReceived(SourceRoute from, byte[] payload) throws IOException {//    SourceRoute route = from.reverse();//    SocketBuffer sb = new SocketBuffer(localAddress, route);//    System.arraycopy(HEADER_SHORT_PING_RESPONSE, 0, payload, 0, HEADER_SHORT_PING_RESPONSE.length);//    sb.o.write(payload,0,payload.length);//    sb.setType(SHORT_PING_RESPONSE_TYPE);//    enqueue(route, sb);//  }  /**   * Processes a short ping response   *   * @param path DESCRIBE THE PARAMETER   * @param prl DESCRIBE THE PARAMETER   *///  protected void shortPingResponseReceived(SourceRoute route, byte[] payload) throws IOException {//    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(payload));//    dis.readFully(new byte[HEADER_SHORT_PING_RESPONSE.length]);//    long start = dis.readLong();//    int ping = (int) (environment.getTimeSource().currentTimeMillis() - start);////    SourceRoute from = route.reverse();//

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -