grouprequest.java
来自「JGRoups源码」· Java 代码 · 共 616 行 · 第 1/2 页
JAVA
616 行
// $Id: GroupRequest.java,v 1.21 2006/08/28 06:51:53 belaban Exp $package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Address;import org.jgroups.Message;import org.jgroups.Transport;import org.jgroups.View;import org.jgroups.util.Command;import org.jgroups.util.Rsp;import org.jgroups.util.RspList;import java.util.*;/** * Sends a message to all members of the group and waits for all responses (or timeout). Returns a * boolean value (success or failure). Results (if any) can be retrieved when done.<p> * The supported transport to send requests is currently either a RequestCorrelator or a generic * Transport. One of them has to be given in the constructor. It will then be used to send a * request. When a message is received by either one, the receiveResponse() of this class has to * be called (this class does not actively receive requests/responses itself). Also, when a view change * or suspicion is received, the methods viewChange() or suspect() of this class have to be called.<p> * When started, an array of responses, correlating to the membership, is created. Each response * is added to the corresponding field in the array. When all fields have been set, the algorithm * terminates. * This algorithm can optionally use a suspicion service (failure detector) to detect (and * exclude from the membership) fauly members. If no suspicion service is available, timeouts * can be used instead (see <code>execute()</code>). When done, a list of suspected members * can be retrieved.<p> * Because a channel might deliver requests, and responses to <em>different</em> requests, the * <code>GroupRequest</code> class cannot itself receive and process requests/responses from the * channel. A mechanism outside this class has to do this; it has to determine what the responses * are for the message sent by the <code>execute()</code> method and call <code>receiveResponse()</code> * to do so.<p> * <b>Requirements</b>: lossless delivery, e.g. acknowledgment-based message confirmation. * @author Bela Ban * @version $Revision: 1.21 $ */public class GroupRequest implements RspCollector, Command { /** return only first response */ public static final int GET_FIRST=1; /** return all responses */ public static final int GET_ALL=2; /** return majority (of all non-faulty members) */ public static final int GET_MAJORITY=3; /** return majority (of all members, may block) */ public static final int GET_ABS_MAJORITY=4; /** return n responses (may block) */ public static final int GET_N=5; /** return no response (async call) */ public static final int GET_NONE=6; private Address caller; /** Map<Address, Rsp>. Maps requests and responses */ private final Map requests=new HashMap(); /** bounded queue of suspected members */ private final Vector suspects=new Vector(); /** list of members, changed by viewChange() */ private final Collection members=new TreeSet(); /** keep suspects vector bounded */ private final int max_suspects=40; protected Message request_msg; protected RequestCorrelator corr; // either use RequestCorrelator or ... protected Transport transport; // Transport (one of them has to be non-null) protected int rsp_mode=GET_ALL; protected boolean done=false; protected long timeout=0; protected int expected_mbrs=0; private static final Log log=LogFactory.getLog(GroupRequest.class); /** to generate unique request IDs (see getRequestId()) */ private static long last_req_id=1; private long req_id=-1; // request ID for this request /** @param m The message to be sent @param corr The request correlator to be used. A request correlator sends requests tagged with a unique ID and notifies the sender when matching responses are received. The reason <code>GroupRequest</code> uses it instead of a <code>Transport</code> is that multiple requests/responses might be sent/received concurrently. @param members The initial membership. This value reflects the membership to which the request is sent (and from which potential responses are expected). Is reset by reset(). @param rsp_mode How many responses are expected. Can be <ol> <li><code>GET_ALL</code>: wait for all responses from non-suspected members. A suspicion service might warn us when a member from which a response is outstanding has crashed, so it can be excluded from the responses. If no suspision service is available, a timeout can be used (a value of 0 means wait forever). <em>If a timeout of 0 is used, no suspicion service is available and a member from which we expect a response has crashed, this methods blocks forever !</em>. <li><code>GET_FIRST</code>: wait for the first available response. <li><code>GET_MAJORITY</code>: wait for the majority of all responses. The majority is re-computed when a member is suspected. <li><code>GET_ABS_MAJORITY</code>: wait for the majority of <em>all</em> members. This includes failed members, so it may block if no timeout is specified. <li><code>GET_N</CODE>: wait for N members. Return if n is >= membership+suspects. <li><code>GET_NONE</code>: don't wait for any response. Essentially send an asynchronous message to the group members. </ol> */ public GroupRequest(Message m, RequestCorrelator corr, Vector members, int rsp_mode) { request_msg=m; this.corr=corr; this.rsp_mode=rsp_mode; reset(members); // suspects.removeAllElements(); // bela Aug 23 2002: made suspects bounded } /** @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely (e.g. if a suspicion service is available; timeouts are not needed). */ public GroupRequest(Message m, RequestCorrelator corr, Vector members, int rsp_mode, long timeout, int expected_mbrs) { this(m, corr, members, rsp_mode); if(timeout > 0) this.timeout=timeout; this.expected_mbrs=expected_mbrs; } public GroupRequest(Message m, Transport transport, Vector members, int rsp_mode) { request_msg=m; this.transport=transport; this.rsp_mode=rsp_mode; reset(members); // suspects.removeAllElements(); // bela Aug 23 2002: make suspects bounded } /** * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely * (e.g. if a suspicion service is available; timeouts are not needed). */ public GroupRequest(Message m, Transport transport, Vector members, int rsp_mode, long timeout, int expected_mbrs) { this(m, transport, members, rsp_mode); if(timeout > 0) this.timeout=timeout; this.expected_mbrs=expected_mbrs; } public Address getCaller() { return caller; } public void setCaller(Address caller) { this.caller=caller; } /** * Sends the message. Returns when n responses have been received, or a * timeout has occurred. <em>n</em> can be the first response, all * responses, or a majority of the responses. */ public boolean execute() throws Exception { if(corr == null && transport == null) { if(log.isErrorEnabled()) log.error("both corr and transport are null, cannot send group request"); return false; } try { done=false; boolean retval=doExecute(timeout); if(retval == false && log.isTraceEnabled()) log.trace("call did not execute correctly, request is " + this.toString()); return retval; } finally { done=true; } } /** * This method sets the <code>membership</code> variable to the value of * <code>members</code>. It requires that the caller already hold the * <code>rsp_mutex</code> lock. * @param mbrs The new list of members */ public final void reset(Vector mbrs) { if(mbrs != null) { Address mbr; synchronized(requests) { requests.clear(); for(int i=0; i < mbrs.size(); i++) { mbr=(Address)mbrs.elementAt(i); requests.put(mbr, new Rsp(mbr)); } } // maintain local membership synchronized(this.members) { this.members.clear(); this.members.addAll(mbrs); } } else { synchronized(requests) { Rsp rsp; for(Iterator it=requests.values().iterator(); it.hasNext();) { rsp=(Rsp)it.next(); rsp.setReceived(false); rsp.setValue(null); } } } } /* ---------------------- Interface RspCollector -------------------------- */ /** * <b>Callback</b> (called by RequestCorrelator or Transport). * Adds a response to the response table. When all responses have been received, * <code>execute()</code> returns. */ public void receiveResponse(Object response_value, Address sender) { if(done) { if(log.isWarnEnabled()) log.warn("command is done; cannot add response !"); return; } if(suspects != null && suspects.size() > 0 && suspects.contains(sender)) { if(log.isWarnEnabled()) log.warn("received response from suspected member " + sender + "; discarding"); return; } synchronized(requests) { Rsp rsp=(Rsp)requests.get(sender); if(rsp != null) { if(rsp.wasReceived() == false) { rsp.setValue(response_value); rsp.setReceived(true); if(log.isTraceEnabled()) log.trace(new StringBuffer("received response for request ").append(req_id).append(", sender="). append(sender).append(", val=").append(response_value)); requests.notifyAll(); // wakes up execute() } } } } /** * <b>Callback</b> (called by RequestCorrelator or Transport). * Report to <code>GroupRequest</code> that a member is reported as faulty (suspected). * This method would probably be called when getting a suspect message from a failure detector * (where available). It is used to exclude faulty members from the response list. */ public void suspect(Address suspected_member) { Rsp rsp; if(suspected_member == null) return; addSuspect(suspected_member); synchronized(requests) { rsp=(Rsp)requests.get(suspected_member); if(rsp != null) { rsp.setSuspected(true); rsp.setValue(null); requests.notifyAll(); } } } /** * Any member of 'membership' that is not in the new view is flagged as * SUSPECTED. Any member in the new view that is <em>not</em> in the * membership (ie, the set of responses expected for the current RPC) will * <em>not</em> be added to it. If we did this we might run into the * following problem: * <ul> * <li>Membership is {A,B} * <li>A sends a synchronous group RPC (which sleeps for 60 secs in the * invocation handler) * <li>C joins while A waits for responses from A and B * <li>If this would generate a new view {A,B,C} and if this expanded the * response set to {A,B,C}, A would wait forever on C's response because C * never received the request in the first place, therefore won't send a * response. * </ul>
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?