📄 nioreceiver.java
字号:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.transport.nio;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import org.apache.catalina.tribes.ChannelReceiver;
import org.apache.catalina.tribes.io.ListenCallback;
import org.apache.catalina.tribes.io.ObjectReader;
import org.apache.catalina.tribes.transport.Constants;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.catalina.tribes.transport.RxTaskPool;
import org.apache.catalina.tribes.transport.AbstractRxTask;
import org.apache.catalina.tribes.util.StringManager;
import java.util.LinkedList;
import java.util.Set;
import java.nio.channels.CancelledKeyException;
/**
* @author Filip Hanik
* @version $Revision: 487423 $ $Date: 2006-12-15 02:58:56 +0100 (ven., 15 déc. 2006) $
*/
public class NioReceiver extends ReceiverBase implements Runnable, ChannelReceiver, ListenCallback {
protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(NioReceiver.class);
/**
* The string manager for this package.
*/
protected StringManager sm = StringManager.getManager(Constants.Package);
/**
* The descriptive information about this implementation.
*/
private static final String info = "NioReceiver/1.0";
private Selector selector = null;
private ServerSocketChannel serverChannel = null;
protected LinkedList events = new LinkedList();
// private Object interestOpsMutex = new Object();
public NioReceiver() {
}
/**
* Return descriptive information about this implementation and the
* corresponding version number, in the format
* <code><description>/<version></code>.
*/
public String getInfo() {
return (info);
}
// public Object getInterestOpsMutex() {
// return interestOpsMutex;
// }
public void stop() {
this.stopListening();
super.stop();
}
/**
* start cluster receiver
* @throws Exception
* @see org.apache.catalina.tribes.ClusterReceiver#start()
*/
public void start() throws IOException {
super.start();
try {
setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
} catch (Exception x) {
log.fatal("ThreadPool can initilzed. Listener not started", x);
if ( x instanceof IOException ) throw (IOException)x;
else throw new IOException(x.getMessage());
}
try {
getBind();
bind();
Thread t = new Thread(this, "NioReceiver");
t.setDaemon(true);
t.start();
} catch (Exception x) {
log.fatal("Unable to start cluster receiver", x);
if ( x instanceof IOException ) throw (IOException)x;
else throw new IOException(x.getMessage());
}
}
public AbstractRxTask createRxTask() {
NioReplicationTask thread = new NioReplicationTask(this,this);
thread.setUseBufferPool(this.getUseBufferPool());
thread.setRxBufSize(getRxBufSize());
thread.setOptions(getWorkerThreadOptions());
return thread;
}
protected void bind() throws IOException {
// allocate an unbound server socket channel
serverChannel = ServerSocketChannel.open();
// Get the associated ServerSocket to bind it with
ServerSocket serverSocket = serverChannel.socket();
// create a new Selector for use below
selector = Selector.open();
// set the port the server channel will listen to
//serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
bind(serverSocket,getTcpListenPort(),getAutoBind());
// set non-blocking mode for the listening socket
serverChannel.configureBlocking(false);
// register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void addEvent(Runnable event) {
if ( selector != null ) {
synchronized (events) {
events.add(event);
}
if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
if ( isListening() && selector!=null ) selector.wakeup();
}
}
public void events() {
if ( events.size() == 0 ) return;
synchronized (events) {
Runnable r = null;
while ( (events.size() > 0) && (r = (Runnable)events.removeFirst()) != null ) {
try {
if ( log.isTraceEnabled() ) log.trace("Processing event in selector:"+r);
r.run();
} catch ( Exception x ) {
log.error("",x);
}
}
events.clear();
}
}
public static void cancelledKey(SelectionKey key) {
ObjectReader reader = (ObjectReader)key.attachment();
if ( reader != null ) {
reader.setCancelled(true);
reader.finish();
}
key.cancel();
key.attach(null);
try { ((SocketChannel)key.channel()).socket().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
try { key.channel().close(); } catch (IOException e) { if (log.isDebugEnabled()) log.debug("", e); }
}
protected void socketTimeouts() {
//timeout
Selector tmpsel = selector;
Set keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null;
if ( keys == null ) return;
long now = System.currentTimeMillis();
for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
SelectionKey key = (SelectionKey) iter.next();
try {
// if (key.interestOps() == SelectionKey.OP_READ) {
// //only timeout sockets that we are waiting for a read from
// ObjectReader ka = (ObjectReader) key.attachment();
// long delta = now - ka.getLastAccess();
// if (delta > (long) getTimeout()) {
// cancelledKey(key);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -