📄 pastimpl.java
字号:
} //System.out.println("Closing "+socket); pendingSocketTransactions.remove(socket); socket.close(); } }); endpoint.register(); } /** * Gets the Environment attribute of the PastImpl object * * @return The Environment value */ public Environment getEnvironment() { return environment; } /** * Returns of the outstanding messages. This is a DEBUGGING method ONLY! * * @return The list of all the outstanding messages */ public Continuation[] getOutstandingMessages() { return (Continuation[]) outstanding.values().toArray(new Continuation[0]); } /** * Returns the endpoint associated with the Past - ONLY FOR TESTING - DO NOT * USE * * @return The endpoint */ public Endpoint getEndpoint() { return endpoint; } /** * Returns a new uid for a message * * @return A new id */ protected synchronized int getUID() { return id++; } /** * Returns a continuation which will respond to the given message. * * @param msg DESCRIBE THE PARAMETER * @return A new id */ protected Continuation getResponseContinuation(final PastMessage msg) { if (logger.level <= Logger.FINER) { logger.log("Getting the Continuation to respond to the message " + msg); } final ContinuationMessage cmsg = (ContinuationMessage) msg; return new Continuation() { public void receiveResult(Object o) { cmsg.receiveResult(o); endpoint.route(null, cmsg, msg.getSource()); } public void receiveException(Exception e) { cmsg.receiveException(e); endpoint.route(null, cmsg, msg.getSource()); } }; } /** * Do like above, but use a socket * * @param msg * @return */ protected Continuation getFetchResponseContinuation(final PastMessage msg) { final ContinuationMessage cmsg = (ContinuationMessage) msg; return new Continuation() { public void receiveResult(Object o) { cmsg.receiveResult(o); PastContent content = (PastContent) o; if (socketStrategy.sendAlongSocket(SocketStrategy.TYPE_FETCH, content)) { sendViaSocket(msg.getSource(), cmsg, null); } else { endpoint.route(null, cmsg, msg.getSource()); } } public void receiveException(Exception e) { cmsg.receiveException(e); endpoint.route(null, cmsg, msg.getSource()); } }; } /** * Internal method which returns the handles to an object. It first checks to * see if the handles can be determined locally, and if so, returns. * Otherwise, it sends a LookupHandles messsage out to find out the nodes. * * @param id The id to fetch the handles for * @param max The maximum number of handles to return * @param command The command to call with the result (NodeHandle[]) */ protected void getHandles(Id id, int max, Continuation command) { NodeHandleSet set = endpoint.replicaSet(id, max); if (set.size() == max) { command.receiveResult(set); } else { sendRequest(id, new LookupHandlesMessage(getUID(), id, max, getLocalNodeHandle(), id), new StandardContinuation(command) { public void receiveResult(Object o) { NodeHandleSet replicas = (NodeHandleSet) o; // check to make sure we've fetched the correct number of replicas if (endpoint.replicaSet(endpoint.getLocalNodeHandle().getId(), replicationFactor + 1).size() > replicas.size()) { parent.receiveException(new PastException("Only received " + replicas.size() + " replicas - cannot insert as we know about more nodes.")); } else { parent.receiveResult(replicas); } } }); } } /** * get the nodeHandle of the local Past node * * @return the nodehandle */ public NodeHandle getLocalNodeHandle() { return endpoint.getLocalNodeHandle(); } /** * Returns the number of replicas used in this Past * * @return the number of replicas for each object */ public int getReplicationFactor() { return replicationFactor; } // ----- UTILITY METHODS ----- /** * Returns the replica manager for this Past instance. Should *ONLY* be used * for testing. Messing with this will cause unknown behavior. * * @return This Past's replica manager */ public Replication getReplication() { return replicaManager.getReplication(); } /** * Returns this Past's storage manager. Should *ONLY* be used for testing. * Messing with this will cause unknown behavior. * * @return This Past's storage manager. */ public StorageManager getStorageManager() { return storage; } /** * Gets the Instance attribute of the PastImpl object * * @return The Instance value */ public String getInstance() { return instance; } /** * Sets the ContentDeserializer attribute of the PastImpl object * * @param deserializer The new ContentDeserializer value */ public void setContentDeserializer(PastContentDeserializer deserializer) { contentDeserializer = deserializer; } /** * Sets the ContentHandleDeserializer attribute of the PastImpl object * * @param deserializer The new ContentHandleDeserializer value */ public void setContentHandleDeserializer(PastContentHandleDeserializer deserializer) { contentHandleDeserializer = deserializer; } /** * DESCRIBE THE METHOD * * @return DESCRIBE THE RETURN VALUE */ public String toString() { if (endpoint == null) { return super.toString(); } return "PastImpl[" + endpoint.getInstance() + "]"; } // ----- INTERNAL METHODS ----- /** * Internal method which builds the replication manager. Can be overridden by * subclasses. * * @param node The node to base the RM off of * @param instance The instance name to use * @return The replication manager, ready for use */ protected ReplicationManager buildReplicationManager(Node node, String instance) { return new ReplicationManagerImpl(node, this, replicationFactor, instance); } /** * DESCRIBE THE METHOD * * @param handle DESCRIBE THE PARAMETER * @param m DESCRIBE THE PARAMETER * @param c DESCRIBE THE PARAMETER */ private void sendViaSocket(final NodeHandle handle, final PastMessage m, final Continuation c) { if (c != null) { CancellableTask timer = endpoint.scheduleMessage(new MessageLostMessage(m.getUID(), getLocalNodeHandle(), null, m, handle), MESSAGE_TIMEOUT); insertPending(m.getUID(), timer, c); } // create a bb[] to be written SimpleOutputBuffer sob = new SimpleOutputBuffer(); try { sob.writeInt(0); // place holder for size... sob.writeShort(m.getType()); m.serialize(sob); } catch (IOException ioe) { if (c != null) { c.receiveException(ioe); } } // add the size back to the beginning... int size = sob.getWritten() - 4; // remove the size of the size :) if (logger.level <= Logger.FINER) { logger.log("Sending size of " + size + " to " + handle + " to send " + m); } byte[] bytes = sob.getBytes(); MathUtils.intToByteArray(size, bytes, 0); // prepare the bytes for writing final ByteBuffer[] bb = new ByteBuffer[1]; bb[0] = ByteBuffer.wrap(bytes, 0, sob.getWritten()); // the whole thing if (logger.level <= Logger.FINE) { logger.log("Opening socket to " + handle + " to send " + m); } endpoint.connect(handle, new AppSocketReceiver() { public void receiveSocket(AppSocket socket) { if (logger.level <= Logger.FINER) { logger.log("Opened socket to " + handle + ":" + socket + " to send " + m); } socket.register(false, true, 10000, this); } public void receiveSelectResult(AppSocket socket, boolean canRead, boolean canWrite) { if (logger.level <= Logger.FINEST) { logger.log("Writing to " + handle + ":" + socket + " to send " + m); } try {// ByteBuffer[] outs = new ByteBuffer[1];// ByteBuffer out = ByteBuffer.wrap(endpoint.getLocalNodeHandle().getId().toByteArray());// outs[0] = out;// socket.write(outs, 0, 1); socket.write(bb, 0, 1); } catch (IOException ioe) { if (c != null) { c.receiveException(ioe); } else if (logger.level <= Logger.WARNING) { logger.logException("Error sending " + m, ioe); } return; // don't continue to try to send } if (bb[0].remaining() > 0) { socket.register(false, true, 10000, this); } else { socket.close(); } } public void receiveException(AppSocket socket, Exception e) { if (c != null) { c.receiveException(e); } } }, 10000); } /** * Sends a request message across the wire, and stores the appropriate * continuation. * * @param id The destination id * @param message The message to send. * @param command The command to run once a result is received */ protected void sendRequest(Id id, PastMessage message, Continuation command) { sendRequest(id, message, null, command); } /** * Sends a request message across the wire, and stores the appropriate * continuation. * * @param handle The node handle to send directly too * @param message The message to send. * @param command The command to run once a result is received */ protected void sendRequest(NodeHandle handle, PastMessage message, Continuation command) { sendRequest(null, message, handle, command); } /** * Sends a request message across the wire, and stores the appropriate * continuation. Sends the message using the provided handle as a hint. * * @param id The destination id * @param message The message to send. * @param command The command to run once a result is received * @param hint DESCRIBE THE PARAMETER */ protected void sendRequest(Id id, PastMessage message, NodeHandle hint, Continuation command) { if (logger.level <= Logger.FINER) { logger.log("Sending request message " + message + " {" + message.getUID() + "} to id " + id + " via " + hint); } CancellableTask timer = endpoint.scheduleMessage(new MessageLostMessage(message.getUID(), getLocalNodeHandle(), id, message, hint), MESSAGE_TIMEOUT); insertPending(message.getUID(), timer, command); endpoint.route(id, message, hint); } /** * Loads the provided continuation into the pending table * * @param uid The id of the message * @param command The continuation to run * @param timer DESCRIBE THE PARAMETER */ private void insertPending(int uid, CancellableTask timer, Continuation command) { if (logger.level <= Logger.FINER) { logger.log("Loading continuation " + uid + " into pending table"); } timers.put(new Integer(uid), timer); outstanding.put(new Integer(uid), command); } /** * Removes and returns the provided continuation from the pending table * * @param uid The id of the message * @return The continuation to run */ private Continuation removePending(int uid) { if (logger.level <= Logger.FINER) { logger.log("Removing and returning continuation " + uid + " from pending table"); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -