distributedlockmanager.java

来自「JGRoups源码」· Java 代码 · 共 798 行 · 第 1/2 页

JAVA
798
字号
                // we were unable to aquire local lock                return false;        } else        if (decree instanceof MultiLockDecree) {            // Here we abuse the voting mechanism for notifying the other lockManagers of multiple locked objects.            MultiLockDecree multiLockDecree = (MultiLockDecree)decree;            if(log.isDebugEnabled()) {                log.debug("Marking " + multiLockDecree.getKey() + " as multilocked");            }            LockDecree lockDecree = (LockDecree)heldLocks.get(multiLockDecree.getKey());            if (lockDecree != null) {                lockDecree.setMultipleLocked(true);            }            return true;        }        // we should not be here        return false;    }    /**     * Commit phase for the lock acquisition or release.     *      * @param decree should be an instance <code>LockDecree</code>, if not,     * we throw <code>VoteException</code> to be ignored by the     * <code>VoteChannel</code>.     *      * @return <code>true</code> when commiting the lock operation succeeds.     *      * @throws VoteException if we should be ignored during voting.     */    public synchronized boolean commit(Object decree) throws VoteException {        if (!(decree instanceof LockDecree))            throw new VoteException("Uknown decree type. Ignore me.");        if (decree instanceof AcquireLockDecree) {                if(log.isDebugEnabled()) log.debug("Committing decree acquisition " + ((LockDecree)decree).lockId);            if (!checkPrepared(preparedLocks, (LockDecree)decree))                // there is a prepared lock owned by third party                return false;            if (localLock((LockDecree)decree)) {                preparedLocks.remove(((LockDecree)decree).getKey());                return true;            } else                return false;        } else        if (decree instanceof ReleaseLockDecree) {                if(log.isDebugEnabled()) log.debug("Committing decree release " + ((LockDecree)decree).lockId);            if (!checkPrepared(preparedReleases, (LockDecree)decree))                // there is a prepared release owned by third party                return false;            if (localRelease((LockDecree)decree)) {                preparedReleases.remove(((LockDecree)decree).getKey());                return true;            } else                return false;        } else        if (decree instanceof MultiLockDecree) {            return true;        }        // we should not be here        return false;    }    /**     * Abort phase for the lock acquisition or release.     *      * @param decree should be an instance <code>LockDecree</code>, if not,     * we throw <code>VoteException</code> to be ignored by the     * <code>VoteChannel</code>.     *      * @throws VoteException if we should be ignored during voting.     */    public synchronized void abort(Object decree) throws VoteException {        if (!(decree instanceof LockDecree))            throw new VoteException("Uknown decree type. Ignore me.");        if (decree instanceof AcquireLockDecree) {                if(log.isDebugEnabled()) log.debug("Aborting decree acquisition " + ((LockDecree)decree).lockId);            if (!checkPrepared(preparedLocks, (LockDecree)decree))                // there is a prepared lock owned by third party                return;            preparedLocks.remove(((LockDecree)decree).getKey());        } else        if (decree instanceof ReleaseLockDecree) {                if(log.isDebugEnabled()) log.debug("Aborting decree release " + ((LockDecree)decree).lockId);            if (!checkPrepared(preparedReleases, (LockDecree)decree))                // there is a prepared release owned by third party                return;            preparedReleases.remove(((LockDecree)decree).getKey());        }    }    /**     * Processes the response list and votes like the default processResponses method with the consensusType VOTE_ALL     * If the result of the voting is false, but this DistributedLockManager owns the lock, the result is changed to     * true and the lock is released, but marked as multiple locked. (only in the prepare state to reduce traffic)     * <p>     * Note: we do not support voting in case of Byzantine failures, i.e.     * when the node responds with the fault message.     */    public boolean processResponses(RspList responses, int consensusType, Object decree) throws ChannelException {        if (responses == null) {            return false;        }        int totalPositiveVotes = 0;        int totalNegativeVotes = 0;        for (int i = 0; i < responses.size(); i++) {            Rsp response = (Rsp) responses.elementAt(i);            switch (checkResponse(response)) {                case PROCESS_SKIP:                    continue;                case PROCESS_BREAK:                    return false;            }            VoteResult result = (VoteResult) response.getValue();            totalPositiveVotes += result.getPositiveVotes();            totalNegativeVotes += result.getNegativeVotes();        }        boolean voteResult = (totalNegativeVotes == 0 && totalPositiveVotes > 0);        if (decree instanceof TwoPhaseVotingAdapter.TwoPhaseWrapper) {            TwoPhaseVotingAdapter.TwoPhaseWrapper wrappedDecree = (TwoPhaseVotingAdapter.TwoPhaseWrapper)decree;            if (wrappedDecree.isPrepare()) {                Object unwrappedDecree = wrappedDecree.getDecree();                if (unwrappedDecree instanceof ReleaseLockDecree) {                    ReleaseLockDecree releaseLockDecree = (ReleaseLockDecree)unwrappedDecree;                    LockDecree lock = null;                    if ((lock = (LockDecree)heldLocks.get(releaseLockDecree.getKey())) != null) {                        // If there is a local lock...                        if (!voteResult) {                            // ... and another DLM voted negatively, but this DLM owns the lock                            // we inform the other node, that it's lock is multiple locked                            if (informLockingNodes(releaseLockDecree)) {                                // we set the local lock to multiple locked                                lock.setMultipleLocked(true);                                voteResult = true;                            }                        }                        if (lock.isMultipleLocked()) {                            //... and the local lock is marked as multilocked                            // we mark the releaseLockDecree als multiple locked for evaluation when unlock returns                            releaseLockDecree.setMultipleLocked(true);                        }                    }                }            }        }        return voteResult;    }    /**     * This method checks the response and says the processResponses() method     * what to do.     * @return PROCESS_CONTINUE to continue calculating votes,     * PROCESS_BREAK to stop calculating votes from the nodes,     * PROCESS_SKIP to skip current response.     * @throws ChannelException when the response is fatal to the     * current voting process.     */    private int checkResponse(Rsp response) throws ChannelException {        if (!response.wasReceived()) {            if (log.isDebugEnabled())                log.debug("Response from node " + response.getSender() + " was not received.");            throw new ChannelException("Node " + response.getSender() + " failed to respond.");        }        if (response.wasSuspected()) {            if (log.isDebugEnabled())                log.debug("Node " + response.getSender() + " was suspected.");            return PROCESS_SKIP;        }        Object object = response.getValue();        // we received exception/error, something went wrong        // on one of the nodes... and we do not handle such faults        if (object instanceof Throwable) {            throw new ChannelException("Node " + response.getSender() + " is faulty.");        }        if (object == null) {            return PROCESS_SKIP;        }        // it is always interesting to know the class that caused failure...        if (!(object instanceof VoteResult)) {            String faultClass = object.getClass().getName();            // ...but we do not handle byzantine faults            throw new ChannelException("Node " + response.getSender() + " generated fault (class " + faultClass + ')');        }        // what if we received the response from faulty node?        if (object instanceof FailureVoteResult) {            if (log.isErrorEnabled())                log.error(((FailureVoteResult) object).getReason());            return PROCESS_BREAK;        }        // everything is fine :)        return PROCESS_CONTINUE;    }    private boolean informLockingNodes(ReleaseLockDecree releaseLockDecree) throws ChannelException {        return votingAdapter.vote(new MultiLockDecree(releaseLockDecree), VOTE_TIMEOUT);    }    /** Remove all locks held by members who left the previous view */    public void viewAccepted(View new_view) {        Vector prev_view=new Vector(current_members);        current_members.clear();        current_members.addAll(new_view.getMembers());        System.out.println("-- VIEW: " + current_members + ", old view: " + prev_view);        prev_view.removeAll(current_members);        if(prev_view.size() > 0) { // we have left members, so we need to check for locks which are still held by them            for(Iterator it=prev_view.iterator(); it.hasNext();) {                Object mbr=it.next();                removeLocksHeldBy(preparedLocks, mbr);                removeLocksHeldBy(preparedReleases, mbr);                removeLocksHeldBy(heldLocks, mbr);            }        }    }    /** Remove from preparedLocks, preparedReleases and heldLocks */    private void removeLocksHeldBy(Map lock_table, Object mbr) {        Map.Entry entry;        LockDecree val;        Object holder;        for(Iterator it=lock_table.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            val=(LockDecree)entry.getValue();            holder=val.requester;            if(holder != null && holder.equals(mbr)) {                if(log.isTraceEnabled())                    log.trace("removing a leftover lock held by " + mbr + " for " + entry.getKey() + ": " + val);                it.remove();            }        }    }    public void suspect(Address suspected_mbr) {    }    public void block() {    }    /**     * This class represents the lock     */    public static class LockDecree implements Serializable {        protected final Object lockId;        protected final Object requester;        protected final Object managerId;        protected boolean commited;        private boolean multipleLocked = false;        private static final long serialVersionUID = 7264104838035219212L;        private LockDecree(Object lockId, Object requester, Object managerId) {            this.lockId = lockId;            this.requester = requester;            this.managerId = managerId;        }        /**         * Returns the key that should be used for Map lookup.         */        public Object getKey() { return lockId; }        /**         * This is a place-holder for future lock expiration code.         */        public boolean isValid() { return true; }        public void commit() { this.commited = true; }        /**         * @return Returns the multipleLocked.         */        public boolean isMultipleLocked() {            return multipleLocked;        }        /**         * @param multipleLocked The multipleLocked to set.         */        public void setMultipleLocked(boolean multipleLocked) {            this.multipleLocked = multipleLocked;        }        /**         * This is hashcode from the java.lang.Long class.         */        public int hashCode() {            return lockId.hashCode();        }        public boolean equals(Object other) {            if (other instanceof LockDecree) {                return ((LockDecree)other).lockId.equals(this.lockId);            } else {                return false;            }        }    }    /**     * This class represents the lock to be released.     */    public static class AcquireLockDecree extends LockDecree {        private final long creationTime;        private AcquireLockDecree(Object lockId, Object requester, Object managerId) {            super(lockId, requester, managerId);            this.creationTime = System.currentTimeMillis();        }        /**         * Lock aquire decree is valid for a <code>ACQUIRE_EXPIRATION</code>         * time after creation and if the lock is still valid (in the         * future locks will be leased for a predefined period of time).         */        public boolean isValid() {            boolean result =  super.isValid();            if (!commited && result)                result = ((creationTime + ACQUIRE_EXPIRATION) > System.currentTimeMillis());            return result;        }    }    /**     * This class represents the lock to be released.     */    public static class ReleaseLockDecree extends LockDecree {        ReleaseLockDecree(Object lockId, Object requester, Object managerId) {            super(lockId, requester, managerId);        }    }    /**     * This class represents the lock that has to be marked as multilocked      */    public static class MultiLockDecree extends LockDecree {        MultiLockDecree(Object lockId, Object requester, Object managerId) {            super(lockId, requester, managerId);        }        MultiLockDecree(ReleaseLockDecree releaseLockDecree) {            super(releaseLockDecree.lockId, releaseLockDecree.requester, releaseLockDecree.managerId);        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?