⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 groupchannel.java

📁 精通tomcat书籍原代码,希望大家共同学习
💻 JAVA
字号:
/* * Copyright 1999,2004-2006 The Apache Software Foundation. *  * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *  *      http://www.apache.org/licenses/LICENSE-2.0 *  * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.group;import java.io.Serializable;import java.util.ArrayList;import java.util.Iterator;import org.apache.catalina.tribes.ByteMessage;import org.apache.catalina.tribes.ChannelException;import org.apache.catalina.tribes.ChannelInterceptor;import org.apache.catalina.tribes.ChannelListener;import org.apache.catalina.tribes.ChannelMessage;import org.apache.catalina.tribes.ChannelReceiver;import org.apache.catalina.tribes.ChannelSender;import org.apache.catalina.tribes.ErrorHandler;import org.apache.catalina.tribes.ManagedChannel;import org.apache.catalina.tribes.Member;import org.apache.catalina.tribes.MembershipListener;import org.apache.catalina.tribes.MembershipService;import org.apache.catalina.tribes.io.ClusterData;import org.apache.catalina.tribes.io.XByteBuffer;import java.io.ObjectInput;import java.io.Externalizable;import java.io.IOException;import java.io.ObjectOutput;/** * The GroupChannel manages the replication channel. It coordinates * message being sent and received with membership announcements. * The channel has an chain of interceptors that can modify the message or perform other logic. * It manages a complete cluster group, both membership and replication. * @author Filip Hanik * @version $Revision: 304032 $, $Date: 2005-07-27 10:11:55 -0500 (Wed, 27 Jul 2005) $ */public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel {            private ChannelCoordinator coordinator = new ChannelCoordinator();    private ChannelInterceptor interceptors = null;        private ArrayList membershipListeners = new ArrayList();    private ArrayList channelListeners = new ArrayList();    private boolean optionCheck = false;    public GroupChannel() {        addInterceptor(this);    }            /**     * Adds an interceptor to the stack for message processing     * @param interceptor ChannelInterceptorBase     */    public void addInterceptor(ChannelInterceptor interceptor) {         if ( interceptors == null ) {            interceptors = interceptor;            interceptors.setNext(coordinator);            interceptors.setPrevious(null);            coordinator.setPrevious(interceptors);        } else {            ChannelInterceptor last = interceptors;            while ( last.getNext() != coordinator ) {                last = last.getNext();            }            last.setNext(interceptor);            interceptor.setNext(coordinator);            interceptor.setPrevious(last);            coordinator.setPrevious(interceptor);        }    }        public void heartbeat() {        super.heartbeat();    }            /**     * Send a message to one or more members in the cluster     * @param destination Member[] - the destinations, null or zero length means all     * @param msg ClusterMessage - the message to send     * @param options int - sender options, see class documentation     * @return ClusterMessage[] - the replies from the members, if any.     */    public void send(Member[] destination, Serializable msg, int options) throws ChannelException {        send(destination,msg,options,null);    }    public void send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException {        if ( msg == null ) return;        try {            if ( destination == null ) throw new ChannelException("No destination given");            if ( destination.length == 0 ) return;            ClusterData data = new ClusterData();//generates a unique Id            data.setAddress(getLocalMember(false));            data.setTimestamp(System.currentTimeMillis());            byte[] b = null;            if ( msg instanceof ByteMessage ){                b = ((ByteMessage)msg).getMessage();                options = options | SEND_OPTIONS_BYTE_MESSAGE;            } else {                b = XByteBuffer.serialize(msg);            }            data.setOptions(options);            XByteBuffer buffer = new XByteBuffer(b.length+128,false);            buffer.append(b,0,b.length);            data.setMessage(buffer);            InterceptorPayload payload = null;            if ( handler != null ) {                payload = new InterceptorPayload();                payload.setErrorHandler(handler);            }            getFirstInterceptor().sendMessage(destination, data, payload);        }catch ( Exception x ) {            if ( x instanceof ChannelException ) throw (ChannelException)x;            throw new ChannelException(x);        }    }            public void messageReceived(ChannelMessage msg) {        if ( msg == null ) return;        try {                        Serializable fwd = null;            if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {                fwd = new ByteMessage(msg.getMessage().getBytes());            } else {                fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(),0,msg.getMessage().getLength());            }            //get the actual member with the correct alive time            Member source = msg.getAddress();                        for ( int i=0; i<channelListeners.size(); i++ ) {                ChannelListener channelListener = (ChannelListener)channelListeners.get(i);                if (channelListener != null && channelListener.accept(fwd, source))                    channelListener.messageReceived(fwd, source);            }//for        } catch ( Exception x ) {            log.error("Unable to deserialize channel message.",x);        }    }        public void memberAdded(Member member) {        //notify upwards        for (int i=0; i<membershipListeners.size(); i++ ) {            MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);            if (membershipListener != null) membershipListener.memberAdded(member);        }    }        public void memberDisappeared(Member member) {        //notify upwards        for (int i=0; i<membershipListeners.size(); i++ ) {            MembershipListener membershipListener = (MembershipListener)membershipListeners.get(i);            if (membershipListener != null) membershipListener.memberDisappeared(member);        }    }        protected void checkOptionFlags() throws ChannelException {        StringBuffer conflicts = new StringBuffer();        ChannelInterceptor first = interceptors;        while ( first != null ) {            int flag = first.getOptionFlag();            if ( flag != 0 ) {                ChannelInterceptor next = first.getNext();                while ( next != null ) {                    int nflag = next.getOptionFlag();                    if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) {                        conflicts.append("[");                        conflicts.append(first.getClass().getName());                        conflicts.append(":");                        conflicts.append(flag);                        conflicts.append(" == ");                        conflicts.append(next.getClass().getName());                        conflicts.append(":");                        conflicts.append(nflag);                        conflicts.append("] ");                    }//end if                    next = next.getNext();                }//while            }//end if            first = first.getNext();        }//while        if ( conflicts.length() > 0 ) throw new ChannelException("Interceptor option flag conflict: "+conflicts.toString());        }        public void start(int svc) throws ChannelException {        if (optionCheck) checkOptionFlags();        super.start(svc);    }        public ChannelInterceptor getFirstInterceptor() {        if (interceptors != null) return interceptors;        else return coordinator;    }        public ChannelReceiver getChannelReceiver() {        return coordinator.getClusterReceiver();    }    public ChannelSender getChannelSender() {        return coordinator.getClusterSender();    }    public MembershipService getMembershipService() {        return coordinator.getMembershipService();    }        public void setChannelReceiver(ChannelReceiver clusterReceiver) {        coordinator.setClusterReceiver(clusterReceiver);    }    public void setChannelSender(ChannelSender clusterSender) {        coordinator.setClusterSender(clusterSender);    }    public void setMembershipService(MembershipService membershipService) {        coordinator.setMembershipService(membershipService);    }    public void addMembershipListener(MembershipListener membershipListener) {        if (!this.membershipListeners.contains(membershipListener) )            this.membershipListeners.add(membershipListener);    }    public void removeMembershipListener(MembershipListener membershipListener) {        membershipListeners.remove(membershipListener);    }    public void addChannelListener(ChannelListener channelListener) {        if (!this.channelListeners.contains(channelListener) )            this.channelListeners.add(channelListener);    }        public void removeChannelListener(ChannelListener channelListener) {        channelListeners.remove(channelListener);    }    public Iterator getInterceptors() {         return new InterceptorIterator(this.getNext(),this.coordinator);    }    public static class InterceptorIterator implements Iterator {        private ChannelInterceptor end;        private ChannelInterceptor start;        public InterceptorIterator(ChannelInterceptor start, ChannelInterceptor end) {            this.end = end;            this.start = start;        }                public boolean hasNext() {            return start!=null && start != end;        }                public Object next() {            Object result = null;            if ( hasNext() ) {                result = start;                start = start.getNext();            }            return result;        }                public void remove() {            //empty operation        }    }    public void setOptionCheck(boolean optionCheck) {        this.optionCheck = optionCheck;    }    public boolean getOptionCheck() {        return optionCheck;    }            public static class NoChannelReply extends RpcMessage {        public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException {        }        public void writeExternal(ObjectOutput out) throws IOException {        }    }    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -