📄 socketappsocket.java
字号:
*/ public synchronized void modifyKey(SelectionKey key) { int flag = 0; if (reader != null) { flag |= SelectionKey.OP_READ; } if (writer != null) { flag |= SelectionKey.OP_WRITE; } if (toWrite != null) { flag |= SelectionKey.OP_WRITE; } if (connectResult == CONNECTION_UNKNOWN) { flag |= SelectionKey.OP_READ; } //System.out.println(this+"modifyKey()"+flag); key.interestOps(flag); } /** * Specified by the SelectionKeyHandler interface - calling this tells this * socket manager that the connection has completed and we can now read/write. * * @param key The key which is connectable. */ public void connect(SelectionKey key) { try { // deregister interest in connecting to this socket if (channel.finishConnect()) { key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); } if (manager.logger.level <= Logger.FINE) { manager.logger.log("(SM) Found connectable channel - completed connection"); } } catch (Exception e) { if (manager.logger.level <= Logger.FINE) { manager.logger.logException( "(SM) Unable to connect to " + this, e); } exceptionAndClose(e); } } /** * Reads from the socket attached to this connector. * * @param key The selection key for this manager */ public void read(SelectionKey key) { //System.out.println(this+"Reading"); if (connectResult == CONNECTION_UNKNOWN) { try { clearTimer(receiver); manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key); ByteBuffer answer = ByteBuffer.allocate(1); ((SocketChannel) key.channel()).read(answer); answer.clear(); connectResult = answer.get(); //System.out.println(this+"Read "+connectResult); switch (connectResult) { case CONNECTION_OK: receiver.receiveSocket(this); // on connector side return; case CONNECTION_NO_APP: exceptionAndClose(new AppNotRegisteredException()); return; case CONNECTION_NO_ACCEPTOR: exceptionAndClose(new NoReceiverAvailableException()); return; default: exceptionAndClose(new AppSocketException("Unknown error " + connectResult)); return; } } catch (IOException ioe) { exceptionAndClose(ioe); } return; } AppSocketReceiver temp = reader; clearTimer(reader); reader = null; temp.receiveSelectResult(this, true, false); manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key); } /** * DESCRIBE THE METHOD * * @param millis DESCRIBE THE PARAMETER * @param theReceiver DESCRIBE THE PARAMETER */ private void startTimer(int millis, final AppSocketReceiver theReceiver) { if (millis <= 0) { return; } clearTimer(theReceiver); TimerTask timer = new TimerTask() { public void run() { timers.remove(theReceiver); theReceiver.receiveException(SocketAppSocket.this, new TimeoutException()); close(); } };// System.out.println("startTimer()"+timer);// new Exception("stack trace").printStackTrace(System.out); timers.put(theReceiver, timer); manager.pastryNode.getEnvironment().getSelectorManager().getTimer().schedule(timer, millis); } /** * DESCRIBE THE METHOD * * @param theReceiver DESCRIBE THE PARAMETER */ private void clearTimer(AppSocketReceiver theReceiver) { TimerTask timer = (TimerTask) timers.remove(theReceiver);// System.out.println("Clearing "+timer); if (timer == null) { return; } timer.cancel(); timer = null; } /** * DESCRIBE THE METHOD * * @param e DESCRIBE THE PARAMETER */ private void exceptionAndClose(Exception e) { clearTimer(receiver); receiver.receiveException(SocketAppSocket.this, e); close(); } /** * Writes to the socket attached to this socket manager. * * @param key The selection key for this manager */ public synchronized void write(SelectionKey key) { if (toWrite != null) { try { // System.out.println(this+"SocketAppSocket.wroteHeader."+toWrite.remaining()); ((SocketChannel) key.channel()).write(toWrite); } catch (IOException ioe) { exceptionAndClose(ioe); } if (!toWrite.hasRemaining()) { toWrite = null; key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);// receiver.receiveSocket(this); moved to read } return; } AppSocketReceiver temp = writer; clearTimer(writer); writer = null; temp.receiveSelectResult(this, false, true); manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key); } /** * Accepts a new connection on the given key * * @param key DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ protected void acceptConnection(SelectionKey key) throws IOException { //System.out.println("accept connection"); connectResult = CONNECTION_OK; this.channel = (SocketChannel) key.channel(); this.key = manager.pastryNode.getEnvironment().getSelectorManager().register(key.channel(), this, 0); // lookup acceptor toWrite = ByteBuffer.allocate(1); try { manager.pastryNode.acceptAppSocket(this, appId); toWrite.put(CONNECTION_OK); } catch (AppNotRegisteredException anre) { if (manager.logger.level <= Logger.WARNING) { manager.logger.logException("Sending error to connecter " + channel + " ", anre); } toWrite.put(CONNECTION_NO_APP); } catch (NoReceiverAvailableException nrae) { if (manager.logger.level <= Logger.WARNING) { manager.logger.logException("Sending error to connecter " + channel + " ", nrae); } toWrite.put(CONNECTION_NO_ACCEPTOR); } catch (AppSocketException ase) { if (manager.logger.level <= Logger.WARNING) { manager.logger.logException("Sending error to connecter " + channel + " ", ase); } toWrite.put(CONNECTION_UNKNOWN_ERROR); } toWrite.clear(); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); if (manager.logger.level <= Logger.FINE) { manager.logger.log( "(SM) Accepted app connection from " + channel.socket().getRemoteSocketAddress()); } } /** * Creates the outgoing socket to the remote handle * * @param path DESCRIBE THE PARAMETER * @exception IOException DESCRIBE THE EXCEPTION */ protected void createConnection(final SourceRoute path) throws IOException { this.channel = SocketChannel.open(); this.channel.socket().setSendBufferSize(manager.SOCKET_BUFFER_SIZE); this.channel.socket().setReceiveBufferSize(manager.SOCKET_BUFFER_SIZE); this.channel.configureBlocking(false); this.key = manager.pastryNode.getEnvironment().getSelectorManager().register(channel, this, 0); if (manager.logger.level <= Logger.FINE) { manager.logger.log("(SM) Initiating socket connection to path " + path); } manager.pastryNode.broadcastChannelOpened(path.getFirstHop().getAddress(), NetworkListener.REASON_APP_SOCKET_NORMAL); if (this.channel.connect(path.getFirstHop().getAddress())) { this.key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); } else { this.key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ | SelectionKey.OP_CONNECT); } } /** * DESCRIBE THE METHOD * * @param dsts DESCRIBE THE PARAMETER * @param offset DESCRIBE THE PARAMETER * @param length DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { //System.out.println(this+"read"); return channel.read(dsts, offset, length); } /** * DESCRIBE THE METHOD * * @param srcs DESCRIBE THE PARAMETER * @param offset DESCRIBE THE PARAMETER * @param length DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { //System.out.println(this+"write("+srcs.length+","+offset+","+length+")"); return channel.write(srcs, offset, length); } /** * DESCRIBE THE METHOD * * @param wantToRead DESCRIBE THE PARAMETER * @param wantToWrite DESCRIBE THE PARAMETER * @param timeout DESCRIBE THE PARAMETER * @param receiver DESCRIBE THE PARAMETER */ public void register(boolean wantToRead, boolean wantToWrite, int timeout, AppSocketReceiver receiver) { if (wantToRead) { reader = receiver; } if (wantToWrite) { writer = receiver; } startTimer(timeout, receiver); manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -