⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpcchannel.java

📁 精通tomcat书籍原代码,希望大家共同学习
💻 JAVA
字号:
/* * Copyright 1999,2004-2006 The Apache Software Foundation. *  * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *  *      http://www.apache.org/licenses/LICENSE-2.0 *  * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.group;import java.io.Externalizable;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.io.Serializable;import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import org.apache.catalina.tribes.Channel;import org.apache.catalina.tribes.ChannelException;import org.apache.catalina.tribes.ChannelListener;import org.apache.catalina.tribes.Member;import org.apache.catalina.tribes.util.UUIDGenerator;import org.apache.catalina.tribes.tipis.*;/** * A channel to handle RPC messaging * @author Filip Hanik */public class RpcChannel implements ChannelListener{    protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(RpcChannel.class);        public static final int FIRST_REPLY = 1;    public static final int MAJORITY_REPLY = 2;    public static final int ALL_REPLY = 3;        private Channel channel;    private RpcCallback callback;    private byte[] rpcId;        private HashMap responseMap = new HashMap();    /**     * Create an RPC channel. You can have several RPC channels attached to a group     * all separated out by the uniqueness     * @param rpcId - the unique Id for this RPC group     * @param channel Channel     * @param callback RpcCallback     */    public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {        this.channel = channel;        this.callback = callback;        this.rpcId = rpcId;        channel.addChannelListener(this);    }            /**     * Send a message and wait for the response.     * @param destination Member[] - the destination for the message, and the members you request a reply from     * @param message Serializable - the message you are sending out     * @param options int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY     * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned     * @return Response[] - an array of response objects.     * @throws ChannelException     */    public Response[] send(Member[] destination,                            Serializable message,                           int rpcOptions,                            int channelOptions,                           long timeout) throws ChannelException {                if ( destination==null || destination.length == 0 ) return new Response[0];                //avoid dead lock        channelOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;                RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false));        RpcCollector collector = new RpcCollector(key,rpcOptions,destination.length,timeout);        try {            synchronized (collector) {                responseMap.put(key, collector);                RpcMessage rmsg = new RpcMessage(rpcId, key.id, message);                channel.send(destination, rmsg, channelOptions);                collector.wait(timeout);            }        } catch ( InterruptedException ix ) {            Thread.currentThread().interrupted();            throw new ChannelException(ix);        }finally {            responseMap.remove(key);        }        return collector.getResponses();    }        public void messageReceived(Serializable msg, Member sender) {        RpcMessage rmsg = (RpcMessage)msg;        RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);        if ( rmsg.reply ) {            RpcCollector collector = (RpcCollector)responseMap.get(key);            if (collector == null) {                callback.leftOver(rmsg.message, sender);            } else {                synchronized (collector) {                    //make sure it hasn't been removed                    if ( responseMap.containsKey(key) ) {                        collector.addResponse(rmsg.message, sender);                        if (collector.isComplete()) collector.notifyAll();                    } else {                        callback.leftOver(rmsg.message, sender);                    }                }//synchronized            }//end if        } else{            Serializable reply = callback.replyRequest(rmsg.message,sender);            rmsg.reply = true;            rmsg.message = reply;            try {                channel.send(new Member[] {sender}, rmsg,0);            }catch ( Exception x )  {                log.error("Unable to send back reply in RpcChannel.",x);            }        }//end if    }        public void breakdown() {        channel.removeChannelListener(this);    }        public void finalize() {        breakdown();    }        public boolean accept(Serializable msg, Member sender) {        if ( msg instanceof RpcMessage ) {            RpcMessage rmsg = (RpcMessage)msg;            return Arrays.equals(rmsg.rpcId,rpcId);        }else return false;    }        public Channel getChannel() {        return channel;    }    public RpcCallback getCallback() {        return callback;    }    public byte[] getRpcId() {        return rpcId;    }    public void setChannel(Channel channel) {        this.channel = channel;    }    public void setCallback(RpcCallback callback) {        this.callback = callback;    }    public void setRpcId(byte[] rpcId) {        this.rpcId = rpcId;    }        /**     *      * Class that holds all response.     * @author not attributable     * @version 1.0     */    public static class RpcCollector {        public ArrayList responses = new ArrayList();         public RpcCollectorKey key;        public int options;        public int destcnt;        public long timeout;                public RpcCollector(RpcCollectorKey key, int options, int destcnt, long timeout) {            this.key = key;            this.options = options;            this.destcnt = destcnt;            this.timeout = timeout;        }                public void addResponse(Serializable message, Member sender){            Response resp = new Response(sender,message);            responses.add(resp);        }                public boolean isComplete() {            switch (options) {                case ALL_REPLY:                    return destcnt == responses.size();                case MAJORITY_REPLY:                {                    float perc = ((float)responses.size()) / ((float)destcnt);                    return perc >= 0.50f;                }                case FIRST_REPLY:                    return responses.size()>0;                default:                    return false;            }        }                public int hashCode() {            return key.hashCode();        }                public boolean equals(Object o) {            if ( o instanceof RpcCollector ) {                RpcCollector r = (RpcCollector)o;                return r.key.equals(this.key);            } else return false;        }                public Response[] getResponses() {            return (Response[])responses.toArray(new Response[responses.size()]);        }    }        public static class RpcCollectorKey {        byte[] id;        public RpcCollectorKey(byte[] id) {            this.id = id;        }                public int hashCode() {            return id[0]+id[1]+id[2]+id[3];        }        public boolean equals(Object o) {            if ( o instanceof RpcCollectorKey ) {                RpcCollectorKey r = (RpcCollectorKey)o;                return Arrays.equals(id,r.id);            } else return false;        }            }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -