⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 netagent.java

📁 sea是一个基于seda模式的实现。这个设计模式将系统分为很多stage。每个stage分布不同的任务(基于线程池)。通过任务流的方式提高系统的效率。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
					((DatagramChannel) channel).socket().close();					((DatagramChannel) channel).socket().disconnect();				}			}			this.onCloseDone(key, reason);		} catch (IOException e) {			if (log.isErrorEnabled()) log.error("closing itself threw exception", e);			this.onCloseDone(key, e);		}	}		/**	 * Called when a close signal has been successfully processed.	 * Override this method for custom behaviour (e.g forwarding to another stage).	 * 	 * @param key the selection key that has become ready.	 */	protected void onCloseDone(SelectionKey key, IOException exception) {		this.onException(exception);		if (log.isTraceEnabled()) log.trace("Closed channel=" + key.channel());		ChannelResponse.Closed event = new ChannelResponse.Closed(this, key, exception);		this.notifyObservers(key, event);	}		/**	 * Handle a read-ready signal selected from OS, for example by reading bytes	 * from the key's channel.	 * 	 * @param key	 *            the selection key that has become ready.	 */	protected void onReadReady(SelectionKey key) {		ReadableByteChannel channel = (ReadableByteChannel) key.channel();		//ByteBuffer readBuffer = ByteBuffer.allocate(this.readBufferPool.minBufferCapacity);		ByteBuffer readBuffer = this.readBufferPool.take();				int n;		try {			n = NioUtil.readMany(channel, readBuffer);		} catch (IOException e) {			if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_READ);			readBuffer.flip(); // prepare for user reads			this.onReadDone(key, readBuffer, e);			this.onClose(key, e);			return;		}		boolean eos = false;		if (n < 0) {			eos = true;			n = -(n + 1);		}				if (n > 0) {			readBuffer.flip(); // prepare for user reads			this.onReadDone(key, readBuffer, null);		}		else { // assert: n == 0, buffer does not get handed to user, so reuse immediately 			this.readBufferPool.put(readBuffer);		}				if (eos) {  			log.debug("Reached end-of-stream; remote host seems to have closed or lost connection");			if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_READ);			IOException e = new IOException("Agent reached EOS");			this.onReadDone(key, ByteBuffer.allocate(0), e);			this.onClose(key, e);					}	}		/**	 * Called when a read-ready signal has been successfully processed.	 * 	 * @param key the selection key that has become ready.	 */	protected void onReadDone(SelectionKey key, ByteBuffer buffer, IOException exception) {		this.onException(exception);		if (exception == null && log.isTraceEnabled()) {			log.trace("Read " + buffer.position() + " bytes from channel=" + key.channel() + ", into buffer=" + buffer);		}		ChannelResponse.Read event = new ChannelResponse.Read(this, key, exception, buffer);		this.notifyObservers(key, event);	}		/**	 * Schedules the given buffer to be written to the key's channel once it	 * becomes write-ready. Call this method repeatadly to schedule multiple	 * buffers for later writing. As usual with NIO buffers, the buffer contents	 * between index 0 and buffer.limit() are written in relative mode. After	 * invocation of this method, you MUST NOT modify the buffer in user space	 * until onWriteDone with buffer.hasRemaining()==false is called back. Once	 * that is called back you MAY again modify and/or reuse the buffer (e.g.	 * with a buffer pool).	 * 	 * @param key	 *            the selection key to write to.	 * @param buffer	 *            the buffer to read from.	 */	protected void onWriteRequest(SelectionKey key, ByteBuffer buffer) {		if (key == null || !key.isValid()) return; // ignore		if (log.isTraceEnabled()) {			log.trace("adding write request to key" + NioUtil.toString(key));		}		List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer>		if (buffersToWrite == null) {			buffersToWrite = new LinkedList(); // linked list for efficiency			this.writeQueues.put(key, buffersToWrite); 		}		buffersToWrite.add(buffer);				if (key.isValid()) NioUtil.addInterestBits(key, SelectionKey.OP_WRITE);		//if (buffersToWrite.size() == 1) {		this.onWriteReady(key); // optimization: try to write immediately without waiting for write ready to bubble up from selector		//}	}		/**	 * Handle a write-ready signal selected from OS, for examply by writing	 * bytes to the key's channel.	 * 	 * @param key	 *            the selection key that has become ready.	 */	protected void onWriteReady(SelectionKey key) {		List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer>		if (buffersToWrite == null) { 			log.warn("Nothing to write - really should not happen");			return;		}		WritableByteChannel channel = (WritableByteChannel) key.channel();		// try to write as many buffers as possible		while (buffersToWrite.size() > 0) {			ByteBuffer buffer = (ByteBuffer) buffersToWrite.get(0);			if (! buffer.hasRemaining()) { // notify empty buffer write (correctly handle pathological case)!				buffersToWrite.remove(0);				this.onWriteDone(key, buffer, 0, null);				continue;			}			int n;			try {				n = NioUtil.writeMany(channel, buffer);			} catch (IOException e) {				if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_WRITE);				this.onWriteDone(key, buffer, 0, e);									this.onClose(key, e);				return;			}			if (n == 0) {				// apparently not much can be written right now.				// wait for next write-ready signal and then resume writing				break;			}			else {				if (buffer.hasRemaining()) { // just a little could be written					break;				}				else {					buffersToWrite.remove(0); // remove fully written buffer					this.onWriteDone(key, buffer, n, null);				}			}		}				// nothing more to write? if so deregister WRITE interest		if (buffersToWrite.size() == 0 && key.isValid()) {			NioUtil.removeInterestBits(key, SelectionKey.OP_WRITE);		}	}	/**	 * Called when N bytes have been successfully written from the given buffer	 * to the key's channel. There MAY still be remaining bytes in the buffer	 * waiting to be written in the future. In such a "partial write" case	 * <code>buffer.hasRemaining()</code> will return true. 	 * 	 * @param key	 *            the selection key that has become ready.	 * @param buffer	 *            the buffer read from	 * @param n	 *            the number of bytes written	 */	protected void onWriteDone(SelectionKey key, ByteBuffer buffer, int n, IOException exception) {		this.onException(exception);		if (exception == null && log.isTraceEnabled()) {			log.trace("Fully wrote " + n + " bytes to channel=" + key.channel() + ", from buffer=" + buffer);		}		ChannelResponse.Write event = new ChannelResponse.Write(this, key, exception, buffer);		this.notifyObservers(key, event);	}	/**	 * Handle channel interest registration request.	 */	protected void onRegisterSelectorInterest(SelectableChannel channel, int ops, Object attachment, boolean hasAttachment) {		ops = ops & ~SelectionKey.OP_WRITE; // enqueuing a write toggles that automatically		SelectionKey key = channel.keyFor(this.selector);		try {			if (key != null) {				if (hasAttachment) key.attach(attachment);				if (!key.isValid()) throw new ClosedChannelException();				List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer>				if (buffersToWrite != null && buffersToWrite.size() > 0) {					ops = ops | SelectionKey.OP_WRITE; // we still need to write stuff to the network				}				key.interestOps(ops);			}			else {				key = channel.register(this.selector, ops);				if (hasAttachment) key.attach(attachment);			}						this.onRegisterSelectorInterestDone(key, ops, null);		} catch (ClosedChannelException e) {			this.onRegisterSelectorInterestDone(key, ops, e);			if (key != null) {				this.onClose(key, e);			}		}	}			/**	 * Done registering interest 	 */	protected void onRegisterSelectorInterestDone(SelectionKey key, int ops, ClosedChannelException exception) {		this.onException(exception);		if (exception == null && log.isTraceEnabled()) {			log.trace("Registered interest = " + NioUtil.toString(ops) + ", key=" + (key == null ? "null" : NioUtil.toString(key)));		}		ChannelResponse.Registered event = new ChannelResponse.Registered(this, key, exception, ops);		this.notifyObservers(key, event);	}	/**	 * Take care of exception	 * @param exception	 */	protected void onException(IOException exception) {		if (exception != null && log.isTraceEnabled()) 			log.trace("Gracefully forwarding exception = ", exception);	}		/**	 * Cleanly shut the agent down, releasing acquired resources.	 * @throws IOException	 */	protected void doCloseAll() throws IOException {		if (log.isTraceEnabled()) log.trace("doCloseAll");		List channels = this.getRegisteredChannels();					for (int i = 0; i < channels.size(); i++) {			this.onClose(((SelectableChannel) channels.get(i)).keyFor(this.selector));		}				if (log.isTraceEnabled()) log.trace("selector before selector.close()=" + NioUtil.toString(this.selector));		try {			this.selector.close(); 		} catch (IOException e) {			// vm bug on MacOSX & FreeBSD produces BadFileDescriptor exception, see			// http://freepastry.rice.edu/FreePastry/README-1.3.2.html			// http://list.droso.net/15/15756			if (System.getProperty("os.name").startsWith("Mac") || 					System.getProperty("os.name").startsWith("Free") 					&& "Bad file descriptor".equals(e.getMessage())) {				; // ignore			}			else throw e; // rethrow		}		if (log.isTraceEnabled()) log.trace("selector after selector.close()=" + NioUtil.toString(this.selector));		this.selector = null; // help garbage collector		this.pendingEvents = new LinkedQueue();		this.readBufferPool.clear();		this.writeQueues = null;		this.observerStages = null;	}		/**	 * Enqueues the given event onto the observer stage associated with the given channel.	 * 	 * @param key	 * @param event	 */	protected void notifyObservers(SelectionKey key, Object event) {		Channel channel = key.channel();		Object observer = this.observerStages.get(channel);		if (observer instanceof Stage) {			if (log.isTraceEnabled()) log.trace("Agent enqueuing to observer: event="+event+", observerStage=" + observer);			((Stage) observer).enqueue(event);		}		//else {		//	if (log.isTraceEnabled()) log.trace("No observer defined for channel="+channel);		//}	}		/**	 * Returns all selectable channels registered with this agent,	 * excluding channels with invalid keys.	 * 	 * @return the channels	 */	protected List getRegisteredChannels() {		Selector sel = this.selector;		if (sel == null) 			return new ArrayList(0);		else			return NioUtil.getRegisteredChannels(sel);	}	/**	 * Checks if the agent is running in the selector loop, and throws an	 * exception if it is runnning.	 */	protected void checkIsShutdown() {		if (this.shutdown == false) throw new IllegalStateException("must not be invoked on running agent.");	}}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -