⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pingmanager.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
//    } catch (NotSerializableException e) {//      if (logger.level <= Logger.SEVERE) logger.logException(//          "PANIC: Object to be serialized was not serializable! [" + message + "]",e);//      throw new IOException("Unserializable class " + message + " during attempt to serialize.");//    }//  }  /**   * Method which takes in a ByteBuffer read from a datagram, and deserializes   * the contained object.   *   * @param header DESCRIBE THE PARAMETER   * @return The deserialized object.   * @exception IOException if the buffer can't be deserialized   *///  public static Object deserialize(byte[] array, Environment env, SocketPastryNode spn, Logger logger) throws IOException {//    PastryObjectInputStream ois = new PastryObjectInputStream(new ByteArrayInputStream(array), spn);////    try {//      Object ret = ois.readObject();//      return ret;//    } catch (ClassNotFoundException e) {//      if (logger.level <= Logger.SEVERE) logger.logException(//          "PANIC: Unknown class type in serialized message!",e);//      throw new IOException("Unknown class type in message - closing channel.");//    } catch (InvalidClassException e) {//      if (logger.level <= Logger.SEVERE) logger.logException(//          "PANIC: Serialized message was an invalid class!",e);//      throw new IOException("Invalid class in message - closing channel.");//    }//  }  /**   * Method which adds a header for the provided path to the given data.   *   * @param header DESCRIBE THE PARAMETER   * @return The messag with a header attached   * @exception IOException DESCRIBE THE EXCEPTION   *///  public static SocketBuffer addHeader(SourceRoute path, SocketBuffer data, EpochInetSocketAddress localAddress, Environment env, Logger logger) throws IOException {////    ByteArrayOutputStream baos = new ByteArrayOutputStream();////    DataOutputStream dos = new DataOutputStream(baos);////    SocketBuffer sb = new SocketBuffer();//    OutputBuffer dos = sb.o;////    dos.write(HEADER_PING, 0, HEADER_PING.length);//    dos.writeByte((byte) 1);//    dos.writeByte((byte) (path.getNumHops() + 1));//    SocketChannelRepeater.encodeHeader(localAddress, dos);////    for (int i=0; i<path.getNumHops(); i++)//      SocketChannelRepeater.encodeHeader(path.getHop(i), dos);////    dos.write(data.buffer.array(), 0, data.buffer.limit());//////    dos.flush();//////    return new SocketBuffer(baos.toByteArray());//    return sb;//  }  /**   * Method which adds a header for the provided path to the given data.   *   * @param header DESCRIBE THE PARAMETER   * @return The messag with a header attached   * @exception IOException DESCRIBE THE EXCEPTION   */  public SourceRoute decodeHeader(byte[] header) throws IOException {    EpochInetSocketAddress[] route = new EpochInetSocketAddress[header.length / SocketChannelRepeater.HEADER_BUFFER_SIZE];    for (int i = 0; i < route.length; i++) {      route[i] = SocketChannelRepeater.decodeHeader(header, i);    }    return SourceRoute.build(route);  }  /**   * Method which processes an incoming message and hands it off to the   * appropriate handler.   *   * @param address DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  protected void readHeader(InetSocketAddress address) throws IOException {    byte[] header = new byte[HEADER_SIZE];    buffer.get(header, 0, HEADER_SIZE);    if (!Arrays.equals(header, SocketCollectionManager.PASTRY_MAGIC_NUMBER)) {      throw new IOException("Not a pastry message:" + header[0] + "," + header[1] + "," + header[2] + "," + header[3]);    }    buffer.get(header, 0, HEADER_SIZE);    int version = MathUtils.byteArrayToInt(header);    if (!(version == 0)) {      throw new IOException("Unknown Version:" + version);    }//    header = new byte[HEADER_SIZE];//    buffer.get(header);////    if (Arrays.equals(header, HEADER_PING)) {    byte[] metadata = new byte[2];    buffer.get(metadata);    // first, read all of the source route    byte[] route = new byte[SocketChannelRepeater.HEADER_BUFFER_SIZE * metadata[1]];    buffer.get(route);    // now, check to make sure our hop is correct    EpochInetSocketAddress eisa;    if (logger.level <= Logger.FINER) {      logger.log("readHeader(" + address + ") (" + metadata[0] + " " + metadata[1] + ") local " + localAddress);      for (int i = 0; i < metadata[1]; i++) {        logger.log("  " + SocketChannelRepeater.decodeHeader(route, i));      }    }    try {      eisa = SocketChannelRepeater.decodeHeader(route, metadata[0]);    } catch (IOException ioe) {      throw ioe;    }    // if so, process the packet    if ((eisa.equals(localAddress)) || (eisa.getAddress().equals(localAddress.getAddress()) &&      (eisa.getEpoch() == EpochInetSocketAddress.EPOCH_UNKNOWN))) {      // if the packet is at the end of the route, accept it      // otherwise, forward it to the next hop (and increment the stamp)      if (metadata[0] + 1 == metadata[1]) {        // The message was meant for me        byte[] array = new byte[buffer.remaining()];        buffer.get(array);        buffer.clear();//          byte[] test = new byte[HEADER_SHORT_PING.length];//          System.arraycopy(array, 0, test, 0, test.length);        SourceRoute inbound = decodeHeader(route);//          SourceRoute sr = inbound.removeLastHop();//          if (Arrays.equals(test, HEADER_SHORT_PING)) {//            // the PING was meant for me//            int len = (header.length + metadata.length + array.length + route.length);//            if (logger.level <= Logger.FINER) logger.log(//                "COUNT: Read message rice.pastry.socket.messaging.ShortPingMessage of size " + len  + " from " + sr);//            if (spn != null) {//              ((SocketPastryNode) spn).broadcastReceivedListeners(array, address, len, NetworkListener.TYPE_UDP);//            }////            shortPingReceived(sr, array);//          } else if (Arrays.equals(test, HEADER_SHORT_PING_RESPONSE)) {//            // the PING_RESPONSE was meant for me//            int len = (header.length + metadata.length + array.length + route.length);//            if (logger.level <= Logger.FINER) logger.log(//                "COUNT: Read message rice.pastry.socket.messaging.ShortPingResponseMessage of size " + len  + " from " + sr);////            if (spn != null) {//              ((SocketPastryNode) spn).broadcastReceivedListeners(array, address, len, NetworkListener.TYPE_UDP);//            }//            shortPingResponseReceived(sr, array);//          } else {        // a normal message        SocketBuffer delivery = new SocketBuffer(array, spn);        receiveMessage(inbound, (DatagramMessage) delivery.deserialize(deserializer), array.length, address);//          }      } else {        // sourceroute hop        EpochInetSocketAddress next = SocketChannelRepeater.decodeHeader(route, metadata[0] + 1);        buffer.position(0);        byte[] packet = new byte[buffer.remaining()];        buffer.get(packet);        // increment the hop count        packet[HEADER_SIZE + 4]++;        if (logger.level <= Logger.FINE) {          logger.log("Forwarding (" + metadata[0] + " " + metadata[1] + ") from " + address + " to " + next + " at " + localAddress);        }        if (spn != null) {          ((SocketPastryNode) spn).broadcastReceivedListeners(packet, address, packet.length, NetworkListener.TYPE_SR_UDP);          ((SocketPastryNode) spn).broadcastSentListeners(packet, next.address, packet.length, NetworkListener.TYPE_SR_UDP);        }        synchronized (pendingMsgs) {          pendingMsgs.add(new Envelope(next, new SocketBuffer(packet)));        }        environment.getSelectorManager().modifyKey(key);      }    } else {      // if this is an old epoch of ours, reply with an update      if (eisa.getAddress().equals(localAddress.getAddress())) {        SourceRoute back = SourceRoute.build(new EpochInetSocketAddress[0]);        SourceRoute outbound = SourceRoute.build(new EpochInetSocketAddress[0]);        for (int i = 0; i < metadata[0]; i++) {          back = back.append(SocketChannelRepeater.decodeHeader(route, i));          if (i > 0) {            outbound = outbound.append(SocketChannelRepeater.decodeHeader(route, i));          }        }        outbound = outbound.append(localAddress);//          if (spn != null) {//            ((SocketPastryNode) spn).broadcastReceivedListeners(packet, address, packet.length, NetworkListener.TYPE_SR_UDP);//          }        enqueue(back.reverse(), new WrongEpochMessage(        /*         *  outbound, back.reverse(),         */          eisa, localAddress, environment.getTimeSource().currentTimeMillis()));      } else {        if (logger.level <= Logger.WARNING) {          logger.log(            "WARNING: Received packet destined for EISA (" + metadata[0] + " " + metadata[1] + ") " + eisa + " but the local address is " + localAddress + " - dropping silently.");        }        throw new IOException("Received packet destined for EISA (" + metadata[0] + " " + metadata[1] + ") " + eisa + " but the local address is " + localAddress + " - dropping silently.");      }    }//    } else {//      if (logger.level <= Logger.WARNING) logger.log(//        "WARNING: Received unrecognized message header - ignoring from "+address+".");//      throw new IOException("Improper message header received - ignoring from "+address+". Read " + ((byte) header[0]) + " " + ((byte) header[1]) + " " + ((byte) header[2]) + " " + ((byte) header[3]));//    }  }  /**   * Internal class which holds a pending datagram   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author amislove   */  public class Envelope {    /**     * DESCRIBE THE FIELD     */    protected EpochInetSocketAddress destination;    /**     * DESCRIBE THE FIELD     */    protected SocketBuffer data;    /**     * Constructor for Envelope.     *     * @param destination DESCRIBE THE PARAMETER     * @param data DESCRIBE THE PARAMETER     */    public Envelope(EpochInetSocketAddress destination, SocketBuffer data) {      this.destination = destination;      this.data = data;      if (logger.level <= Logger.FINER) {        try {          byte[] metadata = new byte[2];          metadata[0] = data.getBuffer().get(HEADER_SIZE + 4);          metadata[1] = data.getBuffer().get(HEADER_SIZE + 5);          byte[] route = new byte[SocketChannelRepeater.HEADER_BUFFER_SIZE * metadata[1]];          System.arraycopy(data.getBuffer().array(), HEADER_SIZE + 6, route, 0, route.length);          logger.log("enqueue(" + destination + ") (" + metadata[0] + " " + metadata[1] + ") local " + localAddress);          for (int ii = 0; ii < metadata[1]; ii++) {            logger.log("  " + SocketChannelRepeater.decodeHeader(route, ii));          }//        if (metadata[1] == 3 && metadata[0] == 1) {//          EpochInetSocketAddress dest = SocketChannelRepeater.decodeHeader(route, metadata[1]-1);//          if (dest.equals(destination)) {//            System.out.println("Warning");//          }//        }        } catch (IOException ioe) {          logger.logException("", ioe);        }      }      if (data.getBuffer().get(HEADER_SIZE) != 0) {        throw new RuntimeException("Attempting to send Invalid version");      }    }  }  /**   * DESCRIBE THE CLASS   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author jeffh   */  static class PMDeserializer implements MessageDeserializer {    Logger logger;    /**     * Constructor for PMDeserializer.     *     * @param logger DESCRIBE THE PARAMETER     */    public PMDeserializer(Logger logger) {      this.logger = logger;    }    /**     * DESCRIBE THE METHOD     *     * @param buf DESCRIBE THE PARAMETER     * @param type DESCRIBE THE PARAMETER     * @param priority DESCRIBE THE PARAMETER     * @param sender DESCRIBE THE PARAMETER     * @return DESCRIBE THE RETURN VALUE     * @exception IOException DESCRIBE THE EXCEPTION     */    public rice.p2p.commonapi.Message deserialize(InputBuffer buf, short type, byte priority, rice.p2p.commonapi.NodeHandle sender) throws IOException {      switch (type) {        case IPAddressRequestMessage.TYPE:          return new IPAddressRequestMessage(buf);        case IPAddressResponseMessage.TYPE:          return new IPAddressResponseMessage(buf);        case PingMessage.TYPE:          return new PingMessage(buf);        case PingResponseMessage.TYPE:          return new PingResponseMessage(buf);        case WrongEpochMessage.TYPE:          return new WrongEpochMessage(buf);        default:          if (logger.level <= Logger.SEVERE) {            logger.logException("PM SERIOUS ERROR: Received unknown message address: " + 0 + " type:" + type, new Exception("stack trace"));          }      }      return null;    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -