📄 netagent.java
字号:
/* * Copyright (c) 2003, The Regents of the University of California, through * Lawrence Berkeley National Laboratory (subject to receipt of any required * approvals from the U.S. Dept. of Energy). All rights reserved. */package gov.lbl.dsd.sea.nio;import gov.lbl.dsd.sea.Stage;import gov.lbl.dsd.sea.event.IllegalEventException;import gov.lbl.dsd.sea.nio.auth.HostAuthorizer;import gov.lbl.dsd.sea.nio.auth.SmartHostAuthorizationRules;import gov.lbl.dsd.sea.nio.auth.SmartHostAuthorizer;import gov.lbl.dsd.sea.nio.event.AdminRequest;import gov.lbl.dsd.sea.nio.event.ChannelRequest;import gov.lbl.dsd.sea.nio.event.ChannelResponse;import gov.lbl.dsd.sea.nio.util.ByteBufferPool;import gov.lbl.dsd.sea.nio.util.NioUtil;import gov.lbl.dsd.sea.nio.util.SocketOpts;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.Channel;import java.nio.channels.ClosedChannelException;import java.nio.channels.DatagramChannel;import java.nio.channels.ReadableByteChannel;import java.nio.channels.SelectableChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.channels.WritableByteChannel;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.LinkedList;import java.util.List;import java.util.Map;import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;/** * Efficient and scalable NIO based non-blocking single-threaded network agent * that can be used both as a client and a server, both for TCP and UDP * transports; Can handle large amounts of concurrent client and server * connections in a single thread; Automatically deals with the subtle grunt * work of life-cycle issues, threading, I/O multiplexing and NIO gotchas; This * is the main work horse of this package. * <p> * General usage pattern: * * <ol> * <li>Construct a new agent</li> * <li>Configure it via setters and getters. Convenience methods simplify * starting outgoing client connections and listening on ports for incoming * connections.</li> * <li>Start the agent via <code>start()</code> or by enqueuing a * {@link gov.lbl.dsd.sea.nio.event.AdminRequest.Start}</li> * <li>Have your application enqueue some request events from the * {@link gov.lbl.dsd.sea.nio.event}package via <code>enqueue(request)</code> * onto the agent. The agent will process the requests and enqueue responses to * the requests onto the (synchronous or asynchronous) observer stage of the * given network channel (connection), for example an {@link AgentEventHandler} * 's stage.</li> * <li>Finally, stop the agent via <code>stop()</code> or by enqueuing a * {@link gov.lbl.dsd.sea.nio.event.AdminRequest.Stop}</li> * </ol> * <p> * See the demo client and servers in the {@link gov.lbl.dsd.sea.nio.demo} * package for simple and complex example usage. * <p> * Whenever an IOException or EOS (end-of-stream) is encountered as a result of * a channel request, the following happens, in that order: * <ol> * <li>A ChannelResponse with the IOException is posted to the observer stage * (e.g. a {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}or * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Write}or * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Read}or * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Connected}or * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Registered}, depending on * the type of request in use).</li> * <li>The agent automatically closes the channel</li> * <li>A {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}response is * posted to the observer stage. The response contains the very same exception (unless * closing throws yet another exception).</li> * </ol> * * Hence, most applications can ignore responses containing IOExceptions, * unless it is a {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}. * <p> * For a nice introduction to java networking, see <a * href="http://lgl.epfl.ch/teaching/software_project/documentation/tutorials/networking.pdf">here * </a> and <a target="_blank" * href="http://www.theserverside.com/blogs/showblog.tss?id=DispellingNIOMyths">there * </a> and <a target="_blank" * href="http://www.ii.uib.no/~khalid/atij/atij-nio-web/atij-nio-2x1.pdf">also * there </a>. * * @author whoschek@lbl.gov * @author $Author: hoschek3 $ * @version $Revision: 1.14 $, $Date: 2004/08/17 18:26:54 $ */public final class NetAgent { // class made final until internal API is clean private volatile boolean shutdown; // flag to indicate agent shutdown initiation private Selector selector; // interface to I/O signals from OS private EDU.oswego.cs.dl.util.concurrent.Channel pendingEvents; // events to be handed from user thread to agent thread private long selectTimeout; // the maximum time to block in Selector.select() private ByteBufferPool readBufferPool; // buffers to read into from network private Map writeQueues; // Map<SelectionKey, List<ByteBuffer>> // since a SelectionKey corresponds to a registered channel, the map // maintains a separate write buffer queue for each channel // We could simply use key.attach to register a write queue with a key, but // we explicitly do not take that approach in order to not pollute // key.attachment(). That way user space apps can use key attachments for their custom // application specific associations. private Map observerStages; // Map<SelectableChannel, Stage> private SocketOpts socketOptions; // options to set for new connections private HostAuthorizer hostAuthorizer; private final Object outerLock = new Object(); private final Object innerLock = new Object(); private int nopLoops; private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NetAgent.class); /** * Constructs a new agent with default configuration options; * the agent is not yet started. */ public NetAgent() { this.shutdown = true; // initially it is shutdown; run() toggles that this.selector = null; this.pendingEvents = new LinkedQueue(); this.selectTimeout = 0; // wait indefinitely //this.selectTimeout = 1000; // wait for at most 1 sec this.setSocketOptions(new SocketOpts()); this.setReadBufferPool(new ByteBufferPool(8 * 128 * 1024, 128 * 1024, true, null)); // default: allow unlimited accepts from all hosts SmartHostAuthorizer auth = new SmartHostAuthorizer( true, new SmartHostAuthorizationRules().addHost(SmartHostAuthorizationRules.ALL), new SmartHostAuthorizationRules() ); this.setAcceptHostAuthorizer(auth); } /** * Tells the agent to start listening as a server for TCP connections on the * given port. To register multiple listen ports, call this method multiple * times. * * @param observer * the stage onto which responses to the new server channel * should be enqueued (may be null) * @param port * the port to listen on * @return the agent itself (for method chaining convenience) * @throws IOException */ public NetAgent addListenPort(Stage observer, int port) throws IOException { return this.addChannel(observer, NioUtil .createServerChannel(port), SelectionKey.OP_ACCEPT); } /** * Tells the agent to start a TCP client connection to the given remote address. * To register multiple connections, call this method multiple times. * * @param observer * the stage onto which responses to the new client channel * should be enqueued (may be null) * @param address the address to connect to * @return the agent itself (for method chaining convenience) * @throws IOException */ public NetAgent addConnectAddress(Stage observer, InetSocketAddress address) throws IOException { SocketChannel channel = NioUtil.createClientChannel(address); int interestOps = SelectionKey.OP_CONNECT; if (! channel.isConnectionPending()) { log.debug("socket channel already connected"); interestOps = 0; } else { log.debug("socket channel not yet connected"); } return this.addChannel(observer, channel, interestOps); } /** * Add the given channel with the given interest set and attachment to the agent; * for example so that it starts listening on a given port. * To register multiple channels, call this method multiple times. * * @param observer * the stage onto which responses to the channel * should be enqueued (may be null) * @param channel the channel to register * @param interestOps the NIO interest set * @return the agent itself (for method chaining convenience) * @throws IOException */ protected NetAgent addChannel(Stage observer, SelectableChannel channel, int interestOps) { this.enqueue(new ChannelRequest.Register(observer, channel, interestOps)); return this; } /** * Sets the host authorizer the server uses to allow/deny accepting * connections from the network. * * @param hostAuthorizer * the host authorizer to use. */ public void setAcceptHostAuthorizer(HostAuthorizer hostAuthorizer) { if (hostAuthorizer == null) throw new IllegalArgumentException("hostAuthorizer must not be null"); this.checkIsShutdown(); this.hostAuthorizer = hostAuthorizer; } /** * Sets the buffer pool to be used on reading from the network. Use this * method for maximum performance; For casual usage you can safely and * entirely ignore ALL buffer pool related concerns. * * @param readBufferPool * the pool to use. */ public void setReadBufferPool(ByteBufferPool readBufferPool) { if (readBufferPool == null) throw new IllegalArgumentException("readBufferPool must not be null"); this.checkIsShutdown(); this.readBufferPool = readBufferPool; } /** * Returns the buffer pool to be used on reading from the network. */ public ByteBufferPool getReadBufferPool() { return this.readBufferPool; } /** * Set the socket options to be used for newly accepted connections as well * as client connections (on OP_ACCEPT and OP_CONNECT). * * @param options * the options to be set for the socket. */ public void setSocketOptions(SocketOpts options) { if (options == null) throw new IllegalArgumentException("options must not be null"); this.checkIsShutdown(); this.socketOptions = options; } /** * Hand a request event from the {@link gov.lbl.dsd.sea.nio.event} package * to the agent; the request will be processed the next time the agent thread * comes around the select loop. * * @param event * the event to hand to the agent. */ public void enqueue(Object event) { if (event instanceof AdminRequest.Start) { if (this.shutdown) { new Thread( new Runnable() { public void run() { start(); } } ).start(); } else { ; // ignore (we are already running) } } else { try { this.pendingEvents.put(event); } catch (InterruptedException e) { log.warn("interrupted", e); } Selector sel = this.selector; // avoid potential NPE race by caching in local var if (sel != null) sel.wakeup(); //if (log.isTraceEnabled()) log.trace("handed event off to selector thread="+event); } } /** * The main selector loop; Runs the agent selector in the current thread. */ public void start() { try { this.shutdown = false; this.selector = Selector.open(); this.writeQueues = new HashMap(); this.observerStages = new HashMap(); this.nopLoops = 0; // main selector loop while (! this.shutdown) { synchronized (outerLock) {} // sync up with toDebugString() synchronized (innerLock) { this.doEvents(); // do any selector updates within our thread this.doSelect(this.selector); // this callback is the main workhorse } } } catch (IOException e) { log.fatal(e); } finally { synchronized (innerLock) { try { this.doCloseAll(); } catch (IOException e1) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -