⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketsourceroutemanager.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
   * Should be called while synchronized on nodeHandles   *   * @param address   * @param search DESCRIBE THE PARAMETER   * @return DESCRIBE THE RETURN VALUE   */  public AddressManager putAddressManager(EpochInetSocketAddress address,                                          boolean search) {    WeakReference wr = (WeakReference) nodeHandles.get(address);    SocketNodeHandle snh;    AddressManager manager;    if (wr == null) {      snh = new SocketNodeHandle(address, null);      snh.setLocalNode(spn);      wr = new WeakReference(snh);      nodeHandles.put(address, wr);    } else {      snh = (SocketNodeHandle) wr.get();      if (snh == null) {        // WARNING: this code must be repeated because of a very slight timing        // issue with the garbage collector        snh = new SocketNodeHandle(address, null);        snh.setLocalNode(spn);        wr = new WeakReference(snh);        nodeHandles.put(address, wr);      }    }    if (snh.addressManager != null) {      throw new IllegalStateException("Address manager for address " + address        + " already exists.");    }    manager = new AddressManager(snh, search);    // TODO make this time configurable    this.spn.getEnvironment().getSelectorManager().getTimer().schedule(      new HardLinkTimerTask(manager), 30000);    snh.addressManager = manager;    return manager;  }  /**   * Method which sends a bootstrap message across the wire.   *   * @param address The address to send the message to   * @param msg DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  public void bootstrap(EpochInetSocketAddress address, Message msg) throws IOException {//    PRawMessage rm;//    if (msg instanceof PRawMessage) {//      rm = (PRawMessage)msg;//    } else {//      rm = new PJavaSerializedMessage(msg);//    }//    // todo, pool//    final SocketBuffer message = new SocketBuffer(defaultDeserializer);//    message.serialize(rm, logger);    manager.bootstrap(SourceRoute.build(address), msg);  }  /**   * DESCRIBE THE METHOD   *   * @param address DESCRIBE THE PARAMETER   * @param msg DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  public void send(EpochInetSocketAddress address, Message msg) throws IOException {    PRawMessage rm;    if (msg instanceof PRawMessage) {      rm = (PRawMessage) msg;    } else {      rm = new PJavaSerializedMessage(msg);    }    // todo, pool    final SocketBuffer buffer = new SocketBuffer(manager.defaultDeserializer, manager.pastryNode);    buffer.serialize(rm, true);    send(address, buffer);  }  /**   * Method which sends a message across the wire.   *   * @param message The message to send   * @param address The address to send the message to   */  public void send(final EpochInetSocketAddress address, final SocketBuffer message) {    if (spn.getEnvironment().getSelectorManager().isSelectorThread()) {      getAddressManager(address, true).send(message);    } else {      if (logger.level <= Logger.FINE) {        logger.log("Application attempted to send " + message + " to " + address + " on a non-selector thread.");      }      spn.getEnvironment().getSelectorManager().invoke(        new Runnable() {          public void run() {            getAddressManager(address, true).send(message);          }        });    }  }  /**   * Method which sends a message across the wire.   *   * @param address The address to send the message to   * @param appAddress DESCRIBE THE PARAMETER   * @param receiver DESCRIBE THE PARAMETER   * @param timeout DESCRIBE THE PARAMETER   */  public void connect(final EpochInetSocketAddress address, final int appAddress, final AppSocketReceiver receiver, final int timeout) {    if (spn.getEnvironment().getSelectorManager().isSelectorThread()) {      getAddressManager(address, true).connect(appAddress, receiver, timeout);    } else {      if (logger.level <= Logger.FINE) {        logger.log("Application " + appAddress + " attempted to open a connection to " + address + " on a non-selector thread.");      }      spn.getEnvironment().getSelectorManager().invoke(        new Runnable() {          public void run() {            getAddressManager(address, true).connect(appAddress, receiver, timeout);          }        });    }  }  /**   * Method which suggests a ping to the remote node.   *   * @param address DESCRIBE THE PARAMETER   */  public void ping(EpochInetSocketAddress address) {    AddressManager am = getAddressManager(address);    if (am == null) {      manager.ping(SourceRoute.build(address));    } else {      am.ping();    }  }  /**   * Method which FORCES a check of liveness of the remote node. Note that this   * method should ONLY be called by internal Pastry maintenance algorithms -   * this is NOT to be used by applications. Doing so will likely cause a blowup   * of liveness traffic.   *   * @param address DESCRIBE THE PARAMETER   */  public void checkLiveness(EpochInetSocketAddress address) {    getAddressManager(address, true).checkLiveness();  }  /**   * Method which returns the last cached proximity value for the given address.   * If there is no cached value, then DEFAULT_PROXIMITY is returned.   *   * @param address The address to return the value for   * @return The ping value to the remote address   */  public int proximity(EpochInetSocketAddress address) {    AddressManager am = getAddressManager(address);    if (am == null) {      return SocketNodeHandle.DEFAULT_PROXIMITY;    } else {      return am.proximity();    }  }  /**   * This method should be called when a known route is declared dead.   *   * @param route The now-dead route   */  protected void markDead(SourceRoute route) {    if (logger.level <= Logger.FINE) {      logger.log("(SSRM) Found route " + route + " to be dead");    }    AddressManager am = getAddressManager(route.getLastHop());    if (am != null) {      am.markDead(route);    }  }  /**   * This method should be called when a known node is declared dead - this is   * ONLY called when a new epoch of that node is detected. Note that this   * method is silent - no checks are done. Caveat emptor.   *   * @param address The now-dead address   */  protected void markDead(EpochInetSocketAddress address) {    AddressManager am = getAddressManager(address);    if (am != null) {      am.markDeadForever();    }  }  /**   * This method should be called when a known route is declared alive.   *   * @param route The now-live route   */  protected void markAlive(SourceRoute route) {    if (logger.level <= Logger.FINE) {      logger.log("(SSRM) Found route " + route + " to be alive");    }    getAddressManager(route.getLastHop(), false).markAlive(route);  }  /**   * DESCRIBE THE METHOD   *   * @param route DESCRIBE THE PARAMETER   * @return DESCRIBE THE RETURN VALUE   */  protected int proximity(SourceRoute route) {    return getAddressManager(route.getLastHop(), false).proximity();  }  /**   * This method should be called when a known route is declared suspected.   *   * @param route The now-live route   */  protected void markSuspected(SourceRoute route) {    if (logger.level <= Logger.FINE) {      logger.log("(SSRM) Found route " + route + " to be suspected");    }    getAddressManager(route.getLastHop(), false).markSuspected(route);  }  /**   * This method should be called when a known route has its proximity updated   *   * @param route The route   * @param proximity The proximity   */  protected synchronized void markProximity(SourceRoute route, int proximity) {    getAddressManager(route.getLastHop(), false).markProximity(route, proximity);  }  /**   * Reroutes the given message. If this node is alive, send() is called. If   * this node is not alive and the message is a route message, it is rerouted.   * Otherwise, the message is dropped.   *   * @param m The message   * @param address The address of the remote node   *///  protected void reroutee(EpochInetSocketAddress address, Message m) {//    if (getLiveness(address) == SocketNodeHandle.LIVENESS_ALIVE) {//      if (logger.level <= Logger.INFO) logger.log( "(SSRM) Attempting to resend message " + m + " to alive address " + address);//      send(address, m);//    } else {//      if (m instanceof RouteMessage) {//        if (((RouteMessage) m).getOptions().multipleHopsAllowed()) {//          if (logger.level <= Logger.INFO) logger.log( "(SSRM) Attempting to reroute route message " + m);//          ((RouteMessage) m).nextHop = null;//          // kick it back to pastry//          spn.receiveMessage(m);//        } else if (getLiveness(address) <= SocketNodeHandle.LIVENESS_SUSPECTED) {//          // it's required to go to this address only//          send(address, m);//        } else {//          // this address is dead, and the routemessage is not allowed to go anywhere else//          if (logger.level <= Logger.WARNING) logger.log("(SSRM) Dropping message " + m + " because next hop "+address+" is dead!");//        }//      } else {//        if (logger.level <= Logger.WARNING) logger.log("(SSRM) Dropping message " + m + " because next hop "+address+" is dead!");//      }//    }//  }  /**   * Reroutes the given message. If this node is alive, send() is called. If   * this node is not alive and the message is a route message, it is rerouted.   * Otherwise, the message is dropped. Can be called when a socket is closed,   * if for example a different source route is found. This is how   * non-routemessages may be called here For suspected/dead, it will get called   * with all RouteMessages   *   * @param m The message   * @param address The address of the remote node   */  protected void reroute(EpochInetSocketAddress address, SocketBuffer m) {    if (m.discard) {      if (logger.level <= Logger.FINE) {        logger.log("(SSRM) Dropping garbage in resend message " + m + " address " + address + " with liveness " + getLiveness(address));      }      return;    }    switch (getLiveness(address)) {      case SocketNodeHandle.LIVENESS_ALIVE:        if (logger.level <= Logger.INFO) {          logger.log("(SSRM) Attempting to resend message " + m + " to alive address " + address);        }        send(address, m);        return;      case SocketNodeHandle.LIVENESS_SUSPECTED:        if (m.isRouteMessage()) {          if (m.getOptions().multipleHopsAllowed() && m.getOptions().rerouteIfSuspected()) {            // kick it back to pastry            if (logger.level <= Logger.INFO) {              logger.log("(SSRM) Attempting to reroute route message " + m);            }            RouteMessage rm = m.getRouteMessage();            rm.nextHop = null;            spn.receiveMessage(rm);            return;          }        } else {          if (logger.level <= Logger.INFO) {            logger.log("(SSRM) Attempting to resend message " + m + " to alive address " + address);          }          send(address, m);          return;        }      case SocketNodeHandle.LIVENESS_DEAD:      case SocketNodeHandle.LIVENESS_DEAD_FOREVER:        if (m.isRouteMessage()) {          if (m.getOptions().multipleHopsAllowed()) {            if (logger.level <= Logger.INFO) {              logger.log("(SSRM) Attempting to reroute route message " + m);            }            RouteMessage rm = m.getRouteMessage();            rm.nextHop = null;            spn.receiveMessage(rm);            return;          }        }    }    if (logger.level <= Logger.WARNING) {      logger.log("(SSRM) Dropping message " + m + " because next hop " + address + " is dead!");    }  }  /**   * Internal class which is tasked with maintaining the status of a single   * remote address. This class is in charge of all source routes to that   * address, as well as declaring liveness/death of this address   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author jeffh   */  protected class AddressManager {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -