votingadapter.java

来自「JGRoups源码」· Java 代码 · 共 505 行 · 第 1/2 页

JAVA
505
字号
package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.Rsp;import org.jgroups.util.RspList;import java.io.Serializable;import java.util.*;/** * Voting adapter provides a voting functionality for an application. There  * should be at most one {@link VotingAdapter} listening on one {@link Channel} * instance. Each adapter can have zero or more registered {@link VotingListener}  * instances that will be called during voting process.  * <p> * Decree is an object that has some semantic meaning within the application.  * Each voting listener receives a decree and can respond with either  * <code>true</code> or false. If the decree has no meaning for the voting * listener, it is required to throw {@link VoteException}. In this case * this specific listener will be excluded from the voting on the specified * decree. After performing local voting, this voting adapter sends the request * back to the originator of the voting process. Originator receives results * from each node and decides if all voting process succeeded or not depending  * on the consensus type specified during voting. *  * @author Roman Rokytskyy (rrokytskyy@acm.org) * @author Robert Schaffar-Taurok (robert@fusion.at) * @version $Id: VotingAdapter.java,v 1.10 2006/09/27 12:42:53 belaban Exp $ */public class VotingAdapter implements MessageListener, MembershipListener, VoteResponseProcessor {    /**     * This consensus type means that at least one positive vote is required     * for the voting to succeed.     */    public static final	int VOTE_ANY = 0;    /**     * This consensus type means that at least one positive vote and no negative     * votes are required for the voting to succeed.     */    public static final int VOTE_ALL = 1;    /**     * This consensus type means that number of positive votes should be greater     * than number of negative votes.     */    public static final int VOTE_MAJORITY = 2;    private static final int PROCESS_CONTINUE = 0;    private static final int PROCESS_SKIP = 1;    private static final int PROCESS_BREAK = 2;    private final RpcDispatcher rpcDispatcher;    protected final Log log=LogFactory.getLog(getClass());    private final HashSet suspectedNodes = new HashSet();    private boolean closed;    private final List membership_listeners=new LinkedList();    /**     * Creates an instance of the VoteChannel that uses JGroups     * for communication between group members.     * @param channel JGroups channel.     */    public VotingAdapter(Channel channel) {        rpcDispatcher = new RpcDispatcher(channel, this, this, this);    }    public VotingAdapter(PullPushAdapter adapter, Serializable id) {        rpcDispatcher = new RpcDispatcher(adapter, id, this, this, this);    }    public Collection getMembers() {        return rpcDispatcher != null? rpcDispatcher.getMembers() : null;    }    public void addMembershipListener(MembershipListener l) {        if(l != null && !membership_listeners.contains(l))            membership_listeners.add(l);    }    public void removeMembershipListener(MembershipListener l) {        if(l != null)            membership_listeners.remove(l);    }    /**     * Performs actual voting on the VoteChannel using the JGroups     * facilities for communication.     */    public boolean vote(Object decree, int consensusType, long timeout)    throws ChannelException {        return vote(decree, consensusType, timeout, null);    }    /**     * Performs actual voting on the VoteChannel using the JGroups     * facilities for communication.     */    public boolean vote(Object decree, int consensusType, long timeout, VoteResponseProcessor voteResponseProcessor)    throws ChannelException    {        if (closed)            throw new ChannelException("Channel was closed.");            if(log.isDebugEnabled()) log.debug("Conducting voting on decree " + decree + ", consensus type " +            getConsensusStr(consensusType) + ", timeout " + timeout);        int mode = GroupRequest.GET_ALL;        // perform the consensus mapping        switch (consensusType) {    case VotingAdapter.VOTE_ALL : mode = GroupRequest.GET_ALL; break;    case VotingAdapter.VOTE_ANY : mode = GroupRequest.GET_FIRST; break;    case VotingAdapter.VOTE_MAJORITY : mode = GroupRequest.GET_MAJORITY; break;    default : mode = GroupRequest.GET_ALL;        }        try {            java.lang.reflect.Method method = this.getClass().getMethod(                                    "localVote", new Class[] { Object.class });            MethodCall methodCall = new MethodCall(method, new Object[] {decree});            if(log.isDebugEnabled()) log.debug("Calling remote methods...");            // vote            RspList responses = rpcDispatcher.callRemoteMethods(                                     null, methodCall, mode, timeout);            if(log.isDebugEnabled()) log.debug("Checking responses.");            if (voteResponseProcessor == null) {                voteResponseProcessor = this;            }            return voteResponseProcessor.processResponses(responses, consensusType, decree);        } catch(NoSuchMethodException nsmex) {            // UPS!!! How can this happen?!            if(log.isErrorEnabled()) log.error("Could not find method localVote(Object). " +            nsmex.toString());            throw new UnsupportedOperationException(                            "Cannot execute voting because of absence of " +                            this.getClass().getName() + ".localVote(Object) method.");        }    }    /**     * Processes the response list and makes a decision according to the     * type of the consensus for current voting.     * <p>     * Note: we do not support voting in case of Byzantine failures, i.e.     * when the node responds with the fault message.     */    public boolean processResponses(RspList responses, int consensusType, Object decree)    throws ChannelException    {        if (responses == null) {            return false;        }        boolean voteResult = false;        int totalPositiveVotes = 0;        int totalNegativeVotes = 0;        for(Iterator it=responses.values().iterator(); it.hasNext();) {            Rsp response = (Rsp)it.next();            switch(checkResponse(response)) {        case PROCESS_SKIP : continue;        case PROCESS_BREAK : return false;            }            VoteResult result = (VoteResult)response.getValue();            totalPositiveVotes += result.getPositiveVotes();            totalNegativeVotes += result.getNegativeVotes();        }        switch(consensusType) {    case VotingAdapter.VOTE_ALL :        voteResult = (totalNegativeVotes == 0 && totalPositiveVotes > 0);        break;    case VotingAdapter.VOTE_ANY :        voteResult = (totalPositiveVotes > 0);        break;    case VotingAdapter.VOTE_MAJORITY :        voteResult = (totalPositiveVotes > totalNegativeVotes);        }        return voteResult;    }    /**     * This method checks the response and says the processResponses() method     * what to do.     * @return PROCESS_CONTINUE to continue calculating votes,     * PROCESS_BREAK to stop calculating votes from the nodes,     * PROCESS_SKIP to skip current response.     * @throws ChannelException when the response is fatal to the     * current voting process.     */    private int checkResponse(Rsp response) throws ChannelException {        if (!response.wasReceived()) {                if(log.isDebugEnabled()) log.debug("Response from node " + response.getSender() +                " was not received.");            // what do we do when one node failed to respond?            //throw new ChannelException("Node " + response.GetSender() +            //	" failed to respond.");            return PROCESS_BREAK ;        }        /**@todo check what to do here */        if (response.wasSuspected()) {            if(log.isDebugEnabled()) log.debug("Node " + response.getSender() + " was suspected.");            // wat do we do when one node is suspected?            return PROCESS_SKIP ;        }        Object object = response.getValue();        // we received exception/error, something went wrong        // on one of the nodes... and we do not handle such faults        if (object instanceof Throwable) {            throw new ChannelException("Node " + response.getSender() +                       " is faulty.");        }        if (object == null) {            return PROCESS_SKIP;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?