⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 parallelniosender.java

📁 精通tomcat书籍原代码,希望大家共同学习
💻 JAVA
字号:
/* * Copyright 1999,2004 The Apache Software Foundation. *  * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *  *      http://www.apache.org/licenses/LICENSE-2.0 *  * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.transport.nio;import java.io.IOException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import org.apache.catalina.tribes.ChannelException;import org.apache.catalina.tribes.ChannelMessage;import org.apache.catalina.tribes.Member;import org.apache.catalina.tribes.io.ClusterData;import org.apache.catalina.tribes.io.XByteBuffer;import org.apache.catalina.tribes.transport.MultiPointSender;import org.apache.catalina.tribes.transport.SenderState;import org.apache.catalina.tribes.transport.AbstractSender;import java.net.UnknownHostException;import org.apache.catalina.tribes.Channel;import org.apache.catalina.tribes.group.RpcChannel;/** * <p>Title: </p> * * <p>Description: </p> * * <p>Copyright: Copyright (c) 2005</p> * * <p>Company: </p> * * @author not attributable * @version 1.0 */public class ParallelNioSender extends AbstractSender implements MultiPointSender {        protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ParallelNioSender.class);        protected long selectTimeout = 1000; //default 1 second    protected Selector selector;    protected HashMap nioSenders = new HashMap();    public ParallelNioSender() throws IOException {        selector = Selector.open();        setConnected(true);    }            public synchronized void sendMessage(Member[] destination, ChannelMessage msg) throws ChannelException {        long start = System.currentTimeMillis();        byte[] data = XByteBuffer.createDataPackage((ClusterData)msg);        NioSender[] senders = setupForSend(destination);        connect(senders);        setData(senders,data);                int remaining = senders.length;        ChannelException cx = null;        try {            //loop until complete, an error happens, or we timeout            long delta = System.currentTimeMillis() - start;            boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK & msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK;            while ( (remaining>0) && (delta<getTimeout()) ) {                try {                    remaining -= doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck);                } catch (Exception x ) {                    if ( cx == null ) {                        if ( x instanceof ChannelException ) cx = (ChannelException)x;                        else cx = new ChannelException("Parallel NIO send failed.", x);                    } else {                        if (x instanceof ChannelException) cx.addFaultyMember( ( (ChannelException) x).getFaultyMembers());                    }                }                //bail out if all remaining senders are failing                if ( cx != null && cx.getFaultyMembers().length == remaining ) throw cx;                delta = System.currentTimeMillis() - start;            }            if ( remaining > 0 ) {                //timeout has occured                cx = new ChannelException("Operation has timed out("+getTimeout()+" ms.).");                for (int i=0; i<senders.length; i++ ) {                    if (!senders[i].isComplete() ) cx.addFaultyMember(senders[i].getDestination());                }                throw cx;            }        } catch (Exception x ) {            try { this.disconnect(); } catch (Exception ignore) {}            if ( x instanceof ChannelException ) throw (ChannelException)x;            else throw new ChannelException(x);        }            }        private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck) throws IOException, ChannelException {        int completed = 0;        int selectedKeys = selector.select(selectTimeOut);                if (selectedKeys == 0) {            return 0;        }                Iterator it = selector.selectedKeys().iterator();        while (it.hasNext()) {            SelectionKey sk = (SelectionKey) it.next();            it.remove();            int readyOps = sk.readyOps();            sk.interestOps(sk.interestOps() & ~readyOps);            NioSender sender = (NioSender) sk.attachment();            try {                if (sender.process(sk,waitForAck)) {                    completed++;                    sender.setComplete(true);                    SenderState.getSenderState(sender.getDestination()).setReady();                }//end if            } catch (Exception x) {                SenderState state = SenderState.getSenderState(sender.getDestination());                int attempt = sender.getAttempt()+1;                boolean retry = (sender.getAttempt() <= maxAttempts && maxAttempts>0);                synchronized (state) {                                    //sk.cancel();                    if (state.isSuspect()) state.setFailing();                    if (state.isReady()) {                        state.setSuspect();                        if ( retry )                             log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect and retrying.");                        else                             log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect.", x);                    }                                    }                if ( !isConnected() ) {                    log.warn("Not retrying send for:" + sender.getDestination().getName() + "; Sender is disconnected.");                    ChannelException cx = new ChannelException("Send failed, and sender is disconnected. Not retrying.",x);                    cx.addFaultyMember(sender.getDestination());                    throw cx;                }                                byte[] data = sender.getMessage();                if ( retry ) {                    try {                         sender.disconnect();                         sender.connect();                        sender.setAttempt(attempt);                        sender.setMessage(data);                    }catch ( Exception ignore){                        state.setFailing();                    }                } else {                    ChannelException cx = new ChannelException("Send failed, attempt:"+sender.getAttempt()+" max:"+maxAttempts,x);                    cx.addFaultyMember(sender.getDestination());                    throw cx;                }//end if            }        }        return completed;    }        private void connect(NioSender[] senders) throws ChannelException {        ChannelException x = null;        for (int i=0; i<senders.length; i++ ) {            try {                if (!senders[i].isConnected()) senders[i].connect();            }catch ( IOException io ) {                if ( x==null ) x = new ChannelException(io);                x.addFaultyMember(senders[i].getDestination());            }        }        if ( x != null ) throw x;    }        private void setData(NioSender[] senders, byte[] data) throws ChannelException {        ChannelException x = null;        for (int i=0; i<senders.length; i++ ) {            try {                senders[i].setMessage(data);            }catch ( IOException io ) {                if ( x==null ) x = new ChannelException(io);                x.addFaultyMember(senders[i].getDestination());            }        }        if ( x != null ) throw x;    }            private NioSender[] setupForSend(Member[] destination) throws ChannelException {        ChannelException cx = null;        NioSender[] result = new NioSender[destination.length];        for ( int i=0; i<destination.length; i++ ) {            NioSender sender = (NioSender)nioSenders.get(destination[i]);            if ( sender == null ) {                try {                    sender = new NioSender(destination[i]);                    nioSenders.put(destination[i], sender);                }catch ( UnknownHostException x ) {                    if ( cx == null ) cx = new ChannelException("Unable to setup NioSender.",x);                    cx.addFaultyMember(destination[i]);                }            }            if ( sender != null ) {                sender.reset();                sender.setSelector(selector);                sender.setDirectBuffer(getDirectBuffer());                sender.setRxBufSize(getRxBufSize());                sender.setTxBufSize(getTxBufSize());                sender.setTimeout(getTimeout());                sender.setKeepAliveCount(getKeepAliveCount());                sender.setKeepAliveTime(getKeepAliveTime());                result[i] = sender;            }        }        if ( cx != null ) throw cx;        else return result;    }        public void connect() {        //do nothing, we connect on demand        setConnected(true);    }            private synchronized void close() throws ChannelException  {        ChannelException x = null;        Object[] members = nioSenders.keySet().toArray();        for (int i=0; i<members.length; i++ ) {            Member mbr = (Member)members[i];            try {                NioSender sender = (NioSender)nioSenders.get(mbr);                sender.disconnect();            }catch ( Exception e ) {                if ( x == null ) x = new ChannelException(e);                x.addFaultyMember(mbr);            }            nioSenders.remove(mbr);        }        if ( x != null ) throw x;    }        public void memberAdded(Member member) {            }        public void memberDisappeared(Member member) {        //disconnect senders        NioSender sender = (NioSender)nioSenders.remove(member);        if ( sender != null ) sender.disconnect();    }        public synchronized void disconnect() {        setConnected(false);        try {close(); }catch (Exception x){}            }        public void finalize() {        try {disconnect(); }catch ( Exception ignore){}    }    public boolean keepalive() {        //throw new UnsupportedOperationException("Method ParallelNioSender.checkKeepAlive() not implemented");        boolean result = false;        for ( Iterator i = nioSenders.entrySet().iterator(); i.hasNext();  ) {            Map.Entry entry = (Map.Entry)i.next();            NioSender sender = (NioSender)entry.getValue();            if ( sender.keepalive() ) {                nioSenders.remove(entry.getKey());            }        }        return result;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -