📄 netagent.java
字号:
((DatagramChannel) channel).socket().close(); ((DatagramChannel) channel).socket().disconnect(); } } this.onCloseDone(key, reason); } catch (IOException e) { if (log.isErrorEnabled()) log.error("closing itself threw exception", e); this.onCloseDone(key, e); } } /** * Called when a close signal has been successfully processed. * Override this method for custom behaviour (e.g forwarding to another stage). * * @param key the selection key that has become ready. */ protected void onCloseDone(SelectionKey key, IOException exception) { this.onException(exception); if (log.isTraceEnabled()) log.trace("Closed channel=" + key.channel()); ChannelResponse.Closed event = new ChannelResponse.Closed(this, key, exception); this.notifyObservers(key, event); } /** * Handle a read-ready signal selected from OS, for example by reading bytes * from the key's channel. * * @param key * the selection key that has become ready. */ protected void onReadReady(SelectionKey key) { ReadableByteChannel channel = (ReadableByteChannel) key.channel(); //ByteBuffer readBuffer = ByteBuffer.allocate(this.readBufferPool.minBufferCapacity); ByteBuffer readBuffer = this.readBufferPool.take(); int n; try { n = NioUtil.readMany(channel, readBuffer); } catch (IOException e) { if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_READ); readBuffer.flip(); // prepare for user reads this.onReadDone(key, readBuffer, e); this.onClose(key, e); return; } boolean eos = false; if (n < 0) { eos = true; n = -(n + 1); } if (n > 0) { readBuffer.flip(); // prepare for user reads this.onReadDone(key, readBuffer, null); } else { // assert: n == 0, buffer does not get handed to user, so reuse immediately this.readBufferPool.put(readBuffer); } if (eos) { log.debug("Reached end-of-stream; remote host seems to have closed or lost connection"); if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_READ); IOException e = new IOException("Agent reached EOS"); this.onReadDone(key, ByteBuffer.allocate(0), e); this.onClose(key, e); } } /** * Called when a read-ready signal has been successfully processed. * * @param key the selection key that has become ready. */ protected void onReadDone(SelectionKey key, ByteBuffer buffer, IOException exception) { this.onException(exception); if (exception == null && log.isTraceEnabled()) { log.trace("Read " + buffer.position() + " bytes from channel=" + key.channel() + ", into buffer=" + buffer); } ChannelResponse.Read event = new ChannelResponse.Read(this, key, exception, buffer); this.notifyObservers(key, event); } /** * Schedules the given buffer to be written to the key's channel once it * becomes write-ready. Call this method repeatadly to schedule multiple * buffers for later writing. As usual with NIO buffers, the buffer contents * between index 0 and buffer.limit() are written in relative mode. After * invocation of this method, you MUST NOT modify the buffer in user space * until onWriteDone with buffer.hasRemaining()==false is called back. Once * that is called back you MAY again modify and/or reuse the buffer (e.g. * with a buffer pool). * * @param key * the selection key to write to. * @param buffer * the buffer to read from. */ protected void onWriteRequest(SelectionKey key, ByteBuffer buffer) { if (key == null || !key.isValid()) return; // ignore if (log.isTraceEnabled()) { log.trace("adding write request to key" + NioUtil.toString(key)); } List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer> if (buffersToWrite == null) { buffersToWrite = new LinkedList(); // linked list for efficiency this.writeQueues.put(key, buffersToWrite); } buffersToWrite.add(buffer); if (key.isValid()) NioUtil.addInterestBits(key, SelectionKey.OP_WRITE); //if (buffersToWrite.size() == 1) { this.onWriteReady(key); // optimization: try to write immediately without waiting for write ready to bubble up from selector //} } /** * Handle a write-ready signal selected from OS, for examply by writing * bytes to the key's channel. * * @param key * the selection key that has become ready. */ protected void onWriteReady(SelectionKey key) { List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer> if (buffersToWrite == null) { log.warn("Nothing to write - really should not happen"); return; } WritableByteChannel channel = (WritableByteChannel) key.channel(); // try to write as many buffers as possible while (buffersToWrite.size() > 0) { ByteBuffer buffer = (ByteBuffer) buffersToWrite.get(0); if (! buffer.hasRemaining()) { // notify empty buffer write (correctly handle pathological case)! buffersToWrite.remove(0); this.onWriteDone(key, buffer, 0, null); continue; } int n; try { n = NioUtil.writeMany(channel, buffer); } catch (IOException e) { if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_WRITE); this.onWriteDone(key, buffer, 0, e); this.onClose(key, e); return; } if (n == 0) { // apparently not much can be written right now. // wait for next write-ready signal and then resume writing break; } else { if (buffer.hasRemaining()) { // just a little could be written break; } else { buffersToWrite.remove(0); // remove fully written buffer this.onWriteDone(key, buffer, n, null); } } } // nothing more to write? if so deregister WRITE interest if (buffersToWrite.size() == 0 && key.isValid()) { NioUtil.removeInterestBits(key, SelectionKey.OP_WRITE); } } /** * Called when N bytes have been successfully written from the given buffer * to the key's channel. There MAY still be remaining bytes in the buffer * waiting to be written in the future. In such a "partial write" case * <code>buffer.hasRemaining()</code> will return true. * * @param key * the selection key that has become ready. * @param buffer * the buffer read from * @param n * the number of bytes written */ protected void onWriteDone(SelectionKey key, ByteBuffer buffer, int n, IOException exception) { this.onException(exception); if (exception == null && log.isTraceEnabled()) { log.trace("Fully wrote " + n + " bytes to channel=" + key.channel() + ", from buffer=" + buffer); } ChannelResponse.Write event = new ChannelResponse.Write(this, key, exception, buffer); this.notifyObservers(key, event); } /** * Handle channel interest registration request. */ protected void onRegisterSelectorInterest(SelectableChannel channel, int ops, Object attachment, boolean hasAttachment) { ops = ops & ~SelectionKey.OP_WRITE; // enqueuing a write toggles that automatically SelectionKey key = channel.keyFor(this.selector); try { if (key != null) { if (hasAttachment) key.attach(attachment); if (!key.isValid()) throw new ClosedChannelException(); List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer> if (buffersToWrite != null && buffersToWrite.size() > 0) { ops = ops | SelectionKey.OP_WRITE; // we still need to write stuff to the network } key.interestOps(ops); } else { key = channel.register(this.selector, ops); if (hasAttachment) key.attach(attachment); } this.onRegisterSelectorInterestDone(key, ops, null); } catch (ClosedChannelException e) { this.onRegisterSelectorInterestDone(key, ops, e); if (key != null) { this.onClose(key, e); } } } /** * Done registering interest */ protected void onRegisterSelectorInterestDone(SelectionKey key, int ops, ClosedChannelException exception) { this.onException(exception); if (exception == null && log.isTraceEnabled()) { log.trace("Registered interest = " + NioUtil.toString(ops) + ", key=" + (key == null ? "null" : NioUtil.toString(key))); } ChannelResponse.Registered event = new ChannelResponse.Registered(this, key, exception, ops); this.notifyObservers(key, event); } /** * Take care of exception * @param exception */ protected void onException(IOException exception) { if (exception != null && log.isTraceEnabled()) log.trace("Gracefully forwarding exception = ", exception); } /** * Cleanly shut the agent down, releasing acquired resources. * @throws IOException */ protected void doCloseAll() throws IOException { if (log.isTraceEnabled()) log.trace("doCloseAll"); List channels = this.getRegisteredChannels(); for (int i = 0; i < channels.size(); i++) { this.onClose(((SelectableChannel) channels.get(i)).keyFor(this.selector)); } if (log.isTraceEnabled()) log.trace("selector before selector.close()=" + NioUtil.toString(this.selector)); try { this.selector.close(); } catch (IOException e) { // vm bug on MacOSX & FreeBSD produces BadFileDescriptor exception, see // http://freepastry.rice.edu/FreePastry/README-1.3.2.html // http://list.droso.net/15/15756 if (System.getProperty("os.name").startsWith("Mac") || System.getProperty("os.name").startsWith("Free") && "Bad file descriptor".equals(e.getMessage())) { ; // ignore } else throw e; // rethrow } if (log.isTraceEnabled()) log.trace("selector after selector.close()=" + NioUtil.toString(this.selector)); this.selector = null; // help garbage collector this.pendingEvents = new LinkedQueue(); this.readBufferPool.clear(); this.writeQueues = null; this.observerStages = null; } /** * Enqueues the given event onto the observer stage associated with the given channel. * * @param key * @param event */ protected void notifyObservers(SelectionKey key, Object event) { Channel channel = key.channel(); Object observer = this.observerStages.get(channel); if (observer instanceof Stage) { if (log.isTraceEnabled()) log.trace("Agent enqueuing to observer: event="+event+", observerStage=" + observer); ((Stage) observer).enqueue(event); } //else { // if (log.isTraceEnabled()) log.trace("No observer defined for channel="+channel); //} } /** * Returns all selectable channels registered with this agent, * excluding channels with invalid keys. * * @return the channels */ protected List getRegisteredChannels() { Selector sel = this.selector; if (sel == null) return new ArrayList(0); else return NioUtil.getRegisteredChannels(sel); } /** * Checks if the agent is running in the selector loop, and throws an * exception if it is runnning. */ protected void checkIsShutdown() { if (this.shutdown == false) throw new IllegalStateException("must not be invoked on running agent."); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -