⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 netagent.java

📁 sea是一个基于seda模式的实现。这个设计模式将系统分为很多stage。每个stage分布不同的任务(基于线程池)。通过任务流的方式提高系统的效率。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
					log.fatal(e1);				}			}		}	}	/**	 * Cleanly shut the agent down, releasing acquired resources.	 */	public void stop() {		if (this.shutdown) return; // nothing to do anymore		this.shutdown = true;		this.selector.wakeup(); // break out of blocking select() within doSelect() of start()	}		/** 	 * Returns a summary string representation of the receiver.	 */	public String toDebugString() {		String s = this.getClass().getName() + ": ";		Selector sel = this.selector;		if (sel != null) {			synchronized (outerLock) { // sync up with start() and select()				sel.wakeup(); // ensure we can acquire the lock below to safely access selector state				synchronized (innerLock) {					s += "writeQueues=" + this.writeQueues;					s += ", observerStages=" + this.observerStages;					s += "\nselector=" + NioUtil.toString(sel);				}			}		}		s += "\n\nreadBufferPool=" + this.getReadBufferPool().toString();		return s;	}	/**	 * Handle events in the selector thread (and ONLY in the selector thread)	 * Otherwise the selector methods may block and deadlock us! Takes user	 * level events from the given queue (without ever blocking), and processes	 * them.	 */	protected void doEvents() {		try {			Object event;			this.pendingEvents.put(this.innerLock); // terminator flag prevents potentially infinite race loops			while ((event = this.pendingEvents.poll(0)) != null && event != this.innerLock) {				this.doEvent(event);				this.nopLoops = 0;			}		//			while ((event = this.pendingEvents.poll(0)) != null) {//				this.doEvent(event);//				this.nopLoops = 0;//			}		} catch (InterruptedException e) {			log.warn("interrupted", e);		}	}	/**	 * Handle given event in the selector thread (and ONLY in the selector thread)	 * 	 * @param event	 *            the event to handle.	 */	protected void doEvent(Object event) {		if (log.isTraceEnabled()) log.trace("doEvent=" + event);		// shut the agent down		if (event instanceof AdminRequest.Stop) {			this.stop();		}				// close a channel		else if (event instanceof ChannelRequest.Close) {			ChannelRequest.Close req = (ChannelRequest.Close) event;			//this.observerStages.put(req.getChannel(), req.getSource());			this.onClose(req.getChannel().keyFor(this.selector));		}				// schedule writing data to a channel		else if (event instanceof ChannelRequest.WriteData) {			ChannelRequest.WriteData req = (ChannelRequest.WriteData) event;			this.onWriteRequest(req.getChannel().keyFor(this.selector), req.getBuffer());		}				// register event interest in a channel		else if (event instanceof ChannelRequest.Register) {			ChannelRequest.Register req = (ChannelRequest.Register) event;			this.observerStages.put(req.getChannel(), req.getSource());			Object attachment = req.hasAttachment() ? req.getAttachment() : null;			this.onRegisterSelectorInterest(req.getChannel(), req.getInterestOps(), attachment, req.hasAttachment());		}				// OOPSLA		else throw new IllegalEventException("Illegal event enqueued", event, null);	}	/**	 * Wait for I/O signals from OS, then dispatch them via selection key.	 * Override for custom behaviour.	 * 	 * @param selector	 *            the IO multiplexer interface to the OS	 */	protected void doSelect(Selector selector) throws IOException	{			// block until OS ready event or wakeup() or timeout or thread interruption occurs				int 	numUpdatedReadyKeys = selector.select(this.selectTimeout);				boolean isFineTraceEnabled = false;		if (isFineTraceEnabled) {			log.error("num="+numUpdatedReadyKeys + ", readySet.size="+selector.selectedKeys().size() + ", keyset.size="+selector.keys().size()); 			//log.trace("selector=" + NioUtil.toString(selector));			//log.trace("channels=" + this.getRegisteredChannels());			//log.trace("selKeys="+selector.selectedKeys());			//log.trace("selector="+selector);		}		if (numUpdatedReadyKeys == 0) { // nothing to do			this.nopLoops++;			if (this.nopLoops > 100) { // FIXME TODO				log.fatal("no operation loop detected; unnecessarily eating CPU");				log.error("num="+numUpdatedReadyKeys + ", readySet.size="+selector.selectedKeys().size() + ", keyset.size="+selector.keys().size()); 				this.nopLoops = 0;			}			//return;		}		Iterator iter = selector.selectedKeys().iterator();		while (iter.hasNext()) {			SelectionKey key = (SelectionKey) iter.next();						if (key.isValid()) {				onKeyReady(key);			}			iter.remove(); // remove key from readySet !!			if (!key.isValid()) {				this.onKeyInvalidation(key);			}		}	}	protected void onKeyInvalidation(SelectionKey key) {		if (log.isTraceEnabled()) {			log.trace("******** Garbage collecting write buffers for invalid key=" + NioUtil.toString(key));		}		this.writeQueues.remove(key);		this.observerStages.remove(key.channel());	}		/**	 * Handle a selection key that has become ready.	 * 	 * @param key	 *            the selection key that has become ready.	 * @return true if the key should be removed from the selector's ready-set,	 *         false otherwise.	 */	protected void onKeyReady(SelectionKey key) {		// potentially handle multiple ready ops together		if (log.isTraceEnabled()) log.trace("onKeyReady: key=" + NioUtil.toString(key));		if (key.isValid() && key.isWritable()) {			this.onWriteReady(key);		}		if (key.isValid() && key.isReadable()) {			this.onReadReady(key);		}		if (key.isValid() && key.isAcceptable()) { 			this.onAcceptReady(key);		}		if (key.isValid() && key.isConnectable()) {			this.onConnectReady(key);		}	}			/**	 * Handle an accept-ready signal selected from OS, for examply by accepting	 * the new connection.	 * 	 * @param key	 *            the selection key that has become ready.	 */	protected void onAcceptReady(SelectionKey key) {		try {			if (log.isDebugEnabled()) 				log.debug("Accepting a new channel on server channel=" + key.channel());						ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();			SocketChannel clientChannel = serverChannel.accept();			if (clientChannel == null) {				log.error("null channel accepted (should never happen)");			}			else {				synchronized (this.hostAuthorizer) {					if (! this.hostAuthorizer.isAllowed(clientChannel.socket().getInetAddress())) 	{						if (log.isWarnEnabled()) log.warn("Accept authorization denied to " + clientChannel.socket().getInetAddress());						clientChannel.close();						return;					}				}				this.socketOptions.copyTo(clientChannel.socket());				clientChannel.configureBlocking(false);				SelectionKey newKey = clientChannel.register(this.selector, 0);								if (this.observerStages.get(serverChannel) != null) {					// Initially the client socket inherits its observer stage from the server socket.					// It will stay that way until a ChannelRequest.Register					// for the client socket is later received.					this.observerStages.put(clientChannel, this.observerStages.get(serverChannel));				}				newKey.attach(key.attachment()); // initially inherited from server channel								this.onAcceptDone(newKey, null);			}		} catch (IOException e) {			this.onAcceptDone(key, e);		}	}		/**	 * Called when an accept-ready signal has been successfully processed.	 * Override this method for custom behaviour (e.g forwarding to another	 * stage).	 * 	 * @param key	 *            the selection key that has become ready.	 */	protected void onAcceptDone(SelectionKey key, IOException exception) {		this.onException(exception);		if (log.isTraceEnabled()) {			if (key == null) 				log.warn("Oopsla, new null channel accepted!");			else				log.trace("Accepted new channel=" + key.channel());						}		ChannelResponse.Accepted event = new ChannelResponse.Accepted(this, key, exception);		this.notifyObservers(key, event);	}	/**	 * Handle a connect-ready signal selected from OS, for example by completing	 * to connect.	 * 	 * @param key	 *            the selection key that has become ready.	 */	protected void onConnectReady(SelectionKey key) {		if (log.isTraceEnabled()) log.trace("now connecting channel with key=" + NioUtil.toString(key));		SocketChannel channel = (SocketChannel) key.channel();		try {			long start = System.currentTimeMillis();			if (channel.finishConnect()) {				long end = System.currentTimeMillis();				log.debug("finishConnect took [ms] = " + (end-start));				this.socketOptions.copyTo(channel.socket());				if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_CONNECT);				this.onConnectDone(key, null);			}			else { // connection process is not yet complete				;  // wait for completion			}		} catch (IOException e) {			if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_CONNECT);			this.onConnectDone(key, e);			this.onClose(key, e);		}	}			/**	 * Called when a connect-ready signal has been successfully processed.	 * Override this method for custom behaviour (e.g forwarding to another	 * stage).	 * 	 * @param key	 *            the selection key that has become ready.	 */	protected void onConnectDone(SelectionKey key, IOException exception) {		this.onException(exception);		if (exception == null && log.isTraceEnabled()) {			log.trace("Connected to channel=" + key.channel());		}		ChannelResponse.Connected event = new ChannelResponse.Connected(this, key, exception);		this.notifyObservers(key, event);	}		/**	 * Closes the given channel.	 * @param channel the channel to close.	 */	protected void onClose(SelectionKey key) {		this.onClose(key, null);	}		protected void onClose(SelectionKey key, IOException reason) {		if (log.isTraceEnabled())			log.trace("onClose with key=" + (key==null ? "null" : NioUtil.toString(key)) + ", reason=" + reason);				if (key == null) return; // ignore		try {			if (key.isValid()) key.interestOps(0);			key.cancel();			Channel channel = key.channel();			if (channel.isOpen()) {				if (log.isTraceEnabled()) log.trace("Closing channel=" + channel);				channel.close();				if (channel instanceof SocketChannel) {					((SocketChannel) channel).socket().close();					//((SocketChannel) channel).socket().shutdownOutput();					//((SocketChannel) channel).socket().shutdownInput();				}				if (channel instanceof ServerSocketChannel) {					// Even with this conservative code, on MacOSX, the server socket is					// sometimes not closed properly (probably yet another vm bug).					// On Linux it seems to work fine.					((ServerSocketChannel) channel).socket().close();				}				if (channel instanceof DatagramChannel) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -