📄 socketpastrynodefactory.java
字号:
* blocks until a response is received, and then closes the socket and returns * the response. * * @param address The address to send to * @param message The message to send * @return The response * @exception IOException DESCRIBE THE EXCEPTION */ protected Message getResponse(InetSocketAddress address, Message message) throws IOException { // create reader and writer SocketChannelWriter writer; SocketChannelReader reader; writer = new SocketChannelWriter(environment, SourceRoute.build(new EpochInetSocketAddress(address, EpochInetSocketAddress.EPOCH_UNKNOWN))); reader = new SocketChannelReader(environment, SourceRoute.build(new EpochInetSocketAddress(address, EpochInetSocketAddress.EPOCH_UNKNOWN))); // bind to the appropriate port SocketChannel channel = SocketChannel.open(); channel.configureBlocking(true); channel.socket().connect(address, 20000); channel.socket().setSoTimeout(20000); writer.enqueue(TOTAL_HEADER); writer.enqueue(message); writer.write(channel); SocketBuffer o = null; while (o == null) { o = reader.read(channel); } if (logger.level <= Logger.FINER) { logger.log("SPNF.getResponse(): Closing " + channel); } channel.socket().shutdownOutput(); channel.socket().close(); channel.close(); if (logger.level <= Logger.FINER) { logger.log("SPNF.getResponse(): Closed " + channel); } return o.deserialize(deserializer); } /** * Gets the Response attribute of the SocketPastryNodeFactory object * * @param address DESCRIBE THE PARAMETER * @param message DESCRIBE THE PARAMETER * @param c DESCRIBE THE PARAMETER * @return The Response value */ protected CancellableTask getResponse(final InetSocketAddress address, final Message message, final Continuation c) { // create reader and writer final SocketChannelWriter writer; final SocketChannelReader reader; writer = new SocketChannelWriter(environment, SourceRoute.build(new EpochInetSocketAddress(address, EpochInetSocketAddress.EPOCH_UNKNOWN))); reader = new SocketChannelReader(environment, SourceRoute.build(new EpochInetSocketAddress(address, EpochInetSocketAddress.EPOCH_UNKNOWN))); writer.enqueue(TOTAL_HEADER); try { writer.enqueue(message); } catch (IOException ioe) { c.receiveException(ioe); return null; } // bind to the appropriate port try { final SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); final SelectionKey key = environment.getSelectorManager().register( channel, new SelectionKeyHandler() { public void connect(SelectionKey key) { if (logger.level <= Logger.FINE) { logger.log("SPNF.getResponse(" + address + "," + message + ").connect()"); } try { if (channel.finishConnect()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); } if (logger.level <= Logger.FINE) { logger.log("(SPNF) Found connectable channel - completed connection"); } // channel.socket().connect(address, 20000); // channel.socket().setSoTimeout(20000); } catch (IOException ioe) { handleException(ioe); } } public void read(SelectionKey key) { if (logger.level <= Logger.FINE) { logger.log("SPNF.getResponse(" + address + "," + message + ").read()"); } try { SocketBuffer o = null; while (o == null) { o = reader.read(channel); } channel.socket().close(); channel.close(); key.cancel(); c.receiveResult(o.deserialize(deserializer)); } catch (IOException ioe) { handleException(ioe); } } public void write(SelectionKey key) { if (logger.level <= Logger.FINE) { logger.log("SPNF.getResponse(" + address + "," + message + ").write()"); } try { if (writer.write(channel)) { key.interestOps(SelectionKey.OP_READ); } } catch (IOException ioe) { handleException(ioe); } } public void handleException(Exception e) { try { channel.socket().close(); channel.close(); channel.keyFor(environment.getSelectorManager().getSelector()) .cancel(); } catch (IOException ioe) { if (logger.level <= Logger.WARNING) { logger.logException("Error while trying requesting " + message + " from " + address, e); } } finally { c.receiveException(e); } } }, 0); if (logger.level <= Logger.FINE) { logger.log("(SPNF) Initiating socket connection to address " + address); } if (channel.connect(address)) { key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); } else { key.interestOps(SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE | SelectionKey.OP_READ); } return new CancellableTask() { public void run() { } public boolean cancel() { environment.getSelectorManager().invoke( new Runnable() { public void run() { try { synchronized (key) { channel.socket().close(); channel.close();// if (logger.level <= Logger.WARNING) {// if (!environment.getSelectorManager().isSelectorThread()) {// logger.logException("WARNING: cancelling key:"+key+" on the wrong thread.", new Exception("Stack Trace"));// }// } key.cancel(); }// return true; } catch (Exception ioe) { if (logger.level <= Logger.WARNING) { logger.logException("Error cancelling task.", ioe); }// return false; } } }); return true; } public long scheduledExecutionTime() { return 0; } }; } catch (IOException ioe) { c.receiveException(ioe); return null; } } /** * Method which constructs an InetSocketAddres for the local host with the * specifed port number. * * @param portNumber The port number to create the address at. * @param epoch DESCRIBE THE PARAMETER * @return An InetSocketAddress at the localhost with port portNumber. */ private EpochInetSocketAddress getEpochAddress(int portNumber, long epoch) { EpochInetSocketAddress result = null; result = new EpochInetSocketAddress(new InetSocketAddress(localAddress, portNumber), epoch); return result; } /** * Way to generate a NodeHandle with a maximum timeout to receive the result. * Helper funciton for using the non-blocking version. However this method * behaves as a blocking call. * * @param address * @param timeout maximum time in millis to return the result. <= 0 will use * the blocking version. * @return */ public NodeHandle generateNodeHandle(InetSocketAddress address, int timeout) { if (timeout <= 0) { return generateNodeHandle(address); } TimerContinuation c = new TimerContinuation(); CancellableTask task = generateNodeHandle(address, c); if (task == null) { return null; } synchronized (c) { try { c.wait(timeout); } catch (InterruptedException ie) { return null; } } task.cancel(); if (logger.level <= Logger.FINER) { logger.log("SPNF.generateNodeHandle() returning " + c.ret + " after trying to contact " + address); } return (NodeHandle) c.ret; } /** * Method which contructs a node handle (using the socket protocol) for the * node at address NodeHandle. * * @param address The address of the remote node. * @return A NodeHandle cooresponding to that address */ public NodeHandle generateNodeHandle(InetSocketAddress address) { // send nodeId request to remote node, wait for response // allocate enought bytes to read a node handle if (logger.level <= Logger.FINE) { logger.log("Socket: Contacting bootstrap node " + address); } try { NodeIdResponseMessage rm = (NodeIdResponseMessage) getResponse(address, new NodeIdRequestMessage()); return new SocketNodeHandle(new EpochInetSocketAddress(address, rm.getEpoch()), rm.getNodeId()); } catch (IOException e) { if (logger.level <= Logger.FINE) { logger.logException("Error connecting to address " + address + ": ", e); } else { if (logger.level <= Logger.WARNING) { logger.log("Error connecting to address " + address + ": " + e); } } return null; } } /** * DESCRIBE THE METHOD * * @param address DESCRIBE THE PARAMETER * @param c DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ public CancellableTask generateNodeHandle(final InetSocketAddress address, final Continuation c) { if (logger.level <= Logger.FINE) { logger.log("Socket: Contacting bootstrap node " + address); } return getResponse(address, new NodeIdRequestMessage(), new Continuation() { public void receiveResult(Object result) { NodeIdResponseMessage rm = (NodeIdResponseMessage) result; c.receiveResult(new SocketNodeHandle(new EpochInetSocketAddress( address, rm.getEpoch()), rm.getNodeId())); } public void receiveException(Exception result) { if (logger.level <= Logger.WARNING) { logger.log("Error connecting to address " + address + ": " + result); } c.receiveException(result); } }); } /** * Method which creates a Pastry node from the next port with a randomly * generated NodeId. * * @param bootstrap Node handle to bootstrap from. * @return A node with a random ID and next port number. */ public PastryNode newNode(NodeHandle bootstrap) { // if (bootstrap == null) { // return newNode(bootstrap, NodeId.buildNodeId()); // } return newNode(bootstrap, nidFactory.generateNodeId()); } /** * Method which creates a Pastry node from the next port with a randomly
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -