📄 nioreplicationthread.java
字号:
/* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.transport.nio;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import org.apache.catalina.tribes.io.ObjectReader;import org.apache.catalina.tribes.transport.Constants;import org.apache.catalina.tribes.transport.WorkerThread;import org.apache.catalina.tribes.ChannelMessage;import org.apache.catalina.tribes.io.ListenCallback;import org.apache.catalina.tribes.io.ClusterData;/** * A worker thread class which can drain channels and echo-back the input. Each * instance is constructed with a reference to the owning thread pool object. * When started, the thread loops forever waiting to be awakened to service the * channel associated with a SelectionKey object. The worker is tasked by * calling its serviceChannel() method with a SelectionKey object. The * serviceChannel() method stores the key reference in the thread object then * calls notify() to wake it up. When the channel has been drained, the worker * thread returns itself to its parent pool. * * @author Filip Hanik * * @version $Revision: 378050 $, $Date: 2006-02-15 12:30:02 -0600 (Wed, 15 Feb 2006) $ */public class NioReplicationThread extends WorkerThread { private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( NioReplicationThread.class ); private ByteBuffer buffer = null; private SelectionKey key; private int rxBufSize; public NioReplicationThread (ListenCallback callback) { super(callback); } // loop forever waiting for work to do public synchronized void run() { if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) { buffer = ByteBuffer.allocateDirect(getRxBufSize()); }else { buffer = ByteBuffer.allocate (getRxBufSize()); } while (isDoRun()) { try { // sleep and release object lock this.wait(); } catch (InterruptedException e) { if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e); // clear interrupt status Thread.interrupted(); } if (key == null) { continue; // just in case } try { drainChannel (key); } catch (Exception e) { //this is common, since the sockets on the other //end expire after a certain time. if ( e instanceof IOException ) { //dont spew out stack traces for IO exceptions unless debug is enabled. if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed.", e); else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed."); } else if ( log.isErrorEnabled() ) { //this is a real error, log it. log.error("Exception caught in TcpReplicationThread.drainChannel.",e); } // close channel and nudge selector try { key.channel().close(); } catch (IOException ex) { log.error("Unable to close channel.",ex); } key.selector().wakeup(); } key = null; // done, ready for more, return to pool getPool().returnWorker (this); } } /** * Called to initiate a unit of work by this worker thread * on the provided SelectionKey object. This method is * synchronized, as is the run() method, so only one key * can be serviced at a given time. * Before waking the worker thread, and before returning * to the main selection loop, this key's interest set is * updated to remove OP_READ. This will cause the selector * to ignore read-readiness for this channel while the * worker thread is servicing it. */ public synchronized void serviceChannel (SelectionKey key) { this.key = key; key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE)); this.notify(); // awaken the thread } /** * The actual code which drains the channel associated with * the given key. This method assumes the key has been * modified prior to invocation to turn off selection * interest in OP_READ. When this method completes it * re-enables OP_READ and calls wakeup() on the selector * so the selector will resume watching this channel. */ protected void drainChannel (SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); int count; buffer.clear(); // make buffer empty ObjectReader reader = (ObjectReader)key.attachment(); // loop while data available, channel is non-blocking while ((count = channel.read (buffer)) > 0) { buffer.flip(); // make buffer readable if ( buffer.hasArray() ) reader.append(buffer.array(),0,count,false); else reader.append(buffer,count,false); buffer.clear(); // make buffer empty } int pkgcnt = reader.count(); if ( pkgcnt > 0 ) { ChannelMessage[] msgs = reader.execute(); for ( int i=0; i<msgs.length; i++ ) { /** * Use send ack here if you want to ack the request to the remote * server before completing the request * This is considered an asynchronized request */ if (ClusterData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel); try { //process the message getCallback().messageDataReceived(msgs[i]); }catch ( Exception e ) { log.error("Processing of cluster message failed.",e); } /** * Use send ack here if you want the request to complete on this * server before sending the ack to the remote server * This is considered a synchronized request */ if (ClusterData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel); } } if (count < 0) { // close channel on EOF, invalidates the key if ( log.isDebugEnabled() ) log.debug("Channel closed on the remote end, disconnecting"); channel.close(); return; } //acquire the interestOps mutex Object mutex = this.getPool().getInterestOpsMutex(); synchronized (mutex) { // cycle the selector so this key is active again key.selector().wakeup(); // resume interest in OP_READ, OP_WRITE int resumeOps = key.interestOps() | SelectionKey.OP_READ; key.interestOps(resumeOps); } } /** * send a reply-acknowledgement (6,2,3) * @param key * @param channel */ protected void sendAck(SelectionKey key, SocketChannel channel) { try { channel.write(ByteBuffer.wrap(Constants.ACK_COMMAND)); if (log.isTraceEnabled()) { log.trace("ACK sent to " + channel.socket().getPort()); } } catch ( java.io.IOException x ) { log.warn("Unable to send ACK back through channel, channel disconnected?: "+x.getMessage()); } } public void setRxBufSize(int rxBufSize) { this.rxBufSize = rxBufSize; } public int getRxBufSize() { return rxBufSize; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -