📄 parallelniosender.java
字号:
/* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.transport.nio;import java.io.IOException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import org.apache.catalina.tribes.ChannelException;import org.apache.catalina.tribes.ChannelMessage;import org.apache.catalina.tribes.Member;import org.apache.catalina.tribes.io.ClusterData;import org.apache.catalina.tribes.io.XByteBuffer;import org.apache.catalina.tribes.transport.MultiPointSender;import org.apache.catalina.tribes.transport.SenderState;import org.apache.catalina.tribes.transport.AbstractSender;import java.net.UnknownHostException;import org.apache.catalina.tribes.Channel;import org.apache.catalina.tribes.group.RpcChannel;/** * <p>Title: </p> * * <p>Description: </p> * * <p>Copyright: Copyright (c) 2005</p> * * <p>Company: </p> * * @author not attributable * @version 1.0 */public class ParallelNioSender extends AbstractSender implements MultiPointSender { protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ParallelNioSender.class); protected long selectTimeout = 1000; //default 1 second protected Selector selector; protected HashMap nioSenders = new HashMap(); public ParallelNioSender() throws IOException { selector = Selector.open(); setConnected(true); } public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException { long start = System.currentTimeMillis(); byte[] data = XByteBuffer.createDataPackage((ClusterData)msg); NioSender[] senders = setupForSend(destination); connect(senders); setData(senders,data); int remaining = senders.length; ChannelException cx = null; try { //loop until complete, an error happens, or we timeout long delta = System.currentTimeMillis() - start; boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK & msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK; while ( (remaining>0) && (delta<getTimeout()) ) { try { remaining -= doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck); } catch (Exception x ) { if ( cx == null ) { if ( x instanceof ChannelException ) cx = (ChannelException)x; else cx = new ChannelException("Parallel NIO send failed.", x); } else { if (x instanceof ChannelException) cx.addFaultyMember( ( (ChannelException) x).getFaultyMembers()); } } //bail out if all remaining senders are failing if ( cx != null && cx.getFaultyMembers().length == remaining ) throw cx; delta = System.currentTimeMillis() - start; } if ( remaining > 0 ) { //timeout has occured cx = new ChannelException("Operation has timed out("+getTimeout()+" ms.)."); for (int i=0; i<senders.length; i++ ) { if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination()); } throw cx; } } catch (Exception x ) { try { this.disconnect(); } catch (Exception ignore) {} if ( x instanceof ChannelException ) throw (ChannelException)x; else throw new ChannelException(x); } } private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck) throws IOException, ChannelException { int completed = 0; int selectedKeys = selector.select(selectTimeOut); if (selectedKeys == 0) { return 0; } Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey sk = (SelectionKey) it.next(); it.remove(); int readyOps = sk.readyOps(); sk.interestOps(sk.interestOps() & ~readyOps); NioSender sender = (NioSender) sk.attachment(); try { if (sender.process(sk,waitForAck)) { completed++; sender.setComplete(true); SenderState.getSenderState(sender.getDestination()).setReady(); }//end if } catch (Exception x) { SenderState state = SenderState.getSenderState(sender.getDestination()); int attempt = sender.getAttempt()+1; boolean retry = (sender.getAttempt() <= maxAttempts && maxAttempts>0); synchronized (state) { //sk.cancel(); if (state.isSuspect()) state.setFailing(); if (state.isReady()) { state.setSuspect(); if ( retry ) log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect and retrying."); else log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect.", x); } } if ( !isConnected() ) { log.warn("Not retrying send for:" + sender.getDestination().getName() + "; Sender is disconnected."); ChannelException cx = new ChannelException("Send failed, and sender is disconnected. Not retrying.",x); cx.addFaultyMember(sender.getDestination()); throw cx; } byte[] data = sender.getMessage(); if ( retry ) { try { sender.disconnect(); sender.connect(); sender.setAttempt(attempt); sender.setMessage(data); }catch ( Exception ignore){ state.setFailing(); } } else { ChannelException cx = new ChannelException("Send failed, attempt:"+sender.getAttempt()+" max:"+maxAttempts,x); cx.addFaultyMember(sender.getDestination()); throw cx; }//end if } } return completed; } private void connect(NioSender[] senders) throws ChannelException { ChannelException x = null; for (int i=0; i<senders.length; i++ ) { try { if (!senders[i].isConnected()) senders[i].connect(); }catch ( IOException io ) { if ( x==null ) x = new ChannelException(io); x.addFaultyMember(senders[i].getDestination()); } } if ( x != null ) throw x; } private void setData(NioSender[] senders, byte[] data) throws ChannelException { ChannelException x = null; for (int i=0; i<senders.length; i++ ) { try { senders[i].setMessage(data); }catch ( IOException io ) { if ( x==null ) x = new ChannelException(io); x.addFaultyMember(senders[i].getDestination()); } } if ( x != null ) throw x; } private NioSender[] setupForSend(Member[] destination) throws ChannelException { ChannelException cx = null; NioSender[] result = new NioSender[destination.length]; for ( int i=0; i<destination.length; i++ ) { NioSender sender = (NioSender)nioSenders.get(destination[i]); if ( sender == null ) { try { sender = new NioSender(destination[i]); nioSenders.put(destination[i], sender); }catch ( UnknownHostException x ) { if ( cx == null ) cx = new ChannelException("Unable to setup NioSender.",x); cx.addFaultyMember(destination[i]); } } if ( sender != null ) { sender.reset(); sender.setSelector(selector); sender.setDirectBuffer(getDirectBuffer()); sender.setRxBufSize(getRxBufSize()); sender.setTxBufSize(getTxBufSize()); sender.setTimeout(getTimeout()); sender.setKeepAliveCount(getKeepAliveCount()); sender.setKeepAliveTime(getKeepAliveTime()); result[i] = sender; } } if ( cx != null ) throw cx; else return result; } public void connect() { //do nothing, we connect on demand setConnected(true); } private synchronized void close() throws ChannelException { ChannelException x = null; Object[] members = nioSenders.keySet().toArray(); for (int i=0; i<members.length; i++ ) { Member mbr = (Member)members[i]; try { NioSender sender = (NioSender)nioSenders.get(mbr); sender.disconnect(); }catch ( Exception e ) { if ( x == null ) x = new ChannelException(e); x.addFaultyMember(mbr); } nioSenders.remove(mbr); } if ( x != null ) throw x; } public void memberAdded(Member member) { } public void memberDisappeared(Member member) { //disconnect senders NioSender sender = (NioSender)nioSenders.remove(member); if ( sender != null ) sender.disconnect(); } public synchronized void disconnect() { setConnected(false); try {close(); }catch (Exception x){} } public void finalize() { try {disconnect(); }catch ( Exception ignore){} } public boolean keepalive() { //throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented"); boolean result = false; for ( Iterator i = nioSenders.entrySet().iterator(); i.hasNext(); ) { Map.Entry entry = (Map.Entry)i.next(); NioSender sender = (NioSender)entry.getValue(); if ( sender.keepalive() ) { nioSenders.remove(entry.getKey()); } } return result; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -