votingadapter.java
来自「JGRoups源码」· Java 代码 · 共 505 行 · 第 1/2 页
JAVA
505 行
package org.jgroups.blocks;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.*;import org.jgroups.util.Rsp;import org.jgroups.util.RspList;import java.io.Serializable;import java.util.*;/** * Voting adapter provides a voting functionality for an application. There * should be at most one {@link VotingAdapter} listening on one {@link Channel} * instance. Each adapter can have zero or more registered {@link VotingListener} * instances that will be called during voting process. * <p> * Decree is an object that has some semantic meaning within the application. * Each voting listener receives a decree and can respond with either * <code>true</code> or false. If the decree has no meaning for the voting * listener, it is required to throw {@link VoteException}. In this case * this specific listener will be excluded from the voting on the specified * decree. After performing local voting, this voting adapter sends the request * back to the originator of the voting process. Originator receives results * from each node and decides if all voting process succeeded or not depending * on the consensus type specified during voting. * * @author Roman Rokytskyy (rrokytskyy@acm.org) * @author Robert Schaffar-Taurok (robert@fusion.at) * @version $Id: VotingAdapter.java,v 1.10 2006/09/27 12:42:53 belaban Exp $ */public class VotingAdapter implements MessageListener, MembershipListener, VoteResponseProcessor { /** * This consensus type means that at least one positive vote is required * for the voting to succeed. */ public static final int VOTE_ANY = 0; /** * This consensus type means that at least one positive vote and no negative * votes are required for the voting to succeed. */ public static final int VOTE_ALL = 1; /** * This consensus type means that number of positive votes should be greater * than number of negative votes. */ public static final int VOTE_MAJORITY = 2; private static final int PROCESS_CONTINUE = 0; private static final int PROCESS_SKIP = 1; private static final int PROCESS_BREAK = 2; private final RpcDispatcher rpcDispatcher; protected final Log log=LogFactory.getLog(getClass()); private final HashSet suspectedNodes = new HashSet(); private boolean closed; private final List membership_listeners=new LinkedList(); /** * Creates an instance of the VoteChannel that uses JGroups * for communication between group members. * @param channel JGroups channel. */ public VotingAdapter(Channel channel) { rpcDispatcher = new RpcDispatcher(channel, this, this, this); } public VotingAdapter(PullPushAdapter adapter, Serializable id) { rpcDispatcher = new RpcDispatcher(adapter, id, this, this, this); } public Collection getMembers() { return rpcDispatcher != null? rpcDispatcher.getMembers() : null; } public void addMembershipListener(MembershipListener l) { if(l != null && !membership_listeners.contains(l)) membership_listeners.add(l); } public void removeMembershipListener(MembershipListener l) { if(l != null) membership_listeners.remove(l); } /** * Performs actual voting on the VoteChannel using the JGroups * facilities for communication. */ public boolean vote(Object decree, int consensusType, long timeout) throws ChannelException { return vote(decree, consensusType, timeout, null); } /** * Performs actual voting on the VoteChannel using the JGroups * facilities for communication. */ public boolean vote(Object decree, int consensusType, long timeout, VoteResponseProcessor voteResponseProcessor) throws ChannelException { if (closed) throw new ChannelException("Channel was closed."); if(log.isDebugEnabled()) log.debug("Conducting voting on decree " + decree + ", consensus type " + getConsensusStr(consensusType) + ", timeout " + timeout); int mode = GroupRequest.GET_ALL; // perform the consensus mapping switch (consensusType) { case VotingAdapter.VOTE_ALL : mode = GroupRequest.GET_ALL; break; case VotingAdapter.VOTE_ANY : mode = GroupRequest.GET_FIRST; break; case VotingAdapter.VOTE_MAJORITY : mode = GroupRequest.GET_MAJORITY; break; default : mode = GroupRequest.GET_ALL; } try { java.lang.reflect.Method method = this.getClass().getMethod( "localVote", new Class[] { Object.class }); MethodCall methodCall = new MethodCall(method, new Object[] {decree}); if(log.isDebugEnabled()) log.debug("Calling remote methods..."); // vote RspList responses = rpcDispatcher.callRemoteMethods( null, methodCall, mode, timeout); if(log.isDebugEnabled()) log.debug("Checking responses."); if (voteResponseProcessor == null) { voteResponseProcessor = this; } return voteResponseProcessor.processResponses(responses, consensusType, decree); } catch(NoSuchMethodException nsmex) { // UPS!!! How can this happen?! if(log.isErrorEnabled()) log.error("Could not find method localVote(Object). " + nsmex.toString()); throw new UnsupportedOperationException( "Cannot execute voting because of absence of " + this.getClass().getName() + ".localVote(Object) method."); } } /** * Processes the response list and makes a decision according to the * type of the consensus for current voting. * <p> * Note: we do not support voting in case of Byzantine failures, i.e. * when the node responds with the fault message. */ public boolean processResponses(RspList responses, int consensusType, Object decree) throws ChannelException { if (responses == null) { return false; } boolean voteResult = false; int totalPositiveVotes = 0; int totalNegativeVotes = 0; for(Iterator it=responses.values().iterator(); it.hasNext();) { Rsp response = (Rsp)it.next(); switch(checkResponse(response)) { case PROCESS_SKIP : continue; case PROCESS_BREAK : return false; } VoteResult result = (VoteResult)response.getValue(); totalPositiveVotes += result.getPositiveVotes(); totalNegativeVotes += result.getNegativeVotes(); } switch(consensusType) { case VotingAdapter.VOTE_ALL : voteResult = (totalNegativeVotes == 0 && totalPositiveVotes > 0); break; case VotingAdapter.VOTE_ANY : voteResult = (totalPositiveVotes > 0); break; case VotingAdapter.VOTE_MAJORITY : voteResult = (totalPositiveVotes > totalNegativeVotes); } return voteResult; } /** * This method checks the response and says the processResponses() method * what to do. * @return PROCESS_CONTINUE to continue calculating votes, * PROCESS_BREAK to stop calculating votes from the nodes, * PROCESS_SKIP to skip current response. * @throws ChannelException when the response is fatal to the * current voting process. */ private int checkResponse(Rsp response) throws ChannelException { if (!response.wasReceived()) { if(log.isDebugEnabled()) log.debug("Response from node " + response.getSender() + " was not received."); // what do we do when one node failed to respond? //throw new ChannelException("Node " + response.GetSender() + // " failed to respond."); return PROCESS_BREAK ; } /**@todo check what to do here */ if (response.wasSuspected()) { if(log.isDebugEnabled()) log.debug("Node " + response.getSender() + " was suspected."); // wat do we do when one node is suspected? return PROCESS_SKIP ; } Object object = response.getValue(); // we received exception/error, something went wrong // on one of the nodes... and we do not handle such faults if (object instanceof Throwable) { throw new ChannelException("Node " + response.getSender() + " is faulty."); } if (object == null) { return PROCESS_SKIP;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?