grouprequest.java

来自「JGRoups源码」· Java 代码 · 共 616 行 · 第 1/2 页

JAVA
616
字号
// $Id: GroupRequest.java,v 1.21 2006/08/28 06:51:53 belaban Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Address;import org.jgroups.Message;import org.jgroups.Transport;import org.jgroups.View;import org.jgroups.util.Command;import org.jgroups.util.Rsp;import org.jgroups.util.RspList;import java.util.*;/** * Sends a message to all members of the group and waits for all responses (or timeout). Returns a * boolean value (success or failure). Results (if any) can be retrieved when done.<p> * The supported transport to send requests is currently either a RequestCorrelator or a generic * Transport. One of them has to be given in the constructor. It will then be used to send a * request. When a message is received by either one, the receiveResponse() of this class has to * be called (this class does not actively receive requests/responses itself). Also, when a view change * or suspicion is received, the methods viewChange() or suspect() of this class have to be called.<p> * When started, an array of responses, correlating to the membership, is created. Each response * is added to the corresponding field in the array. When all fields have been set, the algorithm * terminates. * This algorithm can optionally use a suspicion service (failure detector) to detect (and * exclude from the membership) fauly members. If no suspicion service is available, timeouts * can be used instead (see <code>execute()</code>). When done, a list of suspected members * can be retrieved.<p> * Because a channel might deliver requests, and responses to <em>different</em> requests, the * <code>GroupRequest</code> class cannot itself receive and process requests/responses from the * channel. A mechanism outside this class has to do this; it has to determine what the responses * are for the message sent by the <code>execute()</code> method and call <code>receiveResponse()</code> * to do so.<p> * <b>Requirements</b>: lossless delivery, e.g. acknowledgment-based message confirmation. * @author Bela Ban * @version $Revision: 1.21 $ */public class GroupRequest implements RspCollector, Command {    /** return only first response */    public static final int GET_FIRST=1;    /** return all responses */    public static final int GET_ALL=2;    /** return majority (of all non-faulty members) */    public static final int GET_MAJORITY=3;    /** return majority (of all members, may block) */    public static final int GET_ABS_MAJORITY=4;    /** return n responses (may block) */    public static final int GET_N=5;    /** return no response (async call) */    public static final int GET_NONE=6;    private Address caller;    /** Map<Address, Rsp>. Maps requests and responses */    private final Map requests=new HashMap();    /** bounded queue of suspected members */    private final Vector suspects=new Vector();    /** list of members, changed by viewChange() */    private final Collection members=new TreeSet();    /** keep suspects vector bounded */    private final int max_suspects=40;    protected Message request_msg;    protected RequestCorrelator corr; // either use RequestCorrelator or ...    protected Transport transport;    // Transport (one of them has to be non-null)    protected int rsp_mode=GET_ALL;    protected boolean done=false;    protected long timeout=0;    protected int expected_mbrs=0;    private static final Log log=LogFactory.getLog(GroupRequest.class);    /** to generate unique request IDs (see getRequestId()) */    private static long last_req_id=1;    private long req_id=-1; // request ID for this request    /**     @param m The message to be sent     @param corr The request correlator to be used. A request correlator sends requests tagged with     a unique ID and notifies the sender when matching responses are received. The     reason <code>GroupRequest</code> uses it instead of a <code>Transport</code> is     that multiple requests/responses might be sent/received concurrently.     @param members The initial membership. This value reflects the membership to which the request     is sent (and from which potential responses are expected). Is reset by reset().     @param rsp_mode How many responses are expected. Can be     <ol>     <li><code>GET_ALL</code>: wait for all responses from non-suspected members.     A suspicion service might warn     us when a member from which a response is outstanding has crashed, so it can     be excluded from the responses. If no suspision service is available, a     timeout can be used (a value of 0 means wait forever). <em>If a timeout of     0 is used, no suspicion service is available and a member from which we     expect a response has crashed, this methods blocks forever !</em>.     <li><code>GET_FIRST</code>: wait for the first available response.     <li><code>GET_MAJORITY</code>: wait for the majority of all responses. The     majority is re-computed when a member is suspected.     <li><code>GET_ABS_MAJORITY</code>: wait for the majority of     <em>all</em> members.     This includes failed members, so it may block if no timeout is specified.     <li><code>GET_N</CODE>: wait for N members.     Return if n is >= membership+suspects.     <li><code>GET_NONE</code>: don't wait for any response. Essentially send an     asynchronous message to the group members.     </ol>     */    public GroupRequest(Message m, RequestCorrelator corr, Vector members, int rsp_mode) {        request_msg=m;        this.corr=corr;        this.rsp_mode=rsp_mode;        reset(members);        // suspects.removeAllElements(); // bela Aug 23 2002: made suspects bounded    }    /**     @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely     (e.g. if a suspicion service is available; timeouts are not needed).     */    public GroupRequest(Message m, RequestCorrelator corr, Vector members, int rsp_mode,                        long timeout, int expected_mbrs) {        this(m, corr, members, rsp_mode);        if(timeout > 0)            this.timeout=timeout;        this.expected_mbrs=expected_mbrs;    }    public GroupRequest(Message m, Transport transport, Vector members, int rsp_mode) {        request_msg=m;        this.transport=transport;        this.rsp_mode=rsp_mode;        reset(members);        // suspects.removeAllElements(); // bela Aug 23 2002: make suspects bounded    }    /**     * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely     *                       (e.g. if a suspicion service is available; timeouts are not needed).     */    public GroupRequest(Message m, Transport transport, Vector members,                        int rsp_mode, long timeout, int expected_mbrs) {       this(m, transport, members, rsp_mode);       if(timeout > 0)          this.timeout=timeout;       this.expected_mbrs=expected_mbrs;    }    public Address getCaller() {        return caller;    }    public void setCaller(Address caller) {        this.caller=caller;    }    /**     * Sends the message. Returns when n responses have been received, or a     * timeout  has occurred. <em>n</em> can be the first response, all     * responses, or a majority  of the responses.     */    public boolean execute() throws Exception {        if(corr == null && transport == null) {            if(log.isErrorEnabled()) log.error("both corr and transport are null, cannot send group request");            return false;        }        try {            done=false;            boolean retval=doExecute(timeout);            if(retval == false && log.isTraceEnabled())                log.trace("call did not execute correctly, request is " + this.toString());            return retval;        }        finally {            done=true;        }    }    /**     * This method sets the <code>membership</code> variable to the value of     * <code>members</code>. It requires that the caller already hold the     * <code>rsp_mutex</code> lock.     * @param mbrs The new list of members     */    public final void reset(Vector mbrs) {        if(mbrs != null) {            Address mbr;            synchronized(requests) {                requests.clear();                for(int i=0; i < mbrs.size(); i++) {                    mbr=(Address)mbrs.elementAt(i);                    requests.put(mbr, new Rsp(mbr));                }            }            // maintain local membership            synchronized(this.members) {                this.members.clear();                this.members.addAll(mbrs);            }        }        else {            synchronized(requests) {                Rsp rsp;                for(Iterator it=requests.values().iterator(); it.hasNext();) {                    rsp=(Rsp)it.next();                    rsp.setReceived(false);                    rsp.setValue(null);                }            }        }    }    /* ---------------------- Interface RspCollector -------------------------- */    /**     * <b>Callback</b> (called by RequestCorrelator or Transport).     * Adds a response to the response table. When all responses have been received,     * <code>execute()</code> returns.     */    public void receiveResponse(Object response_value, Address sender) {        if(done) {            if(log.isWarnEnabled()) log.warn("command is done; cannot add response !");            return;        }        if(suspects != null && suspects.size() > 0 && suspects.contains(sender)) {            if(log.isWarnEnabled()) log.warn("received response from suspected member " + sender + "; discarding");            return;        }        synchronized(requests) {            Rsp rsp=(Rsp)requests.get(sender);            if(rsp != null) {                if(rsp.wasReceived() == false) {                    rsp.setValue(response_value);                    rsp.setReceived(true);                    if(log.isTraceEnabled())                        log.trace(new StringBuffer("received response for request ").append(req_id).append(", sender=").                                  append(sender).append(", val=").append(response_value));                    requests.notifyAll(); // wakes up execute()                }            }        }    }    /**     * <b>Callback</b> (called by RequestCorrelator or Transport).     * Report to <code>GroupRequest</code> that a member is reported as faulty (suspected).     * This method would probably be called when getting a suspect message from a failure detector     * (where available). It is used to exclude faulty members from the response list.     */    public void suspect(Address suspected_member) {        Rsp rsp;        if(suspected_member == null)            return;        addSuspect(suspected_member);        synchronized(requests) {            rsp=(Rsp)requests.get(suspected_member);            if(rsp != null) {                rsp.setSuspected(true);                rsp.setValue(null);                requests.notifyAll();            }        }    }    /**     * Any member of 'membership' that is not in the new view is flagged as     * SUSPECTED. Any member in the new view that is <em>not</em> in the     * membership (ie, the set of responses expected for the current RPC) will     * <em>not</em> be added to it. If we did this we might run into the     * following problem:     * <ul>     * <li>Membership is {A,B}     * <li>A sends a synchronous group RPC (which sleeps for 60 secs in the     * invocation handler)     * <li>C joins while A waits for responses from A and B     * <li>If this would generate a new view {A,B,C} and if this expanded the     * response set to {A,B,C}, A would wait forever on C's response because C     * never received the request in the first place, therefore won't send a     * response.     * </ul>

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?