📄 socketcollectionmanager.java
字号:
} channel2.close(); channel2 = null; } sourceRouteClosed(this); } catch (IOException e) { if (logger.level <= Logger.WARNING) { logger.log("ERROR: Recevied exception " + e + " while closing intermediateSourceRoute!"); } } } /** * Specified by the SelectionKeyHandler interface - calling this tells this * socket manager that the connection has completed and we can now * read/write. * * @param key The key which is connectable. */ public void connect(SelectionKey key) { if (logger.level <= Logger.FINE) { logger.log("(SRM) " + this + " connecting to key " + (key.channel() == channel1 ? "1" : "2")); } try { // deregister interest in connecting to this socket if (((SocketChannel) key.channel()).finishConnect()) { removeInterestOp(key.channel(), SelectionKey.OP_CONNECT); } if (logger.level <= Logger.FINE) { logger.log("(SRM) Found connectable source route channel - completed connection"); } } catch (IOException e) { if (logger.level <= Logger.WARNING) { logger.log("(SRM) Got exception " + e + " on connect - killing off source route"); } close(); } } /** * Reads from the socket attached to this connector. * * @param key The selection key for this manager */ public void read(SelectionKey key) { String k = (key.channel() == channel1 ? "1" : "2"); if (logger.level <= Logger.FINE) { logger.log("(SRM) " + this + " reading from key " + k + " " + key.interestOps()); } try { try { if (repeater.read((SocketChannel) key.channel())) { addInterestOp(otherChannel(key.channel()), SelectionKey.OP_WRITE); removeInterestOp(key.channel(), SelectionKey.OP_READ); } if (logger.level <= Logger.FINE) { logger.log("(SRM) " + this + " done reading from key " + k); } } catch (ClosedChannelException e) { if (logger.level <= Logger.FINE) { logger.log("(SRM) " + this + " reading from key " + k + " returned -1 - processing shutdown"); } // then determine if the sockets are now completely shut down, // or if only half is now closed if (otherChannel(key.channel()).socket().isInputShutdown()) { if (logger.level <= Logger.FINE) { logger.log("(SRM) " + this + " other key is shut down - closing"); } close(); } else { // first, deregister in reading and writing to the appropriate sockets ((SocketChannel) key.channel()).socket().shutdownInput(); removeInterestOp(key.channel(), SelectionKey.OP_READ); removeInterestOp(otherChannel(key.channel()), SelectionKey.OP_WRITE); if (logger.level <= Logger.FINE) { logger.log("(SRM) " + this + " other key not yet closed - shutting it down"); } shutdown(otherChannel(key.channel())); } } } catch (IOException e) { if (logger.level <= Logger.FINE) { logger.logException( "(SRM) ERROR " + e + " reading source route - cancelling.", e); } close(); } } /** * Writes to the socket attached to this socket manager. * * @param key The selection key for this manager */ public synchronized void write(SelectionKey key) { String k = (key.channel() == channel1 ? "1" : "2"); if (logger.level <= Logger.FINER) { logger.log("(SRM) " + this + " writing to key " + k + " " + key.interestOps()); } try { if (repeater.write((SocketChannel) key.channel())) { addInterestOp(otherChannel(key.channel()), SelectionKey.OP_READ); removeInterestOp(key.channel(), SelectionKey.OP_WRITE); } if (logger.level <= Logger.FINER) { logger.log("(SRM) " + this + " done writing to key " + k); } } catch (IOException e) { if (logger.level <= Logger.WARNING) { logger.log("ERROR " + e + " writing source route - cancelling."); } close(); } } /** * Accepts a new connection on the given key * * @param key DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ protected void acceptConnection(SelectionKey key) throws IOException { if (logger.level <= Logger.FINE) { logger.log("(SRM) " + this + " accepted connection for key 1 as " + ((SocketChannel) key.channel()).socket().getRemoteSocketAddress()); } if (logger.level <= Logger.FINE) { logger.log("(SRM) Accepted source route connection from " + ((SocketChannel) key.channel()).socket().getRemoteSocketAddress()); } pastryNode.getEnvironment().getSelectorManager().register(key.channel(), this, SelectionKey.OP_READ); this.channel1 = (SocketChannel) key.channel(); } /** * Creates the outgoing socket to the remote handle * * @param address The accress to connect to * @exception IOException DESCRIBE THE EXCEPTION */ protected void createConnection(final EpochInetSocketAddress address) throws IOException { if (logger.level <= Logger.FINE) { logger.log("(SRM) " + this + " creating connection for key 2 as " + address.getAddress()); } channel2 = SocketChannel.open(); channel2.socket().setSendBufferSize(SOCKET_BUFFER_SIZE); channel2.socket().setReceiveBufferSize(SOCKET_BUFFER_SIZE); channel2.configureBlocking(false); if (logger.level <= Logger.FINE) { logger.log("(SRM) " + "Initiating source route connection to " + address); } pastryNode.broadcastChannelOpened(address.address, NetworkListener.REASON_SR); boolean done = channel2.connect(address.getAddress()); if (done) { pastryNode.getEnvironment().getSelectorManager().register(channel2, this, SelectionKey.OP_READ); } else { pastryNode.getEnvironment().getSelectorManager().register(channel2, this, SelectionKey.OP_READ | SelectionKey.OP_CONNECT); } if (logger.level <= Logger.FINE) { logger.log("(SRM) " + this + " setting initial ops to " + SelectionKey.OP_READ + " for key 2"); } } /** * DESCRIBE THE METHOD * * @return DESCRIBE THE RETURN VALUE */ public String toString() { String s1 = null; if (channel1 != null) { if (channel1.socket() != null) { if (channel1.socket().getRemoteSocketAddress() != null) { s1 = channel1.socket().getRemoteSocketAddress().toString(); } else { s1 = channel1.socket().toString(); } } else { s1 = channel1.toString(); } } String s2 = null; if (channel2 != null) { if (channel2.socket() != null) { if (channel2.socket().getRemoteSocketAddress() != null) { s2 = channel2.socket().getRemoteSocketAddress().toString(); } else { s2 = channel2.socket().toString(); } } else { s2 = channel2.toString(); } } return "SourceRouteManager " + s1 + " to " + s2; } } /** * Internal class which reads the greeting message off of a newly-accepted * socket. This class determines whether this is a normal connection or a * source-route request, and then hands the connection off to the appropriate * handler (SocketManager or SourceRouteManager). * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ protected class SocketAccepter extends SelectionKeyHandler { // the key to read from private SelectionKey key; // the buffer used to read the header private ByteBuffer buffer; /** * Private method which is designed to examine the newly read buffer and * handoff the connection to the approriate handler * * @exception IOException DESCRIBE THE EXCEPTION */ ByteBuffer appTypeBuffer = null; byte[] array = null; /** * Constructor which accepts an incoming connection, represented by the * selection key. This constructor builds a new * IntermediateSourceRouteManager, and waits until the greeting message is * read from the other end. Once the greeting is received, the manager makes * sure that a socket for this handle is not already open, and then proceeds * as normal. * * @param key The server accepting key for the channel * @exception IOException DESCRIBE THE EXCEPTION */ public SocketAccepter(SelectionKey key) throws IOException { this.buffer = ByteBuffer.allocateDirect(TOTAL_HEADER_SIZE); acceptConnection(key); } /** * Method which closes down this socket manager, by closing the socket, * cancelling the key and setting the key to be interested in nothing */ public void close() { try { if (key != null) { key.channel().close(); key.cancel(); key.attach(null); key = null; } } catch (IOException e) { if (logger.level <= Logger.WARNING) { logger.log("(SA) " + "ERROR: Recevied exception " + e + " while closing just accepted socket!"); } } } /** * Accepts a new connection on the given key * * @param serverKey The server socket key * @exception IOException DESCRIBE THE EXCEPTION */ protected void acceptConnection(SelectionKey serverKey) throws IOException { final SocketChannel channel = (SocketChannel) ((ServerSocketChannel) serverKey.channel()).accept(); channel.socket().setSendBufferSize(SOCKET_BUFFER_SIZE); channel.socket().setReceiveBufferSize(SOCKET_BUFFER_SIZE); channel.configureBlocking(false); if (logger.level <= Logger.FINE) { logger.log("(SA) " + "Accepted incoming connection from " + channel.socket().getRemoteSocketAddress()); } pastryNode.broadcastChannelOpened((InetSocketAddress) channel.socket().getRemoteSocketAddress(), NetworkListener.REASON_ACC_NORMAL); key = pastryNode.getEnvironment().getSelectorManager().register(channel, this, SelectionKey.OP_READ); } /** * Reads from the socket attached to this connector. * * @param key The selection key for this manager */ public void read(SelectionKey key) { try { int read = ((SocketChannel) key.channel()).read(buffer); if (logger.level <= Logger.FINE) { logger.log("(SA)1 Read " + read + " bytes from newly accepted connection."); } // implies that the channel is closed if (read == -1) { throw new IOException("Error on read - the channel has been closed."); } // this could be a problem if a socket is opened and nothing, or not enough is being written if (buffer.remaining() == 0) { processBuffer(); } } catch (IOException e) { if (logger.level <= Logger.FINE) { logger.log("(SA) ERROR " + e + " reading source route - cancelling."); } close(); } } /** * DESCRIBE THE METHOD * * @exception IOException DESCRIBE THE EXCEPTION */ private void processBuffer() throws IOException { // NOTE: this is kind of a funky hack, the reason is that it is possible to // read the header without reading the appId bytes. So, this code makes // read/processBuffer just keep being called until both arrive // we don't want to touch buffer once we construct appTypeBuffer, and we // return until it finishes reading if (appTypeBuffer == null) { // flip the buffer buffer.flip(); array = new byte[HEADER_SIZE]; buffer.get(array, 0, HEADER_SIZE); if (!Arrays.equals(array, PASTRY_MAGIC_NUMBER)) { throw new IOException("Not a pastry socket:" + array[0] + "," + array[1] + "," + array[2] + "," + array[3]); } buffer.get(array, 0, HEADER_SIZE); int version = MathUtils.byteArrayToInt(array); if (!(version == 0)) { throw new IOException("Unknown Version:" + version); } // allocate space for the header buffer.get(array, 0, HEADER_SIZE); appTypeBuffer = ByteBuffer.allocateDirect(4); } // verify the buffer if (Arrays.equals(array, HEADER_DIRECT)) { int read = ((SocketChannel) key.channel()).read(appTypeBuffer); if (logger.level <= Logger.FINE) { logger.log("(SA)2 Read " + read + " bytes from newly accepted connection."); } if (appTypeBuffer.hasRemaining()) { return; } appTypeBuffer.flip(); byte[] appIDbytes = new byte[4]; appTypeBuffer.get(appIDbytes, 0, 4); int appId = MathUtils.byteArrayToInt(appIDbytes);// if (logger.level <= Logger.FINE) logger.log("Found connection with AppId "+appId); // TODO: make this level FINE when done if (appId == 0) { unIdentifiedSM.add(new SocketManager(SocketCollectionManager.this, key)); } else { if (logger.level <= Logger.FINE) { logger.log("Found connection with AppId " + appId); } appSockets.add(new SocketAppSocket(SocketCollectionManager.this, key, appId)); } } else if (Arrays.equals(array, HEADER_SOURCE_ROUTE)) { new SourceRouteManager(key); } else { if (logger.level <= Logger.WARNING) { logger.log("ERROR: Improperly formatted header received accepted connection - ignoring."); } if (logger.level <= Logger.WARNING) { logger.log("READ " + array[0] + " " + array[1] + " " + array[2] + " " + array[3]); } throw new IOException("Improperly formatted header received - unknown header."); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -