📄 netagent.java
字号:
log.fatal(e1); } } } } /** * Cleanly shut the agent down, releasing acquired resources. */ public void stop() { if (this.shutdown) return; // nothing to do anymore this.shutdown = true; this.selector.wakeup(); // break out of blocking select() within doSelect() of start() } /** * Returns a summary string representation of the receiver. */ public String toDebugString() { String s = this.getClass().getName() + ": "; Selector sel = this.selector; if (sel != null) { synchronized (outerLock) { // sync up with start() and select() sel.wakeup(); // ensure we can acquire the lock below to safely access selector state synchronized (innerLock) { s += "writeQueues=" + this.writeQueues; s += ", observerStages=" + this.observerStages; s += "\nselector=" + NioUtil.toString(sel); } } } s += "\n\nreadBufferPool=" + this.getReadBufferPool().toString(); return s; } /** * Handle events in the selector thread (and ONLY in the selector thread) * Otherwise the selector methods may block and deadlock us! Takes user * level events from the given queue (without ever blocking), and processes * them. */ protected void doEvents() { try { Object event; this.pendingEvents.put(this.innerLock); // terminator flag prevents potentially infinite race loops while ((event = this.pendingEvents.poll(0)) != null && event != this.innerLock) { this.doEvent(event); this.nopLoops = 0; } // while ((event = this.pendingEvents.poll(0)) != null) {// this.doEvent(event);// this.nopLoops = 0;// } } catch (InterruptedException e) { log.warn("interrupted", e); } } /** * Handle given event in the selector thread (and ONLY in the selector thread) * * @param event * the event to handle. */ protected void doEvent(Object event) { if (log.isTraceEnabled()) log.trace("doEvent=" + event); // shut the agent down if (event instanceof AdminRequest.Stop) { this.stop(); } // close a channel else if (event instanceof ChannelRequest.Close) { ChannelRequest.Close req = (ChannelRequest.Close) event; //this.observerStages.put(req.getChannel(), req.getSource()); this.onClose(req.getChannel().keyFor(this.selector)); } // schedule writing data to a channel else if (event instanceof ChannelRequest.WriteData) { ChannelRequest.WriteData req = (ChannelRequest.WriteData) event; this.onWriteRequest(req.getChannel().keyFor(this.selector), req.getBuffer()); } // register event interest in a channel else if (event instanceof ChannelRequest.Register) { ChannelRequest.Register req = (ChannelRequest.Register) event; this.observerStages.put(req.getChannel(), req.getSource()); Object attachment = req.hasAttachment() ? req.getAttachment() : null; this.onRegisterSelectorInterest(req.getChannel(), req.getInterestOps(), attachment, req.hasAttachment()); } // OOPSLA else throw new IllegalEventException("Illegal event enqueued", event, null); } /** * Wait for I/O signals from OS, then dispatch them via selection key. * Override for custom behaviour. * * @param selector * the IO multiplexer interface to the OS */ protected void doSelect(Selector selector) throws IOException { // block until OS ready event or wakeup() or timeout or thread interruption occurs int numUpdatedReadyKeys = selector.select(this.selectTimeout); boolean isFineTraceEnabled = false; if (isFineTraceEnabled) { log.error("num="+numUpdatedReadyKeys + ", readySet.size="+selector.selectedKeys().size() + ", keyset.size="+selector.keys().size()); //log.trace("selector=" + NioUtil.toString(selector)); //log.trace("channels=" + this.getRegisteredChannels()); //log.trace("selKeys="+selector.selectedKeys()); //log.trace("selector="+selector); } if (numUpdatedReadyKeys == 0) { // nothing to do this.nopLoops++; if (this.nopLoops > 100) { // FIXME TODO log.fatal("no operation loop detected; unnecessarily eating CPU"); log.error("num="+numUpdatedReadyKeys + ", readySet.size="+selector.selectedKeys().size() + ", keyset.size="+selector.keys().size()); this.nopLoops = 0; } //return; } Iterator iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = (SelectionKey) iter.next(); if (key.isValid()) { onKeyReady(key); } iter.remove(); // remove key from readySet !! if (!key.isValid()) { this.onKeyInvalidation(key); } } } protected void onKeyInvalidation(SelectionKey key) { if (log.isTraceEnabled()) { log.trace("******** Garbage collecting write buffers for invalid key=" + NioUtil.toString(key)); } this.writeQueues.remove(key); this.observerStages.remove(key.channel()); } /** * Handle a selection key that has become ready. * * @param key * the selection key that has become ready. * @return true if the key should be removed from the selector's ready-set, * false otherwise. */ protected void onKeyReady(SelectionKey key) { // potentially handle multiple ready ops together if (log.isTraceEnabled()) log.trace("onKeyReady: key=" + NioUtil.toString(key)); if (key.isValid() && key.isWritable()) { this.onWriteReady(key); } if (key.isValid() && key.isReadable()) { this.onReadReady(key); } if (key.isValid() && key.isAcceptable()) { this.onAcceptReady(key); } if (key.isValid() && key.isConnectable()) { this.onConnectReady(key); } } /** * Handle an accept-ready signal selected from OS, for examply by accepting * the new connection. * * @param key * the selection key that has become ready. */ protected void onAcceptReady(SelectionKey key) { try { if (log.isDebugEnabled()) log.debug("Accepting a new channel on server channel=" + key.channel()); ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel clientChannel = serverChannel.accept(); if (clientChannel == null) { log.error("null channel accepted (should never happen)"); } else { synchronized (this.hostAuthorizer) { if (! this.hostAuthorizer.isAllowed(clientChannel.socket().getInetAddress())) { if (log.isWarnEnabled()) log.warn("Accept authorization denied to " + clientChannel.socket().getInetAddress()); clientChannel.close(); return; } } this.socketOptions.copyTo(clientChannel.socket()); clientChannel.configureBlocking(false); SelectionKey newKey = clientChannel.register(this.selector, 0); if (this.observerStages.get(serverChannel) != null) { // Initially the client socket inherits its observer stage from the server socket. // It will stay that way until a ChannelRequest.Register // for the client socket is later received. this.observerStages.put(clientChannel, this.observerStages.get(serverChannel)); } newKey.attach(key.attachment()); // initially inherited from server channel this.onAcceptDone(newKey, null); } } catch (IOException e) { this.onAcceptDone(key, e); } } /** * Called when an accept-ready signal has been successfully processed. * Override this method for custom behaviour (e.g forwarding to another * stage). * * @param key * the selection key that has become ready. */ protected void onAcceptDone(SelectionKey key, IOException exception) { this.onException(exception); if (log.isTraceEnabled()) { if (key == null) log.warn("Oopsla, new null channel accepted!"); else log.trace("Accepted new channel=" + key.channel()); } ChannelResponse.Accepted event = new ChannelResponse.Accepted(this, key, exception); this.notifyObservers(key, event); } /** * Handle a connect-ready signal selected from OS, for example by completing * to connect. * * @param key * the selection key that has become ready. */ protected void onConnectReady(SelectionKey key) { if (log.isTraceEnabled()) log.trace("now connecting channel with key=" + NioUtil.toString(key)); SocketChannel channel = (SocketChannel) key.channel(); try { long start = System.currentTimeMillis(); if (channel.finishConnect()) { long end = System.currentTimeMillis(); log.debug("finishConnect took [ms] = " + (end-start)); this.socketOptions.copyTo(channel.socket()); if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_CONNECT); this.onConnectDone(key, null); } else { // connection process is not yet complete ; // wait for completion } } catch (IOException e) { if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_CONNECT); this.onConnectDone(key, e); this.onClose(key, e); } } /** * Called when a connect-ready signal has been successfully processed. * Override this method for custom behaviour (e.g forwarding to another * stage). * * @param key * the selection key that has become ready. */ protected void onConnectDone(SelectionKey key, IOException exception) { this.onException(exception); if (exception == null && log.isTraceEnabled()) { log.trace("Connected to channel=" + key.channel()); } ChannelResponse.Connected event = new ChannelResponse.Connected(this, key, exception); this.notifyObservers(key, event); } /** * Closes the given channel. * @param channel the channel to close. */ protected void onClose(SelectionKey key) { this.onClose(key, null); } protected void onClose(SelectionKey key, IOException reason) { if (log.isTraceEnabled()) log.trace("onClose with key=" + (key==null ? "null" : NioUtil.toString(key)) + ", reason=" + reason); if (key == null) return; // ignore try { if (key.isValid()) key.interestOps(0); key.cancel(); Channel channel = key.channel(); if (channel.isOpen()) { if (log.isTraceEnabled()) log.trace("Closing channel=" + channel); channel.close(); if (channel instanceof SocketChannel) { ((SocketChannel) channel).socket().close(); //((SocketChannel) channel).socket().shutdownOutput(); //((SocketChannel) channel).socket().shutdownInput(); } if (channel instanceof ServerSocketChannel) { // Even with this conservative code, on MacOSX, the server socket is // sometimes not closed properly (probably yet another vm bug). // On Linux it seems to work fine. ((ServerSocketChannel) channel).socket().close(); } if (channel instanceof DatagramChannel) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -