📄 socketappsocket.java
字号:
/*************************************************************************"FreePastry" Peer-to-Peer Application Development Substrate Copyright 2002, Rice University. All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:- Redistributions of source code must retain the above copyrightnotice, this list of conditions and the following disclaimer.- Redistributions in binary form must reproduce the above copyrightnotice, this list of conditions and the following disclaimer in thedocumentation and/or other materials provided with the distribution.- Neither the name of Rice University (RICE) nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.This software is provided by RICE and the contributors on an "as is"basis, without any representations or warranties of any kind, expressor implied including, but not limited to, representations orwarranties of non-infringement, merchantability or fitness for aparticular purpose. In no event shall RICE or contributors be liablefor any direct, indirect, incidental, special, exemplary, orconsequential damages (including, but not limited to, procurement ofsubstitute goods or services; loss of use, data, or profits; orbusiness interruption) however caused and on any theory of liability,whether in contract, strict liability, or tort (including negligenceor otherwise) arising in any way out of the use of this software, evenif advised of the possibility of such damage.********************************************************************************/package rice.pastry.socket;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.*;import rice.environment.logging.Logger;import rice.p2p.commonapi.*;import rice.p2p.commonapi.appsocket.*;import rice.p2p.commonapi.exception.*;import rice.p2p.util.MathUtils;import rice.pastry.NetworkListener;import rice.pastry.messaging.Message;import rice.pastry.socket.messaging.*;import rice.selector.*;import rice.selector.TimerTask;/** * Private class which is tasked with reading the greeting message off of a * newly connected socket. This greeting message says who the socket is coming * from, and allows the connected to hand the socket off the appropriate node * handle. * * @version $Id: SocketCollectionManager.java 3061 2006-02-14 00:56:04Z jeffh $ * @author jeffh */class SocketAppSocket extends SelectionKeyHandler implements AppSocket { /** * The manager. */ private final SocketCollectionManager manager; // the key to read from /** * DESCRIBE THE FIELD */ protected SelectionKey key; // the channel we are associated with /** * DESCRIBE THE FIELD */ protected SocketChannel channel; /** * DESCRIBE THE FIELD */ protected AppSocketReceiver receiver, reader, writer; ByteBuffer toWrite; byte connectResult = CONNECTION_UNKNOWN; int appId; /** * AppSocketReceiver -> Timer */ Hashtable timers = new Hashtable(); /** * DESCRIBE THE FIELD */ public final static byte CONNECTION_UNKNOWN_ERROR = -1; /** * DESCRIBE THE FIELD */ public final static byte CONNECTION_UNKNOWN = -100; /** * DESCRIBE THE FIELD */ public final static byte CONNECTION_OK = 0; /** * DESCRIBE THE FIELD */ public final static byte CONNECTION_NO_APP = 1; /** * DESCRIBE THE FIELD */ public final static byte CONNECTION_NO_ACCEPTOR = 2; /** * Constructor which accepts an incoming connection, represented by the * selection key. This constructor builds a new SocketManager, and waits until * the greeting message is read from the other end. Once the greeting is * received, the manager makes sure that a socket for this handle is not * already open, and then proceeds as normal. * * @param key The server accepting key for the channel * @param manager TODO * @param appId DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ public SocketAppSocket(SocketCollectionManager manager, SelectionKey key, int appId) throws IOException { this.appId = appId; this.manager = manager; acceptConnection(key); } /** * Constructor which creates an outgoing connection to the given node handle * using the provided address as a source route intermediate node. This * creates the connection by building the socket and sending accross the * greeting message. Once the response greeting message is received, * everything proceeds as normal. * * @param manager TODO * @param path DESCRIBE THE PARAMETER * @param appId DESCRIBE THE PARAMETER * @param receiver DESCRIBE THE PARAMETER * @param timeout DESCRIBE THE PARAMETER * @exception IOException An error */ public SocketAppSocket(SocketCollectionManager manager, SourceRoute path, int appId, AppSocketReceiver receiver, int timeout) throws IOException { this.appId = appId; this.receiver = receiver; this.manager = manager; if (manager.logger.level <= Logger.FINE) { manager.logger.log("Opening connection with path " + path); } // this is a bit lazy, and could be optimized, but I'm just going to do the exact thing that a SocketManager does, but then aggregate // them into a big byte[]// ArrayList tempList = new ArrayList(); SocketBuffer sb = new SocketBuffer(path, appId);// int sizeToAllocate = 0;// Iterator i = tempList.iterator();// while(i.hasNext()) {// byte[] next = (byte[])i.next();// sizeToAllocate+=next.length;// }// byte[] toWriteBytes = new byte[sizeToAllocate];// int ptr = 0;// i = tempList.iterator();// while(i.hasNext()) {// byte[] next = (byte[])i.next();// System.arraycopy(next, 0, toWriteBytes, ptr, next.length);// ptr+=next.length;// } toWrite = sb.getBuffer(); //ByteBuffer.wrap(toWriteBytes); // build the entire connection startTimer(timeout, receiver); createConnection(path); } /** * DESCRIBE THE METHOD * * @return DESCRIBE THE RETURN VALUE */ public String toString() { return "SAS{" + appId + "}" + channel; } /** * Method which initiates a shutdown of this socket by calling * shutdownOutput(). This has the effect of removing the manager from the open * list. */ public void shutdownOutput() { try { if (manager.logger.level <= Logger.FINE) { manager.logger.log("Shutting down output on app connection " + this); } if (channel != null) { channel.socket().shutdownOutput(); } else if (manager.logger.level <= Logger.SEVERE) { manager.logger.log("ERROR: Unable to shutdown output on channel; channel is null!"); } manager.appSocketClosed(this); manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key); } catch (IOException e) { if (manager.logger.level <= Logger.SEVERE) { manager.logger.log("ERROR: Received exception " + e + " while shutting down output."); } close(); } } /** * Method which closes down this socket manager, by closing the socket, * cancelling the key and setting the key to be interested in nothing */ public void close() { try {// System.out.println("SocketAppSocket.close()"); if (manager.logger.level <= Logger.FINE) { manager.logger.log("Closing connection with " + this); } fireAllTimers(); if (manager.pastryNode != null) { manager.pastryNode.broadcastChannelClosed((InetSocketAddress) channel.socket().getRemoteSocketAddress()); } if (key != null) { if (manager.logger.level <= Logger.WARNING) { if (!manager.pastryNode.getEnvironment().getSelectorManager().isSelectorThread()) { manager.logger.logException("WARNING: cancelling key:" + key + " on the wrong thread.", new Exception("Stack Trace")); } } key.cancel(); key.attach(null); key = null; } manager.unIdentifiedSM.remove(this); if (channel != null) { channel.close(); } manager.appSocketClosed(this); } catch (IOException e) { if (manager.logger.level <= Logger.SEVERE) { manager.logger.log("ERROR: Recevied exception " + e + " while closing socket!"); } } } /** * DESCRIBE THE METHOD */ private void fireAllTimers() { Iterator i = timers.keySet().iterator(); while (i.hasNext()) { AppSocketReceiver rec = (AppSocketReceiver) i.next(); rec.receiveException(this, new TimeoutException()); TimerTask timer = (TimerTask) timers.get(rec); timer.cancel(); System.out.println("fired" + timer); i.remove(); } } /** * Method which should change the interestOps of the handler's key. This * method should *ONLY* be called by the selection thread in the context of a * select(). * * @param key The key in question
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -