⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 netagent.java

📁 sea是一个基于seda模式的实现。这个设计模式将系统分为很多stage。每个stage分布不同的任务(基于线程池)。通过任务流的方式提高系统的效率。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* * Copyright (c) 2003, The Regents of the University of California, through * Lawrence Berkeley National Laboratory (subject to receipt of any required * approvals from the U.S. Dept. of Energy). All rights reserved. */package gov.lbl.dsd.sea.nio;import gov.lbl.dsd.sea.Stage;import gov.lbl.dsd.sea.event.IllegalEventException;import gov.lbl.dsd.sea.nio.auth.HostAuthorizer;import gov.lbl.dsd.sea.nio.auth.SmartHostAuthorizationRules;import gov.lbl.dsd.sea.nio.auth.SmartHostAuthorizer;import gov.lbl.dsd.sea.nio.event.AdminRequest;import gov.lbl.dsd.sea.nio.event.ChannelRequest;import gov.lbl.dsd.sea.nio.event.ChannelResponse;import gov.lbl.dsd.sea.nio.util.ByteBufferPool;import gov.lbl.dsd.sea.nio.util.NioUtil;import gov.lbl.dsd.sea.nio.util.SocketOpts;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.Channel;import java.nio.channels.ClosedChannelException;import java.nio.channels.DatagramChannel;import java.nio.channels.ReadableByteChannel;import java.nio.channels.SelectableChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.channels.WritableByteChannel;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.LinkedList;import java.util.List;import java.util.Map;import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;/** * Efficient and scalable NIO based non-blocking single-threaded network agent * that can be used both as a client and a server, both for TCP and UDP * transports; Can handle large amounts of concurrent client and server * connections in a single thread; Automatically deals with the subtle grunt * work of life-cycle issues, threading, I/O multiplexing and NIO gotchas; This * is the main work horse of this package. * <p> * General usage pattern: *  * <ol> * <li>Construct a new agent</li> * <li>Configure it via setters and getters. Convenience methods simplify * starting outgoing client connections and listening on ports for incoming * connections.</li> * <li>Start the agent via <code>start()</code> or by enqueuing a * {@link gov.lbl.dsd.sea.nio.event.AdminRequest.Start}</li> * <li>Have your application enqueue some request events from the * {@link gov.lbl.dsd.sea.nio.event}package via <code>enqueue(request)</code> * onto the agent. The agent will process the requests and enqueue responses to * the requests onto the (synchronous or asynchronous) observer stage of the * given network channel (connection), for example an {@link AgentEventHandler} * 's stage.</li> * <li>Finally, stop the agent via <code>stop()</code> or by enqueuing a * {@link gov.lbl.dsd.sea.nio.event.AdminRequest.Stop}</li> * </ol> * <p> * See the demo client and servers in the {@link gov.lbl.dsd.sea.nio.demo} * package for simple and complex example usage. * <p> * Whenever an IOException or EOS (end-of-stream) is encountered as a result of * a channel request, the following happens, in that order: * <ol> * <li>A ChannelResponse with the IOException is posted to the observer stage * (e.g. a {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}or * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Write}or * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Read}or * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Connected}or * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Registered}, depending on * the type of request in use).</li> * <li>The agent automatically closes the channel</li> * <li>A {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}response is * posted to the observer stage. The response contains the very same exception (unless * closing throws yet another exception).</li> * </ol> *  * Hence, most applications can ignore responses containing IOExceptions, * unless it is a {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}. * <p> * For a nice introduction to java networking, see <a * href="http://lgl.epfl.ch/teaching/software_project/documentation/tutorials/networking.pdf">here * </a> and <a target="_blank" * href="http://www.theserverside.com/blogs/showblog.tss?id=DispellingNIOMyths">there * </a> and <a target="_blank" * href="http://www.ii.uib.no/~khalid/atij/atij-nio-web/atij-nio-2x1.pdf">also * there </a>. *  * @author whoschek@lbl.gov * @author $Author: hoschek3 $ * @version $Revision: 1.14 $, $Date: 2004/08/17 18:26:54 $ */public final class NetAgent { // class made final until internal API is clean	private volatile boolean shutdown;	// flag to indicate agent shutdown initiation	private Selector selector;  // interface to I/O signals from OS	private EDU.oswego.cs.dl.util.concurrent.Channel pendingEvents; // events to be handed from user thread to agent thread	private long selectTimeout; // the maximum time to block in Selector.select()		private ByteBufferPool readBufferPool; // buffers to read into from network	private Map writeQueues; // Map<SelectionKey, List<ByteBuffer>> 	// since a SelectionKey corresponds to a registered channel, the map	// maintains a separate write buffer queue for each channel	// We could simply use key.attach to register a write queue with a key, but	// we explicitly do not take that approach in order to not pollute	// key.attachment(). That way user space apps can use key attachments for their custom	// application specific associations.		private Map observerStages; // Map<SelectableChannel, Stage> 	private SocketOpts socketOptions; // options to set for new connections	private HostAuthorizer hostAuthorizer;		private final Object outerLock = new Object();	private final Object innerLock = new Object();	private int nopLoops;		private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NetAgent.class);	/**	 * Constructs a new agent with default configuration options; 	 * the agent is not yet started.	 */	public NetAgent() {		this.shutdown = true; // initially it is shutdown; run() toggles that		this.selector = null;		this.pendingEvents = new LinkedQueue();		this.selectTimeout = 0; // wait indefinitely		//this.selectTimeout = 1000; // wait for at most 1 sec		this.setSocketOptions(new SocketOpts());		this.setReadBufferPool(new ByteBufferPool(8 * 128 * 1024, 128 * 1024, true, null));				// default: allow unlimited accepts from all hosts		SmartHostAuthorizer auth = new SmartHostAuthorizer(				true,				new SmartHostAuthorizationRules().addHost(SmartHostAuthorizationRules.ALL),				new SmartHostAuthorizationRules()			);		this.setAcceptHostAuthorizer(auth);	}	/**	 * Tells the agent to start listening as a server for TCP connections on the	 * given port. To register multiple listen ports, call this method multiple	 * times.	 * 	 * @param observer	 *            the stage onto which responses to the new server channel	 *            should be enqueued (may be null)	 * @param port	 *            the port to listen on	 * @return the agent itself (for method chaining convenience)	 * @throws IOException	 */	public NetAgent addListenPort(Stage observer, int port) throws IOException {		return this.addChannel(observer, NioUtil				.createServerChannel(port), SelectionKey.OP_ACCEPT);	}	/** 	 * Tells the agent to start a TCP client connection to the given remote address.	 * To register multiple connections, call this method multiple times.	 * 	 * @param observer	 *            the stage onto which responses to the new client channel	 *            should be enqueued (may be null)	 * @param address the address to connect to	 * @return the agent itself (for method chaining convenience)	 * @throws IOException	 */	public NetAgent addConnectAddress(Stage observer, InetSocketAddress address) throws IOException {		SocketChannel channel = NioUtil.createClientChannel(address);		int interestOps = SelectionKey.OP_CONNECT;		if (! channel.isConnectionPending()) {			log.debug("socket channel already connected");			interestOps = 0;		}		else {			log.debug("socket channel not yet connected");		}		return this.addChannel(observer, channel, interestOps);	}	/** 	 * Add the given channel with the given interest set and attachment to the agent;	 * for example so that it starts listening on a given port. 	 * To register multiple channels, call this method multiple times.	 * 	 * @param observer	 *            the stage onto which responses to the channel	 *            should be enqueued (may be null)	 * @param channel the channel to register	 * @param interestOps the NIO interest set	 * @return the agent itself (for method chaining convenience)	 * @throws IOException	 */	protected NetAgent addChannel(Stage observer, SelectableChannel channel, int interestOps) {		this.enqueue(new ChannelRequest.Register(observer, channel, interestOps));		return this;	}	/**	 * Sets the host authorizer the server uses to allow/deny accepting	 * connections from the network.	 * 	 * @param hostAuthorizer	 *            the host authorizer to use.	 */	public void setAcceptHostAuthorizer(HostAuthorizer hostAuthorizer) {		if (hostAuthorizer == null) throw new IllegalArgumentException("hostAuthorizer must not be null");		this.checkIsShutdown();		this.hostAuthorizer = hostAuthorizer;	}		/**	 * Sets the buffer pool to be used on reading from the network. Use this	 * method for maximum performance; For casual usage you can safely and	 * entirely ignore ALL buffer pool related concerns.	 * 	 * @param readBufferPool	 *            the pool to use.	 */	public void setReadBufferPool(ByteBufferPool readBufferPool) {		if (readBufferPool == null) throw new IllegalArgumentException("readBufferPool must not be null");		this.checkIsShutdown();		this.readBufferPool = readBufferPool;	}		/**	 * Returns the buffer pool to be used on reading from the network.	 */	public ByteBufferPool getReadBufferPool() {		return this.readBufferPool;	}		/**	 * Set the socket options to be used for newly accepted connections as well	 * as client connections (on OP_ACCEPT and OP_CONNECT).	 * 	 * @param options	 *            the options to be set for the socket.	 */	public void setSocketOptions(SocketOpts options) {		if (options == null) throw new IllegalArgumentException("options must not be null");		this.checkIsShutdown();		this.socketOptions = options;	}			/**	 * Hand a request event from the {@link gov.lbl.dsd.sea.nio.event} package	 * to the agent; the request will be processed the next time the agent thread	 * comes around the select loop.	 * 	 * @param event	 *            the event to hand to the agent.	 */	public void enqueue(Object event) {		if (event instanceof AdminRequest.Start) {			if (this.shutdown) {				new Thread(					new Runnable() {						public void run() {							start();						}					}				).start();			}			else {				; // ignore (we are already running)			}		}		else {			try {				this.pendingEvents.put(event);			} catch (InterruptedException e) {				log.warn("interrupted", e);			}			Selector sel = this.selector; // avoid potential NPE race by caching in local var			if (sel != null) sel.wakeup();			//if (log.isTraceEnabled()) log.trace("handed event off to selector thread="+event);		}	}		/**	 * The main selector loop; Runs the agent selector in the current thread.	 */	public void start() {		try {			this.shutdown = false;			this.selector = Selector.open();			this.writeQueues = new HashMap();			this.observerStages = new HashMap();			this.nopLoops = 0;			// main selector loop			while (! this.shutdown) {				synchronized (outerLock) {} // sync up with toDebugString()				synchronized (innerLock) {					this.doEvents(); // do any selector updates within our thread					this.doSelect(this.selector); // this callback is the main workhorse				}			}		} catch (IOException e) {			log.fatal(e);		}		finally {			synchronized (innerLock) {							try {					this.doCloseAll();				} catch (IOException e1) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -