replicationmanager.java

来自「JGRoups源码」· Java 代码 · 共 343 行 · 第 1/2 页

JAVA
343
字号
     *                    resource to become available (for a certain amount of time). When a transaction is      *                    committed or rolled back, all resources associated with this transaction will be released.     * @param lock_info Information about resource(s) to be acquired. This may be null, e.g. if this information     *                  is already implied in <code>data</code>. Both <code>data</code> and <code>lock_info</code>     *                  may be used to define the set of resources to be acquired.     * @param lock_acquisition_timeout The number of milliseconds to wait until a lock acquisition request is     *                                 considered failed (causing a LockingException). If 0 we will wait forever.     *                                 (Note that this may lead to deadlocks).     * @param lock_lease_timeout The number of milliseconds we want to keep the lock for a resource. After     *                           this time has elapsed, the lock will be released. If 0 we won't release the lock(s)     * @param use_locks If this is false, we will ignore all lock information (even if it is specified) and     *                  not use locks at all.     * @return RspList A list of Rsps ({@link org.jgroups.util.Rsp}), one for each member. Each one is the result of     *                 {@link ReplicationReceiver#receive}. If a member didn't send a response, the <code>received</code>     *                 field will be false. If the member was suspected while waiting for a response, the <code>     *                 suspected</code> field will be true. If the <code>receive()</code> method in the receiver returned     *                 a value it will be in field <code>retval</code>. If the receiver threw an exception it will also     *                 be in this field.     */    public RspList send(Address dest,                        byte[]  data,                        boolean synchronous,                        long    synchronous_timeout,                        Xid     transaction,                        byte[]  lock_info,                        long    lock_acquisition_timeout,                        long    lock_lease_timeout,                        boolean use_locks) { // throws UpdateException, TimeoutException, LockingException {                Message         msg=null;        ReplicationData d=new ReplicationData(ReplicationData.SEND,                                              data,                                              transaction,                                              lock_info,                                              lock_acquisition_timeout,                                              lock_lease_timeout,                                              use_locks);            if(log.isInfoEnabled()) log.info("data is " + d + " (synchronous=" + synchronous + ')');        msg=new Message(dest, null, d);        if(synchronous) {            return disp.castMessage(null, msg, GroupRequest.GET_ALL, synchronous_timeout);        }        else {            disp.castMessage(null, msg, GroupRequest.GET_NONE, 0);            return null;        }    }        /**     * Commits all modifications sent to the receivers via {@link #send} and releases all locks associated with     * this transaction. If modifications were made to stable storage (but not to resource), those modifications     * would now need to be transferred to the resource (e.g. database).     */    public void commit(Xid transaction) {        sendMessage(ReplicationData.COMMIT, transaction);    }    /**     * Discards all modifications sent to the receivers via {@link #send} and releases all locks associated with     * this transaction.     */    public void rollback(Xid transaction) {        sendMessage(ReplicationData.ROLLBACK, transaction);    }    /* ------------------------------- RequestHandler interface ------------------------------ */    public Object handle(Message msg) {        Object          retval=null;        ReplicationData data;        if(msg == null) {            if(log.isErrorEnabled()) log.error("received message was null");            return null;        }        if(msg.getLength() == 0) {            if(log.isErrorEnabled()) log.error("payload of received message was null");            return null;        }                try {            data=(ReplicationData)msg.getObject();        }        catch(Throwable ex) {            if(log.isErrorEnabled()) log.error("failure unmarshalling message: " + ex);            return null;        }        switch(data.getType()) {        case ReplicationData.SEND:            try {                return handleSend(data);            }            catch(Throwable ex) {                if(log.isErrorEnabled()) log.error("failed handling update: " + ex);                return ex;            }        case ReplicationData.COMMIT:            handleCommit(data.getTransaction());            break;        case ReplicationData.ROLLBACK:            handleRollback(data.getTransaction());            break;        default:            if(log.isErrorEnabled()) log.error("received incorrect replication message: " + data);            return null;        }        return retval;    }        /* --------------------------- End of RequestHandler interface---------------------------- */    protected Object handleSend(ReplicationData data) throws UpdateException, LockingException {        try {            if(receiver == null) {                if(log.isWarnEnabled()) log.warn("receiver is not set");                return null;            }            return receiver.receive(data.getTransaction(),                                    data.getData(),                                    data.getLockInfo(),                                    data.getLockAcquisitionTimeout(),                                    data.getLockLeaseTimeout(),                                    data.useLocks());        }        catch(Throwable ex) {            return ex;                    }    }    protected void handleCommit(Xid transaction) {        if(receiver == null) {            if(log.isWarnEnabled()) log.warn("receiver is not set");        }        else            receiver.commit(transaction);    }    protected void handleRollback(Xid transaction) {        if(receiver == null) {            if(log.isWarnEnabled()) log.warn("receiver is not set");        }        else            receiver.rollback(transaction);    }    /* -------------------------------------- Private methods ------------------------------------ */        void sendMessage(int type, Xid transaction) {        ReplicationData data=new ReplicationData(type, null, transaction, null, 0, 0, false);        Message msg=new Message(null, null, data);        disp.castMessage(null, msg, GroupRequest.GET_NONE, 0); // send commit message asynchronously    }    /* ---------------------------------- End of Private methods --------------------------------- */	}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?