messagedispatcher.java

来自「JGRoups源码」· Java 代码 · 共 945 行 · 第 1/3 页

JAVA
945
字号
        membership_listener=l;    }    public final void setRequestHandler(RequestHandler rh) {        req_handler=rh;    }    /**     * Offers access to the underlying Channel.     * @return a reference to the underlying Channel.     */    public Channel getChannel() {        return channel;    }    public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException {        if(channel != null) {            channel.send(msg);        }        else            if(adapter != null) {                try {                    if(id != null) {                        adapter.send(id, msg);                    }                    else {                        adapter.send(msg);                    }                }                catch(Throwable ex) {                    if(log.isErrorEnabled()) {                        log.error("exception=" + Util.print(ex));                    }                }            }            else {                if(log.isErrorEnabled()) {                    log.error("channel == null");                }            }    }    /**     * Cast a message to all members, and wait for <code>mode</code> responses. The responses are returned in a response     * list, where each response is associated with its sender.<p> Uses <code>GroupRequest</code>.     *     * @param dests   The members to which the message is to be sent. If it is null, then the message is sent to all     *                members     * @param msg     The message to be sent to n members     * @param mode    Defined in <code>GroupRequest</code>. The number of responses to wait for: <ol> <li>GET_FIRST:     *                return the first response received. <li>GET_ALL: wait for all responses (minus the ones from     *                suspected members) <li>GET_MAJORITY: wait for a majority of all responses (relative to the grp     *                size) <li>GET_ABS_MAJORITY: wait for majority (absolute, computed once) <li>GET_N: wait for n     *                responses (may block if n > group size) <li>GET_NONE: wait for no responses, return immediately     *                (non-blocking) </ol>     * @param timeout If 0: wait forever. Otherwise, wait for <code>mode</code> responses <em>or</em> timeout time.     * @return RspList A list of responses. Each response is an <code>Object</code> and associated to its sender.     */    public RspList castMessage(final Vector dests, Message msg, int mode, long timeout) {        GroupRequest _req=null;        Vector real_dests;        Channel tmp;        // we need to clone because we don't want to modify the original        // (we remove ourselves if LOCAL is false, see below) !        // real_dests=dests != null ? (Vector) dests.clone() : (members != null ? new Vector(members) : null);        if(dests != null) {            real_dests=(Vector)dests.clone();        }        else {            synchronized(members) {                real_dests=new Vector(members);            }        }        // if local delivery is off, then we should not wait for the message from the local member.        // therefore remove it from the membership        tmp=channel;        if(tmp == null) {            if(adapter != null && adapter.getTransport() instanceof Channel) {                tmp=(Channel) adapter.getTransport();            }        }        if(tmp != null && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) {            if(local_addr == null) {                local_addr=tmp.getLocalAddress();            }            if(local_addr != null && real_dests != null) {                real_dests.removeElement(local_addr);            }        }        // don't even send the message if the destination list is empty        if(log.isTraceEnabled())            log.trace("real_dests=" + real_dests);        if(real_dests == null || real_dests.size() == 0) {            if(log.isTraceEnabled())                log.trace("destination list is empty, won't send message");            return new RspList(); // return empty response list        }        _req=new GroupRequest(msg, corr, real_dests, mode, timeout, 0);        _req.setCaller(this.local_addr);        try {            _req.execute();        }        catch(Exception ex) {            throw new RuntimeException("failed executing request " + _req, ex);        }        return _req.getResults();    }    /**     * Multicast a message request to all members in <code>dests</code> and receive responses via the RspCollector     * interface. When done receiving the required number of responses, the caller has to call done(req_id) on the     * underlyinh RequestCorrelator, so that the resources allocated to that request can be freed.     *     * @param dests  The list of members from which to receive responses. Null means all members     * @param req_id The ID of the request. Used by the underlying RequestCorrelator to correlate responses with     *               requests     * @param msg    The request to be sent     * @param coll   The sender needs to provide this interface to collect responses. Call will return immediately if     *               this is null     */    public void castMessage(final Vector dests, long req_id, Message msg, RspCollector coll) {        Vector real_dests;        Channel tmp;        if(msg == null) {            if(log.isErrorEnabled())                log.error("request is null");            return;        }        if(coll == null) {            if(log.isErrorEnabled())                log.error("response collector is null (must be non-null)");            return;        }        // we need to clone because we don't want to modify the original        // (we remove ourselves if LOCAL is false, see below) !        //real_dests=dests != null ? (Vector) dests.clone() : (Vector) members.clone();        if(dests != null) {            real_dests=(Vector)dests.clone();        }        else {            synchronized(members) {                real_dests=new Vector(members);            }        }        // if local delivery is off, then we should not wait for the message from the local member.        // therefore remove it from the membership        tmp=channel;        if(tmp == null) {            if(adapter != null && adapter.getTransport() instanceof Channel) {                tmp=(Channel) adapter.getTransport();            }        }        if(tmp != null && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) {            if(local_addr == null) {                local_addr=tmp.getLocalAddress();            }            if(local_addr != null) {                real_dests.removeElement(local_addr);            }        }        // don't even send the message if the destination list is empty        if(real_dests.size() == 0) {            if(log.isDebugEnabled())                log.debug("destination list is empty, won't send message");            return;        }        try {            corr.sendRequest(req_id, real_dests, msg, coll);        }        catch(Exception e) {            throw new RuntimeException("failure sending request " + req_id + " to " + real_dests, e);        }    }    public void done(long req_id) {        corr.done(req_id);    }    /**     * Sends a message to a single member (destination = msg.dest) and returns the response. The message's destination     * must be non-zero !     */    public Object sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException {        Vector mbrs=new Vector();        RspList rsp_list=null;        Object dest=msg.getDest();        Rsp rsp;        GroupRequest _req=null;        if(dest == null) {            if(log.isErrorEnabled())                log.error("the message's destination is null, cannot send message");            return null;        }        mbrs.addElement(dest);   // dummy membership (of destination address)        _req=new GroupRequest(msg, corr, mbrs, mode, timeout, 0);        _req.setCaller(local_addr);        try {            _req.execute();        }        catch(Exception t) {            throw new RuntimeException("failed executing request " + _req, t);        }        if(mode == GroupRequest.GET_NONE) {            return null;        }        rsp_list=_req.getResults();        if(rsp_list.size() == 0) {            if(log.isWarnEnabled())                log.warn(" response list is empty");            return null;        }        if(rsp_list.size() > 1) {            if(log.isWarnEnabled())                log.warn("response list contains more that 1 response; returning first response !");        }        rsp=(Rsp)rsp_list.elementAt(0);        if(rsp.wasSuspected()) {            throw new SuspectedException(dest);        }        if(!rsp.wasReceived()) {            throw new TimeoutException();        }        return rsp.getValue();    }//    public void channelConnected(Channel channel) {//        if(channel != null) {//            Address new_local_addr=channel.getLocalAddress();//            if(new_local_addr != null) {//                this.local_addr=new_local_addr;////                    if(log.isInfoEnabled()) log.info("MessageDispatcher.channelConnected()", "new local address is " + this.local_addr);//            }//        }//    }////    public void channelDisconnected(Channel channel) {//    }////    public void channelClosed(Channel channel) {//    }////    public void channelShunned() {//    }////    public void channelReconnected(Address addr) {//        if(channel != null) {//            Address new_local_addr=channel.getLocalAddress();//            if(new_local_addr != null) {//                this.local_addr=new_local_addr;////                    if(log.isInfoEnabled()) log.info("MessageDispatcher.channelReconnected()", "new local address is " + this.local_addr);//            }//        }//    }    /* ------------------------ RequestHandler Interface ---------------------- */    public Object handle(Message msg) {        if(req_handler != null) {            return req_handler.handle(msg);        }        else {            return null;        }    }    /* -------------------- End of RequestHandler Interface ------------------- */    class ProtocolAdapter extends Protocol implements UpHandler {        /* ------------------------- Protocol Interface --------------------------- */        public String getName() {            return "MessageDispatcher";        }        public void startUpHandler() {            // do nothing, DON'T REMOVE !!!!        }        public void startDownHandler() {            // do nothing, DON'T REMOVE !!!!        }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?