distributedlockmanager.java
来自「JGRoups源码」· Java 代码 · 共 798 行 · 第 1/2 页
JAVA
798 行
// we were unable to aquire local lock return false; } else if (decree instanceof MultiLockDecree) { // Here we abuse the voting mechanism for notifying the other lockManagers of multiple locked objects. MultiLockDecree multiLockDecree = (MultiLockDecree)decree; if(log.isDebugEnabled()) { log.debug("Marking " + multiLockDecree.getKey() + " as multilocked"); } LockDecree lockDecree = (LockDecree)heldLocks.get(multiLockDecree.getKey()); if (lockDecree != null) { lockDecree.setMultipleLocked(true); } return true; } // we should not be here return false; } /** * Commit phase for the lock acquisition or release. * * @param decree should be an instance <code>LockDecree</code>, if not, * we throw <code>VoteException</code> to be ignored by the * <code>VoteChannel</code>. * * @return <code>true</code> when commiting the lock operation succeeds. * * @throws VoteException if we should be ignored during voting. */ public synchronized boolean commit(Object decree) throws VoteException { if (!(decree instanceof LockDecree)) throw new VoteException("Uknown decree type. Ignore me."); if (decree instanceof AcquireLockDecree) { if(log.isDebugEnabled()) log.debug("Committing decree acquisition " + ((LockDecree)decree).lockId); if (!checkPrepared(preparedLocks, (LockDecree)decree)) // there is a prepared lock owned by third party return false; if (localLock((LockDecree)decree)) { preparedLocks.remove(((LockDecree)decree).getKey()); return true; } else return false; } else if (decree instanceof ReleaseLockDecree) { if(log.isDebugEnabled()) log.debug("Committing decree release " + ((LockDecree)decree).lockId); if (!checkPrepared(preparedReleases, (LockDecree)decree)) // there is a prepared release owned by third party return false; if (localRelease((LockDecree)decree)) { preparedReleases.remove(((LockDecree)decree).getKey()); return true; } else return false; } else if (decree instanceof MultiLockDecree) { return true; } // we should not be here return false; } /** * Abort phase for the lock acquisition or release. * * @param decree should be an instance <code>LockDecree</code>, if not, * we throw <code>VoteException</code> to be ignored by the * <code>VoteChannel</code>. * * @throws VoteException if we should be ignored during voting. */ public synchronized void abort(Object decree) throws VoteException { if (!(decree instanceof LockDecree)) throw new VoteException("Uknown decree type. Ignore me."); if (decree instanceof AcquireLockDecree) { if(log.isDebugEnabled()) log.debug("Aborting decree acquisition " + ((LockDecree)decree).lockId); if (!checkPrepared(preparedLocks, (LockDecree)decree)) // there is a prepared lock owned by third party return; preparedLocks.remove(((LockDecree)decree).getKey()); } else if (decree instanceof ReleaseLockDecree) { if(log.isDebugEnabled()) log.debug("Aborting decree release " + ((LockDecree)decree).lockId); if (!checkPrepared(preparedReleases, (LockDecree)decree)) // there is a prepared release owned by third party return; preparedReleases.remove(((LockDecree)decree).getKey()); } } /** * Processes the response list and votes like the default processResponses method with the consensusType VOTE_ALL * If the result of the voting is false, but this DistributedLockManager owns the lock, the result is changed to * true and the lock is released, but marked as multiple locked. (only in the prepare state to reduce traffic) * <p> * Note: we do not support voting in case of Byzantine failures, i.e. * when the node responds with the fault message. */ public boolean processResponses(RspList responses, int consensusType, Object decree) throws ChannelException { if (responses == null) { return false; } int totalPositiveVotes = 0; int totalNegativeVotes = 0; for (int i = 0; i < responses.size(); i++) { Rsp response = (Rsp) responses.elementAt(i); switch (checkResponse(response)) { case PROCESS_SKIP: continue; case PROCESS_BREAK: return false; } VoteResult result = (VoteResult) response.getValue(); totalPositiveVotes += result.getPositiveVotes(); totalNegativeVotes += result.getNegativeVotes(); } boolean voteResult = (totalNegativeVotes == 0 && totalPositiveVotes > 0); if (decree instanceof TwoPhaseVotingAdapter.TwoPhaseWrapper) { TwoPhaseVotingAdapter.TwoPhaseWrapper wrappedDecree = (TwoPhaseVotingAdapter.TwoPhaseWrapper)decree; if (wrappedDecree.isPrepare()) { Object unwrappedDecree = wrappedDecree.getDecree(); if (unwrappedDecree instanceof ReleaseLockDecree) { ReleaseLockDecree releaseLockDecree = (ReleaseLockDecree)unwrappedDecree; LockDecree lock = null; if ((lock = (LockDecree)heldLocks.get(releaseLockDecree.getKey())) != null) { // If there is a local lock... if (!voteResult) { // ... and another DLM voted negatively, but this DLM owns the lock // we inform the other node, that it's lock is multiple locked if (informLockingNodes(releaseLockDecree)) { // we set the local lock to multiple locked lock.setMultipleLocked(true); voteResult = true; } } if (lock.isMultipleLocked()) { //... and the local lock is marked as multilocked // we mark the releaseLockDecree als multiple locked for evaluation when unlock returns releaseLockDecree.setMultipleLocked(true); } } } } } return voteResult; } /** * This method checks the response and says the processResponses() method * what to do. * @return PROCESS_CONTINUE to continue calculating votes, * PROCESS_BREAK to stop calculating votes from the nodes, * PROCESS_SKIP to skip current response. * @throws ChannelException when the response is fatal to the * current voting process. */ private int checkResponse(Rsp response) throws ChannelException { if (!response.wasReceived()) { if (log.isDebugEnabled()) log.debug("Response from node " + response.getSender() + " was not received."); throw new ChannelException("Node " + response.getSender() + " failed to respond."); } if (response.wasSuspected()) { if (log.isDebugEnabled()) log.debug("Node " + response.getSender() + " was suspected."); return PROCESS_SKIP; } Object object = response.getValue(); // we received exception/error, something went wrong // on one of the nodes... and we do not handle such faults if (object instanceof Throwable) { throw new ChannelException("Node " + response.getSender() + " is faulty."); } if (object == null) { return PROCESS_SKIP; } // it is always interesting to know the class that caused failure... if (!(object instanceof VoteResult)) { String faultClass = object.getClass().getName(); // ...but we do not handle byzantine faults throw new ChannelException("Node " + response.getSender() + " generated fault (class " + faultClass + ')'); } // what if we received the response from faulty node? if (object instanceof FailureVoteResult) { if (log.isErrorEnabled()) log.error(((FailureVoteResult) object).getReason()); return PROCESS_BREAK; } // everything is fine :) return PROCESS_CONTINUE; } private boolean informLockingNodes(ReleaseLockDecree releaseLockDecree) throws ChannelException { return votingAdapter.vote(new MultiLockDecree(releaseLockDecree), VOTE_TIMEOUT); } /** Remove all locks held by members who left the previous view */ public void viewAccepted(View new_view) { Vector prev_view=new Vector(current_members); current_members.clear(); current_members.addAll(new_view.getMembers()); System.out.println("-- VIEW: " + current_members + ", old view: " + prev_view); prev_view.removeAll(current_members); if(prev_view.size() > 0) { // we have left members, so we need to check for locks which are still held by them for(Iterator it=prev_view.iterator(); it.hasNext();) { Object mbr=it.next(); removeLocksHeldBy(preparedLocks, mbr); removeLocksHeldBy(preparedReleases, mbr); removeLocksHeldBy(heldLocks, mbr); } } } /** Remove from preparedLocks, preparedReleases and heldLocks */ private void removeLocksHeldBy(Map lock_table, Object mbr) { Map.Entry entry; LockDecree val; Object holder; for(Iterator it=lock_table.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); val=(LockDecree)entry.getValue(); holder=val.requester; if(holder != null && holder.equals(mbr)) { if(log.isTraceEnabled()) log.trace("removing a leftover lock held by " + mbr + " for " + entry.getKey() + ": " + val); it.remove(); } } } public void suspect(Address suspected_mbr) { } public void block() { } /** * This class represents the lock */ public static class LockDecree implements Serializable { protected final Object lockId; protected final Object requester; protected final Object managerId; protected boolean commited; private boolean multipleLocked = false; private static final long serialVersionUID = 7264104838035219212L; private LockDecree(Object lockId, Object requester, Object managerId) { this.lockId = lockId; this.requester = requester; this.managerId = managerId; } /** * Returns the key that should be used for Map lookup. */ public Object getKey() { return lockId; } /** * This is a place-holder for future lock expiration code. */ public boolean isValid() { return true; } public void commit() { this.commited = true; } /** * @return Returns the multipleLocked. */ public boolean isMultipleLocked() { return multipleLocked; } /** * @param multipleLocked The multipleLocked to set. */ public void setMultipleLocked(boolean multipleLocked) { this.multipleLocked = multipleLocked; } /** * This is hashcode from the java.lang.Long class. */ public int hashCode() { return lockId.hashCode(); } public boolean equals(Object other) { if (other instanceof LockDecree) { return ((LockDecree)other).lockId.equals(this.lockId); } else { return false; } } } /** * This class represents the lock to be released. */ public static class AcquireLockDecree extends LockDecree { private final long creationTime; private AcquireLockDecree(Object lockId, Object requester, Object managerId) { super(lockId, requester, managerId); this.creationTime = System.currentTimeMillis(); } /** * Lock aquire decree is valid for a <code>ACQUIRE_EXPIRATION</code> * time after creation and if the lock is still valid (in the * future locks will be leased for a predefined period of time). */ public boolean isValid() { boolean result = super.isValid(); if (!commited && result) result = ((creationTime + ACQUIRE_EXPIRATION) > System.currentTimeMillis()); return result; } } /** * This class represents the lock to be released. */ public static class ReleaseLockDecree extends LockDecree { ReleaseLockDecree(Object lockId, Object requester, Object managerId) { super(lockId, requester, managerId); } } /** * This class represents the lock that has to be marked as multilocked */ public static class MultiLockDecree extends LockDecree { MultiLockDecree(Object lockId, Object requester, Object managerId) { super(lockId, requester, managerId); } MultiLockDecree(ReleaseLockDecree releaseLockDecree) { super(releaseLockDecree.lockId, releaseLockDecree.requester, releaseLockDecree.managerId); } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?