⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messageprotocol.java

📁 JGRoups源码
💻 JAVA
字号:
// $Id: MessageProtocol.java,v 1.6 2006/04/05 05:32:24 belaban Exp $package org.jgroups.stack;import org.jgroups.*;import org.jgroups.blocks.GroupRequest;import org.jgroups.blocks.RequestCorrelator;import org.jgroups.blocks.RequestHandler;import org.jgroups.util.Rsp;import org.jgroups.util.RspList;import org.jgroups.util.Util;import java.util.Vector;/** * Based on Protocol, but incorporates RequestCorrelator and GroupRequest: the latter can * be used to mcast messages to all members and receive their reponses.<p> * A protocol based on this template can send messages to all members and receive all, a single, * n, or none responses. Requests directed towards the protocol can be handled by overriding * method <code>Handle</code>.<p> * Requests and responses are in the form of <code>Message</code>s, which would typically need to * contain information pertaining to the request/response, e.g. in the form of objects contained * in the message. To use remote method calls, use <code>RpcProtocol</code> instead.<p> * Typical use of of a <code>MessageProtocol</code> would be when a protocol needs to interact with * its peer protocols at each of the members' protocol stacks. A simple protocol like fragmentation, * which does not need to interact with other instances of fragmentation, may simply subclass * <code>Protocol</code> instead. * @author Bela Ban */public abstract class MessageProtocol extends Protocol implements RequestHandler {    protected RequestCorrelator _corr=null;    protected final Vector members=new Vector();    public void start() throws Exception {        if(_corr == null)            _corr=new RequestCorrelator(getName(), this, this);        _corr.start();    }    public void stop() {        if(_corr != null) {            _corr.stop();            // _corr=null;        }    }    /**     Cast a message to all members, and wait for <code>mode</code> responses. The responses are     returned in a response list, where each response is associated with its sender.<p>     Uses <code>GroupRequest</code>.     @param dests The members from which responses are expected. If it is null, replies from all members     are expected. The request itself is multicast to all members.     @param msg The message to be sent to n members     @param mode Defined in <code>GroupRequest</code>. The number of responses to wait for:     <ol>     <li>GET_FIRST: return the first response received.     <li>GET_ALL: wait for all responses (minus the ones from suspected members)     <li>GET_MAJORITY: wait for a majority of all responses (relative to the grp size)     <li>GET_ABS_MAJORITY: wait for majority (absolute, computed once)     <li>GET_N: wait for n responses (may block if n > group size)     <li>GET_NONE: wait for no responses, return immediately (non-blocking)     </ol>     @param timeout If 0: wait forever. Otherwise, wait for <code>mode</code> responses     <em>or</em> timeout time.     @return RspList A list of responses. Each response is an <code>Object</code> and associated     to its sender.     */    public RspList castMessage(Vector dests, Message msg, int mode, long timeout) {        GroupRequest _req=null;        Vector real_dests=dests != null? (Vector)dests.clone() : (Vector)members.clone();        // This marks message as sent by us ! (used in up()        // msg.addHeader(new MsgProtHeader(getName()));   ++ already done by RequestCorrelator        _req=new GroupRequest(msg, _corr, real_dests, mode, timeout, 0);        try {            _req.execute();        }        catch(Exception e) {            throw new RuntimeException("failed executing request " + _req, e);        }        return _req.getResults();    }    /**     Sends a message to a single member (destination = msg.dest) and returns the response.     The message's destination must be non-zero !     */    public Object sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException {        Vector mbrs=new Vector();        RspList rsp_list=null;        Object dest=msg.getDest();        Rsp rsp;        GroupRequest _req=null;        if(dest == null) {            System.out.println("MessageProtocol.sendMessage(): the message's destination is null ! " +                               "Cannot send message !");            return null;        }        mbrs.addElement(dest);   // dummy membership (of destination address)        _req=new GroupRequest(msg, _corr, mbrs, mode, timeout, 0);        try {            _req.execute();        }        catch(Exception e) {            throw new RuntimeException("failed executing request " + _req, e);        }        if(mode == GroupRequest.GET_NONE)            return null;        rsp_list=_req.getResults();        if(rsp_list.size() == 0) {            if(log.isErrorEnabled()) log.error("response list is empty");            return null;        }        if(rsp_list.size() > 1)            if(log.isErrorEnabled()) log.error("response list contains " +                                                         "more that 1 response; returning first response");        rsp=(Rsp)rsp_list.elementAt(0);        if(rsp.wasSuspected())            throw new SuspectedException(dest);        if(!rsp.wasReceived())            throw new TimeoutException();        return rsp.getValue();    }    /**     Processes a request destined for this layer. The return value is sent as response.     */    public Object handle(Message req) {        System.out.println("MessageProtocol.handle(): this method should be overridden !");        return null;    }    /**     * Handle an event coming from the layer above     */    public final void up(Event evt) {        Message msg;        Object hdr;        switch(evt.getType()) {            case Event.VIEW_CHANGE:                updateView((View)evt.getArg());                break;            default:                if(!handleUpEvent(evt)) return;                if(evt.getType() == Event.MSG) {                    msg=(Message)evt.getArg();                    hdr=msg.getHeader(getName());                    if(!(hdr instanceof RequestCorrelator.Header))                        break;                }                // [[[ TODO                // RequestCorrelator.receive() is currently calling passUp()                // itself. Only _this_ method should call passUp()!                // So return instead of breaking until fixed (igeorg)                // ]]] TODO                if(_corr != null) {                    _corr.receive(evt);                    return;                }                else                    if(log.isWarnEnabled()) log.warn("Request correlator is null, evt=" + Util.printEvent(evt));                break;        }        passUp(evt);    }    /**     * This message is not originated by this layer, therefore we can just     * pass it down without having to go through the request correlator.     * We do this ONLY for messages !     */    public final void down(Event evt) {        switch(evt.getType()) {            case Event.VIEW_CHANGE:                updateView((View)evt.getArg());                if(!handleDownEvent(evt)) return;                break;            case Event.MSG:                if(!handleDownEvent(evt)) return;                break;            default:                if(!handleDownEvent(evt)) return;                break;        }        passDown(evt);    }    protected void updateView(View new_view) {        Vector new_mbrs=new_view.getMembers();        if(new_mbrs != null) {            synchronized(members) {                members.removeAllElements();                members.addAll(new_mbrs);            }        }    }    /**     Handle up event. Return false if it should not be passed up the stack.     */    protected boolean handleUpEvent(Event evt) {        // override in subclasses        return true;    }    /**     Handle down event. Return false if it should not be passed down the stack.     */    protected boolean handleDownEvent(Event evt) {        // override in subclasses        return true;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -