⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pastimpl.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
          }          //System.out.println("Closing "+socket);          pendingSocketTransactions.remove(socket);          socket.close();        }      });    endpoint.register();  }  /**   * Gets the Environment attribute of the PastImpl object   *   * @return The Environment value   */  public Environment getEnvironment() {    return environment;  }  /**   * Returns of the outstanding messages. This is a DEBUGGING method ONLY!   *   * @return The list of all the outstanding messages   */  public Continuation[] getOutstandingMessages() {    return (Continuation[]) outstanding.values().toArray(new Continuation[0]);  }  /**   * Returns the endpoint associated with the Past - ONLY FOR TESTING - DO NOT   * USE   *   * @return The endpoint   */  public Endpoint getEndpoint() {    return endpoint;  }  /**   * Returns a new uid for a message   *   * @return A new id   */  protected synchronized int getUID() {    return id++;  }  /**   * Returns a continuation which will respond to the given message.   *   * @param msg DESCRIBE THE PARAMETER   * @return A new id   */  protected Continuation getResponseContinuation(final PastMessage msg) {    if (logger.level <= Logger.FINER) {      logger.log("Getting the Continuation to respond to the message " + msg);    }    final ContinuationMessage cmsg = (ContinuationMessage) msg;    return      new Continuation() {        public void receiveResult(Object o) {          cmsg.receiveResult(o);          endpoint.route(null, cmsg, msg.getSource());        }        public void receiveException(Exception e) {          cmsg.receiveException(e);          endpoint.route(null, cmsg, msg.getSource());        }      };  }  /**   * Do like above, but use a socket   *   * @param msg   * @return   */  protected Continuation getFetchResponseContinuation(final PastMessage msg) {    final ContinuationMessage cmsg = (ContinuationMessage) msg;    return      new Continuation() {        public void receiveResult(Object o) {          cmsg.receiveResult(o);          PastContent content = (PastContent) o;          if (socketStrategy.sendAlongSocket(SocketStrategy.TYPE_FETCH, content)) {            sendViaSocket(msg.getSource(), cmsg, null);          } else {            endpoint.route(null, cmsg, msg.getSource());          }        }        public void receiveException(Exception e) {          cmsg.receiveException(e);          endpoint.route(null, cmsg, msg.getSource());        }      };  }  /**   * Internal method which returns the handles to an object. It first checks to   * see if the handles can be determined locally, and if so, returns.   * Otherwise, it sends a LookupHandles messsage out to find out the nodes.   *   * @param id The id to fetch the handles for   * @param max The maximum number of handles to return   * @param command The command to call with the result (NodeHandle[])   */  protected void getHandles(Id id, int max, Continuation command) {    NodeHandleSet set = endpoint.replicaSet(id, max);    if (set.size() == max) {      command.receiveResult(set);    } else {      sendRequest(id, new LookupHandlesMessage(getUID(), id, max, getLocalNodeHandle(), id),        new StandardContinuation(command) {          public void receiveResult(Object o) {            NodeHandleSet replicas = (NodeHandleSet) o;            // check to make sure we've fetched the correct number of replicas            if (endpoint.replicaSet(endpoint.getLocalNodeHandle().getId(), replicationFactor + 1).size() > replicas.size()) {              parent.receiveException(new PastException("Only received " + replicas.size() + " replicas - cannot insert as we know about more nodes."));            } else {              parent.receiveResult(replicas);            }          }        });    }  }  /**   * get the nodeHandle of the local Past node   *   * @return the nodehandle   */  public NodeHandle getLocalNodeHandle() {    return endpoint.getLocalNodeHandle();  }  /**   * Returns the number of replicas used in this Past   *   * @return the number of replicas for each object   */  public int getReplicationFactor() {    return replicationFactor;  }  // ----- UTILITY METHODS -----  /**   * Returns the replica manager for this Past instance. Should *ONLY* be used   * for testing. Messing with this will cause unknown behavior.   *   * @return This Past's replica manager   */  public Replication getReplication() {    return replicaManager.getReplication();  }  /**   * Returns this Past's storage manager. Should *ONLY* be used for testing.   * Messing with this will cause unknown behavior.   *   * @return This Past's storage manager.   */  public StorageManager getStorageManager() {    return storage;  }  /**   * Gets the Instance attribute of the PastImpl object   *   * @return The Instance value   */  public String getInstance() {    return instance;  }  /**   * Sets the ContentDeserializer attribute of the PastImpl object   *   * @param deserializer The new ContentDeserializer value   */  public void setContentDeserializer(PastContentDeserializer deserializer) {    contentDeserializer = deserializer;  }  /**   * Sets the ContentHandleDeserializer attribute of the PastImpl object   *   * @param deserializer The new ContentHandleDeserializer value   */  public void setContentHandleDeserializer(PastContentHandleDeserializer deserializer) {    contentHandleDeserializer = deserializer;  }  /**   * DESCRIBE THE METHOD   *   * @return DESCRIBE THE RETURN VALUE   */  public String toString() {    if (endpoint == null) {      return super.toString();    }    return "PastImpl[" + endpoint.getInstance() + "]";  }  // ----- INTERNAL METHODS -----  /**   * Internal method which builds the replication manager. Can be overridden by   * subclasses.   *   * @param node The node to base the RM off of   * @param instance The instance name to use   * @return The replication manager, ready for use   */  protected ReplicationManager buildReplicationManager(Node node, String instance) {    return new ReplicationManagerImpl(node, this, replicationFactor, instance);  }  /**   * DESCRIBE THE METHOD   *   * @param handle DESCRIBE THE PARAMETER   * @param m DESCRIBE THE PARAMETER   * @param c DESCRIBE THE PARAMETER   */  private void sendViaSocket(final NodeHandle handle, final PastMessage m, final Continuation c) {    if (c != null) {      CancellableTask timer = endpoint.scheduleMessage(new MessageLostMessage(m.getUID(), getLocalNodeHandle(), null, m, handle), MESSAGE_TIMEOUT);      insertPending(m.getUID(), timer, c);    }    // create a bb[] to be written    SimpleOutputBuffer sob = new SimpleOutputBuffer();    try {      sob.writeInt(0);      // place holder for size...      sob.writeShort(m.getType());      m.serialize(sob);    } catch (IOException ioe) {      if (c != null) {        c.receiveException(ioe);      }    }    // add the size back to the beginning...    int size = sob.getWritten() - 4;    // remove the size of the size :)    if (logger.level <= Logger.FINER) {      logger.log("Sending size of " + size + " to " + handle + " to send " + m);    }    byte[] bytes = sob.getBytes();    MathUtils.intToByteArray(size, bytes, 0);    // prepare the bytes for writing    final ByteBuffer[] bb = new ByteBuffer[1];    bb[0] = ByteBuffer.wrap(bytes, 0, sob.getWritten());    // the whole thing    if (logger.level <= Logger.FINE) {      logger.log("Opening socket to " + handle + " to send " + m);    }    endpoint.connect(handle,      new AppSocketReceiver() {        public void receiveSocket(AppSocket socket) {          if (logger.level <= Logger.FINER) {            logger.log("Opened socket to " + handle + ":" + socket + " to send " + m);          }          socket.register(false, true, 10000, this);        }        public void receiveSelectResult(AppSocket socket, boolean canRead,                                        boolean canWrite) {          if (logger.level <= Logger.FINEST) {            logger.log("Writing to " + handle + ":" + socket + " to send " + m);          }          try {//          ByteBuffer[] outs = new ByteBuffer[1];//          ByteBuffer out = ByteBuffer.wrap(endpoint.getLocalNodeHandle().getId().toByteArray());//          outs[0] = out;//          socket.write(outs, 0, 1);            socket.write(bb, 0, 1);          } catch (IOException ioe) {            if (c != null) {              c.receiveException(ioe);            } else              if (logger.level <= Logger.WARNING) {              logger.logException("Error sending " + m, ioe);            }            return;            // don't continue to try to send          }          if (bb[0].remaining() > 0) {            socket.register(false, true, 10000, this);          } else {            socket.close();          }        }        public void receiveException(AppSocket socket, Exception e) {          if (c != null) {            c.receiveException(e);          }        }      },      10000);  }  /**   * Sends a request message across the wire, and stores the appropriate   * continuation.   *   * @param id The destination id   * @param message The message to send.   * @param command The command to run once a result is received   */  protected void sendRequest(Id id, PastMessage message, Continuation command) {    sendRequest(id, message, null, command);  }  /**   * Sends a request message across the wire, and stores the appropriate   * continuation.   *   * @param handle The node handle to send directly too   * @param message The message to send.   * @param command The command to run once a result is received   */  protected void sendRequest(NodeHandle handle, PastMessage message, Continuation command) {    sendRequest(null, message, handle, command);  }  /**   * Sends a request message across the wire, and stores the appropriate   * continuation. Sends the message using the provided handle as a hint.   *   * @param id The destination id   * @param message The message to send.   * @param command The command to run once a result is received   * @param hint DESCRIBE THE PARAMETER   */  protected void sendRequest(Id id, PastMessage message, NodeHandle hint, Continuation command) {    if (logger.level <= Logger.FINER) {      logger.log("Sending request message " + message + " {" + message.getUID() + "} to id " + id + " via " + hint);    }    CancellableTask timer = endpoint.scheduleMessage(new MessageLostMessage(message.getUID(), getLocalNodeHandle(), id, message, hint), MESSAGE_TIMEOUT);    insertPending(message.getUID(), timer, command);    endpoint.route(id, message, hint);  }  /**   * Loads the provided continuation into the pending table   *   * @param uid The id of the message   * @param command The continuation to run   * @param timer DESCRIBE THE PARAMETER   */  private void insertPending(int uid, CancellableTask timer, Continuation command) {    if (logger.level <= Logger.FINER) {      logger.log("Loading continuation " + uid + " into pending table");    }    timers.put(new Integer(uid), timer);    outstanding.put(new Integer(uid), command);  }  /**   * Removes and returns the provided continuation from the pending table   *   * @param uid The id of the message   * @return The continuation to run   */  private Continuation removePending(int uid) {    if (logger.level <= Logger.FINER) {      logger.log("Removing and returning continuation " + uid + " from pending table");    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -