📄 nioreceiver.java
字号:
/* * Copyright 1999,2004-2005 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.transport.nio;import java.io.IOException;import java.net.ServerSocket;import java.nio.channels.SelectableChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import org.apache.catalina.tribes.ChannelReceiver;import org.apache.catalina.tribes.io.ListenCallback;import org.apache.catalina.tribes.io.ObjectReader;import org.apache.catalina.tribes.transport.Constants;import org.apache.catalina.tribes.transport.ReceiverBase;import org.apache.catalina.tribes.transport.ThreadPool;import org.apache.catalina.tribes.transport.WorkerThread;import org.apache.catalina.tribes.util.StringManager;/** * @author Filip Hanik * @version $Revision: 379904 $ $Date: 2006-02-22 15:16:25 -0600 (Wed, 22 Feb 2006) $ */public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback { protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioReceiver.class); /** * The string manager for this package. */ protected StringManager sm = StringManager.getManager(Constants.Package); /** * The descriptive information about this implementation. */ private static final String info = "NioReceiver/1.0"; private Selector selector = null; private ServerSocketChannel serverChannel = null; private Object interestOpsMutex = new Object(); public NioReceiver() { } /** * Return descriptive information about this implementation and the * corresponding version number, in the format * <code><description>/<version></code>. */ public String getInfo() { return (info); } public Object getInterestOpsMutex() { return interestOpsMutex; } public void stop() { this.stopListening(); } /** * start cluster receiver * @throws Exception * @see org.apache.catalina.tribes.ClusterReceiver#start() */ public void start() { try { setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this)); } catch (Exception e) { log.error("ThreadPool can initilzed. Listener not started", e); return; } try { getBind(); bind(); Thread t = new Thread(this, "NioReceiver"); t.setDaemon(true); t.start(); } catch (Exception x) { log.fatal("Unable to start cluster receiver", x); } } public WorkerThread getWorkerThread() { NioReplicationThread thread = new NioReplicationThread(this); thread.setRxBufSize(getRxBufSize()); thread.setOptions(getWorkerThreadOptions()); return thread; } protected void bind() throws IOException { // allocate an unbound server socket channel serverChannel = ServerSocketChannel.open(); // Get the associated ServerSocket to bind it with ServerSocket serverSocket = serverChannel.socket(); // create a new Selector for use below selector = Selector.open(); // set the port the server channel will listen to //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort())); bind(serverSocket,getTcpListenPort(),getAutoBind()); // set non-blocking mode for the listening socket serverChannel.configureBlocking(false); // register the ServerSocketChannel with the Selector serverChannel.register(selector, SelectionKey.OP_ACCEPT); } /** * get data from channel and store in byte array * send it to cluster * @throws IOException * @throws java.nio.channels.ClosedChannelException */ protected void listen() throws Exception { if (doListen()) { log.warn("ServerSocketChannel already started"); return; } setListen(true); while (doListen() && selector != null) { // this may block for a long time, upon return the // selected set contains keys of the ready channels try { int n = selector.select(getTcpSelectorTimeout()); if (n == 0) { //there is a good chance that we got here //because the TcpReplicationThread called //selector wakeup(). //if that happens, we must ensure that that //thread has enough time to call interestOps synchronized (interestOpsMutex) { //if we got the lock, means there are no //keys trying to register for the //interestOps method } continue; // nothing to do } // get an iterator over the set of selected keys Iterator it = selector.selectedKeys().iterator(); // look at each key in the selected set while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); // Is a new connection coming in? if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); channel.socket().setReceiveBufferSize(getRxBufSize()); channel.socket().setSendBufferSize(getTxBufSize()); channel.socket().setTcpNoDelay(getTcpNoDelay()); channel.socket().setKeepAlive(getSoKeepAlive()); channel.socket().setOOBInline(getOoBInline()); channel.socket().setReuseAddress(getSoReuseAddress()); channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); channel.socket().setTrafficClass(getSoTrafficClass()); channel.socket().setSoTimeout(getTimeout()); Object attach = new ObjectReader(channel); registerChannel(selector, channel, SelectionKey.OP_READ, attach); } // is there data to read on this channel? if (key.isReadable()) { readDataFromSocket(key); } else { key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } // remove key from selected set, it's been handled it.remove(); } } catch (java.nio.channels.ClosedSelectorException cse) { // ignore is normal at shutdown or stop listen socket } catch (java.nio.channels.CancelledKeyException nx) { log.warn( "Replication client disconnected, error when polling key. Ignoring client."); } catch (Exception x) { log.error("Unable to process request in NioReceiver", x); } } serverChannel.close(); if (selector != null) selector.close(); } /** * Close Selector. * * @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening() */ protected void stopListening() { // Bugzilla 37529: http://issues.apache.org/bugzilla/show_bug.cgi?id=37529 setListen(false); if (selector != null) { try { for (int i = 0; i < getMaxThreads(); i++) { selector.wakeup(); } selector.close(); } catch (Exception x) { log.error("Unable to close cluster receiver selector.", x); } finally { selector = null; } } } // ---------------------------------------------------------- /** * Register the given channel with the given selector for * the given operations of interest */ protected void registerChannel(Selector selector, SelectableChannel channel, int ops, Object attach) throws Exception { if (channel == null)return; // could happen // set the new channel non-blocking channel.configureBlocking(false); // register it with the selector channel.register(selector, ops, attach); } /** * Start thread and listen */ public void run() { try { listen(); } catch (Exception x) { log.error("Unable to run replication listener.", x); } } // ---------------------------------------------------------- /** * Sample data handler method for a channel with data ready to read. * @param key A SelectionKey object associated with a channel * determined by the selector to be ready for reading. If the * channel returns an EOF condition, it is closed here, which * automatically invalidates the associated key. The selector * will then de-register the channel on the next select call. */ protected void readDataFromSocket(SelectionKey key) throws Exception { NioReplicationThread worker = (NioReplicationThread) getPool().getWorker(); if (worker == null) { // No threads available, do nothing, the selection // loop will keep calling this method until a // thread becomes available. // FIXME: This design could be improved. if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available"); } else { // invoking this wakes up the worker thread then returns worker.serviceChannel(key); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -