📄 pastryendpoint.java
字号:
* replica set size, then its maximum replica set is returned. The returned * nodes may be used for replicating data since they are precisely the nodes * which become roots for the key when the local node fails. * * @param id DESCRIBE THE PARAMETER * @param maxRank DESCRIBE THE PARAMETER * @return the replica set */ public NodeHandleSet replicaSet(Id id, int maxRank) { LeafSet leafset = getLeafSet(); if (maxRank > leafset.maxSize() / 2 + 1) { throw new IllegalArgumentException("maximum replicaSet size for this configuration exceeded; asked for " + maxRank + " but max is " + leafset.maxSize() / 2 + 1); } if (maxRank > leafset.size()) { if (logger.level <= Logger.FINER) { logger.log( "trying to get a replica set of size " + maxRank + " but only " + leafset.size() + " nodes in leafset"); } } return leafset.replicaSet((rice.pastry.Id) id, maxRank); } /** * This methods returns an ordered set of nodehandles on which replicas of an * object with a given id can be stored. The call returns nodes up to and * including a node with maxRank. This call also allows the application to * provide a remote "center" node, as well as other nodes in the vicinity. * * @param id The object's id. * @param maxRank The number of desired replicas. * @param set The set of other nodes around the root handle * @param root DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ public NodeHandleSet replicaSet(Id id, int maxRank, NodeHandle root, NodeHandleSet set) { LeafSet leaf = new LeafSet((rice.pastry.NodeHandle) root, getLeafSet().maxSize(), false); for (int i = 0; i < set.size(); i++) { leaf.put((rice.pastry.NodeHandle) set.getHandle(i)); } return leaf.replicaSet((rice.pastry.Id) id, maxRank); } /** * This method provides information about ranges of keys for which the node n * is currently a r-root. The operations returns null if the range could not * be determined. It is an error to query the range of a node not present in * the neighbor set as returned by the update upcall or the neighborSet call. * Some implementations may have multiple, disjoint ranges of keys for which a * given node is responsible (Pastry has two). The parameter key allows the * caller to specify which range should be returned. If the node referenced by * n is the r-root for key, then the resulting range includes key. Otherwise, * the result is the nearest range clockwise from key for which n is * responsible. * * @param n nodeHandle of the node whose range is being queried * @param r the rank * @param key the key * @param cumulative if true, returns ranges for which n is an i-root for 0<i * <=r * @return the range of keys, or null if range could not be determined for the * given node and rank */ public IdRange range(NodeHandle n, int r, Id key, boolean cumulative) { rice.pastry.Id pKey = (rice.pastry.Id) key; if (cumulative) { return getLeafSet().range((rice.pastry.NodeHandle) n, r); } rice.pastry.IdRange ccw = getLeafSet().range((rice.pastry.NodeHandle) n, r, false); rice.pastry.IdRange cw = getLeafSet().range((rice.pastry.NodeHandle) n, r, true); if (cw == null || ccw.contains(pKey) || pKey.isBetween(cw.getCW(), ccw.getCCW())) { return ccw; } else { return cw; } } /** * This method provides information about ranges of keys for which the node n * is currently a r-root. The operations returns null if the range could not * be determined. It is an error to query the range of a node not present in * the neighbor set as returned by the update upcall or the neighborSet call. * Some implementations may have multiple, disjoint ranges of keys for which a * given node is responsible (Pastry has two). The parameter key allows the * caller to specify which range should be returned. If the node referenced by * n is the r-root for key, then the resulting range includes key. Otherwise, * the result is the nearest range clockwise from key for which n is * responsible. * * @param n nodeHandle of the node whose range is being queried * @param r the rank * @param key the key * @return the range of keys, or null if range could not be determined for the * given node and rank */ public IdRange range(NodeHandle n, int r, Id key) { return range(n, r, key, false); } // Upcall to Application support /** * DESCRIBE THE METHOD * * @param msg DESCRIBE THE PARAMETER */ public final void messageForAppl(rice.pastry.messaging.Message msg) { if (logger.level <= Logger.FINER) { logger.log( "[" + thePastryNode + "] deliver " + msg + " from " + msg.getSenderId()); } if (msg instanceof PastryEndpointMessage) { // null for now, when RouteMessage stuff is completed, then it will be different! application.deliver(null, ((PastryEndpointMessage) msg).getMessage()); } else { if (logger.level <= Logger.WARNING) { logger.log( "Received unknown message " + msg + " - dropping on floor"); } } } /** * DESCRIBE THE METHOD * * @param msg DESCRIBE THE PARAMETER * @param key DESCRIBE THE PARAMETER * @param nextHop DESCRIBE THE PARAMETER * @param opt DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ public final boolean enrouteMessage(Message msg, Id key, NodeHandle nextHop, SendOptions opt) { if (msg instanceof RouteMessage) { if (logger.level <= Logger.FINER) { logger.log( "[" + thePastryNode + "] forward " + msg); } return application.forward((RouteMessage) msg); } else { return true; } } /** * DESCRIBE THE METHOD * * @param nh DESCRIBE THE PARAMETER * @param wasAdded DESCRIBE THE PARAMETER */ public void leafSetChange(rice.pastry.NodeHandle nh, boolean wasAdded) { application.update(nh, wasAdded); } // PastryAppl support /** * Called by pastry to deliver a message to this client. Not to be overridden. * * @param msg the message that is arriving. */ public void receiveMessage(rice.pastry.messaging.Message msg) { if (logger.level <= Logger.FINER) { logger.log( "[" + thePastryNode + "] recv " + msg); } if (msg instanceof rice.pastry.routing.RouteMessage) { try { rice.pastry.routing.RouteMessage rm = (rice.pastry.routing.RouteMessage) msg; // call application if (logger.level <= Logger.FINER) { logger.log( "[" + thePastryNode + "] forward " + msg); } if (application.forward(rm)) { if (rm.nextHop != null) { rice.pastry.NodeHandle nextHop = rm.nextHop; // if the message is for the local node, deliver it here if (getNodeId().equals(nextHop.getNodeId())) { PastryEndpointMessage pMsg = (PastryEndpointMessage) rm.unwrap(deserializer); if (logger.level <= Logger.FINER) { logger.log( "[" + thePastryNode + "] deliver " + pMsg + " from " + pMsg.getSenderId()); } application.deliver(rm.getTarget(), pMsg.getMessage()); } else { // route the message rm.routeMessage((rice.pastry.NodeHandle) getLocalNodeHandle()); } } } } catch (IOException ioe) { if (logger.level <= Logger.SEVERE) { logger.logException(this.toString(), ioe); } } } else { // if the message is not a RouteMessage, then it is for the local node and // was sent with a PastryAppl.routeMsgDirect(); we deliver it for backward compatibility messageForAppl(msg); } } /** * Schedules a job for processing on the dedicated processing thread. CPU * intensive jobs, such as encryption, erasure encoding, or bloom filter * creation should never be done in the context of the underlying node's * thread, and should only be done via this method. * * @param task The task to run on the processing thread * @param command The command to return the result to once it's done */ public void process(Executable task, Continuation command) { thePastryNode.process(task, command); } /** * Translate to a pastry.NodeHandle, otherwise, this is a passthrough * function. * * @param handle DESCRIBE THE PARAMETER * @param receiver DESCRIBE THE PARAMETER * @param timeout DESCRIBE THE PARAMETER */ public void connect(NodeHandle handle, AppSocketReceiver receiver, int timeout) { connect((rice.pastry.NodeHandle) handle, receiver, timeout); } /** * DESCRIBE THE METHOD * * @return DESCRIBE THE RETURN VALUE */ public String toString() { return "PastryEndpoint " + application + " " + instance + " " + getAddress(); } /** * DESCRIBE THE METHOD * * @param buf DESCRIBE THE PARAMETER * @param type DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public Id readId(InputBuffer buf, short type) throws IOException { if (type != rice.pastry.Id.TYPE) { throw new IllegalArgumentException("Invalid type:" + type); } return rice.pastry.Id.build(buf); } /** * DESCRIBE THE METHOD * * @param buf DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public NodeHandle readNodeHandle(InputBuffer buf) throws IOException { return thePastryNode.readNodeHandle(buf); } /** * DESCRIBE THE METHOD * * @param buf DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public IdRange readIdRange(InputBuffer buf) throws IOException { return new rice.pastry.IdRange(buf); } /** * DESCRIBE THE METHOD * * @param newHandle DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ public NodeHandle coalesce(NodeHandle newHandle) { return thePastryNode.coalesce((rice.pastry.NodeHandle) newHandle); } /** * DESCRIBE THE METHOD * * @param buf DESCRIBE THE PARAMETER * @param type DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public NodeHandleSet readNodeHandleSet(InputBuffer buf, short type) throws IOException { switch (type) { case NodeSet.TYPE: return new NodeSet(buf, thePastryNode); case RouteSet.TYPE: return new RouteSet(buf, thePastryNode); } throw new IllegalArgumentException("Unknown type: " + type); } /** * DESCRIBE THE CLASS * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ class PEDeserializer implements MessageDeserializer { /** * DESCRIBE THE METHOD * * @param buf DESCRIBE THE PARAMETER * @param type DESCRIBE THE PARAMETER * @param priority DESCRIBE THE PARAMETER * @param sender DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public Message deserialize(InputBuffer buf, short type, byte priority, NodeHandle sender) throws IOException { if (type == PastryEndpointMessage.TYPE) { return new PastryEndpointMessage(getAddress(), buf, appDeserializer, (rice.pastry.NodeHandle) sender); } return null; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -