⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pingmanager.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
//    manager.markAlive(from);//    manager.markProximity(from, ping);//    notifyPingResponseListeners(from, ping, start);//  }  /**   * Adds a feature to the PingResponseListener attribute of the PingManager   * object   *   * @param prl The feature to be added to the PingResponseListener attribute   * @param path DESCRIBE THE PARAMETER   */  protected void removePingResponseListener(SourceRoute path, PingResponseListener prl) {    if (prl == null) {      return;    }    ArrayList list = (ArrayList) pingListeners.get(path);    if (list != null) {      // remove all      while (list.remove(prl)) {        ;      }    }  }  /**   * Adds a feature to the PingResponseListener attribute of the PingManager   * object   *   * @param prl The feature to be added to the PingResponseListener attribute   * @param path The feature to be added to the PingResponseListener attribute   */  protected void addPingResponseListener(SourceRoute path, PingResponseListener prl) {    if (prl == null) {      return;    }    ArrayList list = (ArrayList) pingListeners.get(path);    if (list == null) {      list = new ArrayList();      pingListeners.put(path, list);    }    list.add(prl);  }  /**   * caller must synchronized(pingResponseTimes)   *   * @param proximity   * @param lastTimePinged   * @param path DESCRIBE THE PARAMETER   */  protected void notifyPingResponseListeners(SourceRoute path, int proximity, long lastTimePinged) {    ArrayList list = (ArrayList) pingListeners.remove(path);    if (list != null) {      Iterator i = list.iterator();      while (i.hasNext()) {        ((PingResponseListener) i.next()).pingResponse(path, proximity, lastTimePinged);      }    }  }  /**   * DESCRIBE THE METHOD   *   * @param path DESCRIBE THE PARAMETER   * @param msg DESCRIBE THE PARAMETER   */  public void enqueue(SourceRoute path, PRawMessage msg) {    if (logger.level <= Logger.FINER) {      logger.log("enqueue(" + path + "," + msg + ")");    }    try {      enqueue(path, new SocketBuffer(localAddress, path, msg));    } catch (IOException e) {      if (logger.level <= Logger.SEVERE) {        logger.log(          "ERROR: Received exceptoin " + e + " while enqueuing ping " + msg);      }    }  }  /**   * DESCRIBE THE METHOD   *   * @param msg DESCRIBE THE PARAMETER   * @param path DESCRIBE THE PARAMETER   */  public void enqueue(SourceRoute path, SocketBuffer msg) {//      SocketBuffer data = addHeader(path, msg, localAddress, environment,logger);    synchronized (pendingMsgs) {      pendingMsgs.add(new Envelope(path.getFirstHop(), msg));    }    if (spn != null) {      ((SocketPastryNode) spn).broadcastSentListeners(msg, path.getLastHop().address, msg.getBuffer().limit(), NetworkListener.TYPE_UDP);    }//      if (logger.level <= Logger.FINER) {//        switch (msg.getType()) {//  //      if (! (msg instanceof byte[])) {//          case SHORT_PING_TYPE://            logger.log("COUNT: Sent message rice.pastry.socket.messaging.ShortPingMessage of size " + msg.getBuffer().limit()  + " to " + path);//            break;//          case SHORT_PING_RESPONSE_TYPE://            logger.log("COUNT: Sent message rice.pastry.socket.messaging.ShortPingResponseMessage of size " + msg.getBuffer().limit()  + " to " + path);//            break;//          default:    if (logger.level <= Logger.FINER) {      logger.log(        "COUNT: Sent message " + msg.getType() + " of size " + msg.getBuffer().limit() + " to " + path);    }//        }//      }    environment.getSelectorManager().modifyKey(key);  }  /**   * DESCRIBE THE METHOD   *   * @param sr DESCRIBE THE PARAMETER   * @param dm DESCRIBE THE PARAMETER   * @param size DESCRIBE THE PARAMETER   * @param from DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  public void receiveMessage(SourceRoute sr, DatagramMessage dm, int size, InetSocketAddress from) throws IOException {//    if (message instanceof DatagramMessage) {//      DatagramMessage dm = (DatagramMessage) message;    long start = dm.getStartTime();    SourceRoute inboundPath = sr.removeLastHop();    //dm.getInboundPath();    SourceRoute outboundPath = inboundPath.reverse();    //dm.getOutboundPath();    if (inboundPath == null) {      inboundPath = SourceRoute.build(new EpochInetSocketAddress(from));    }    if (spn != null) {      ((SocketPastryNode) spn).broadcastReceivedListeners(dm, inboundPath.reverse().getLastHop().address, size, NetworkListener.TYPE_UDP);    }    if (dm instanceof PingMessage) {      if (logger.level <= Logger.FINER) {        logger.log(          "COUNT: Read message(1) " + dm.getClass() + " of size " + size + " from " + inboundPath.reverse());      }      enqueue(inboundPath.reverse(), new PingResponseMessage(      /*       *  outboundPath, inboundPath,       */        start));    } else if (dm instanceof PingResponseMessage) {      if (logger.level <= Logger.FINER) {        logger.log(          "COUNT: Read message(2) " + dm.getClass() + " of size " + size + " from " + outboundPath.reverse());      }      int ping = (int) (environment.getTimeSource().currentTimeMillis() - start);      manager.markAlive(outboundPath);      manager.markProximity(outboundPath, ping);      notifyPingResponseListeners(outboundPath, ping, start);    } else if (dm instanceof WrongEpochMessage) {      WrongEpochMessage wem = (WrongEpochMessage) dm;      if (logger.level <= Logger.FINER) {        logger.log(          "COUNT: Read message(3) " + dm.getClass() + " of size " + size + " from " + outboundPath.reverse());      }      manager.markAlive(outboundPath);      manager.markDead(wem.getIncorrect());    } else if (dm instanceof IPAddressRequestMessage) {      if (logger.level <= Logger.FINER) {        logger.log(          "COUNT: Read message(4) " + dm.getClass() + " of size " + size + " from " + SourceRoute.build(new EpochInetSocketAddress(from)));      }      enqueue(SourceRoute.build(new EpochInetSocketAddress(from)), new IPAddressResponseMessage(from, environment.getTimeSource().currentTimeMillis()));    } else {      if (logger.level <= Logger.WARNING) {        logger.log(          "ERROR: Received unknown DatagramMessage " + dm);      }    }//    }  }  /**   * DESCRIBE THE METHOD   *   * @param key DESCRIBE THE PARAMETER   */  public void read(SelectionKey key) {    try {      InetSocketAddress address = null;      while ((address = (InetSocketAddress) channel.receive(buffer)) != null) {        buffer.flip();        if (testSourceRouting) {//          if (address.getPort() % 2 == localAddress.getAddress().getPort() % 2) {          if ((address.getPort() % 2 == 0) && (localAddress.getAddress().getPort() % 2 == 0)) {            buffer.clear();            if (logger.level <= Logger.INFO) {              logger.log("Dropping packet");            }            return;          }        }        if (buffer.remaining() > 0) {          readHeader(address);        } else {          if (logger.level <= Logger.INFO) {            logger.log(              "(PM) Read from datagram channel, but no bytes were there - no bad, but wierd.");          }          break;        }      }    } catch (IOException e) {      if (logger.level <= Logger.WARNING) {        logger.logException(          "ERROR (datagrammanager:read): ", e);      }    } finally {      buffer.clear();    }  }  /**   * DESCRIBE THE METHOD   *   * @param key DESCRIBE THE PARAMETER   */  public void write(SelectionKey key) {    Envelope write = null;    try {      synchronized (pendingMsgs) {        Iterator i = pendingMsgs.iterator();        while (i.hasNext()) {          write = (Envelope) i.next();          if (logger.level <= Logger.FINER) {            byte[] metadata = new byte[2];            metadata[0] = write.data.getBuffer().get(HEADER_SIZE + 4);            metadata[1] = write.data.getBuffer().get(HEADER_SIZE + 5);            byte[] route = new byte[SocketChannelRepeater.HEADER_BUFFER_SIZE * metadata[1]];            System.arraycopy(write.data.getBuffer().array(), HEADER_SIZE + 6, route, 0, route.length);            logger.log("write(" + write.destination + ") (" + metadata[0] + " " + metadata[1] + ") local " + localAddress);            for (int ii = 0; ii < metadata[1]; ii++) {              logger.log("  " + SocketChannelRepeater.decodeHeader(route, ii));            }          }          if (write.data.getBuffer().get(HEADER_SIZE) != 0) {            throw new IOException("Attempting to send Invalid version");          }          try {            if (channel.send(write.data.getBuffer(), write.destination.getAddress()) == write.data.getBuffer().limit()) {              i.remove();            } else {              break;            }          } catch (IOException e) {            i.remove();            throw e;          }        }      }    } catch (IOException e) {      if (logger.level <= Logger.WARNING) {        // This code prevents this line from filling up logs during some kinds of network outages        // it makes this error only be printed 1ce/second        long now = timeSource.currentTimeMillis();        if (lastTimePrinted + 1000 > now) {          return;        }        lastTimePrinted = now;        logger.logException(          "ERROR (datagrammanager:write) to " + (write == null ? null : write.destination.getAddress()), e);      }    } finally {      if (pendingMsgs.isEmpty()) {        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);      }    }  }  /**   * DESCRIBE THE METHOD   *   * @param key DESCRIBE THE PARAMETER   */  public void modifyKey(SelectionKey key) {    synchronized (pendingMsgs) {      if (!pendingMsgs.isEmpty()) {        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);      }    }  }  /**   * Method which serializes a given object into a ByteBuffer, in order to   * prepare it for writing.   *   * @param header DESCRIBE THE PARAMETER   * @return A ByteBuffer containing the object   * @exception IOException if the object can't be serialized   *///  public static byte[] serialize(Object message, Environment environment, Logger logger) throws IOException {//    try {////      ByteArrayOutputStream baos = new ByteArrayOutputStream();//      ObjectOutputStream oos = new ObjectOutputStream(baos);//      oos.writeObject(message);//      oos.close();////      byte[] ret = baos.toByteArray();//      return ret;//    } catch (InvalidClassException e) {//      if (logger.level <= Logger.SEVERE) logger.logException(//          "PANIC: Object to be serialized was an invalid class!",e);//      throw new IOException("Invalid class during attempt to serialize.");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -