📄 socketmanager.java
字号:
/*************************************************************************"FreePastry" Peer-to-Peer Application Development Substrate Copyright 2002, Rice University. All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:- Redistributions of source code must retain the above copyrightnotice, this list of conditions and the following disclaimer.- Redistributions in binary form must reproduce the above copyrightnotice, this list of conditions and the following disclaimer in thedocumentation and/or other materials provided with the distribution.- Neither the name of Rice University (RICE) nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.This software is provided by RICE and the contributors on an "as is"basis, without any representations or warranties of any kind, expressor implied including, but not limited to, representations orwarranties of non-infringement, merchantability or fitness for aparticular purpose. In no event shall RICE or contributors be liablefor any direct, indirect, incidental, special, exemplary, orconsequential damages (including, but not limited to, procurement ofsubstitute goods or services; loss of use, data, or profits; orbusiness interruption) however caused and on any theory of liability,whether in contract, strict liability, or tort (including negligenceor otherwise) arising in any way out of the use of this software, evenif advised of the possibility of such damage.********************************************************************************/package rice.pastry.socket;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.*;import java.util.*;import rice.environment.logging.Logger;import rice.p2p.commonapi.*;import rice.p2p.commonapi.rawserialization.*;import rice.pastry.*;import rice.pastry.messaging.*;import rice.pastry.messaging.Message;import rice.pastry.socket.messaging.*;import rice.selector.SelectionKeyHandler;import rice.pastry.NodeHandle;/** * Private class which is tasked with reading the greeting message off of a * newly connected socket. This greeting message says who the socket is coming * from, and allows the connected to hand the socket off the appropriate node * handle. * * @version $Id: SocketCollectionManager.java 3061 2006-02-14 00:56:04Z jeffh $ * @author jeffh */class SocketManager extends SelectionKeyHandler { /** */ private final SocketCollectionManager manager; // the key to read from /** * DESCRIBE THE FIELD */ protected SelectionKey key; // the channel we are associated with /** * DESCRIBE THE FIELD */ protected SocketChannel channel; // the reader reading data off of the stream /** * DESCRIBE THE FIELD */ protected SocketChannelReader reader; // the writer (in case it is necessary) /** * DESCRIBE THE FIELD */ protected SocketChannelWriter writer; // the timer we use to check for stalled nodes /** * DESCRIBE THE FIELD */ protected rice.selector.TimerTask timer; // the node handle we're talking to /** * DESCRIBE THE FIELD */ protected SourceRoute path; // whether or not this is a bootstrap socket - if so, we fake the address // and die once the the message has been sent /** * DESCRIBE THE FIELD */ protected boolean bootstrap; MessageDeserializer deserializer = new SMDeserializer(); /** * Constructor which accepts an incoming connection, represented by the * selection key. This constructor builds a new SocketManager, and waits until * the greeting message is read from the other end. Once the greeting is * received, the manager makes sure that a socket for this handle is not * already open, and then proceeds as normal. * * @param key The server accepting key for the channel * @param manager TODO * @exception IOException DESCRIBE THE EXCEPTION */ public SocketManager(SocketCollectionManager manager, SelectionKey key) throws IOException { this.manager = manager; this.reader = new SocketChannelReader(manager.pastryNode, null); this.writer = new SocketChannelWriter(manager.pastryNode, null); this.bootstrap = false; acceptConnection(key); } /** * Constructor which creates an outgoing connection to the given node handle * using the provided address as a source route intermediate node. This * creates the connection by building the socket and sending accross the * greeting message. Once the response greeting message is received, * everything proceeds as normal. * * @param manager TODO * @param path DESCRIBE THE PARAMETER * @param bootstrap DESCRIBE THE PARAMETER * @exception IOException An error */ public SocketManager(SocketCollectionManager manager, SourceRoute path, boolean bootstrap) throws IOException { this.manager = manager; this.reader = new SocketChannelReader(manager.pastryNode, path.reverse()); this.writer = new SocketChannelWriter(manager.pastryNode, path); this.bootstrap = bootstrap; if (manager.logger.level <= Logger.FINE) { manager.logger.log("Opening connection with path " + path); } // build the entire connection createConnection(path);// ArrayList tempList = new ArrayList();// for (int i=1; i<path.getNumHops(); i++) {// tempList.add(SocketCollectionManager.HEADER_SOURCE_ROUTE);// tempList.add(SocketChannelRepeater.encodeHeader(path.getHop(i)));// }// tempList.add(SocketCollectionManager.HEADER_DIRECT);// tempList.add(new byte[4]);//// int sizeToAllocate = 0;// Iterator i = tempList.iterator();// while(i.hasNext()) {// byte[] next = (byte[])i.next();// sizeToAllocate+=next.length;// }//// byte[] toWriteBytes = new byte[sizeToAllocate];// int ptr = 0;// i = tempList.iterator();// while(i.hasNext()) {// byte[] next = (byte[])i.next();// System.arraycopy(next, 0, toWriteBytes, ptr, next.length);// ptr+=next.length;// } send(new SocketBuffer(path, 0));// for (int i=1; i<path.getNumHops(); i++) {// send(SocketCollectionManager.HEADER_SOURCE_ROUTE);// send(SocketChannelRepeater.encodeHeader(path.getHop(i)));// }//// send(SocketCollectionManager.HEADER_DIRECT);// send(new byte[4]); if (!bootstrap) { send(new SocketBuffer(path.reverse(manager.localAddress))); } } /** * Internal method which sets the internal timer */ protected void setTimer() { if (this.timer == null) { this.timer = new rice.selector.TimerTask() { public void run() { if (manager.logger.level <= Logger.FINE) { SocketManager.this.manager.logger.log("WRITE_TIMER::Timer expired, checking liveness..."); } SocketManager.this.manager.manager.checkLiveness(path.getLastHop()); } }; manager.pastryNode.getEnvironment().getSelectorManager().schedule(this.timer, manager.WRITE_WAIT_TIME); } } /** * DESCRIBE THE METHOD * * @return DESCRIBE THE RETURN VALUE */ public String toString() { return "SM " + channel; } /** * Method which initiates a shutdown of this socket by calling * shutdownOutput(). This has the effect of removing the manager from the open * list. */ public void shutdown() { try { if (manager.logger.level <= Logger.FINE) { manager.logger.log("Shutting down output on connection with path " + path); } if (channel != null) { channel.socket().shutdownOutput(); } else if (manager.logger.level <= Logger.SEVERE) { manager.logger.log("ERROR: Unable to shutdown output on channel; channel is null!"); } manager.socketClosed(path, this); manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key); } catch (IOException e) { if (manager.logger.level <= Logger.SEVERE) { manager.logger.log("ERROR: Received exception " + e + " while shutting down output."); } close(); } } /** * Method which closes down this socket manager, by closing the socket, * cancelling the key and setting the key to be interested in nothing */ public void close() { try { if (manager.logger.level <= Logger.FINE) { if (path != null) { manager.logger.log("Closing connection with path " + path); } else { manager.logger.log("Closing connection to " + (InetSocketAddress) channel.socket().getRemoteSocketAddress()); } } // todo, need to monitor all openings, sourceroute, accepted, etc. if (manager.pastryNode != null) { manager.pastryNode.broadcastChannelClosed((InetSocketAddress) channel.socket().getRemoteSocketAddress()); } clearTimer(); if (key != null) { if (manager.logger.level <= Logger.WARNING) { if (!manager.pastryNode.getEnvironment().getSelectorManager().isSelectorThread()) { manager.logger.logException("WARNING: cancelling key:" + key + " on the wrong thread.", new Exception("Stack Trace")); } } key.cancel(); key.attach(null); key = null; } manager.unIdentifiedSM.remove(this); if (channel != null) { channel.close(); } if (path != null) { manager.socketClosed(path, this); Iterator i = writer.getQueue().iterator(); writer.reset(); /** * Here, if we have not been declared dead, then we attempt to resend * the messages. However, if we have been declared dead, we reroute the * route messages via the pastry node, but delete any messages routed * directly. */ while (i.hasNext()) { Object o = i.next(); if ((o instanceof Message) && (manager.manager != null)) { manager.manager.reroute(path.getLastHop(), (SocketBuffer) o); } } path = null; } } catch (IOException e) { if (manager.logger.level <= Logger.SEVERE) { manager.logger.log("ERROR: Recevied exception " + e + " while closing socket!"); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -